YARN-4597. Introduce ContainerScheduler and a SCHEDULED state to NodeManager container lifecycle. (asuresh)

This commit is contained in:
Arun Suresh 2016-11-15 07:48:55 -08:00
parent 7ffb9943b8
commit 3219b7b4ac
48 changed files with 1456 additions and 1326 deletions

View File

@ -68,15 +68,6 @@ public class TestMROpportunisticMaps {
doTest(4, 1, 1, 2);
}
/**
* Test will run with 6 Maps and 2 Reducers. All the Maps are OPPORTUNISTIC.
* @throws Exception
*/
@Test
public void testMultipleReducers() throws Exception {
doTest(6, 2, 1, 6);
}
public void doTest(int numMappers, int numReducers, int numNodes,
int percent) throws Exception {
doTest(numMappers, numReducers, numNodes, 1000, percent);
@ -94,7 +85,8 @@ public class TestMROpportunisticMaps {
conf.setBoolean(YarnConfiguration.
OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true);
conf.setInt(
YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10);
dfsCluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numNodes).build();
fileSystem = dfsCluster.getFileSystem();
@ -104,11 +96,7 @@ public class TestMROpportunisticMaps {
createInput(fileSystem, numMappers, numLines);
// Run the test.
Configuration jobConf = mrCluster.getConfig();
jobConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
runMergeTest(new JobConf(jobConf), fileSystem,
runMergeTest(new JobConf(conf), fileSystem,
numMappers, numReducers, numLines, percent);
} finally {
if (dfsCluster != null) {

View File

@ -72,4 +72,10 @@ public class ContainerExitStatus {
*/
public static final int KILLED_AFTER_APP_COMPLETION = -107;
/**
* Container was terminated by the ContainerScheduler to make room
* for another container...
*/
public static final int KILLED_BY_CONTAINER_SCHEDULER = -108;
}

View File

@ -36,6 +36,6 @@ public enum ContainerState {
/** Completed container */
COMPLETE,
/** Queued at the NM. */
QUEUED
/** Scheduled (awaiting resources) at the NM. */
SCHEDULED
}

View File

@ -390,12 +390,16 @@ public class YarnConfiguration extends Configuration {
public static final float NM_CONTAINER_QUEUING_LIMIT_STDEV_DEFAULT =
1.0f;
/** Min length of container queue at NodeManager. */
/** Min length of container queue at NodeManager. This is a cluster-wide
* configuration that acts as the lower-bound of optimal queue length
* calculated by the NodeQueueLoadMonitor */
public static final String NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH =
YARN_PREFIX + "nm-container-queuing.min-queue-length";
public static final int NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH_DEFAULT = 1;
/** Max length of container queue at NodeManager. */
/** Max length of container queue at NodeManager. This is a cluster-wide
* configuration that acts as the upper-bound of optimal queue length
* calculated by the NodeQueueLoadMonitor */
public static final String NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH =
YARN_PREFIX + "nm-container-queuing.max-queue-length";
public static final int NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT = 10;
@ -834,10 +838,11 @@ public class YarnConfiguration extends Configuration {
/** Prefix for all node manager configs.*/
public static final String NM_PREFIX = "yarn.nodemanager.";
/** Enable Queuing of <code>OPPORTUNISTIC</code> containers. */
public static final String NM_CONTAINER_QUEUING_ENABLED = NM_PREFIX
+ "container-queuing-enabled";
public static final boolean NM_CONTAINER_QUEUING_ENABLED_DEFAULT = false;
/** Max Queue length of <code>OPPORTUNISTIC</code> containers on the NM. */
public static final String NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH =
NM_PREFIX + "opportunistic-containers-max-queue-length";
public static final int NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH_DEFAULT =
0;
/** Environment variables that will be sent to containers.*/
public static final String NM_ADMIN_USER_ENV = NM_PREFIX + "admin-env";

View File

@ -82,7 +82,7 @@ enum ContainerStateProto {
C_NEW = 1;
C_RUNNING = 2;
C_COMPLETE = 3;
C_QUEUED = 4;
C_SCHEDULED = 4;
}
message ContainerProto {

View File

@ -108,7 +108,8 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
conf.setBoolean(YarnConfiguration.
OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true);
conf.setInt(YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH,
10);
cluster.init(conf);
cluster.start();
yarnConf = cluster.getConfig();

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -330,6 +331,12 @@ public class TestNMClient {
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
ContainerLaunchContext clc =
Records.newRecord(ContainerLaunchContext.class);
if (Shell.WINDOWS) {
clc.setCommands(
Arrays.asList("ping", "-n", "100", "127.0.0.1", ">nul"));
} else {
clc.setCommands(Arrays.asList("sleep", "10"));
}
clc.setTokens(securityTokens);
try {
nmClient.startContainer(container, clc);
@ -415,7 +422,7 @@ public class TestNMClient {
try {
nmClient.increaseContainerResource(container);
} catch (YarnException e) {
// NM container will only be in LOCALIZED state, so expect the increase
// NM container will only be in SCHEDULED state, so expect the increase
// action to fail.
if (!e.getMessage().contains(
"can only be changed when a container is in RUNNING state")) {

View File

@ -111,6 +111,8 @@ public class TestOpportunisticContainerAllocation {
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
conf.setBoolean(
YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
conf.setInt(
YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10);
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
yarnCluster =
new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);

View File

@ -1000,10 +1000,10 @@
</property>
<property>
<description>Enable Queuing of OPPORTUNISTIC containers on the
<description>Max number of OPPORTUNISTIC containers to queue at the
nodemanager.</description>
<name>yarn.nodemanager.container-queuing-enabled</name>
<value>false</value>
<name>yarn.nodemanager.opportunistic-containers-max-queue-length</name>
<value>0</value>
</property>
<property>

View File

@ -27,12 +27,12 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
@ -46,15 +46,6 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
*/
public interface Context {
/**
* Interface exposing methods related to the queuing of containers in the NM.
*/
interface QueuingContext {
ConcurrentMap<ContainerId, ContainerTokenIdentifier> getQueuedContainers();
ConcurrentMap<ContainerTokenIdentifier, String> getKilledQueuedContainers();
}
/**
* Return the nodeId. Usable only when the ContainerManager is started.
*
@ -112,13 +103,6 @@ public interface Context {
NodeStatusUpdater getNodeStatusUpdater();
/**
* Returns a <code>QueuingContext</code> that provides information about the
* number of Containers Queued as well as the number of Containers that were
* queued and killed.
*/
QueuingContext getQueuingContext();
boolean isDistributedSchedulingEnabled();
OpportunisticContainerAllocator getContainerAllocator();

View File

@ -56,7 +56,6 @@ import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
@ -64,7 +63,6 @@ import org.apache.hadoop.yarn.server.nodemanager.collectormanager.NMCollectorSer
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing.QueuingContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
@ -177,15 +175,9 @@ public class NodeManager extends CompositeService
ContainerExecutor exec, DeletionService del,
NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager,
LocalDirsHandlerService dirsHandler) {
if (getConfig().getBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED,
YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED_DEFAULT)) {
return new QueuingContainerManagerImpl(context, exec, del,
nodeStatusUpdater, metrics, dirsHandler);
} else {
return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
metrics, dirsHandler);
}
}
protected NMCollectorService createNMCollectorService(Context ctxt) {
return new NMCollectorService(ctxt);
@ -510,7 +502,6 @@ public class NodeManager extends CompositeService
private OpportunisticContainerAllocator containerAllocator;
private final QueuingContext queuingContext;
private ContainerExecutor executor;
private NMTimelinePublisher nmTimelinePublisher;
@ -533,7 +524,6 @@ public class NodeManager extends CompositeService
this.stateStore = stateStore;
this.logAggregationReportForApps = new ConcurrentLinkedQueue<
LogAggregationReport>();
this.queuingContext = new QueuingNMContext();
this.isDistSchedulingEnabled = isDistSchedulingEnabled;
this.conf = conf;
}
@ -662,11 +652,6 @@ public class NodeManager extends CompositeService
this.nodeStatusUpdater = nodeStatusUpdater;
}
@Override
public QueuingContext getQueuingContext() {
return this.queuingContext;
}
public boolean isDistributedSchedulingEnabled() {
return isDistSchedulingEnabled;
}
@ -715,29 +700,6 @@ public class NodeManager extends CompositeService
}
}
/**
* Class that keeps the context for containers queued at the NM.
*/
public static class QueuingNMContext implements Context.QueuingContext {
protected final ConcurrentMap<ContainerId, ContainerTokenIdentifier>
queuedContainers = new ConcurrentSkipListMap<>();
protected final ConcurrentMap<ContainerTokenIdentifier, String>
killedQueuedContainers = new ConcurrentHashMap<>();
@Override
public ConcurrentMap<ContainerId, ContainerTokenIdentifier>
getQueuedContainers() {
return this.queuedContainers;
}
@Override
public ConcurrentMap<ContainerTokenIdentifier, String>
getKilledQueuedContainers() {
return this.killedQueuedContainers;
}
}
/**
* @return the node health checker
*/

View File

@ -47,7 +47,6 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionUtil;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -62,7 +61,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.ServerRMProxy;
@ -89,7 +87,6 @@ import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
@ -570,9 +567,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
}
}
// Account for all containers that got killed while they were still queued.
pendingCompletedContainers.putAll(getKilledQueuedContainerStatuses());
containerStatuses.addAll(pendingCompletedContainers.values());
if (LOG.isDebugEnabled()) {
@ -582,43 +576,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
return containerStatuses;
}
/**
* Add to the container statuses the status of the containers that got killed
* while they were queued.
*/
private Map<ContainerId, ContainerStatus> getKilledQueuedContainerStatuses() {
Map<ContainerId, ContainerStatus> killedQueuedContainerStatuses =
new HashMap<>();
for (Map.Entry<ContainerTokenIdentifier, String> killedQueuedContainer :
this.context.getQueuingContext().
getKilledQueuedContainers().entrySet()) {
ContainerTokenIdentifier containerTokenId = killedQueuedContainer
.getKey();
ContainerId containerId = containerTokenId.getContainerID();
ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
containerId, ContainerState.COMPLETE,
killedQueuedContainer.getValue(), ContainerExitStatus.ABORTED,
containerTokenId.getResource(), containerTokenId.getExecutionType());
ApplicationId applicationId = containerId.getApplicationAttemptId()
.getApplicationId();
if (isApplicationStopped(applicationId)) {
if (LOG.isDebugEnabled()) {
LOG.debug(applicationId + " is completing, " + " remove "
+ containerId + " from NM context.");
}
this.context.getQueuingContext().getKilledQueuedContainers()
.remove(containerTokenId);
killedQueuedContainerStatuses.put(containerId, containerStatus);
} else {
if (!isContainerRecentlyStopped(containerId)) {
killedQueuedContainerStatuses.put(containerId, containerStatus);
}
}
addCompletedContainer(containerId);
}
return killedQueuedContainerStatuses;
}
private List<ApplicationId> getRunningApplications() {
List<ApplicationId> runningApplications = new ArrayList<ApplicationId>();
runningApplications.addAll(this.context.getApplications().keySet());
@ -703,17 +660,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
}
}
// Remove null containers from queuing context for killed queued containers.
Iterator<ContainerTokenIdentifier> killedQueuedContIter =
context.getQueuingContext().getKilledQueuedContainers().keySet().
iterator();
while (killedQueuedContIter.hasNext()) {
if (removedNullContainers.contains(
killedQueuedContIter.next().getContainerID())) {
killedQueuedContIter.remove();
}
}
if (!removedContainers.isEmpty()) {
LOG.info("Removed completed containers from NM context: "
+ removedContainers);

View File

@ -26,6 +26,8 @@ import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor
.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler
.ContainerScheduler;
/**
* The ContainerManager is an entity that manages the life cycle of Containers.
@ -42,4 +44,6 @@ public interface ContainerManager extends ServiceStateChangeListener,
void setBlockNewContainerRequests(boolean blockNewContainerRequests);
ContainerScheduler getContainerScheduler();
}

View File

@ -136,6 +136,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Change
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
@ -205,6 +208,7 @@ public class ContainerManagerImpl extends CompositeService implements
private final WriteLock writeLock;
private AMRMProxyService amrmProxyService;
protected boolean amrmProxyEnabled = false;
private final ContainerScheduler containerScheduler;
private long waitForContainersOnShutdownMillis;
@ -231,6 +235,8 @@ public class ContainerManagerImpl extends CompositeService implements
addService(containersLauncher);
this.nodeStatusUpdater = nodeStatusUpdater;
this.containerScheduler = createContainerScheduler(context);
addService(containerScheduler);
// Start configurable services
auxiliaryServices = new AuxServices();
@ -259,6 +265,7 @@ public class ContainerManagerImpl extends CompositeService implements
dispatcher.register(AuxServicesEventType.class, auxiliaryServices);
dispatcher.register(ContainersMonitorEventType.class, containersMonitor);
dispatcher.register(ContainersLauncherEventType.class, containersLauncher);
dispatcher.register(ContainerSchedulerEventType.class, containerScheduler);
addService(dispatcher);
@ -311,6 +318,14 @@ public class ContainerManagerImpl extends CompositeService implements
}
}
@VisibleForTesting
protected ContainerScheduler createContainerScheduler(Context cntxt) {
// Currently, this dispatcher is shared by the ContainerManager,
// all the containers, the container monitor and all the container.
// The ContainerScheduler may use its own dispatcher.
return new ContainerScheduler(cntxt, dispatcher, metrics);
}
protected ContainersMonitor createContainersMonitor(ContainerExecutor exec) {
return new ContainersMonitorImpl(exec, dispatcher, this.context);
}
@ -1263,10 +1278,8 @@ public class ContainerManagerImpl extends CompositeService implements
}
} else {
context.getNMStateStore().storeContainerKilled(containerID);
dispatcher.getEventHandler().handle(
new ContainerKillEvent(containerID,
ContainerExitStatus.KILLED_BY_APPMASTER,
"Container killed by the ApplicationMaster."));
container.sendKillEvent(ContainerExitStatus.KILLED_BY_APPMASTER,
"Container killed by the ApplicationMaster.");
NMAuditLogger.logSuccess(container.getUser(),
AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID
@ -1521,12 +1534,12 @@ public class ContainerManagerImpl extends CompositeService implements
@Override
public OpportunisticContainersStatus getOpportunisticContainersStatus() {
return OpportunisticContainersStatus.newInstance();
return this.containerScheduler.getOpportunisticContainersStatus();
}
@Override
public void updateQueuingLimit(ContainerQueuingLimit queuingLimit) {
LOG.trace("Implementation does not support queuing of Containers!!");
this.containerScheduler.updateQueuingLimit(queuingLimit);
}
@SuppressWarnings("unchecked")
@ -1687,4 +1700,9 @@ public class ContainerManagerImpl extends CompositeService implements
LOG.info("Container " + containerId + " no longer exists");
}
}
@Override
public ContainerScheduler getContainerScheduler() {
return this.containerScheduler;
}
}

View File

@ -83,7 +83,13 @@ public interface Container extends EventHandler<ContainerEvent> {
boolean isReInitializing();
boolean isMarkedForKilling();
boolean canRollback();
void commitUpgrade();
void sendLaunchEvent();
void sendKillEvent(int exitStatus, String description);
}

View File

@ -74,6 +74,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
@ -164,6 +166,7 @@ public class ContainerImpl implements Container {
private String ips;
private volatile ReInitializationContext reInitContext;
private volatile boolean isReInitializing = false;
private volatile boolean isMarkeForKilling = false;
/** The NM-wide configuration - not specific to this container */
private final Configuration daemonConf;
@ -286,7 +289,7 @@ public class ContainerImpl implements Container {
// From NEW State
.addTransition(ContainerState.NEW,
EnumSet.of(ContainerState.LOCALIZING,
ContainerState.LOCALIZED,
ContainerState.SCHEDULED,
ContainerState.LOCALIZATION_FAILED,
ContainerState.DONE),
ContainerEventType.INIT_CONTAINER, new RequestResourcesTransition())
@ -298,7 +301,7 @@ public class ContainerImpl implements Container {
// From LOCALIZING State
.addTransition(ContainerState.LOCALIZING,
EnumSet.of(ContainerState.LOCALIZING, ContainerState.LOCALIZED),
EnumSet.of(ContainerState.LOCALIZING, ContainerState.SCHEDULED),
ContainerEventType.RESOURCE_LOCALIZED, new LocalizedTransition())
.addTransition(ContainerState.LOCALIZING,
ContainerState.LOCALIZATION_FAILED,
@ -309,7 +312,7 @@ public class ContainerImpl implements Container {
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.LOCALIZING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER,
new KillDuringLocalizationTransition())
new KillBeforeRunningTransition())
// From LOCALIZATION_FAILED State
.addTransition(ContainerState.LOCALIZATION_FAILED,
@ -334,17 +337,18 @@ public class ContainerImpl implements Container {
ContainerState.LOCALIZATION_FAILED,
ContainerEventType.RESOURCE_FAILED)
// From LOCALIZED State
.addTransition(ContainerState.LOCALIZED, ContainerState.RUNNING,
// From SCHEDULED State
.addTransition(ContainerState.SCHEDULED, ContainerState.RUNNING,
ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition())
.addTransition(ContainerState.LOCALIZED, ContainerState.EXITED_WITH_FAILURE,
.addTransition(ContainerState.SCHEDULED, ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new ExitedWithFailureTransition(true))
.addTransition(ContainerState.LOCALIZED, ContainerState.LOCALIZED,
.addTransition(ContainerState.SCHEDULED, ContainerState.SCHEDULED,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.LOCALIZED, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER, new KillTransition())
.addTransition(ContainerState.SCHEDULED, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER,
new KillBeforeRunningTransition())
// From RUNNING State
.addTransition(ContainerState.RUNNING,
@ -353,7 +357,7 @@ public class ContainerImpl implements Container {
new ExitedWithSuccessTransition(true))
.addTransition(ContainerState.RUNNING,
EnumSet.of(ContainerState.RELAUNCHING,
ContainerState.LOCALIZED,
ContainerState.SCHEDULED,
ContainerState.EXITED_WITH_FAILURE),
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new RetryFailureTransition())
@ -402,7 +406,7 @@ public class ContainerImpl implements Container {
.addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER, new KillTransition())
.addTransition(ContainerState.REINITIALIZING,
ContainerState.LOCALIZED,
ContainerState.SCHEDULED,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
new KilledForReInitializationTransition())
@ -520,9 +524,11 @@ public class ContainerImpl implements Container {
case NEW:
case LOCALIZING:
case LOCALIZATION_FAILED:
case LOCALIZED:
case SCHEDULED:
return org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED;
case RUNNING:
case RELAUNCHING:
case REINITIALIZING:
case EXITED_WITH_SUCCESS:
case EXITED_WITH_FAILURE:
case KILLING:
@ -553,7 +559,7 @@ public class ContainerImpl implements Container {
public Map<Path, List<String>> getLocalizedResources() {
this.readLock.lock();
try {
if (ContainerState.LOCALIZED == getContainerState()
if (ContainerState.SCHEDULED == getContainerState()
|| ContainerState.RELAUNCHING == getContainerState()) {
return resourceSet.getLocalizedResources();
} else {
@ -690,6 +696,9 @@ public class ContainerImpl implements Container {
ContainerStatus containerStatus = cloneAndGetContainerStatus();
eventHandler.handle(new ApplicationContainerFinishedEvent(containerStatus));
// Tell the scheduler the container is Done
eventHandler.handle(new ContainerSchedulerEvent(this,
ContainerSchedulerEventType.CONTAINER_COMPLETED));
// Remove the container from the resource-monitor
eventHandler.handle(new ContainerStopMonitoringEvent(containerId));
// Tell the logService too
@ -698,7 +707,8 @@ public class ContainerImpl implements Container {
}
@SuppressWarnings("unchecked") // dispatcher not typed
private void sendLaunchEvent() {
@Override
public void sendLaunchEvent() {
ContainersLauncherEventType launcherEvent =
ContainersLauncherEventType.LAUNCH_CONTAINER;
if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) {
@ -710,6 +720,22 @@ public class ContainerImpl implements Container {
new ContainersLauncherEvent(this, launcherEvent));
}
@SuppressWarnings("unchecked") // dispatcher not typed
private void sendScheduleEvent() {
dispatcher.getEventHandler().handle(
new ContainerSchedulerEvent(this,
ContainerSchedulerEventType.SCHEDULE_CONTAINER)
);
}
@SuppressWarnings("unchecked") // dispatcher not typed
@Override
public void sendKillEvent(int exitStatus, String description) {
this.isMarkeForKilling = true;
dispatcher.getEventHandler().handle(
new ContainerKillEvent(containerId, exitStatus, description));
}
@SuppressWarnings("unchecked") // dispatcher not typed
private void sendRelaunchEvent() {
ContainersLauncherEventType launcherEvent =
@ -781,7 +807,7 @@ public class ContainerImpl implements Container {
* to the ResourceLocalizationManager and enters LOCALIZING state.
*
* If there are no resources to localize, sends LAUNCH_CONTAINER event
* and enters LOCALIZED state directly.
* and enters SCHEDULED state directly.
*
* If there are any invalid resources specified, enters LOCALIZATION_FAILED
* directly.
@ -847,9 +873,9 @@ public class ContainerImpl implements Container {
}
return ContainerState.LOCALIZING;
} else {
container.sendLaunchEvent();
container.sendScheduleEvent();
container.metrics.endInitingContainer();
return ContainerState.LOCALIZED;
return ContainerState.SCHEDULED;
}
}
}
@ -889,7 +915,7 @@ public class ContainerImpl implements Container {
new ContainerLocalizationEvent(LocalizationEventType.
CONTAINER_RESOURCES_LOCALIZED, container));
container.sendLaunchEvent();
container.sendScheduleEvent();
container.metrics.endInitingContainer();
// If this is a recovered container that has already launched, skip
@ -909,7 +935,7 @@ public class ContainerImpl implements Container {
SharedCacheUploadEventType.UPLOAD));
}
return ContainerState.LOCALIZED;
return ContainerState.SCHEDULED;
}
}
@ -1099,7 +1125,7 @@ public class ContainerImpl implements Container {
}
/**
* Transition from LOCALIZED state to RUNNING state upon receiving
* Transition from SCHEDULED state to RUNNING state upon receiving
* a CONTAINER_LAUNCHED event.
*/
static class LaunchTransition extends ContainerTransition {
@ -1257,7 +1283,7 @@ public class ContainerImpl implements Container {
container.containerId.getApplicationAttemptId().getApplicationId(),
container.containerId);
new KilledForReInitializationTransition().transition(container, event);
return ContainerState.LOCALIZED;
return ContainerState.SCHEDULED;
} else {
new ExitedWithFailureTransition(true).transition(container, event);
return ContainerState.EXITED_WITH_FAILURE;
@ -1339,7 +1365,7 @@ public class ContainerImpl implements Container {
}
/**
* Transition to LOCALIZED and wait for RE-LAUNCH
* Transition to SCHEDULED and wait for RE-LAUNCH
*/
static class KilledForReInitializationTransition extends ContainerTransition {
@ -1363,8 +1389,8 @@ public class ContainerImpl implements Container {
container.resourceSet =
container.reInitContext.mergedResourceSet(container.resourceSet);
container.sendLaunchEvent();
container.isMarkeForKilling = false;
container.sendScheduleEvent();
}
}
@ -1392,7 +1418,7 @@ public class ContainerImpl implements Container {
* Transition from LOCALIZING to KILLING upon receiving
* KILL_CONTAINER event.
*/
static class KillDuringLocalizationTransition implements
static class KillBeforeRunningTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
@ -1424,7 +1450,7 @@ public class ContainerImpl implements Container {
/**
* Transitions upon receiving KILL_CONTAINER.
* - LOCALIZED -> KILLING.
* - SCHEDULED -> KILLING.
* - RUNNING -> KILLING.
* - REINITIALIZING -> KILLING.
*/
@ -1651,7 +1677,8 @@ public class ContainerImpl implements Container {
stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitionException e) {
LOG.warn("Can't handle this event at current state: Current: ["
+ oldState + "], eventType: [" + event.getType() + "]", e);
+ oldState + "], eventType: [" + event.getType() + "]," +
" container: [" + containerID + "]", e);
}
if (oldState != newState) {
LOG.info("Container " + containerID + " transitioned from "
@ -1714,6 +1741,11 @@ public class ContainerImpl implements Container {
return this.isReInitializing;
}
@Override
public boolean isMarkedForKilling() {
return this.isMarkeForKilling;
}
@Override
public boolean canRollback() {
return (this.reInitContext != null)

View File

@ -19,7 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
public enum ContainerState {
NEW, LOCALIZING, LOCALIZATION_FAILED, LOCALIZED, RUNNING, RELAUNCHING,
NEW, LOCALIZING, LOCALIZATION_FAILED, SCHEDULED, RUNNING, RELAUNCHING,
REINITIALIZING, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING,
CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE
}

View File

@ -104,9 +104,10 @@ public class ContainerLaunch implements Callable<Integer> {
private final Context context;
private final ContainerManagerImpl containerManager;
protected AtomicBoolean shouldLaunchContainer = new AtomicBoolean(false);
protected AtomicBoolean containerAlreadyLaunched = new AtomicBoolean(false);
protected AtomicBoolean completed = new AtomicBoolean(false);
private volatile boolean killedBeforeStart = false;
private long sleepDelayBeforeSigKill = 250;
private long maxKillWaitTime = 2000;
@ -401,7 +402,12 @@ public class ContainerLaunch implements Callable<Integer> {
@SuppressWarnings("unchecked")
protected int launchContainer(ContainerStartContext ctx) throws IOException {
ContainerId containerId = container.getContainerId();
if (container.isMarkedForKilling()) {
LOG.info("Container " + containerId + " not launched as it has already "
+ "been marked for Killing");
this.killedBeforeStart = true;
return ExitCode.TERMINATED.getExitCode();
}
// LaunchContainer is a blocking call. We are here almost means the
// container is launched, so send out the event.
dispatcher.getEventHandler().handle(new ContainerEvent(
@ -410,7 +416,7 @@ public class ContainerLaunch implements Callable<Integer> {
context.getNMStateStore().storeContainerLaunched(containerId);
// Check if the container is signalled to be killed.
if (!shouldLaunchContainer.compareAndSet(false, true)) {
if (!containerAlreadyLaunched.compareAndSet(false, true)) {
LOG.info("Container " + containerId + " not launched as "
+ "cleanup already called");
return ExitCode.TERMINATED.getExitCode();
@ -451,10 +457,14 @@ public class ContainerLaunch implements Callable<Integer> {
|| exitCode == ExitCode.TERMINATED.getExitCode()) {
// If the process was killed, Send container_cleanedup_after_kill and
// just break out of this method.
// If Container was killed before starting... NO need to do this.
if (!killedBeforeStart) {
dispatcher.getEventHandler().handle(
new ContainerExitEvent(containerId,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST, exitCode,
diagnosticInfo.toString()));
}
} else if (exitCode != 0) {
handleContainerExitWithFailure(containerId, exitCode, containerLogDir,
diagnosticInfo);
@ -565,7 +575,8 @@ public class ContainerLaunch implements Callable<Integer> {
}
// launch flag will be set to true if process already launched
boolean alreadyLaunched = !shouldLaunchContainer.compareAndSet(false, true);
boolean alreadyLaunched =
!containerAlreadyLaunched.compareAndSet(false, true);
if (!alreadyLaunched) {
LOG.info("Container " + containerIdStr + " not launched."
+ " No cleanup needed to be done");
@ -660,7 +671,8 @@ public class ContainerLaunch implements Callable<Integer> {
LOG.info("Sending signal " + command + " to container " + containerIdStr);
boolean alreadyLaunched = !shouldLaunchContainer.compareAndSet(false, true);
boolean alreadyLaunched =
!containerAlreadyLaunched.compareAndSet(false, true);
if (!alreadyLaunched) {
LOG.info("Container " + containerIdStr + " not launched."
+ " Not sending the signal");

View File

@ -39,7 +39,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext;
import org.apache.hadoop.yarn.util.ConverterUtils;
/**
* This is a ContainerLaunch which has been recovered after an NM restart (for
@ -57,7 +57,7 @@ public class RecoveredContainerLaunch extends ContainerLaunch {
{
super(context, configuration, dispatcher, exec, app, container, dirsHandler,
containerManager);
this.shouldLaunchContainer.set(true);
this.containerAlreadyLaunched.set(true);
}
/**

View File

@ -19,29 +19,51 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo;
public interface ContainersMonitor extends Service,
EventHandler<ContainersMonitorEvent>, ResourceView {
public ResourceUtilization getContainersUtilization();
ResourceUtilization getContainersUtilization();
ResourceUtilization getContainersAllocation();
boolean hasResourcesAvailable(ProcessTreeInfo pti);
void increaseContainersAllocation(ProcessTreeInfo pti);
void decreaseContainersAllocation(ProcessTreeInfo pti);
void increaseResourceUtilization(ResourceUtilization resourceUtil,
ProcessTreeInfo pti);
void decreaseResourceUtilization(ResourceUtilization resourceUtil,
ProcessTreeInfo pti);
float getVmemRatio();
void subtractNodeResourcesFromResourceUtilization(
ResourceUtilization resourceUtil);
/**
* Utility method to add a {@link Resource} to the
* {@link ResourceUtilization}.
* @param containersMonitor Containers Monitor.
* @param resourceUtil Resource Utilization.
* @param resource Resource.
*/
static void increaseResourceUtilization(
ContainersMonitor containersMonitor, ResourceUtilization resourceUtil,
Resource resource) {
float vCores = (float) resource.getVirtualCores() /
containersMonitor.getVCoresAllocatedForContainers();
int vmem = (int) (resource.getMemorySize()
* containersMonitor.getVmemRatio());
resourceUtil.addTo((int)resource.getMemorySize(), vmem, vCores);
}
/**
* Utility method to subtract a {@link Resource} from the
* {@link ResourceUtilization}.
* @param containersMonitor Containers Monitor.
* @param resourceUtil Resource Utilization.
* @param resource Resource.
*/
static void decreaseResourceUtilization(
ContainersMonitor containersMonitor, ResourceUtilization resourceUtil,
Resource resource) {
float vCores = (float) resource.getVirtualCores() /
containersMonitor.getVCoresAllocatedForContainers();
int vmem = (int) (resource.getMemorySize()
* containersMonitor.getVmemRatio());
resourceUtil.subtractFrom((int)resource.getMemorySize(), vmem, vCores);
}
}

View File

@ -97,9 +97,6 @@ public class ContainersMonitorImpl extends AbstractService implements
}
private ResourceUtilization containersUtilization;
// Tracks the aggregated allocation of the currently allocated containers
// when queuing of containers at the NMs is enabled.
private final ResourceUtilization containersAllocation;
private volatile boolean stopped = false;
@ -114,7 +111,6 @@ public class ContainersMonitorImpl extends AbstractService implements
this.monitoringThread = new MonitoringThread();
this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f);
this.containersAllocation = ResourceUtilization.newInstance(0, 0, 0.0f);
}
@Override
@ -743,6 +739,8 @@ public class ContainersMonitorImpl extends AbstractService implements
LOG.warn("Container " + containerId.toString() + "does not exist");
return;
}
// YARN-5860: Route this through the ContainerScheduler to
// fix containerAllocation
container.setResource(resource);
}
@ -842,67 +840,6 @@ public class ContainersMonitorImpl extends AbstractService implements
this.containersUtilization = utilization;
}
public ResourceUtilization getContainersAllocation() {
return this.containersAllocation;
}
/**
* @return true if there are available allocated resources for the given
* container to start.
*/
@Override
public boolean hasResourcesAvailable(ProcessTreeInfo pti) {
synchronized (this.containersAllocation) {
// Check physical memory.
if (this.containersAllocation.getPhysicalMemory() +
(int) (pti.getPmemLimit() >> 20) >
(int) (getPmemAllocatedForContainers() >> 20)) {
return false;
}
// Check virtual memory.
if (isVmemCheckEnabled() &&
this.containersAllocation.getVirtualMemory() +
(int) (pti.getVmemLimit() >> 20) >
(int) (getVmemAllocatedForContainers() >> 20)) {
return false;
}
// Check CPU.
if (this.containersAllocation.getCPU()
+ allocatedCpuUsage(pti) > 1.0f) {
return false;
}
}
return true;
}
@Override
public void increaseContainersAllocation(ProcessTreeInfo pti) {
synchronized (this.containersAllocation) {
increaseResourceUtilization(this.containersAllocation, pti);
}
}
@Override
public void decreaseContainersAllocation(ProcessTreeInfo pti) {
synchronized (this.containersAllocation) {
decreaseResourceUtilization(this.containersAllocation, pti);
}
}
@Override
public void increaseResourceUtilization(ResourceUtilization resourceUtil,
ProcessTreeInfo pti) {
resourceUtil.addTo((int) (pti.getPmemLimit() >> 20),
(int) (pti.getVmemLimit() >> 20), allocatedCpuUsage(pti));
}
@Override
public void decreaseResourceUtilization(ResourceUtilization resourceUtil,
ProcessTreeInfo pti) {
resourceUtil.subtractFrom((int) (pti.getPmemLimit() >> 20),
(int) (pti.getVmemLimit() >> 20), allocatedCpuUsage(pti));
}
@Override
public void subtractNodeResourcesFromResourceUtilization(
ResourceUtilization resourceUtil) {
@ -910,14 +847,9 @@ public class ContainersMonitorImpl extends AbstractService implements
(int) (getVmemAllocatedForContainers() >> 20), 1.0f);
}
/**
* Calculates the vCores CPU usage that is assigned to the given
* {@link ProcessTreeInfo}. In particular, it takes into account the number of
* vCores that are allowed to be used by the NM and returns the CPU usage
* as a normalized value between {@literal >=} 0 and {@literal <=} 1.
*/
private float allocatedCpuUsage(ProcessTreeInfo pti) {
return (float) pti.getCpuVcores() / getVCoresAllocatedForContainers();
@Override
public float getVmemRatio() {
return vmemRatio;
}
@Override
@ -988,5 +920,4 @@ public class ContainersMonitorImpl extends AbstractService implements
startEvent.getVmemLimit(), startEvent.getPmemLimit(),
startEvent.getCpuVcores()));
}
}

View File

@ -1,686 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* Class extending {@link ContainerManagerImpl} and is used when queuing at the
* NM is enabled.
*/
public class QueuingContainerManagerImpl extends ContainerManagerImpl {
private static final Logger LOG = LoggerFactory
.getLogger(QueuingContainerManagerImpl.class);
private ConcurrentMap<ContainerId, AllocatedContainerInfo>
allocatedGuaranteedContainers;
private ConcurrentMap<ContainerId, AllocatedContainerInfo>
allocatedOpportunisticContainers;
private long allocatedMemoryOpportunistic;
private int allocatedVCoresOpportunistic;
private Queue<AllocatedContainerInfo> queuedGuaranteedContainers;
private Queue<AllocatedContainerInfo> queuedOpportunisticContainers;
private Set<ContainerId> opportunisticContainersToKill;
private final OpportunisticContainersStatus opportunisticContainersStatus;
private final ContainerQueuingLimit queuingLimit;
public QueuingContainerManagerImpl(Context context, ContainerExecutor exec,
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) {
super(context, exec, deletionContext, nodeStatusUpdater, metrics,
dirsHandler);
this.allocatedGuaranteedContainers = new ConcurrentHashMap<>();
this.allocatedOpportunisticContainers = new ConcurrentHashMap<>();
this.allocatedMemoryOpportunistic = 0;
this.allocatedVCoresOpportunistic = 0;
this.queuedGuaranteedContainers = new ConcurrentLinkedQueue<>();
this.queuedOpportunisticContainers = new ConcurrentLinkedQueue<>();
this.opportunisticContainersToKill = Collections.synchronizedSet(
new HashSet<ContainerId>());
this.opportunisticContainersStatus =
OpportunisticContainersStatus.newInstance();
this.queuingLimit = ContainerQueuingLimit.newInstance();
}
@Override
protected EventHandler<ApplicationEvent> createApplicationEventDispatcher() {
return new QueuingApplicationEventDispatcher(
super.createApplicationEventDispatcher());
}
@Override
protected void startContainerInternal(
ContainerTokenIdentifier containerTokenIdentifier,
StartContainerRequest request) throws YarnException, IOException {
this.context.getQueuingContext().getQueuedContainers().put(
containerTokenIdentifier.getContainerID(), containerTokenIdentifier);
AllocatedContainerInfo allocatedContInfo = new AllocatedContainerInfo(
containerTokenIdentifier, request,
containerTokenIdentifier.getExecutionType(), containerTokenIdentifier
.getResource(), getConfig());
// If there are already free resources for the container to start, and
// there are no queued containers waiting to be executed, start this
// container immediately.
if (queuedGuaranteedContainers.isEmpty() &&
queuedOpportunisticContainers.isEmpty() &&
getContainersMonitor().
hasResourcesAvailable(allocatedContInfo.getPti())) {
startAllocatedContainer(allocatedContInfo);
} else {
ContainerId cIdToStart = containerTokenIdentifier.getContainerID();
this.context.getNMStateStore().storeContainer(cIdToStart,
containerTokenIdentifier.getVersion(), request);
this.context.getNMStateStore().storeContainerQueued(cIdToStart);
LOG.info("No available resources for container {} to start its execution "
+ "immediately.", cIdToStart);
if (allocatedContInfo.getExecutionType() == ExecutionType.GUARANTEED) {
queuedGuaranteedContainers.add(allocatedContInfo);
// Kill running opportunistic containers to make space for
// guaranteed container.
killOpportunisticContainers(allocatedContInfo);
} else {
LOG.info("Opportunistic container {} will be queued at the NM.",
cIdToStart);
queuedOpportunisticContainers.add(allocatedContInfo);
}
}
}
@Override
protected void stopContainerInternal(ContainerId containerID)
throws YarnException, IOException {
Container container = this.context.getContainers().get(containerID);
// If container is null and distributed scheduling is enabled, container
// might be queued. Otherwise, container might not be handled by this NM.
if (container == null && this.context.getQueuingContext()
.getQueuedContainers().containsKey(containerID)) {
ContainerTokenIdentifier containerTokenId = this.context
.getQueuingContext().getQueuedContainers().remove(containerID);
boolean foundInQueue = removeQueuedContainer(containerID,
containerTokenId.getExecutionType());
if (foundInQueue) {
LOG.info("Removing queued container with ID " + containerID);
this.context.getQueuingContext().getKilledQueuedContainers().put(
containerTokenId,
"Queued container request removed by ApplicationMaster.");
this.context.getNMStateStore().storeContainerKilled(containerID);
} else {
// The container started execution in the meanwhile.
try {
stopContainerInternalIfRunning(containerID);
} catch (YarnException | IOException e) {
LOG.error("Container did not get removed successfully.", e);
}
}
nodeStatusUpdater.sendOutofBandHeartBeat();
} else {
super.stopContainerInternal(containerID);
}
}
/**
* Start the execution of the given container. Also add it to the allocated
* containers, and update allocated resource utilization.
*/
private void startAllocatedContainer(
AllocatedContainerInfo allocatedContainerInfo) {
ProcessTreeInfo pti = allocatedContainerInfo.getPti();
if (allocatedContainerInfo.getExecutionType() ==
ExecutionType.GUARANTEED) {
allocatedGuaranteedContainers.put(pti.getContainerId(),
allocatedContainerInfo);
} else {
allocatedOpportunisticContainers.put(pti.getContainerId(),
allocatedContainerInfo);
allocatedMemoryOpportunistic += pti.getPmemLimit();
allocatedVCoresOpportunistic += pti.getCpuVcores();
}
getContainersMonitor().increaseContainersAllocation(pti);
// Start execution of container.
ContainerId containerId = allocatedContainerInfo
.getContainerTokenIdentifier().getContainerID();
this.context.getQueuingContext().getQueuedContainers().remove(containerId);
try {
LOG.info("Starting container [" + containerId + "]");
super.startContainerInternal(
allocatedContainerInfo.getContainerTokenIdentifier(),
allocatedContainerInfo.getStartRequest());
} catch (YarnException | IOException e) {
containerFailedToStart(pti.getContainerId(),
allocatedContainerInfo.getContainerTokenIdentifier());
LOG.error("Container failed to start.", e);
}
}
private void containerFailedToStart(ContainerId containerId,
ContainerTokenIdentifier containerTokenId) {
this.context.getQueuingContext().getQueuedContainers().remove(containerId);
removeAllocatedContainer(containerId);
this.context.getQueuingContext().getKilledQueuedContainers().put(
containerTokenId,
"Container removed from queue as it failed to start.");
}
/**
* Remove the given container from the container queues.
*
* @return true if the container was found in one of the queues.
*/
private boolean removeQueuedContainer(ContainerId containerId,
ExecutionType executionType) {
Queue<AllocatedContainerInfo> queue =
(executionType == ExecutionType.GUARANTEED) ?
queuedGuaranteedContainers : queuedOpportunisticContainers;
boolean foundInQueue = false;
Iterator<AllocatedContainerInfo> iter = queue.iterator();
while (iter.hasNext() && !foundInQueue) {
if (iter.next().getPti().getContainerId().equals(containerId)) {
iter.remove();
foundInQueue = true;
}
}
return foundInQueue;
}
/**
* Remove the given container from the allocated containers, and update
* allocated container utilization accordingly.
*/
private void removeAllocatedContainer(ContainerId containerId) {
AllocatedContainerInfo contToRemove = null;
contToRemove = allocatedGuaranteedContainers.remove(containerId);
if (contToRemove == null) {
contToRemove = allocatedOpportunisticContainers.remove(containerId);
}
// If container was indeed running, update allocated resource utilization.
if (contToRemove != null) {
getContainersMonitor().decreaseContainersAllocation(contToRemove
.getPti());
if (contToRemove.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
allocatedMemoryOpportunistic -= contToRemove.getPti().getPmemLimit();
allocatedVCoresOpportunistic -= contToRemove.getPti().getCpuVcores();
}
}
}
/**
* Stop a container only if it is currently running. If queued, do not stop
* it.
*/
private void stopContainerInternalIfRunning(ContainerId containerID)
throws YarnException, IOException {
if (this.context.getContainers().containsKey(containerID)) {
stopContainerInternal(containerID);
}
}
/**
* Kill opportunistic containers to free up resources for running the given
* container.
*
* @param allocatedContInfo
* the container whose execution needs to start by freeing up
* resources occupied by opportunistic containers.
*/
private void killOpportunisticContainers(
AllocatedContainerInfo allocatedContInfo) {
ContainerId containerToStartId = allocatedContInfo.getPti()
.getContainerId();
List<ContainerId> extraOpportContainersToKill =
pickOpportunisticContainersToKill(containerToStartId);
// Kill the opportunistic containers that were chosen.
for (ContainerId contIdToKill : extraOpportContainersToKill) {
try {
stopContainerInternalIfRunning(contIdToKill);
} catch (YarnException | IOException e) {
LOG.error("Container did not get removed successfully.", e);
}
LOG.info(
"Opportunistic container {} will be killed in order to start the "
+ "execution of guaranteed container {}.",
contIdToKill, containerToStartId);
}
}
/**
* Choose the opportunistic containers to kill in order to free up resources
* for running the given container.
*
* @param containerToStartId
* the container whose execution needs to start by freeing up
* resources occupied by opportunistic containers.
* @return the additional opportunistic containers that need to be killed.
*/
protected List<ContainerId> pickOpportunisticContainersToKill(
ContainerId containerToStartId) {
// The additional opportunistic containers that need to be killed for the
// given container to start.
List<ContainerId> extraOpportContainersToKill = new ArrayList<>();
// Track resources that need to be freed.
ResourceUtilization resourcesToFreeUp = resourcesToFreeUp(
containerToStartId);
// Go over the running opportunistic containers. Avoid containers that have
// already been marked for killing.
boolean hasSufficientResources = false;
for (Map.Entry<ContainerId, AllocatedContainerInfo> runningOpportCont :
allocatedOpportunisticContainers.entrySet()) {
ContainerId runningOpportContId = runningOpportCont.getKey();
// If there are sufficient resources to execute the given container, do
// not kill more opportunistic containers.
if (resourcesToFreeUp.getPhysicalMemory() <= 0 &&
resourcesToFreeUp.getVirtualMemory() <= 0 &&
resourcesToFreeUp.getCPU() <= 0.0f) {
hasSufficientResources = true;
break;
}
if (!opportunisticContainersToKill.contains(runningOpportContId)) {
extraOpportContainersToKill.add(runningOpportContId);
opportunisticContainersToKill.add(runningOpportContId);
getContainersMonitor().decreaseResourceUtilization(resourcesToFreeUp,
runningOpportCont.getValue().getPti());
}
}
if (!hasSufficientResources) {
LOG.info(
"There are no sufficient resources to start guaranteed {} even after "
+ "attempting to kill any running opportunistic containers.",
containerToStartId);
}
return extraOpportContainersToKill;
}
/**
* Calculates the amount of resources that need to be freed up (by killing
* opportunistic containers) in order for the given guaranteed container to
* start its execution. Resource allocation to be freed up =
* <code>containersAllocation</code> -
* allocation of <code>opportunisticContainersToKill</code> +
* allocation of <code>queuedGuaranteedContainers</code> that will start
* before the given container +
* allocation of given container -
* total resources of node.
*
* @param containerToStartId
* the ContainerId of the guaranteed container for which we need to
* free resources, so that its execution can start.
* @return the resources that need to be freed up for the given guaranteed
* container to start.
*/
private ResourceUtilization resourcesToFreeUp(
ContainerId containerToStartId) {
// Get allocation of currently allocated containers.
ResourceUtilization resourceAllocationToFreeUp = ResourceUtilization
.newInstance(getContainersMonitor().getContainersAllocation());
// Subtract from the allocation the allocation of the opportunistic
// containers that are marked for killing.
for (ContainerId opportContId : opportunisticContainersToKill) {
if (allocatedOpportunisticContainers.containsKey(opportContId)) {
getContainersMonitor().decreaseResourceUtilization(
resourceAllocationToFreeUp,
allocatedOpportunisticContainers.get(opportContId).getPti());
}
}
// Add to the allocation the allocation of the pending guaranteed
// containers that will start before the current container will be started.
for (AllocatedContainerInfo guarContInfo : queuedGuaranteedContainers) {
getContainersMonitor().increaseResourceUtilization(
resourceAllocationToFreeUp, guarContInfo.getPti());
if (guarContInfo.getPti().getContainerId().equals(containerToStartId)) {
break;
}
}
// Subtract the overall node resources.
getContainersMonitor().subtractNodeResourcesFromResourceUtilization(
resourceAllocationToFreeUp);
return resourceAllocationToFreeUp;
}
/**
* If there are available resources, try to start as many pending containers
* as possible.
*/
private void startPendingContainers() {
// Start pending guaranteed containers, if resources available.
boolean resourcesAvailable =
startContainersFromQueue(queuedGuaranteedContainers);
// Start opportunistic containers, if resources available.
if (resourcesAvailable) {
startContainersFromQueue(queuedOpportunisticContainers);
}
}
private boolean startContainersFromQueue(
Queue<AllocatedContainerInfo> queuedContainers) {
Iterator<AllocatedContainerInfo> guarIter = queuedContainers.iterator();
boolean resourcesAvailable = true;
while (guarIter.hasNext() && resourcesAvailable) {
AllocatedContainerInfo allocatedContInfo = guarIter.next();
if (getContainersMonitor().hasResourcesAvailable(
allocatedContInfo.getPti())) {
startAllocatedContainer(allocatedContInfo);
guarIter.remove();
} else {
resourcesAvailable = false;
}
}
return resourcesAvailable;
}
@Override
protected ContainerStatus getContainerStatusInternal(ContainerId containerID,
NMTokenIdentifier nmTokenIdentifier) throws YarnException {
Container container = this.context.getContainers().get(containerID);
if (container == null) {
ContainerTokenIdentifier containerTokenId = this.context
.getQueuingContext().getQueuedContainers().get(containerID);
if (containerTokenId != null) {
ExecutionType executionType = this.context.getQueuingContext()
.getQueuedContainers().get(containerID).getExecutionType();
return BuilderUtils.newContainerStatus(containerID,
org.apache.hadoop.yarn.api.records.ContainerState.QUEUED, "",
ContainerExitStatus.INVALID, this.context.getQueuingContext()
.getQueuedContainers().get(containerID).getResource(),
executionType);
} else {
// Check if part of the stopped/killed queued containers.
for (ContainerTokenIdentifier cTokenId : this.context
.getQueuingContext().getKilledQueuedContainers().keySet()) {
if (cTokenId.getContainerID().equals(containerID)) {
return BuilderUtils.newContainerStatus(containerID,
org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE,
this.context.getQueuingContext().getKilledQueuedContainers()
.get(cTokenId), ContainerExitStatus.ABORTED, cTokenId
.getResource(), cTokenId.getExecutionType());
}
}
}
}
return super.getContainerStatusInternal(containerID, nmTokenIdentifier);
}
/**
* Recover running or queued container.
*/
@Override
protected void recoverActiveContainer(
ContainerLaunchContext launchContext, ContainerTokenIdentifier token,
RecoveredContainerState rcs) throws IOException {
if (rcs.getStatus() ==
RecoveredContainerStatus.QUEUED && !rcs.getKilled()) {
LOG.info(token.getContainerID()
+ "will be added to the queued containers.");
AllocatedContainerInfo allocatedContInfo = new AllocatedContainerInfo(
token, rcs.getStartRequest(), token.getExecutionType(),
token.getResource(), getConfig());
this.context.getQueuingContext().getQueuedContainers().put(
token.getContainerID(), token);
if (allocatedContInfo.getExecutionType() == ExecutionType.GUARANTEED) {
queuedGuaranteedContainers.add(allocatedContInfo);
// Kill running opportunistic containers to make space for
// guaranteed container.
killOpportunisticContainers(allocatedContInfo);
} else {
queuedOpportunisticContainers.add(allocatedContInfo);
}
} else {
super.recoverActiveContainer(launchContext, token, rcs);
}
}
@VisibleForTesting
public int getNumAllocatedGuaranteedContainers() {
return allocatedGuaranteedContainers.size();
}
@VisibleForTesting
public int getNumAllocatedOpportunisticContainers() {
return allocatedOpportunisticContainers.size();
}
@VisibleForTesting
public int getNumQueuedGuaranteedContainers() {
return queuedGuaranteedContainers.size();
}
@VisibleForTesting
public int getNumQueuedOpportunisticContainers() {
return queuedOpportunisticContainers.size();
}
class QueuingApplicationEventDispatcher implements
EventHandler<ApplicationEvent> {
private EventHandler<ApplicationEvent> applicationEventDispatcher;
public QueuingApplicationEventDispatcher(
EventHandler<ApplicationEvent> applicationEventDispatcher) {
this.applicationEventDispatcher = applicationEventDispatcher;
}
@Override
public void handle(ApplicationEvent event) {
if (event.getType() ==
ApplicationEventType.APPLICATION_CONTAINER_FINISHED) {
if (!(event instanceof ApplicationContainerFinishedEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
ApplicationContainerFinishedEvent finishEvent =
(ApplicationContainerFinishedEvent) event;
// Remove finished container from the allocated containers, and
// attempt to start new containers.
ContainerId contIdToRemove = finishEvent.getContainerID();
removeAllocatedContainer(contIdToRemove);
opportunisticContainersToKill.remove(contIdToRemove);
startPendingContainers();
}
this.applicationEventDispatcher.handle(event);
}
}
@Override
public OpportunisticContainersStatus getOpportunisticContainersStatus() {
opportunisticContainersStatus
.setRunningOpportContainers(allocatedOpportunisticContainers.size());
opportunisticContainersStatus
.setOpportMemoryUsed(allocatedMemoryOpportunistic);
opportunisticContainersStatus
.setOpportCoresUsed(allocatedVCoresOpportunistic);
opportunisticContainersStatus
.setQueuedOpportContainers(queuedOpportunisticContainers.size());
opportunisticContainersStatus.setWaitQueueLength(
queuedGuaranteedContainers.size() +
queuedOpportunisticContainers.size());
return opportunisticContainersStatus;
}
@Override
public void updateQueuingLimit(ContainerQueuingLimit limit) {
this.queuingLimit.setMaxQueueLength(limit.getMaxQueueLength());
// TODO: Include wait time as well once it is implemented
if (this.queuingLimit.getMaxQueueLength() > -1) {
shedQueuedOpportunisticContainers();
}
}
private void shedQueuedOpportunisticContainers() {
int numAllowed = this.queuingLimit.getMaxQueueLength();
Iterator<AllocatedContainerInfo> containerIter =
queuedOpportunisticContainers.iterator();
while (containerIter.hasNext()) {
AllocatedContainerInfo cInfo = containerIter.next();
if (numAllowed <= 0) {
containerIter.remove();
ContainerTokenIdentifier containerTokenIdentifier = this.context
.getQueuingContext().getQueuedContainers().remove(
cInfo.getContainerTokenIdentifier().getContainerID());
// The Container might have already started while we were
// iterating..
if (containerTokenIdentifier != null) {
this.context.getQueuingContext().getKilledQueuedContainers()
.putIfAbsent(cInfo.getContainerTokenIdentifier(),
"Container de-queued to meet NM queuing limits. "
+ "Max Queue length["
+ this.queuingLimit.getMaxQueueLength() + "]");
}
}
numAllowed--;
}
}
static class AllocatedContainerInfo {
private final ContainerTokenIdentifier containerTokenIdentifier;
private final StartContainerRequest startRequest;
private final ExecutionType executionType;
private final ProcessTreeInfo pti;
AllocatedContainerInfo(ContainerTokenIdentifier containerTokenIdentifier,
StartContainerRequest startRequest, ExecutionType executionType,
Resource resource, Configuration conf) {
this.containerTokenIdentifier = containerTokenIdentifier;
this.startRequest = startRequest;
this.executionType = executionType;
this.pti = createProcessTreeInfo(containerTokenIdentifier
.getContainerID(), resource, conf);
}
private ContainerTokenIdentifier getContainerTokenIdentifier() {
return this.containerTokenIdentifier;
}
private StartContainerRequest getStartRequest() {
return this.startRequest;
}
private ExecutionType getExecutionType() {
return this.executionType;
}
protected ProcessTreeInfo getPti() {
return this.pti;
}
private ProcessTreeInfo createProcessTreeInfo(ContainerId containerId,
Resource resource, Configuration conf) {
long pmemBytes = resource.getMemorySize() * 1024 * 1024L;
float pmemRatio = conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
long vmemBytes = (long) (pmemRatio * pmemBytes);
int cpuVcores = resource.getVirtualCores();
return new ProcessTreeInfo(containerId, null, null, vmemBytes, pmemBytes,
cpuVcores);
}
@Override
public boolean equals(Object obj) {
boolean equal = false;
if (obj instanceof AllocatedContainerInfo) {
AllocatedContainerInfo otherContInfo = (AllocatedContainerInfo) obj;
equal = this.getPti().getContainerId()
.equals(otherContInfo.getPti().getContainerId());
}
return equal;
}
@Override
public int hashCode() {
return this.getPti().getContainerId().hashCode();
}
}
}

View File

@ -1,23 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
/**
* This package contains classes related to the queuing of containers at
* the NM.
*
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing;

View File

@ -0,0 +1,137 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An implementation of the {@link ResourceUtilizationTracker} that equates
* resource utilization with the total resource allocated to the container.
*/
public class AllocationBasedResourceUtilizationTracker implements
ResourceUtilizationTracker {
private static final Logger LOG =
LoggerFactory.getLogger(AllocationBasedResourceUtilizationTracker.class);
private ResourceUtilization containersAllocation;
private ContainerScheduler scheduler;
AllocationBasedResourceUtilizationTracker(ContainerScheduler scheduler) {
this.containersAllocation = ResourceUtilization.newInstance(0, 0, 0.0f);
this.scheduler = scheduler;
}
/**
* Get the accumulation of totally allocated resources to a container.
* @return ResourceUtilization Resource Utilization.
*/
@Override
public ResourceUtilization getCurrentUtilization() {
return this.containersAllocation;
}
/**
* Add Container's resources to the accumulated Utilization.
* @param container Container.
*/
@Override
public void addContainerResources(Container container) {
ContainersMonitor.increaseResourceUtilization(
getContainersMonitor(), this.containersAllocation,
container.getResource());
}
/**
* Subtract Container's resources to the accumulated Utilization.
* @param container Container.
*/
@Override
public void subtractContainerResource(Container container) {
ContainersMonitor.decreaseResourceUtilization(
getContainersMonitor(), this.containersAllocation,
container.getResource());
}
/**
* Check if NM has resources available currently to run the container.
* @param container Container.
* @return True, if NM has resources available currently to run the container.
*/
@Override
public boolean hasResourcesAvailable(Container container) {
long pMemBytes = container.getResource().getMemorySize() * 1024 * 1024L;
return hasResourcesAvailable(pMemBytes,
(long) (getContainersMonitor().getVmemRatio()* pMemBytes),
container.getResource().getVirtualCores());
}
private boolean hasResourcesAvailable(long pMemBytes, long vMemBytes,
int cpuVcores) {
// Check physical memory.
if (LOG.isDebugEnabled()) {
LOG.debug("pMemCheck [current={} + asked={} > allowed={}]",
this.containersAllocation.getPhysicalMemory(),
(pMemBytes >> 20),
(getContainersMonitor().getPmemAllocatedForContainers() >> 20));
}
if (this.containersAllocation.getPhysicalMemory() +
(int) (pMemBytes >> 20) >
(int) (getContainersMonitor()
.getPmemAllocatedForContainers() >> 20)) {
return false;
}
if (LOG.isDebugEnabled()) {
LOG.debug("before vMemCheck" +
"[isEnabled={}, current={} + asked={} > allowed={}]",
getContainersMonitor().isVmemCheckEnabled(),
this.containersAllocation.getVirtualMemory(), (vMemBytes >> 20),
(getContainersMonitor().getVmemAllocatedForContainers() >> 20));
}
// Check virtual memory.
if (getContainersMonitor().isVmemCheckEnabled() &&
this.containersAllocation.getVirtualMemory() +
(int) (vMemBytes >> 20) >
(int) (getContainersMonitor()
.getVmemAllocatedForContainers() >> 20)) {
return false;
}
float vCores = (float) cpuVcores /
getContainersMonitor().getVCoresAllocatedForContainers();
if (LOG.isDebugEnabled()) {
LOG.debug("before cpuCheck [asked={} > allowed={}]",
this.containersAllocation.getCPU(), vCores);
}
// Check CPU.
if (this.containersAllocation.getCPU() + vCores > 1.0f) {
return false;
}
return true;
}
public ContainersMonitor getContainersMonitor() {
return this.scheduler.getContainersMonitor();
}
}

View File

@ -0,0 +1,419 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
* The ContainerScheduler manages a collection of runnable containers. It
* ensures that a container is launched only if all its launch criteria are
* met. It also ensures that OPPORTUNISTIC containers are killed to make
* room for GUARANTEED containers.
*/
public class ContainerScheduler extends AbstractService implements
EventHandler<ContainerSchedulerEvent> {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerScheduler.class);
private final Context context;
private final int maxOppQueueLength;
// Queue of Guaranteed Containers waiting for resources to run
private final LinkedHashMap<ContainerId, Container>
queuedGuaranteedContainers = new LinkedHashMap<>();
// Queue of Opportunistic Containers waiting for resources to run
private final LinkedHashMap<ContainerId, Container>
queuedOpportunisticContainers = new LinkedHashMap<>();
// Used to keep track of containers that have been marked to be killed
// to make room for a guaranteed container.
private final Map<ContainerId, Container> oppContainersToKill =
new HashMap<>();
// Containers launched by the Scheduler will take a while to actually
// move to the RUNNING state, but should still be fair game for killing
// by the scheduler to make room for guaranteed containers. This holds
// containers that are in RUNNING as well as those in SCHEDULED state that
// have been marked to run, but not yet RUNNING.
private final LinkedHashMap<ContainerId, Container> runningContainers =
new LinkedHashMap<>();
private final ContainerQueuingLimit queuingLimit =
ContainerQueuingLimit.newInstance();
private final OpportunisticContainersStatus opportunisticContainersStatus;
// Resource Utilization Tracker that decides how utilization of the cluster
// increases / decreases based on container start / finish
private ResourceUtilizationTracker utilizationTracker;
private final AsyncDispatcher dispatcher;
private final NodeManagerMetrics metrics;
/**
* Instantiate a Container Scheduler.
* @param context NodeManager Context.
* @param dispatcher AsyncDispatcher.
* @param metrics NodeManagerMetrics.
*/
public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
NodeManagerMetrics metrics) {
this(context, dispatcher, metrics, context.getConf().getInt(
YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH,
YarnConfiguration.
NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH_DEFAULT));
}
@VisibleForTesting
public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
NodeManagerMetrics metrics, int qLength) {
super(ContainerScheduler.class.getName());
this.context = context;
this.dispatcher = dispatcher;
this.metrics = metrics;
this.maxOppQueueLength = (qLength <= 0) ? 0 : qLength;
this.utilizationTracker =
new AllocationBasedResourceUtilizationTracker(this);
this.opportunisticContainersStatus =
OpportunisticContainersStatus.newInstance();
}
/**
* Handle ContainerSchedulerEvents.
* @param event ContainerSchedulerEvent.
*/
@Override
public void handle(ContainerSchedulerEvent event) {
switch (event.getType()) {
case SCHEDULE_CONTAINER:
scheduleContainer(event.getContainer());
break;
case CONTAINER_COMPLETED:
onContainerCompleted(event.getContainer());
break;
case SHED_QUEUED_CONTAINERS:
shedQueuedOpportunisticContainers();
break;
default:
LOG.error("Unknown event arrived at ContainerScheduler: "
+ event.toString());
}
}
/**
* Return number of queued containers.
* @return Number of queued containers.
*/
public int getNumQueuedContainers() {
return this.queuedGuaranteedContainers.size()
+ this.queuedOpportunisticContainers.size();
}
@VisibleForTesting
public int getNumQueuedGuaranteedContainers() {
return this.queuedGuaranteedContainers.size();
}
@VisibleForTesting
public int getNumQueuedOpportunisticContainers() {
return this.queuedOpportunisticContainers.size();
}
public OpportunisticContainersStatus getOpportunisticContainersStatus() {
this.opportunisticContainersStatus.setQueuedOpportContainers(
getNumQueuedOpportunisticContainers());
this.opportunisticContainersStatus.setWaitQueueLength(
getNumQueuedContainers());
this.opportunisticContainersStatus.setOpportMemoryUsed(
metrics.getOpportMemoryUsed());
this.opportunisticContainersStatus.setOpportCoresUsed(
metrics.getOpportCoresUsed());
this.opportunisticContainersStatus.setRunningOpportContainers(
metrics.getRunningOpportContainers());
return this.opportunisticContainersStatus;
}
private void onContainerCompleted(Container container) {
oppContainersToKill.remove(container.getContainerId());
// This could be killed externally for eg. by the ContainerManager,
// in which case, the container might still be queued.
Container queued =
queuedOpportunisticContainers.remove(container.getContainerId());
if (queued == null) {
queuedGuaranteedContainers.remove(container.getContainerId());
}
// decrement only if it was a running container
Container completedContainer = runningContainers.remove(container
.getContainerId());
if (completedContainer != null) {
this.utilizationTracker.subtractContainerResource(container);
if (container.getContainerTokenIdentifier().getExecutionType() ==
ExecutionType.OPPORTUNISTIC) {
this.metrics.opportunisticContainerCompleted(container);
}
startPendingContainers();
}
}
private void startPendingContainers() {
// Start pending guaranteed containers, if resources available.
boolean resourcesAvailable =
startContainersFromQueue(queuedGuaranteedContainers.values());
// Start opportunistic containers, if resources available.
if (resourcesAvailable) {
startContainersFromQueue(queuedOpportunisticContainers.values());
}
}
private boolean startContainersFromQueue(
Collection<Container> queuedContainers) {
Iterator<Container> cIter = queuedContainers.iterator();
boolean resourcesAvailable = true;
while (cIter.hasNext() && resourcesAvailable) {
Container container = cIter.next();
if (this.utilizationTracker.hasResourcesAvailable(container)) {
startAllocatedContainer(container);
cIter.remove();
} else {
resourcesAvailable = false;
}
}
return resourcesAvailable;
}
@VisibleForTesting
protected void scheduleContainer(Container container) {
if (maxOppQueueLength <= 0) {
startAllocatedContainer(container);
return;
}
if (queuedGuaranteedContainers.isEmpty() &&
queuedOpportunisticContainers.isEmpty() &&
this.utilizationTracker.hasResourcesAvailable(container)) {
startAllocatedContainer(container);
} else {
LOG.info("No available resources for container {} to start its execution "
+ "immediately.", container.getContainerId());
boolean isQueued = true;
if (container.getContainerTokenIdentifier().getExecutionType() ==
ExecutionType.GUARANTEED) {
queuedGuaranteedContainers.put(container.getContainerId(), container);
// Kill running opportunistic containers to make space for
// guaranteed container.
killOpportunisticContainers(container);
} else {
if (queuedOpportunisticContainers.size() <= maxOppQueueLength) {
LOG.info("Opportunistic container {} will be queued at the NM.",
container.getContainerId());
queuedOpportunisticContainers.put(
container.getContainerId(), container);
} else {
isQueued = false;
LOG.info("Opportunistic container [{}] will not be queued at the NM" +
"since max queue length [{}] has been reached",
container.getContainerId(), maxOppQueueLength);
container.sendKillEvent(
ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
"Opportunistic container queue is full.");
}
}
if (isQueued) {
try {
this.context.getNMStateStore().storeContainerQueued(
container.getContainerId());
} catch (IOException e) {
LOG.warn("Could not store container [" + container.getContainerId()
+ "] state. The Container has been queued.", e);
}
}
}
}
private void killOpportunisticContainers(Container container) {
List<Container> extraOpportContainersToKill =
pickOpportunisticContainersToKill(container.getContainerId());
// Kill the opportunistic containers that were chosen.
for (Container contToKill : extraOpportContainersToKill) {
contToKill.sendKillEvent(
ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
"Container Killed to make room for Guaranteed Container.");
oppContainersToKill.put(contToKill.getContainerId(), contToKill);
LOG.info(
"Opportunistic container {} will be killed in order to start the "
+ "execution of guaranteed container {}.",
contToKill.getContainerId(), container.getContainerId());
}
}
private void startAllocatedContainer(Container container) {
LOG.info("Starting container [" + container.getContainerId()+ "]");
runningContainers.put(container.getContainerId(), container);
this.utilizationTracker.addContainerResources(container);
if (container.getContainerTokenIdentifier().getExecutionType() ==
ExecutionType.OPPORTUNISTIC) {
this.metrics.opportunisticContainerStarted(container);
}
container.sendLaunchEvent();
}
private List<Container> pickOpportunisticContainersToKill(
ContainerId containerToStartId) {
// The opportunistic containers that need to be killed for the
// given container to start.
List<Container> extraOpportContainersToKill = new ArrayList<>();
// Track resources that need to be freed.
ResourceUtilization resourcesToFreeUp = resourcesToFreeUp(
containerToStartId);
// Go over the running opportunistic containers.
// Use a descending iterator to kill more recently started containers.
Iterator<Container> lifoIterator = new LinkedList<>(
runningContainers.values()).descendingIterator();
while(lifoIterator.hasNext() &&
!hasSufficientResources(resourcesToFreeUp)) {
Container runningCont = lifoIterator.next();
if (runningCont.getContainerTokenIdentifier().getExecutionType() ==
ExecutionType.OPPORTUNISTIC) {
if (oppContainersToKill.containsKey(
runningCont.getContainerId())) {
// These containers have already been marked to be killed.
// So exclude them..
continue;
}
extraOpportContainersToKill.add(runningCont);
ContainersMonitor.decreaseResourceUtilization(
getContainersMonitor(), resourcesToFreeUp,
runningCont.getResource());
}
}
if (!hasSufficientResources(resourcesToFreeUp)) {
LOG.warn("There are no sufficient resources to start guaranteed [{}]" +
"at the moment. Opportunistic containers are in the process of" +
"being killed to make room.", containerToStartId);
}
return extraOpportContainersToKill;
}
private boolean hasSufficientResources(
ResourceUtilization resourcesToFreeUp) {
return resourcesToFreeUp.getPhysicalMemory() <= 0 &&
resourcesToFreeUp.getVirtualMemory() <= 0 &&
resourcesToFreeUp.getCPU() <= 0.0f;
}
private ResourceUtilization resourcesToFreeUp(
ContainerId containerToStartId) {
// Get allocation of currently allocated containers.
ResourceUtilization resourceAllocationToFreeUp = ResourceUtilization
.newInstance(this.utilizationTracker.getCurrentUtilization());
// Add to the allocation the allocation of the pending guaranteed
// containers that will start before the current container will be started.
for (Container container : queuedGuaranteedContainers.values()) {
ContainersMonitor.increaseResourceUtilization(
getContainersMonitor(), resourceAllocationToFreeUp,
container.getResource());
if (container.getContainerId().equals(containerToStartId)) {
break;
}
}
// These resources are being freed, likely at the behest of another
// guaranteed container..
for (Container container : oppContainersToKill.values()) {
ContainersMonitor.decreaseResourceUtilization(
getContainersMonitor(), resourceAllocationToFreeUp,
container.getResource());
}
// Subtract the overall node resources.
getContainersMonitor().subtractNodeResourcesFromResourceUtilization(
resourceAllocationToFreeUp);
return resourceAllocationToFreeUp;
}
@SuppressWarnings("unchecked")
public void updateQueuingLimit(ContainerQueuingLimit limit) {
this.queuingLimit.setMaxQueueLength(limit.getMaxQueueLength());
// YARN-2886 should add support for wait-times. Include wait time as
// well once it is implemented
if ((queuingLimit.getMaxQueueLength() > -1) &&
(queuingLimit.getMaxQueueLength() <
queuedOpportunisticContainers.size())) {
dispatcher.getEventHandler().handle(
new ContainerSchedulerEvent(null,
ContainerSchedulerEventType.SHED_QUEUED_CONTAINERS));
}
}
private void shedQueuedOpportunisticContainers() {
int numAllowed = this.queuingLimit.getMaxQueueLength();
Iterator<Container> containerIter =
queuedOpportunisticContainers.values().iterator();
while (containerIter.hasNext()) {
Container container = containerIter.next();
if (numAllowed <= 0) {
container.sendKillEvent(
ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
"Container De-queued to meet NM queuing limits.");
containerIter.remove();
LOG.info(
"Opportunistic container {} will be killed to meet NM queuing" +
" limits.", container.getContainerId());
}
numAllowed--;
}
}
public ContainersMonitor getContainersMonitor() {
return this.context.getContainerManager().getContainersMonitor();
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container
.Container;
/**
* Events consumed by the {@link ContainerScheduler}.
*/
public class ContainerSchedulerEvent extends
AbstractEvent<ContainerSchedulerEventType> {
private final Container container;
/**
* Create instance of Event.
* @param container Container.
* @param eventType EventType.
*/
public ContainerSchedulerEvent(Container container,
ContainerSchedulerEventType eventType) {
super(eventType);
this.container = container;
}
/**
* Get the container associated with the event.
* @return Container.
*/
public Container getContainer() {
return container;
}
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
/**
* Event types associated with {@link ContainerSchedulerEvent}.
*/
public enum ContainerSchedulerEventType {
SCHEDULE_CONTAINER,
CONTAINER_COMPLETED,
// Producer: Node HB response - RM has asked to shed the queue
SHED_QUEUED_CONTAINERS,
}

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
/**
* This interface abstracts out how a container contributes to
* Resource Utilization of the node.
* It is used by the {@link ContainerScheduler} to determine which
* OPPORTUNISTIC containers to be killed to make room for a GUARANTEED
* container.
*/
public interface ResourceUtilizationTracker {
/**
* Get the current total utilization of all the Containers running on
* the node.
* @return ResourceUtilization Resource Utilization.
*/
ResourceUtilization getCurrentUtilization();
/**
* Add Container's resources to Node Utilization.
* @param container Container.
*/
void addContainerResources(Container container);
/**
* Subtract Container's resources to Node Utilization.
* @param container Container.
*/
void subtractContainerResource(Container container);
/**
* Check if NM has resources available currently to run the container.
* @param container Container.
* @return True, if NM has resources available currently to run the container.
*/
boolean hasResourcesAvailable(Container container);
}

View File

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Container Scheduler
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;

View File

@ -23,11 +23,14 @@ import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.yarn.api.records.Resource;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container
.Container;
@Metrics(about="Metrics for node manager", context="yarn")
public class NodeManagerMetrics {
@ -60,6 +63,14 @@ public class NodeManagerMetrics {
MutableGaugeInt goodLocalDirsDiskUtilizationPerc;
@Metric("Disk utilization % on good log dirs")
MutableGaugeInt goodLogDirsDiskUtilizationPerc;
@Metric("Memory used by Opportunistic Containers in MB")
MutableGaugeLong opportMemoryUsed;
@Metric("# of Virtual Cores used by opportunistic containers")
MutableGaugeInt opportCoresUsed;
@Metric("# of running opportunistic containers")
MutableGaugeInt runningOpportContainers;
// CHECKSTYLE:ON:VisibilityModifier
private JvmMetrics jvmMetrics = null;
@ -130,6 +141,30 @@ public class NodeManagerMetrics {
containersReIniting.decr();
}
public long getOpportMemoryUsed() {
return opportMemoryUsed.value();
}
public int getOpportCoresUsed() {
return opportCoresUsed.value();
}
public int getRunningOpportContainers() {
return runningOpportContainers.value();
}
public void opportunisticContainerCompleted(Container container) {
opportMemoryUsed.decr(container.getResource().getMemorySize());
opportCoresUsed.decr(container.getResource().getVirtualCores());
runningOpportContainers.decr();
}
public void opportunisticContainerStarted(Container container) {
opportMemoryUsed.incr(container.getResource().getMemorySize());
opportCoresUsed.incr(container.getResource().getVirtualCores());
runningOpportContainers.incr();
}
public void allocateContainer(Resource res) {
allocatedContainers.incr();
allocatedMB = allocatedMB + res.getMemorySize();

View File

@ -21,7 +21,6 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
@ -38,7 +37,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -149,7 +148,7 @@ public class ContainerLogsUtils {
private static void checkState(ContainerState state) {
if (state == ContainerState.NEW || state == ContainerState.LOCALIZING ||
state == ContainerState.LOCALIZED) {
state == ContainerState.SCHEDULED) {
throw new NotFoundException("Container is not yet running. Current state is "
+ state);
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.nodemanager;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.fs.FileContext;
@ -158,7 +159,7 @@ public class TestEventFlow {
containerManager.startContainers(allRequests);
BaseContainerManagerTest.waitForContainerState(containerManager, cID,
ContainerState.RUNNING);
Arrays.asList(ContainerState.RUNNING, ContainerState.SCHEDULED), 20);
List<ContainerId> containerIds = new ArrayList<ContainerId>();
containerIds.add(cID);

View File

@ -454,6 +454,14 @@ public class TestNodeManagerResync {
if (containersShouldBePreserved) {
Assert.assertFalse(containers.isEmpty());
Assert.assertTrue(containers.containsKey(existingCid));
ContainerState state = containers.get(existingCid)
.cloneAndGetContainerStatus().getState();
// Wait till RUNNING state...
int counter = 50;
while (state != ContainerState.RUNNING && counter > 0) {
Thread.sleep(100);
counter--;
}
Assert.assertEquals(ContainerState.RUNNING,
containers.get(existingCid)
.cloneAndGetContainerStatus().getState());

View File

@ -28,6 +28,7 @@ import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -255,7 +256,9 @@ public class TestNodeManagerShutdown {
GetContainerStatusesRequest.newInstance(containerIds);
ContainerStatus containerStatus =
containerManager.getContainerStatuses(request).getContainerStatuses().get(0);
Assert.assertEquals(ContainerState.RUNNING, containerStatus.getState());
Assert.assertTrue(
EnumSet.of(ContainerState.RUNNING, ContainerState.SCHEDULED)
.contains(containerStatus.getState()));
}
public static ContainerId createContainerId() {

View File

@ -65,7 +65,6 @@ import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
@ -1080,128 +1079,6 @@ public class TestNodeStatusUpdater {
Assert.assertTrue(containerIdSet.contains(runningContainerId));
}
@Test(timeout = 90000)
public void testKilledQueuedContainers() throws Exception {
NodeManager nm = new NodeManager();
YarnConfiguration conf = new YarnConfiguration();
conf.set(
NodeStatusUpdaterImpl
.YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS,
"10000");
nm.init(conf);
NodeStatusUpdaterImpl nodeStatusUpdater =
(NodeStatusUpdaterImpl) nm.getNodeStatusUpdater();
ApplicationId appId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 0);
// Add application to context.
nm.getNMContext().getApplications().putIfAbsent(appId,
mock(Application.class));
// Create a running container and add it to the context.
ContainerId runningContainerId =
ContainerId.newContainerId(appAttemptId, 1);
Token runningContainerToken =
BuilderUtils.newContainerToken(runningContainerId, 0, "anyHost",
1234, "anyUser", BuilderUtils.newResource(1024, 1), 0, 123,
"password".getBytes(), 0);
Container runningContainer =
new ContainerImpl(conf, null, null, null, null,
BuilderUtils.newContainerTokenIdentifier(runningContainerToken),
nm.getNMContext()) {
@Override
public ContainerState getCurrentState() {
return ContainerState.RUNNING;
}
@Override
public org.apache.hadoop.yarn.server.nodemanager.containermanager.
container.ContainerState getContainerState() {
return org.apache.hadoop.yarn.server.nodemanager.containermanager.
container.ContainerState.RUNNING;
}
};
nm.getNMContext().getContainers()
.put(runningContainerId, runningContainer);
// Create two killed queued containers and add them to the queuing context.
ContainerId killedQueuedContainerId1 = ContainerId.newContainerId(
appAttemptId, 2);
ContainerTokenIdentifier killedQueuedContainerTokenId1 = BuilderUtils
.newContainerTokenIdentifier(BuilderUtils.newContainerToken(
killedQueuedContainerId1, 0, "anyHost", 1234, "anyUser",
BuilderUtils.newResource(1024, 1), 0, 123,
"password".getBytes(), 0));
ContainerId killedQueuedContainerId2 = ContainerId.newContainerId(
appAttemptId, 3);
ContainerTokenIdentifier killedQueuedContainerTokenId2 = BuilderUtils
.newContainerTokenIdentifier(BuilderUtils.newContainerToken(
killedQueuedContainerId2, 0, "anyHost", 1234, "anyUser",
BuilderUtils.newResource(1024, 1), 0, 123,
"password".getBytes(), 0));
nm.getNMContext().getQueuingContext().getKilledQueuedContainers().put(
killedQueuedContainerTokenId1, "Queued container killed.");
nm.getNMContext().getQueuingContext().getKilledQueuedContainers().put(
killedQueuedContainerTokenId2, "Queued container killed.");
List<ContainerStatus> containerStatuses = nodeStatusUpdater
.getContainerStatuses();
Assert.assertEquals(3, containerStatuses.size());
ContainerStatus runningContainerStatus = null;
ContainerStatus killedQueuedContainerStatus1 = null;
ContainerStatus killedQueuedContainerStatus2 = null;
for (ContainerStatus cStatus : containerStatuses) {
if (ContainerState.RUNNING == cStatus.getState()) {
runningContainerStatus = cStatus;
}
if (ContainerState.COMPLETE == cStatus.getState()) {
if (killedQueuedContainerId1.equals(cStatus.getContainerId())) {
killedQueuedContainerStatus1 = cStatus;
} else {
killedQueuedContainerStatus2 = cStatus;
}
}
}
// Check container IDs and Container Status.
Assert.assertNotNull(runningContainerId);
Assert.assertNotNull(killedQueuedContainerId1);
Assert.assertNotNull(killedQueuedContainerId2);
// Killed queued container should have ABORTED exit status.
Assert.assertEquals(ContainerExitStatus.ABORTED,
killedQueuedContainerStatus1.getExitStatus());
Assert.assertEquals(ContainerExitStatus.ABORTED,
killedQueuedContainerStatus2.getExitStatus());
// Killed queued container should appear in the recentlyStoppedContainers.
Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(
killedQueuedContainerId1));
Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(
killedQueuedContainerId2));
// Check if killed queued containers are successfully removed from the
// queuing context.
List<ContainerId> ackedContainers = new ArrayList<ContainerId>();
ackedContainers.add(killedQueuedContainerId1);
ackedContainers.add(killedQueuedContainerId2);
nodeStatusUpdater.removeOrTrackCompletedContainersFromContext(
ackedContainers);
containerStatuses = nodeStatusUpdater.getContainerStatuses();
// Only the running container should be in the container statuses now.
Assert.assertEquals(1, containerStatuses.size());
Assert.assertEquals(ContainerState.RUNNING,
containerStatuses.get(0).getState());
}
@Test(timeout = 10000)
public void testCompletedContainersIsRecentlyStopped() throws Exception {
NodeManager nm = new NodeManager();

View File

@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
@ -153,7 +154,7 @@ public abstract class BaseAMRMProxyTest {
* rest. So the responses returned can be less than the number of end points
* specified
*
* @param testContext
* @param testContexts
* @param func
* @return
*/
@ -697,11 +698,6 @@ public abstract class BaseAMRMProxyTest {
return null;
}
@Override
public QueuingContext getQueuingContext() {
return null;
}
public boolean isDistributedSchedulingEnabled() {
return false;
}

View File

@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -192,10 +193,10 @@ public abstract class BaseContainerManagerTest {
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
// Default delSrvc
exec = createContainerExecutor();
delSrvc = createDeletionService();
delSrvc.init(conf);
exec = createContainerExecutor();
dirsHandler = new LocalDirsHandlerService();
nodeHealthChecker = new NodeHealthCheckerService(
NodeManager.getNodeHealthScriptRunner(conf), dirsHandler);
@ -288,32 +289,43 @@ public abstract class BaseContainerManagerTest {
ContainerManagementProtocol containerManager, ContainerId containerID,
ContainerState finalState)
throws InterruptedException, YarnException, IOException {
waitForContainerState(containerManager, containerID, finalState, 20);
waitForContainerState(containerManager, containerID,
Arrays.asList(finalState), 20);
}
public static void waitForContainerState(
ContainerManagementProtocol containerManager, ContainerId containerID,
ContainerState finalState, int timeOutMax)
throws InterruptedException, YarnException, IOException {
waitForContainerState(containerManager, containerID,
Arrays.asList(finalState), timeOutMax);
}
public static void waitForContainerState(
ContainerManagementProtocol containerManager, ContainerId containerID,
List<ContainerState> finalStates, int timeOutMax)
throws InterruptedException, YarnException, IOException {
List<ContainerId> list = new ArrayList<ContainerId>();
list.add(containerID);
GetContainerStatusesRequest request =
GetContainerStatusesRequest.newInstance(list);
ContainerStatus containerStatus = null;
HashSet<ContainerState> fStates =
new HashSet<>(finalStates);
int timeoutSecs = 0;
do {
Thread.sleep(2000);
containerStatus =
containerManager.getContainerStatuses(request)
.getContainerStatuses().get(0);
LOG.info("Waiting for container to get into state " + finalState
LOG.info("Waiting for container to get into one of states " + fStates
+ ". Current state is " + containerStatus.getState());
timeoutSecs += 2;
} while (!containerStatus.getState().equals(finalState)
} while (!fStates.contains(containerStatus.getState())
&& timeoutSecs < timeOutMax);
LOG.info("Container state is " + containerStatus.getState());
Assert.assertEquals("ContainerState is not correct (timedout)",
finalState, containerStatus.getState());
Assert.assertTrue("ContainerState is not correct (timedout)",
fStates.contains(containerStatus.getState()));
}
public static void waitForApplicationState(

View File

@ -94,6 +94,10 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
@ -101,7 +105,6 @@ import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecret
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Before;
import org.junit.Test;
@ -551,6 +554,35 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
throw new YarnException("Reject this container");
}
}
@Override
protected ContainerScheduler createContainerScheduler(Context context) {
return new ContainerScheduler(context, dispatcher, metrics){
@Override
public ContainersMonitor getContainersMonitor() {
return new ContainersMonitorImpl(null, null, null) {
@Override
public float getVmemRatio() {
return 2.0f;
}
@Override
public long getVmemAllocatedForContainers() {
return 20480;
}
@Override
public long getPmemAllocatedForContainers() {
return 10240;
}
@Override
public long getVCoresAllocatedForContainers() {
return 4;
}
};
}
};
}
};
}

View File

@ -1,84 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing.QueuingContainerManagerImpl;
/**
* Test class that invokes all test cases of {@link TestContainerManager} while
* using the {@link QueuingContainerManagerImpl}. The goal is to assert that
* no regression is introduced in the existing cases when no queuing of tasks at
* the NMs is involved.
*/
public class TestContainerManagerRegression extends TestContainerManager {
public TestContainerManagerRegression()
throws UnsupportedFileSystemException {
super();
}
static {
LOG = LogFactory.getLog(TestContainerManagerRegression.class);
}
@Override
protected ContainerManagerImpl createContainerManager(
DeletionService delSrvc) {
return new QueuingContainerManagerImpl(context, exec, delSrvc,
nodeStatusUpdater, metrics, dirsHandler) {
@Override
public void
setBlockNewContainerRequests(boolean blockNewContainerRequests) {
// do nothing
}
@Override
protected UserGroupInformation getRemoteUgi() throws YarnException {
ApplicationId appId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
appId, 1);
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(
appAttemptId.toString());
ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context
.getNodeId(), user, context.getNMTokenSecretManager()
.getCurrentKey().getKeyId()));
return ugi;
}
@Override
protected void authorizeGetAndStopContainerRequest(
ContainerId containerId, Container container, boolean stopRequest,
NMTokenIdentifier identifier) throws YarnException {
if (container == null || container.getUser().equals("Fail")) {
throw new YarnException("Reject this container");
}
}
};
}
}

View File

@ -27,6 +27,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.atLeastOnce;
import java.io.IOException;
import java.net.URISyntaxException;
@ -90,6 +91,11 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
@ -143,7 +149,7 @@ public class TestContainer {
Map<Path, List<String>> localPaths = wc.localizeResources();
// all resources should be localized
assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState());
assertEquals(ContainerState.SCHEDULED, wc.c.getContainerState());
assertNotNull(wc.c.getLocalizedResources());
for (Entry<Path, List<String>> loc : wc.c.getLocalizedResources()
.entrySet()) {
@ -421,7 +427,7 @@ public class TestContainer {
wc = new WrappedContainer(17, 314159265358979L, 4344, "yak");
wc.initContainer();
wc.localizeResources();
assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState());
assertEquals(ContainerState.SCHEDULED, wc.c.getContainerState());
ContainerLaunch launcher = wc.launcher.running.get(wc.c.getContainerId());
wc.killContainer();
assertEquals(ContainerState.KILLING, wc.c.getContainerState());
@ -452,7 +458,7 @@ public class TestContainer {
wc = new WrappedContainer(17, 314159265358979L, 4344, "yak");
wc.initContainer();
wc.localizeResources();
assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState());
assertEquals(ContainerState.SCHEDULED, wc.c.getContainerState());
wc.killContainer();
assertEquals(ContainerState.KILLING, wc.c.getContainerState());
wc.containerSuccessful();
@ -480,7 +486,7 @@ public class TestContainer {
wc = new WrappedContainer(17, 314159265358979L, 4344, "yak");
wc.initContainer();
wc.localizeResources();
assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState());
assertEquals(ContainerState.SCHEDULED, wc.c.getContainerState());
wc.killContainer();
assertEquals(ContainerState.KILLING, wc.c.getContainerState());
wc.containerFailed(ExitCode.FORCE_KILLED.getExitCode());
@ -507,7 +513,7 @@ public class TestContainer {
wc = new WrappedContainer(17, 314159265358979L, 4344, "yak");
wc.initContainer();
wc.localizeResources();
assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState());
assertEquals(ContainerState.SCHEDULED, wc.c.getContainerState());
ContainerLaunch launcher = wc.launcher.running.get(wc.c.getContainerId());
launcher.call();
wc.drainDispatcherEvents();
@ -764,7 +770,7 @@ public class TestContainer {
new ResourcesReleasedMatcher(wc.localResources, EnumSet.of(
LocalResourceVisibility.PUBLIC, LocalResourceVisibility.PRIVATE,
LocalResourceVisibility.APPLICATION));
verify(wc.localizerBus).handle(argThat(matchesReq));
verify(wc.localizerBus, atLeastOnce()).handle(argThat(matchesReq));
}
private void verifyOutofBandHeartBeat(WrappedContainer wc) {
@ -890,6 +896,7 @@ public class TestContainer {
final EventHandler<AuxServicesEvent> auxBus;
final EventHandler<ApplicationEvent> appBus;
final EventHandler<LogHandlerEvent> LogBus;
final EventHandler<ContainerSchedulerEvent> schedBus;
final ContainersLauncher launcher;
final ContainerLaunchContext ctxt;
@ -927,9 +934,16 @@ public class TestContainer {
auxBus = mock(EventHandler.class);
appBus = mock(EventHandler.class);
LogBus = mock(EventHandler.class);
schedBus = new ContainerScheduler(context, dispatcher, metrics, 0) {
@Override
protected void scheduleContainer(Container container) {
container.sendLaunchEvent();
}
};
dispatcher.register(LocalizationEventType.class, localizerBus);
dispatcher.register(ContainersLauncherEventType.class, launcherBus);
dispatcher.register(ContainersMonitorEventType.class, monitorBus);
dispatcher.register(ContainerSchedulerEventType.class, schedBus);
dispatcher.register(AuxServicesEventType.class, auxBus);
dispatcher.register(ApplicationEventType.class, appBus);
dispatcher.register(LogHandlerEventType.class, LogBus);

View File

@ -16,15 +16,15 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing;
package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
@ -40,35 +40,41 @@ import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
import org.junit.Test;
import static org.mockito.Mockito.spy;
/**
* Class for testing the {@link QueuingContainerManagerImpl}.
* Tests to verify that the {@link ContainerScheduler} is able to queue and
* make room for containers.
*/
public class TestQueuingContainerManager extends BaseContainerManagerTest {
public TestQueuingContainerManager() throws UnsupportedFileSystemException {
public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
public TestContainerSchedulerQueuing() throws UnsupportedFileSystemException {
super();
}
static {
LOG = LogFactory.getLog(TestQueuingContainerManager.class);
LOG = LogFactory.getLog(TestContainerSchedulerQueuing.class);
}
boolean shouldDeleteWait = false;
private boolean delayContainers = true;
@Override
protected ContainerManagerImpl createContainerManager(
DeletionService delSrvc) {
return new QueuingContainerManagerImpl(context, exec, delSrvc,
return new ContainerManagerImpl(context, exec, delSrvc,
nodeStatusUpdater, metrics, dirsHandler) {
@Override
public void
@ -117,33 +123,29 @@ public class TestQueuingContainerManager extends BaseContainerManagerTest {
}
@Override
protected DeletionService createDeletionService() {
return new DeletionService(exec) {
protected ContainerExecutor createContainerExecutor() {
DefaultContainerExecutor exec = new DefaultContainerExecutor() {
@Override
public void delete(String user, Path subDir, Path... baseDirs) {
// Don't do any deletions.
if (shouldDeleteWait) {
public int launchContainer(ContainerStartContext ctx) throws IOException {
if (delayContainers) {
try {
Thread.sleep(10000);
LOG.info("\n\nSleeping Pseudo delete : user - " + user + ", " +
"subDir - " + subDir + ", " +
"baseDirs - " + Arrays.asList(baseDirs));
} catch (InterruptedException e) {
e.printStackTrace();
// Nothing..
}
} else {
LOG.info("\n\nPseudo delete : user - " + user + ", " +
"subDir - " + subDir + ", " +
"baseDirs - " + Arrays.asList(baseDirs));
}
return super.launchContainer(ctx);
}
};
exec.setConf(conf);
return spy(exec);
}
@Override
public void setup() throws IOException {
conf.setInt(
YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10);
super.setup();
shouldDeleteWait = false;
}
/**
@ -152,7 +154,6 @@ public class TestQueuingContainerManager extends BaseContainerManagerTest {
*/
@Test
public void testStartMultipleContainers() throws Exception {
shouldDeleteWait = true;
containerManager.start();
ContainerLaunchContext containerLaunchContext =
@ -209,7 +210,6 @@ public class TestQueuingContainerManager extends BaseContainerManagerTest {
*/
@Test
public void testQueueMultipleContainers() throws Exception {
shouldDeleteWait = true;
containerManager.start();
ContainerLaunchContext containerLaunchContext =
@ -248,17 +248,18 @@ public class TestQueuingContainerManager extends BaseContainerManagerTest {
.getContainerStatuses(statRequest).getContainerStatuses();
for (ContainerStatus status : containerStatuses) {
Assert.assertEquals(
org.apache.hadoop.yarn.api.records.ContainerState.QUEUED,
org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED,
status.getState());
}
ContainerScheduler containerScheduler =
containerManager.getContainerScheduler();
// Ensure both containers are properly queued.
Assert.assertEquals(2, containerManager.getContext().getQueuingContext()
.getQueuedContainers().size());
Assert.assertEquals(1, ((QueuingContainerManagerImpl) containerManager)
.getNumQueuedGuaranteedContainers());
Assert.assertEquals(1, ((QueuingContainerManagerImpl) containerManager)
.getNumQueuedOpportunisticContainers());
Assert.assertEquals(2, containerScheduler.getNumQueuedContainers());
Assert.assertEquals(1,
containerScheduler.getNumQueuedGuaranteedContainers());
Assert.assertEquals(1,
containerScheduler.getNumQueuedOpportunisticContainers());
}
/**
@ -268,7 +269,6 @@ public class TestQueuingContainerManager extends BaseContainerManagerTest {
*/
@Test
public void testStartAndQueueMultipleContainers() throws Exception {
shouldDeleteWait = true;
containerManager.start();
ContainerLaunchContext containerLaunchContext =
@ -319,18 +319,19 @@ public class TestQueuingContainerManager extends BaseContainerManagerTest {
status.getState());
} else {
Assert.assertEquals(
org.apache.hadoop.yarn.api.records.ContainerState.QUEUED,
org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED,
status.getState());
}
}
ContainerScheduler containerScheduler =
containerManager.getContainerScheduler();
// Ensure two containers are properly queued.
Assert.assertEquals(2, containerManager.getContext().getQueuingContext()
.getQueuedContainers().size());
Assert.assertEquals(0, ((QueuingContainerManagerImpl) containerManager)
.getNumQueuedGuaranteedContainers());
Assert.assertEquals(2, ((QueuingContainerManagerImpl) containerManager)
.getNumQueuedOpportunisticContainers());
Assert.assertEquals(2, containerScheduler.getNumQueuedContainers());
Assert.assertEquals(0,
containerScheduler.getNumQueuedGuaranteedContainers());
Assert.assertEquals(2,
containerScheduler.getNumQueuedOpportunisticContainers());
}
/**
@ -344,7 +345,6 @@ public class TestQueuingContainerManager extends BaseContainerManagerTest {
*/
@Test
public void testKillOpportunisticForGuaranteedContainer() throws Exception {
shouldDeleteWait = true;
containerManager.start();
ContainerLaunchContext containerLaunchContext =
@ -393,11 +393,11 @@ public class TestQueuingContainerManager extends BaseContainerManagerTest {
.getContainerStatuses(statRequest).getContainerStatuses();
for (ContainerStatus status : containerStatuses) {
if (status.getContainerId().equals(createContainerId(0))) {
Assert.assertTrue(status.getDiagnostics()
.contains("Container killed by the ApplicationMaster"));
Assert.assertTrue(status.getDiagnostics().contains(
"Container Killed to make room for Guaranteed Container"));
} else if (status.getContainerId().equals(createContainerId(1))) {
Assert.assertEquals(
org.apache.hadoop.yarn.api.records.ContainerState.QUEUED,
org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED,
status.getState());
} else if (status.getContainerId().equals(createContainerId(2))) {
Assert.assertEquals(
@ -420,6 +420,197 @@ public class TestQueuingContainerManager extends BaseContainerManagerTest {
contStatus1.getState());
}
/**
* 1. Submit a long running GUARANTEED container to hog all NM resources.
* 2. Submit 6 OPPORTUNISTIC containers, all of which will be queued.
* 3. Update the Queue Limit to 2.
* 4. Ensure only 2 containers remain in the Queue, and 4 are de-Queued.
* @throws Exception
*/
@Test
public void testQueueShedding() throws Exception {
containerManager.start();
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
containerLaunchContext.setCommands(Arrays.asList("sleep 100"));
List<StartContainerRequest> list = new ArrayList<>();
list.add(StartContainerRequest.newInstance(
containerLaunchContext,
createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
context.getNodeId(),
user, BuilderUtils.newResource(2048, 1),
context.getContainerTokenSecretManager(), null,
ExecutionType.GUARANTEED)));
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
containerManager.startContainers(allRequests);
list = new ArrayList<>();
list.add(StartContainerRequest.newInstance(
containerLaunchContext,
createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
context.getNodeId(),
user, BuilderUtils.newResource(512, 1),
context.getContainerTokenSecretManager(), null,
ExecutionType.OPPORTUNISTIC)));
list.add(StartContainerRequest.newInstance(
containerLaunchContext,
createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER,
context.getNodeId(),
user, BuilderUtils.newResource(512, 1),
context.getContainerTokenSecretManager(), null,
ExecutionType.OPPORTUNISTIC)));
list.add(StartContainerRequest.newInstance(
containerLaunchContext,
createContainerToken(createContainerId(3), DUMMY_RM_IDENTIFIER,
context.getNodeId(),
user, BuilderUtils.newResource(512, 1),
context.getContainerTokenSecretManager(), null,
ExecutionType.OPPORTUNISTIC)));
list.add(StartContainerRequest.newInstance(
containerLaunchContext,
createContainerToken(createContainerId(4), DUMMY_RM_IDENTIFIER,
context.getNodeId(),
user, BuilderUtils.newResource(512, 1),
context.getContainerTokenSecretManager(), null,
ExecutionType.OPPORTUNISTIC)));
list.add(StartContainerRequest.newInstance(
containerLaunchContext,
createContainerToken(createContainerId(5), DUMMY_RM_IDENTIFIER,
context.getNodeId(),
user, BuilderUtils.newResource(512, 1),
context.getContainerTokenSecretManager(), null,
ExecutionType.OPPORTUNISTIC)));
list.add(StartContainerRequest.newInstance(
containerLaunchContext,
createContainerToken(createContainerId(6), DUMMY_RM_IDENTIFIER,
context.getNodeId(),
user, BuilderUtils.newResource(512, 1),
context.getContainerTokenSecretManager(), null,
ExecutionType.OPPORTUNISTIC)));
allRequests = StartContainersRequest.newInstance(list);
containerManager.startContainers(allRequests);
ContainerScheduler containerScheduler =
containerManager.getContainerScheduler();
// Ensure all containers are properly queued.
int numTries = 30;
while ((containerScheduler.getNumQueuedContainers() < 6) &&
(numTries-- > 0)) {
Thread.sleep(100);
}
Assert.assertEquals(6, containerScheduler.getNumQueuedContainers());
ContainerQueuingLimit containerQueuingLimit = ContainerQueuingLimit
.newInstance();
containerQueuingLimit.setMaxQueueLength(2);
containerScheduler.updateQueuingLimit(containerQueuingLimit);
numTries = 30;
while ((containerScheduler.getNumQueuedContainers() > 2) &&
(numTries-- > 0)) {
Thread.sleep(100);
}
Assert.assertEquals(2, containerScheduler.getNumQueuedContainers());
List<ContainerId> statList = new ArrayList<ContainerId>();
for (int i = 1; i < 7; i++) {
statList.add(createContainerId(i));
}
GetContainerStatusesRequest statRequest =
GetContainerStatusesRequest.newInstance(statList);
List<ContainerStatus> containerStatuses = containerManager
.getContainerStatuses(statRequest).getContainerStatuses();
int deQueuedContainers = 0;
int numQueuedOppContainers = 0;
for (ContainerStatus status : containerStatuses) {
if (status.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
if (status.getDiagnostics().contains(
"Container De-queued to meet NM queuing limits")) {
deQueuedContainers++;
}
if (status.getState() ==
org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED) {
numQueuedOppContainers++;
}
}
}
Assert.assertEquals(4, deQueuedContainers);
Assert.assertEquals(2, numQueuedOppContainers);
}
/**
* 1. Submit a long running GUARANTEED container to hog all NM resources.
* 2. Submit 2 OPPORTUNISTIC containers, both of which will be queued.
* 3. Send Stop Container to one of the queued containers.
* 4. Ensure container is removed from the queue.
* @throws Exception
*/
@Test
public void testContainerDeQueuedAfterAMKill() throws Exception {
containerManager.start();
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
containerLaunchContext.setCommands(Arrays.asList("sleep 100"));
List<StartContainerRequest> list = new ArrayList<>();
list.add(StartContainerRequest.newInstance(
containerLaunchContext,
createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
context.getNodeId(),
user, BuilderUtils.newResource(2048, 1),
context.getContainerTokenSecretManager(), null,
ExecutionType.GUARANTEED)));
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
containerManager.startContainers(allRequests);
list = new ArrayList<>();
list.add(StartContainerRequest.newInstance(
containerLaunchContext,
createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
context.getNodeId(),
user, BuilderUtils.newResource(512, 1),
context.getContainerTokenSecretManager(), null,
ExecutionType.OPPORTUNISTIC)));
list.add(StartContainerRequest.newInstance(
containerLaunchContext,
createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER,
context.getNodeId(),
user, BuilderUtils.newResource(512, 1),
context.getContainerTokenSecretManager(), null,
ExecutionType.OPPORTUNISTIC)));
allRequests = StartContainersRequest.newInstance(list);
containerManager.startContainers(allRequests);
ContainerScheduler containerScheduler =
containerManager.getContainerScheduler();
// Ensure both containers are properly queued.
int numTries = 30;
while ((containerScheduler.getNumQueuedContainers() < 2) &&
(numTries-- > 0)) {
Thread.sleep(100);
}
Assert.assertEquals(2, containerScheduler.getNumQueuedContainers());
containerManager.stopContainers(
StopContainersRequest.newInstance(Arrays.asList(createContainerId(2))));
numTries = 30;
while ((containerScheduler.getNumQueuedContainers() > 1) &&
(numTries-- > 0)) {
Thread.sleep(100);
}
Assert.assertEquals(1, containerScheduler.getNumQueuedContainers());
}
/**
* Submit three OPPORTUNISTIC containers that can run concurrently, and one
* GUARANTEED that needs to kill two of the OPPORTUNISTIC for it to run.
@ -427,7 +618,6 @@ public class TestQueuingContainerManager extends BaseContainerManagerTest {
*/
@Test
public void testKillMultipleOpportunisticContainers() throws Exception {
shouldDeleteWait = true;
containerManager.start();
ContainerLaunchContext containerLaunchContext =
@ -455,6 +645,12 @@ public class TestQueuingContainerManager extends BaseContainerManagerTest {
user, BuilderUtils.newResource(512, 1),
context.getContainerTokenSecretManager(), null,
ExecutionType.OPPORTUNISTIC)));
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
containerManager.startContainers(allRequests);
list = new ArrayList<>();
list.add(StartContainerRequest.newInstance(
containerLaunchContext,
createContainerToken(createContainerId(3), DUMMY_RM_IDENTIFIER,
@ -463,8 +659,7 @@ public class TestQueuingContainerManager extends BaseContainerManagerTest {
context.getContainerTokenSecretManager(), null,
ExecutionType.GUARANTEED)));
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
allRequests = StartContainersRequest.newInstance(list);
containerManager.startContainers(allRequests);
BaseContainerManagerTest.waitForNMContainerState(
@ -486,7 +681,77 @@ public class TestQueuingContainerManager extends BaseContainerManagerTest {
.getContainerStatuses(statRequest).getContainerStatuses();
for (ContainerStatus status : containerStatuses) {
if (status.getDiagnostics().contains(
"Container killed by the ApplicationMaster")) {
"Container Killed to make room for Guaranteed Container")) {
killedContainers++;
}
System.out.println("\nStatus : [" + status + "]\n");
}
Assert.assertEquals(2, killedContainers);
}
/**
* Submit four OPPORTUNISTIC containers that can run concurrently, and then
* two GUARANTEED that needs to kill Exactly two of the OPPORTUNISTIC for
* it to run. Make sure only 2 are killed.
* @throws Exception
*/
@Test
public void testKillOnlyRequiredOpportunisticContainers() throws Exception {
containerManager.start();
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
List<StartContainerRequest> list = new ArrayList<>();
// Fill NM with Opportunistic containers
for (int i = 0; i < 4; i++) {
list.add(StartContainerRequest.newInstance(
containerLaunchContext,
createContainerToken(createContainerId(i), DUMMY_RM_IDENTIFIER,
context.getNodeId(),
user, BuilderUtils.newResource(512, 1),
context.getContainerTokenSecretManager(), null,
ExecutionType.OPPORTUNISTIC)));
}
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
containerManager.startContainers(allRequests);
list = new ArrayList<>();
// Now ask for two Guaranteed containers
for (int i = 4; i < 6; i++) {
list.add(StartContainerRequest.newInstance(
containerLaunchContext,
createContainerToken(createContainerId(i), DUMMY_RM_IDENTIFIER,
context.getNodeId(),
user, BuilderUtils.newResource(512, 1),
context.getContainerTokenSecretManager(), null,
ExecutionType.GUARANTEED)));
}
allRequests = StartContainersRequest.newInstance(list);
containerManager.startContainers(allRequests);
BaseContainerManagerTest.waitForNMContainerState(containerManager,
createContainerId(0), ContainerState.DONE, 40);
Thread.sleep(5000);
// Get container statuses. Container 0 should be killed, container 1
// should be queued and container 2 should be running.
int killedContainers = 0;
List<ContainerId> statList = new ArrayList<ContainerId>();
for (int i = 0; i < 6; i++) {
statList.add(createContainerId(i));
}
GetContainerStatusesRequest statRequest =
GetContainerStatusesRequest.newInstance(statList);
List<ContainerStatus> containerStatuses = containerManager
.getContainerStatuses(statRequest).getContainerStatuses();
for (ContainerStatus status : containerStatuses) {
if (status.getDiagnostics().contains(
"Container Killed to make room for Guaranteed Container")) {
killedContainers++;
}
System.out.println("\nStatus : [" + status + "]\n");
@ -502,7 +767,6 @@ public class TestQueuingContainerManager extends BaseContainerManagerTest {
*/
@Test
public void testStopQueuedContainer() throws Exception {
shouldDeleteWait = true;
containerManager.start();
ContainerLaunchContext containerLaunchContext =
@ -553,7 +817,7 @@ public class TestQueuingContainerManager extends BaseContainerManagerTest {
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING) {
runningContainersNo++;
} else if (status.getState() ==
org.apache.hadoop.yarn.api.records.ContainerState.QUEUED) {
org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED) {
queuedContainersNo++;
}
System.out.println("\nStatus : [" + status + "]\n");
@ -574,23 +838,35 @@ public class TestQueuingContainerManager extends BaseContainerManagerTest {
for (int i = 0; i < 3; i++) {
statList.add(createContainerId(i));
}
statRequest = GetContainerStatusesRequest.newInstance(statList);
HashMap<org.apache.hadoop.yarn.api.records.ContainerState, ContainerStatus>
map = new HashMap<>();
for (int i=0; i < 10; i++) {
containerStatuses = containerManager.getContainerStatuses(statRequest)
.getContainerStatuses();
for (ContainerStatus status : containerStatuses) {
if (status.getContainerId().equals(createContainerId(0))) {
Assert.assertEquals(
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
status.getState());
} else if (status.getContainerId().equals(createContainerId(1))) {
Assert.assertTrue(status.getDiagnostics().contains(
"Queued container request removed"));
} else if (status.getContainerId().equals(createContainerId(2))) {
Assert.assertEquals(
org.apache.hadoop.yarn.api.records.ContainerState.QUEUED,
status.getState());
}
System.out.println("\nStatus : [" + status + "]\n");
map.put(status.getState(), status);
if (map.containsKey(
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING) &&
map.containsKey(
org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED) &&
map.containsKey(
org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE)) {
break;
}
Thread.sleep(1000);
}
}
Assert.assertEquals(createContainerId(0),
map.get(org.apache.hadoop.yarn.api.records.ContainerState.RUNNING)
.getContainerId());
Assert.assertEquals(createContainerId(1),
map.get(org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE)
.getContainerId());
Assert.assertEquals(createContainerId(2),
map.get(org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED)
.getContainerId());
}
}

View File

@ -215,4 +215,19 @@ public class MockContainer implements Container {
public void commitUpgrade() {
}
@Override
public boolean isMarkedForKilling() {
return false;
}
@Override
public void sendLaunchEvent() {
}
@Override
public void sendKillEvent(int exitStatus, String description) {
}
}

View File

@ -313,9 +313,11 @@ public class OpportunisticContainerAllocatorAMService
appAttempt.getApplicationAttemptId(), container.getNodeId(),
appAttempt.getUser(), rmContext, isRemotelyAllocated);
appAttempt.addRMContainer(container.getId(), rmContainer);
((AbstractYarnScheduler) rmContext.getScheduler()).getNode(
container.getNodeId()).allocateContainer(rmContainer);
rmContainer.handle(
new RMContainerEvent(container.getId(),
RMContainerEventType.LAUNCHED));
RMContainerEventType.ACQUIRED));
}
}

View File

@ -80,8 +80,8 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
RMContainerEventType.KILL)
.addTransition(RMContainerState.NEW, RMContainerState.RESERVED,
RMContainerEventType.RESERVED, new ContainerReservedTransition())
.addTransition(RMContainerState.NEW, RMContainerState.RUNNING,
RMContainerEventType.LAUNCHED)
.addTransition(RMContainerState.NEW, RMContainerState.ACQUIRED,
RMContainerEventType.ACQUIRED, new AcquiredTransition())
.addTransition(RMContainerState.NEW,
EnumSet.of(RMContainerState.RUNNING, RMContainerState.COMPLETED),
RMContainerEventType.RECOVER, new ContainerRecoveredTransition())

View File

@ -1394,9 +1394,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
}
// Process running containers
if (remoteContainer.getState() == ContainerState.RUNNING) {
// Process only GUARANTEED containers in the RM.
if (remoteContainer.getExecutionType() == ExecutionType.GUARANTEED) {
if (remoteContainer.getState() == ContainerState.RUNNING ||
remoteContainer.getState() == ContainerState.SCHEDULED) {
++numRemoteRunningContainers;
if (!launchedContainers.contains(containerId)) {
// Just launched container. RM knows about it the first time.
@ -1406,20 +1405,15 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
containerAllocationExpirer
.unregister(new AllocationExpirationInfo(containerId));
}
}
} else {
if (remoteContainer.getExecutionType() == ExecutionType.GUARANTEED) {
// A finished container
launchedContainers.remove(containerId);
// Unregister from containerAllocationExpirer.
containerAllocationExpirer
.unregister(new AllocationExpirationInfo(containerId));
}
// Completed containers should also include the OPPORTUNISTIC containers
// so that the AM gets properly notified.
if (completedContainers.add(containerId)) {
newlyCompletedContainers.add(remoteContainer);
}
// Unregister from containerAllocationExpirer.
containerAllocationExpirer
.unregister(new AllocationExpirationInfo(containerId));
}
}

View File

@ -587,6 +587,8 @@ public abstract class AbstractYarnScheduler
LOG.debug("Completed container: " + rmContainer.getContainerId() +
" in state: " + rmContainer.getState() + " event:" + event);
}
getSchedulerNode(rmContainer.getNodeId()).releaseContainer(
rmContainer.getContainer());
}
// If the container is getting killed in ACQUIRED state, the requester (AM

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
@ -148,7 +149,9 @@ public abstract class SchedulerNode {
*/
public synchronized void allocateContainer(RMContainer rmContainer) {
Container container = rmContainer.getContainer();
if (rmContainer.getExecutionType() != ExecutionType.OPPORTUNISTIC) {
deductUnallocatedResource(container.getResource());
}
++numContainers;
launchedContainers.put(container.getId(), rmContainer);
@ -246,7 +249,9 @@ public abstract class SchedulerNode {
*/
protected synchronized void updateResourceForReleasedContainer(
Container container) {
if (container.getExecutionType() == ExecutionType.GUARANTEED) {
addUnallocatedResource(container.getResource());
}
--numContainers;
}

View File

@ -78,7 +78,8 @@ import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing.QueuingContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
@ -723,8 +724,9 @@ public class MiniYARNCluster extends CompositeService {
ContainerExecutor exec, DeletionService del,
NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager,
LocalDirsHandlerService dirsHandler) {
if (getConfig().getBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED,
YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED_DEFAULT)) {
if (getConfig().getInt(
YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 0)
> 0) {
return new CustomQueueingContainerManagerImpl(context, exec, del,
nodeStatusUpdater, metrics, dirsHandler);
} else {
@ -864,7 +866,7 @@ public class MiniYARNCluster extends CompositeService {
}
private class CustomQueueingContainerManagerImpl extends
QueuingContainerManagerImpl {
ContainerManagerImpl {
public CustomQueueingContainerManagerImpl(Context context,
ContainerExecutor exec, DeletionService del, NodeStatusUpdater
@ -873,25 +875,6 @@ public class MiniYARNCluster extends CompositeService {
super(context, exec, del, nodeStatusUpdater, metrics, dirsHandler);
}
@Override
protected ContainersMonitor createContainersMonitor(ContainerExecutor
exec) {
return new ContainersMonitorImpl(exec, dispatcher, this.context) {
@Override
public void increaseContainersAllocation(ProcessTreeInfo pti) { }
@Override
public void decreaseContainersAllocation(ProcessTreeInfo pti) { }
@Override
public boolean hasResourcesAvailable(
ContainersMonitorImpl.ProcessTreeInfo pti) {
return true;
}
};
}
@Override
protected void createAMRMProxyService(Configuration conf) {
this.amrmProxyEnabled =
@ -910,6 +893,32 @@ public class MiniYARNCluster extends CompositeService {
LOG.info("CustomAMRMProxyService is disabled");
}
}
@Override
protected ContainersMonitor createContainersMonitor(ContainerExecutor
exec) {
return new ContainersMonitorImpl(exec, dispatcher, this.context) {
@Override
public float getVmemRatio() {
return 2.0f;
}
@Override
public long getVmemAllocatedForContainers() {
return 16 * 1024L * 1024L * 1024L;
}
@Override
public long getPmemAllocatedForContainers() {
return 8 * 1024L * 1024L * 1024L;
}
@Override
public long getVCoresAllocatedForContainers() {
return 10;
}
};
}
}
private class ShortCircuitedAMRMProxy extends AMRMProxyService {