YARN-11015. Decouple queue capacity with ability to run OPPORTUNISTIC container (#3779)

This commit is contained in:
Andrew Chung 2022-01-24 11:03:36 -05:00 committed by GitHub
parent b795f6f9a8
commit 3ed3c74a6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 711 additions and 193 deletions

View File

@ -1255,7 +1255,21 @@ public class YarnConfiguration extends Configuration {
/** Prefix for all node manager configs.*/
public static final String NM_PREFIX = "yarn.nodemanager.";
/** Max Queue length of <code>OPPORTUNISTIC</code> containers on the NM. */
/**
* At the NM, the policy to determine whether to queue an
* <code>OPPORTUNISTIC</code> container or not.
* If set to <code>BY_QUEUE_LEN</code>, uses the queue capacity, as set by
* {@link YarnConfiguration#NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH},
* to limit how many containers to accept/queue.
* If set to <code>BY_RESOURCES</code>, limits the number of containers
* accepted based on the resource capacity of the node.
*/
public static final String NM_OPPORTUNISTIC_CONTAINERS_QUEUE_POLICY =
NM_PREFIX + "opportunistic-containers-queue-policy";
/** Max Queue length of <code>OPPORTUNISTIC</code> containers on the NM.
* If set to 0, NM does not accept any <code>OPPORTUNISTIC</code> containers.
* If set to {@literal > 0}, enforces the queue capacity. */
public static final String NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH =
NM_PREFIX + "opportunistic-containers-max-queue-length";
public static final int DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH =

View File

@ -1292,8 +1292,27 @@
</property>
<property>
<description>Max number of OPPORTUNISTIC containers to queue at the
nodemanager.</description>
<description>
At the NM, the policy to determine whether to queue an
OPPORTUNISTIC container or not.
If set to BY_QUEUE_LEN, uses the queue capacity, as set by
yarn.nodemanager.opportunistic-containers-max-queue-length
to limit how many containers to accept/queue.
If set to BY_RESOURCES, limits the number of containers
accepted based on the resource capacity of the node.
</description>
<name>yarn.nodemanager.opportunistic-containers-queue-policy</name>
<value>BY_QUEUE_LEN</value>
</property>
<property>
<description>
Max number of OPPORTUNISTIC containers to queue at the
nodemanager (NM). If the value is 0 or negative,
NMs do not allow any OPPORTUNISTIC containers.
If the value is positive, the NM caps the number of OPPORTUNISTIC
containers that can be queued at the NM.
</description>
<name>yarn.nodemanager.opportunistic-containers-max-queue-length</name>
<value>0</value>
</property>

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
@ -34,6 +35,9 @@ public class AllocationBasedResourceUtilizationTracker implements
private static final Logger LOG =
LoggerFactory.getLogger(AllocationBasedResourceUtilizationTracker.class);
private static final long LEFT_SHIFT_MB_IN_BYTES = 20;
private static final int RIGHT_SHIFT_BYTES_IN_MB = 20;
private ResourceUtilization containersAllocation;
private ContainerScheduler scheduler;
@ -80,10 +84,34 @@ public class AllocationBasedResourceUtilizationTracker implements
*/
@Override
public boolean hasResourcesAvailable(Container container) {
long pMemBytes = container.getResource().getMemorySize() * 1024 * 1024L;
return hasResourcesAvailable(pMemBytes,
(long) (getContainersMonitor().getVmemRatio()* pMemBytes),
container.getResource().getVirtualCores());
return hasResourcesAvailable(container.getResource());
}
/**
* Converts memory in megabytes to bytes by bitwise left-shifting 20 times.
* @param memMB the memory in megabytes
* @return the memory in bytes
*/
private static long convertMBToBytes(final long memMB) {
return memMB << LEFT_SHIFT_MB_IN_BYTES;
}
/**
* Converts memory in bytes to megabytes by bitwise right-shifting 20 times.
* @param bytes the memory in bytes
* @return the memory in megabytes
*/
private static long convertBytesToMB(final long bytes) {
return bytes >> RIGHT_SHIFT_BYTES_IN_MB;
}
@Override
public boolean hasResourcesAvailable(Resource resource) {
long pMemBytes = convertMBToBytes(resource.getMemorySize());
final long vmemBytes = (long)
(getContainersMonitor().getVmemRatio() * pMemBytes);
return hasResourcesAvailable(
pMemBytes, vmemBytes, resource.getVirtualCores());
}
private boolean hasResourcesAvailable(long pMemBytes, long vMemBytes,
@ -92,13 +120,13 @@ public class AllocationBasedResourceUtilizationTracker implements
if (LOG.isDebugEnabled()) {
LOG.debug("pMemCheck [current={} + asked={} > allowed={}]",
this.containersAllocation.getPhysicalMemory(),
(pMemBytes >> 20),
(getContainersMonitor().getPmemAllocatedForContainers() >> 20));
convertBytesToMB(pMemBytes),
convertBytesToMB(
getContainersMonitor().getPmemAllocatedForContainers()));
}
if (this.containersAllocation.getPhysicalMemory() +
(int) (pMemBytes >> 20) >
(int) (getContainersMonitor()
.getPmemAllocatedForContainers() >> 20)) {
convertBytesToMB(pMemBytes) > convertBytesToMB(
getContainersMonitor().getPmemAllocatedForContainers())) {
return false;
}
@ -106,15 +134,17 @@ public class AllocationBasedResourceUtilizationTracker implements
LOG.debug("before vMemCheck" +
"[isEnabled={}, current={} + asked={} > allowed={}]",
getContainersMonitor().isVmemCheckEnabled(),
this.containersAllocation.getVirtualMemory(), (vMemBytes >> 20),
(getContainersMonitor().getVmemAllocatedForContainers() >> 20));
this.containersAllocation.getVirtualMemory(),
convertBytesToMB(vMemBytes),
convertBytesToMB(
getContainersMonitor().getVmemAllocatedForContainers()));
}
// Check virtual memory.
if (getContainersMonitor().isVmemCheckEnabled() &&
this.containersAllocation.getVirtualMemory() +
(int) (vMemBytes >> 20) >
(int) (getContainersMonitor()
.getVmemAllocatedForContainers() >> 20)) {
convertBytesToMB(vMemBytes) >
convertBytesToMB(getContainersMonitor()
.getVmemAllocatedForContainers())) {
return false;
}

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService
.RecoveredContainerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -74,6 +76,7 @@ public class ContainerScheduler extends AbstractService implements
private final Context context;
// Capacity of the queue for opportunistic Containers.
private final int maxOppQueueLength;
private final boolean forceStartGuaranteedContainers;
// Queue of Guaranteed Containers waiting for resources to run
private final LinkedHashMap<ContainerId, Container>
@ -106,9 +109,37 @@ public class ContainerScheduler extends AbstractService implements
private final AsyncDispatcher dispatcher;
private final NodeManagerMetrics metrics;
private final OpportunisticContainersQueuePolicy oppContainersQueuePolicy;
private Boolean usePauseEventForPreemption = false;
private static int getMaxOppQueueLengthFromConf(final Context context) {
if (context == null || context.getConf() == null) {
return YarnConfiguration
.DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH;
}
return context.getConf().getInt(
YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH,
YarnConfiguration.DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH
);
}
private static OpportunisticContainersQueuePolicy
getOppContainersQueuePolicyFromConf(final Context context) {
final OpportunisticContainersQueuePolicy queuePolicy;
if (context == null || context.getConf() == null) {
queuePolicy = OpportunisticContainersQueuePolicy.DEFAULT;
} else {
queuePolicy = context.getConf().getEnum(
YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_QUEUE_POLICY,
OpportunisticContainersQueuePolicy.DEFAULT
);
}
return queuePolicy;
}
@VisibleForTesting
ResourceHandlerChain resourceHandlerChain = null;
@ -120,10 +151,9 @@ public class ContainerScheduler extends AbstractService implements
*/
public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
NodeManagerMetrics metrics) {
this(context, dispatcher, metrics, context.getConf().getInt(
YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH,
YarnConfiguration.
DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH));
this(context, dispatcher, metrics,
getOppContainersQueuePolicyFromConf(context),
getMaxOppQueueLengthFromConf(context));
}
@ -149,13 +179,35 @@ public class ContainerScheduler extends AbstractService implements
@VisibleForTesting
public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
NodeManagerMetrics metrics, int qLength) {
this(context, dispatcher, metrics,
getOppContainersQueuePolicyFromConf(context), qLength);
}
@VisibleForTesting
public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
NodeManagerMetrics metrics,
OpportunisticContainersQueuePolicy oppContainersQueuePolicy,
int qLength) {
super(ContainerScheduler.class.getName());
this.context = context;
this.dispatcher = dispatcher;
this.metrics = metrics;
this.maxOppQueueLength = (qLength <= 0) ? 0 : qLength;
this.utilizationTracker =
new AllocationBasedResourceUtilizationTracker(this);
this.oppContainersQueuePolicy = oppContainersQueuePolicy;
switch (oppContainersQueuePolicy) {
case BY_RESOURCES:
this.maxOppQueueLength = 0;
this.forceStartGuaranteedContainers = false;
LOG.info("Setting max opportunistic queue length to 0,"
+ " as {} is incompatible with queue length",
oppContainersQueuePolicy);
break;
case BY_QUEUE_LEN:
default:
this.maxOppQueueLength = qLength;
this.forceStartGuaranteedContainers = (maxOppQueueLength <= 0);
}
this.opportunisticContainersStatus =
OpportunisticContainersStatus.newInstance();
}
@ -187,7 +239,7 @@ public class ContainerScheduler extends AbstractService implements
shedQueuedOpportunisticContainers();
break;
case RECOVERY_COMPLETED:
startPendingContainers(maxOppQueueLength <= 0);
startPendingContainers(forceStartGuaranteedContainers);
metrics.setQueuedContainers(queuedOpportunisticContainers.size(),
queuedGuaranteedContainers.size());
break;
@ -243,7 +295,7 @@ public class ContainerScheduler extends AbstractService implements
LOG.warn(String.format("Could not update resources on " +
"continer update of %s", containerId), ex);
}
startPendingContainers(maxOppQueueLength <= 0);
startPendingContainers(forceStartGuaranteedContainers);
metrics.setQueuedContainers(queuedOpportunisticContainers.size(),
queuedGuaranteedContainers.size());
}
@ -371,7 +423,6 @@ public class ContainerScheduler extends AbstractService implements
ExecutionType.OPPORTUNISTIC) {
this.metrics.completeOpportunisticContainer(container.getResource());
}
boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0);
startPendingContainers(forceStartGuaranteedContainers);
}
this.metrics.setQueuedContainers(queuedOpportunisticContainers.size(),
@ -380,13 +431,13 @@ public class ContainerScheduler extends AbstractService implements
/**
* Start pending containers in the queue.
* @param forceStartGuaranteedContaieners When this is true, start guaranteed
* @param forceStartGContainers When this is true, start guaranteed
* container without looking at available resource
*/
private void startPendingContainers(boolean forceStartGuaranteedContaieners) {
private void startPendingContainers(boolean forceStartGContainers) {
// Start guaranteed containers that are paused, if resources available.
boolean resourcesAvailable = startContainers(
queuedGuaranteedContainers.values(), forceStartGuaranteedContaieners);
queuedGuaranteedContainers.values(), forceStartGContainers);
// Start opportunistic containers, if resources available.
if (resourcesAvailable) {
startContainers(queuedOpportunisticContainers.values(), false);
@ -429,6 +480,21 @@ public class ContainerScheduler extends AbstractService implements
return this.utilizationTracker.hasResourcesAvailable(container);
}
private boolean resourceAvailableToQueueOppContainer(
Container newOppContainer) {
final Resource cumulativeResource = Resource.newInstance(Resources.none());
for (final Container container : queuedGuaranteedContainers.values()) {
Resources.addTo(cumulativeResource, container.getResource());
}
for (final Container container : queuedOpportunisticContainers.values()) {
Resources.addTo(cumulativeResource, container.getResource());
}
Resources.addTo(cumulativeResource, newOppContainer.getResource());
return this.utilizationTracker.hasResourcesAvailable(cumulativeResource);
}
private boolean enqueueContainer(Container container) {
boolean isGuaranteedContainer = container.getContainerTokenIdentifier().
getExecutionType() == ExecutionType.GUARANTEED;
@ -438,7 +504,21 @@ public class ContainerScheduler extends AbstractService implements
queuedGuaranteedContainers.put(container.getContainerId(), container);
isQueued = true;
} else {
if (queuedOpportunisticContainers.size() < maxOppQueueLength) {
switch (oppContainersQueuePolicy) {
case BY_RESOURCES:
isQueued = resourceAvailableToQueueOppContainer(container);
break;
case BY_QUEUE_LEN:
default:
if (maxOppQueueLength <= 0) {
isQueued = false;
} else {
isQueued =
queuedOpportunisticContainers.size() < maxOppQueueLength;
}
}
if (isQueued) {
LOG.info("Opportunistic container {} will be queued at the NM.",
container.getContainerId());
queuedOpportunisticContainers.put(
@ -451,7 +531,6 @@ public class ContainerScheduler extends AbstractService implements
container.sendKillEvent(
ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
"Opportunistic container queue is full.");
isQueued = false;
}
}
@ -484,7 +563,6 @@ public class ContainerScheduler extends AbstractService implements
// When opportunistic container not allowed (which is determined by
// max-queue length of pending opportunistic containers <= 0), start
// guaranteed containers without looking at available resources.
boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0);
startPendingContainers(forceStartGuaranteedContainers);
// if the guaranteed container is queued, we need to preempt opportunistic

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
/**
* Determines how to schedule opportunistic containers at the NodeManager,
* i.e., whether or not to accept, queue, or reject a container run request.
*/
public enum OpportunisticContainersQueuePolicy {
/**
* Determines whether or not to run a container by the queue capacity:
* {@link YarnConfiguration#NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH}.
* If there's enough capacity in the queue,
* queues the container, otherwise rejects it.
*/
BY_QUEUE_LEN,
/**
* Determines whether or not to run a container based on the amount of
* resource capacity the node has.
* Sums up the resources running + already queued at the node, compares
* it with the total capacity of the node, and accepts the new container only
* if the computed resources above + resources used by the container
* is less than or equal to the node capacity.
*/
BY_RESOURCES;
public static final OpportunisticContainersQueuePolicy DEFAULT = BY_QUEUE_LEN;
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@ -56,4 +57,10 @@ public interface ResourceUtilizationTracker {
*/
boolean hasResourcesAvailable(Container container);
/**
* Check if NM has resources available currently to run requested resources.
* @param resource the resources.
* @return True, if NM has enough available resources.
*/
boolean hasResourcesAvailable(Resource resource);
}

View File

@ -0,0 +1,208 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ConfigurationException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.TestContainerSchedulerQueuing;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static org.mockito.Mockito.spy;
/**
* Base test class that overrides the behavior of
* {@link ContainerStateTransitionListener} for testing
* the {@link ContainerScheduler}.
*/
public class BaseContainerSchedulerTest extends BaseContainerManagerTest {
private static final long TWO_GB = 2048 * 1024 * 1024L;
public BaseContainerSchedulerTest() throws UnsupportedFileSystemException {
super();
}
static {
LOG = LoggerFactory.getLogger(TestContainerSchedulerQueuing.class);
}
public static class Listener implements ContainerStateTransitionListener {
private final Map<ContainerId, List<ContainerState>> states =
new HashMap<>();
private final Map<ContainerId, List<ContainerEventType>> events =
new HashMap<>();
public Map<ContainerId, List<ContainerEventType>> getEvents() {
return events;
}
public Map<ContainerId, List<ContainerState>> getStates() {
return states;
}
@Override
public void init(Context context) {}
@Override
public void preTransition(ContainerImpl op,
ContainerState beforeState,
ContainerEvent eventToBeProcessed) {
if (!states.containsKey(op.getContainerId())) {
states.put(op.getContainerId(), new ArrayList<>());
states.get(op.getContainerId()).add(beforeState);
events.put(op.getContainerId(), new ArrayList<>());
}
}
@Override
public void postTransition(ContainerImpl op, ContainerState beforeState,
ContainerState afterState, ContainerEvent processedEvent) {
states.get(op.getContainerId()).add(afterState);
events.get(op.getContainerId()).add(processedEvent.getType());
}
}
private boolean delayContainers = true;
protected void setDelayContainers(final boolean delayContainersParam) {
this.delayContainers = delayContainersParam;
}
@Override
protected ContainerManagerImpl createContainerManager(
DeletionService delSrvc) {
return new ContainerManagerImpl(context, exec, delSrvc,
getNodeStatusUpdater(), metrics, dirsHandler) {
@Override
protected UserGroupInformation getRemoteUgi() throws YarnException {
ApplicationId appId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(appAttemptId.toString());
ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context
.getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey()
.getKeyId()));
return ugi;
}
@Override
protected ContainersMonitor createContainersMonitor(
ContainerExecutor exec) {
return new ContainersMonitorImpl(exec, dispatcher, this.context) {
// Define resources available for containers to be executed.
@Override
public long getPmemAllocatedForContainers() {
return TWO_GB;
}
@Override
public long getVmemAllocatedForContainers() {
float pmemRatio = getConfig().getFloat(
YarnConfiguration.NM_VMEM_PMEM_RATIO,
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
return (long) (pmemRatio * getPmemAllocatedForContainers());
}
@Override
public long getVCoresAllocatedForContainers() {
return 4;
}
};
}
};
}
@Override
protected ContainerExecutor createContainerExecutor() {
DefaultContainerExecutor exec = new DefaultContainerExecutor() {
ConcurrentMap<String, Boolean> oversleepMap =
new ConcurrentHashMap<String, Boolean>();
/**
* Launches the container.
* If delayContainers is turned on, then we sleep a while before
* starting the container.
*/
@Override
public int launchContainer(ContainerStartContext ctx)
throws IOException, ConfigurationException {
final String containerId =
ctx.getContainer().getContainerId().toString();
oversleepMap.put(containerId, false);
if (delayContainers) {
try {
Thread.sleep(10000);
if (oversleepMap.get(containerId)) {
Thread.sleep(10000);
}
} catch (InterruptedException e) {
// Nothing..
}
}
return super.launchContainer(ctx);
}
@Override
public void pauseContainer(Container container) {
// To mimic pausing we force the container to be in the PAUSED state
// a little longer by oversleeping.
oversleepMap.put(container.getContainerId().toString(), true);
LOG.info("Container was paused");
}
@Override
public void resumeContainer(Container container) {
LOG.info("Container was resumed");
}
};
exec.setConf(conf);
return spy(exec);
}
}

View File

@ -0,0 +1,263 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ContainerSubState;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerSchedulerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeoutException;
/**
* Tests the behavior of {@link ContainerScheduler} when its queueing policy
* is set to {@link OpportunisticContainersQueuePolicy#BY_RESOURCES}
* such that the NM only queues containers if there's enough resources
* on the node to start all queued containers.
*/
public class TestContainerSchedulerOppContainersByResources
extends BaseContainerSchedulerTest {
public TestContainerSchedulerOppContainersByResources()
throws UnsupportedFileSystemException {
}
@Override
public void setup() throws IOException {
conf.set(YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_QUEUE_POLICY,
OpportunisticContainersQueuePolicy.BY_RESOURCES.name());
super.setup();
containerManager.start();
}
/**
* Checks if a container is in a running or successfully run state.
* @param containerStatus the container status
* @return true if the container is running or completed
* with a successful state, false if the container has not started or failed
*/
private static boolean isContainerInSuccessfulState(
final ContainerStatus containerStatus) {
final org.apache.hadoop.yarn.api.records.ContainerState state =
containerStatus.getState();
final ContainerSubState subState = containerStatus.getContainerSubState();
switch (subState) {
case RUNNING:
case COMPLETING:
return true;
case DONE:
// If the state is not COMPLETE, then the
// container is a failed container
return state ==
org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE;
default:
return false;
}
}
private void verifyRunAndKilledContainers(
final List<ContainerId> statList,
final int numExpectedContainers, final Set<ContainerId> runContainers,
final Set<ContainerId> killedContainers)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(
() -> {
GetContainerStatusesRequest statRequest =
GetContainerStatusesRequest.newInstance(statList);
final List<ContainerStatus> containerStatuses;
try {
containerStatuses = containerManager
.getContainerStatuses(statRequest).getContainerStatuses();
} catch (final Exception e) {
return false;
}
if (numExpectedContainers != containerStatuses.size()) {
return false;
}
for (final ContainerStatus status : containerStatuses) {
if (runContainers.contains(status.getContainerId())) {
if (!isContainerInSuccessfulState(status)) {
return false;
}
} else if (killedContainers.contains(status.getContainerId())) {
if (!status.getDiagnostics()
.contains("Opportunistic container queue is full")) {
return false;
}
} else {
return false;
}
}
return true;
}, 1000, 10000);
}
/**
* Verifies that nothing is queued at the container scheduler.
*/
private void verifyNothingQueued() {
// Check that nothing is queued
ContainerScheduler containerScheduler =
containerManager.getContainerScheduler();
Assert.assertEquals(0,
containerScheduler.getNumQueuedContainers());
Assert.assertEquals(0,
containerScheduler.getNumQueuedGuaranteedContainers());
Assert.assertEquals(0,
containerScheduler.getNumQueuedOpportunisticContainers());
Assert.assertEquals(0,
metrics.getQueuedOpportunisticContainers());
Assert.assertEquals(0, metrics.getQueuedGuaranteedContainers());
}
/**
* Tests that newly arrived containers after the resources are filled up
* get killed and never gets run.
*/
@Test
public void testKillOpportunisticWhenNoResourcesAvailable() throws Exception {
List<StartContainerRequest> startContainerRequests = new ArrayList<>();
// GContainer that takes up the whole node
startContainerRequests.add(StartContainerRequest.newInstance(
recordFactory.newRecordInstance(ContainerLaunchContext.class),
createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
context.getNodeId(),
user, BuilderUtils.newResource(2048, 1),
context.getContainerTokenSecretManager(), null,
ExecutionType.GUARANTEED)));
// OContainer that should be killed
startContainerRequests.add(StartContainerRequest.newInstance(
recordFactory.newRecordInstance(ContainerLaunchContext.class),
createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
context.getNodeId(),
user, BuilderUtils.newResource(2048, 1),
context.getContainerTokenSecretManager(), null,
ExecutionType.OPPORTUNISTIC)));
StartContainersRequest allRequests =
StartContainersRequest.newInstance(startContainerRequests);
containerManager.startContainers(allRequests);
BaseContainerManagerTest.waitForNMContainerState(containerManager,
createContainerId(0), ContainerState.RUNNING, 40);
// Wait for the OContainer to get killed
BaseContainerManagerTest.waitForNMContainerState(containerManager,
createContainerId(1), ContainerState.DONE, 40);
// Get container statuses.
// Container 0 should be running and container 1 should be killed
List<ContainerId> statList = ImmutableList.of(createContainerId(0),
createContainerId(1));
verifyRunAndKilledContainers(
statList, 2,
Collections.singleton(createContainerId(0)),
Collections.singleton(createContainerId(1))
);
verifyNothingQueued();
}
/**
* Tests that newly arrived containers after the resources are filled up
* get killed and never gets run.
* This scenario is more granular and runs more small container compared to
* {@link #testKillOpportunisticWhenNoResourcesAvailable()}.
*/
@Test
public void testOpportunisticRunsWhenResourcesAvailable() throws Exception {
List<StartContainerRequest> startContainerRequests = new ArrayList<>();
final int numContainers = 8;
final int numContainersQueued = 4;
final Set<ContainerId> runContainers = new HashSet<>();
final Set<ContainerId> killedContainers = new HashSet<>();
for (int i = 0; i < numContainers; i++) {
// OContainers that should be run
startContainerRequests.add(StartContainerRequest.newInstance(
recordFactory.newRecordInstance(ContainerLaunchContext.class),
createContainerToken(createContainerId(i), DUMMY_RM_IDENTIFIER,
context.getNodeId(),
user, BuilderUtils.newResource(512, 1),
context.getContainerTokenSecretManager(), null,
ExecutionType.OPPORTUNISTIC)));
}
StartContainersRequest allRequests =
StartContainersRequest.newInstance(startContainerRequests);
containerManager.startContainers(allRequests);
// Wait for containers to start
for (int i = 0; i < numContainersQueued; i++) {
final ContainerId containerId = createContainerId(i);
BaseContainerManagerTest
.waitForNMContainerState(containerManager, containerId,
ContainerState.RUNNING, 40);
runContainers.add(containerId);
}
// Wait for containers to be killed
for (int i = numContainersQueued; i < numContainers; i++) {
final ContainerId containerId = createContainerId(i);
BaseContainerManagerTest
.waitForNMContainerState(containerManager, createContainerId(i),
ContainerState.DONE, 40);
killedContainers.add(containerId);
}
Thread.sleep(5000);
// Get container statuses.
List<ContainerId> statList = new ArrayList<>();
for (int i = 0; i < numContainers; i++) {
statList.add(createContainerId(i));
}
verifyRunAndKilledContainers(
statList, numContainers, runContainers, killedContainers);
verifyNothingQueued();
}
}

View File

@ -18,18 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
@ -37,8 +26,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -46,35 +33,27 @@ import org.apache.hadoop.yarn.api.records.ContainerSubState;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ConfigurationException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerSchedulerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerChain;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -82,137 +61,11 @@ import static org.mockito.Mockito.verify;
* Tests to verify that the {@link ContainerScheduler} is able to queue and
* make room for containers.
*/
public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
public class TestContainerSchedulerQueuing extends BaseContainerSchedulerTest {
public TestContainerSchedulerQueuing() throws UnsupportedFileSystemException {
super();
}
static {
LOG = LoggerFactory.getLogger(TestContainerSchedulerQueuing.class);
}
private static class Listener implements ContainerStateTransitionListener {
private final Map<ContainerId,
List<ContainerState>> states = new HashMap<>();
private final Map<ContainerId, List<ContainerEventType>> events =
new HashMap<>();
@Override
public void init(Context context) {}
@Override
public void preTransition(ContainerImpl op,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState beforeState,
ContainerEvent eventToBeProcessed) {
if (!states.containsKey(op.getContainerId())) {
states.put(op.getContainerId(), new ArrayList<>());
states.get(op.getContainerId()).add(beforeState);
events.put(op.getContainerId(), new ArrayList<>());
}
}
@Override
public void postTransition(ContainerImpl op,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState beforeState,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState afterState,
ContainerEvent processedEvent) {
states.get(op.getContainerId()).add(afterState);
events.get(op.getContainerId()).add(processedEvent.getType());
}
}
private boolean delayContainers = true;
@Override
protected ContainerManagerImpl createContainerManager(
DeletionService delSrvc) {
return new ContainerManagerImpl(context, exec, delSrvc,
getNodeStatusUpdater(), metrics, dirsHandler) {
@Override
protected UserGroupInformation getRemoteUgi() throws YarnException {
ApplicationId appId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(appAttemptId.toString());
ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context
.getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey()
.getKeyId()));
return ugi;
}
@Override
protected ContainersMonitor createContainersMonitor(
ContainerExecutor exec) {
return new ContainersMonitorImpl(exec, dispatcher, this.context) {
// Define resources available for containers to be executed.
@Override
public long getPmemAllocatedForContainers() {
return 2048 * 1024 * 1024L;
}
@Override
public long getVmemAllocatedForContainers() {
float pmemRatio = getConfig().getFloat(
YarnConfiguration.NM_VMEM_PMEM_RATIO,
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
return (long) (pmemRatio * getPmemAllocatedForContainers());
}
@Override
public long getVCoresAllocatedForContainers() {
return 4;
}
};
}
};
}
@Override
protected ContainerExecutor createContainerExecutor() {
DefaultContainerExecutor exec = new DefaultContainerExecutor() {
ConcurrentMap<String, Boolean> oversleepMap =
new ConcurrentHashMap<String, Boolean>();
@Override
public int launchContainer(ContainerStartContext ctx)
throws IOException, ConfigurationException {
oversleepMap.put(ctx.getContainer().getContainerId().toString(), false);
if (delayContainers) {
try {
Thread.sleep(10000);
if(oversleepMap.get(ctx.getContainer().getContainerId().toString())
== true) {
Thread.sleep(10000);
}
} catch (InterruptedException e) {
// Nothing..
}
}
return super.launchContainer(ctx);
}
@Override
public void pauseContainer(Container container) {
// To mimic pausing we force the container to be in the PAUSED state
// a little longer by oversleeping.
oversleepMap.put(container.getContainerId().toString(), true);
LOG.info("Container was paused");
}
@Override
public void resumeContainer(Container container) {
LOG.info("Container was resumed");
}
};
exec.setConf(conf);
return spy(exec);
}
@Override
public void setup() throws IOException {
conf.setInt(
@ -408,7 +261,7 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
* @throws Exception
*/
@Test
public void testStartOpportunistcsWhenOppQueueIsFull() throws Exception {
public void testStartOpportunisticsWhenOppQueueIsFull() throws Exception {
containerManager.start();
List<StartContainerRequest> list = new ArrayList<>();
@ -655,7 +508,7 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
List<org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState> containerStates =
listener.states.get(createContainerId(0));
listener.getStates().get(createContainerId(0));
Assert.assertEquals(Arrays.asList(
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.NEW,
@ -676,7 +529,7 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.DONE), containerStates);
List<ContainerEventType> containerEventTypes =
listener.events.get(createContainerId(0));
listener.getEvents().get(createContainerId(0));
Assert.assertEquals(Arrays.asList(ContainerEventType.INIT_CONTAINER,
ContainerEventType.CONTAINER_LAUNCHED,
ContainerEventType.PAUSE_CONTAINER,
@ -1230,7 +1083,7 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
List<org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState> containerStates =
listener.states.get(createContainerId(1));
listener.getStates().get(createContainerId(1));
Assert.assertEquals(Arrays.asList(
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.NEW,
@ -1241,7 +1094,7 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.RUNNING), containerStates);
List<ContainerEventType> containerEventTypes =
listener.events.get(createContainerId(1));
listener.getEvents().get(createContainerId(1));
Assert.assertEquals(Arrays.asList(
ContainerEventType.INIT_CONTAINER,
ContainerEventType.UPDATE_CONTAINER_TOKEN,
@ -1254,7 +1107,7 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
@Test
public void testContainerUpdateExecTypeGuaranteedToOpportunistic()
throws Exception {
delayContainers = true;
setDelayContainers(true);
containerManager.start();
// Construct the Container-id
ContainerId cId = createContainerId(0);