From b56fc51b705ed92823fe876b28f199020a093e44 Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Wed, 20 Apr 2016 09:50:37 -0700 Subject: [PATCH] YARN-2883. Queuing of container requests in the NM. (Konstantinos Karanasos and Arun Suresh via kasha) (cherry picked from commit c8172f5f143d2fefafa5a412899ab7cd081b406d) --- .../yarn/api/records/ContainerState.java | 5 +- .../hadoop/yarn/conf/YarnConfiguration.java | 5 + .../src/main/proto/yarn_protos.proto | 1 + .../src/main/resources/yarn-default.xml | 7 + .../yarn/server/utils/BuilderUtils.java | 9 + .../yarn/server/nodemanager/Context.java | 17 + .../yarn/server/nodemanager/NodeManager.java | 42 +- .../ContainerManagerImpl.java | 113 ++-- .../container/ContainerImpl.java | 3 +- .../monitor/ContainersMonitor.java | 18 + .../monitor/ContainersMonitorImpl.java | 160 ++++- .../queuing/QueuingContainerManagerImpl.java | 556 ++++++++++++++++++ .../queuing/package-info.java | 23 + .../nodemanager/TestNodeStatusUpdater.java | 1 - .../amrmproxy/BaseAMRMProxyTest.java | 7 +- .../BaseContainerManagerTest.java | 56 +- .../TestContainerManager.java | 78 ++- .../monitor/MockResourceCalculatorPlugin.java | 4 + .../TestContainersMonitorResourceChange.java | 9 +- .../queuing/TestQueuingContainerManager.java | 301 ++++++++++ 20 files changed, 1271 insertions(+), 144 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/package-info.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java index 323d31d4c63..582389fa113 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java @@ -34,5 +34,8 @@ public enum ContainerState { RUNNING, /** Completed container */ - COMPLETE + COMPLETE, + + /** Queued at the NM. */ + QUEUED } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 52355ab9933..34e8c1bfecd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -675,6 +675,11 @@ public class YarnConfiguration extends Configuration { /** Prefix for all node manager configs.*/ public static final String NM_PREFIX = "yarn.nodemanager."; + /** Enable Queuing of OPPORTUNISTIC containers. */ + public static final String NM_CONTAINER_QUEUING_ENABLED = NM_PREFIX + + "container-queuing-enabled"; + public static final boolean NM_CONTAINER_QUEUING_ENABLED_DEFAULT = false; + /** Environment variables that will be sent to containers.*/ public static final String NM_ADMIN_USER_ENV = NM_PREFIX + "admin-env"; public static final String DEFAULT_NM_ADMIN_USER_ENV = "MALLOC_ARENA_MAX=$MALLOC_ARENA_MAX"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index e5d31dc5448..30902798863 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -82,6 +82,7 @@ enum ContainerStateProto { C_NEW = 1; C_RUNNING = 2; C_COMPLETE = 3; + C_QUEUED = 4; } message ContainerProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 9cae5fbd057..a38d0d828d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -972,6 +972,13 @@ 4 + + Enable Queuing of OPPORTUNISTIC containers on the + nodemanager. + yarn.nodemanager.container-queuing-enabled + false + + Number of seconds after an application finishes before the nodemanager's diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java index 4fdd43c789d..a70d143c476 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; @@ -215,6 +216,13 @@ public class BuilderUtils { public static ContainerStatus newContainerStatus(ContainerId containerId, ContainerState containerState, String diagnostics, int exitStatus, Resource capability) { + return newContainerStatus(containerId, containerState, diagnostics, + exitStatus, capability, ExecutionType.GUARANTEED); + } + + public static ContainerStatus newContainerStatus(ContainerId containerId, + ContainerState containerState, String diagnostics, int exitStatus, + Resource capability, ExecutionType executionType) { ContainerStatus containerStatus = recordFactory .newRecordInstance(ContainerStatus.class); containerStatus.setState(containerState); @@ -222,6 +230,7 @@ public class BuilderUtils { containerStatus.setDiagnostics(diagnostics); containerStatus.setExitStatus(exitStatus); containerStatus.setCapability(capability); + containerStatus.setExecutionType(executionType); return containerStatus; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index d3251ae46a4..205e47572b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; @@ -42,6 +43,15 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; */ public interface Context { + /** + * Interface exposing methods related to the queuing of containers in the NM. + */ + interface QueuingContext { + ConcurrentMap getQueuedContainers(); + + ConcurrentMap getKilledQueuedContainers(); + } + /** * Return the nodeId. Usable only when the ContainerManager is started. * @@ -89,4 +99,11 @@ public interface Context { getLogAggregationStatusForApps(); NodeStatusUpdater getNodeStatusUpdater(); + + /** + * Returns a QueuingContext that provides information about the + * number of Containers Queued as well as the number of Containers that were + * queued and killed. + */ + QueuingContext getQueuingContext(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 7c104d5ee69..e4036ea6f05 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -57,11 +57,13 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing.QueuingContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; @@ -170,8 +172,14 @@ public class NodeManager extends CompositeService ContainerExecutor exec, DeletionService del, NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager, LocalDirsHandlerService dirsHandler) { - return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater, - metrics, dirsHandler); + if (getConfig().getBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, + YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED_DEFAULT)) { + return new QueuingContainerManagerImpl(context, exec, del, + nodeStatusUpdater, metrics, dirsHandler); + } else { + return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater, + metrics, dirsHandler); + } } protected WebServer createWebServer(Context nmContext, @@ -461,6 +469,8 @@ public class NodeManager extends CompositeService logAggregationReportForApps; private NodeStatusUpdater nodeStatusUpdater; + private final QueuingContext queuingContext; + public NMContext(NMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager, LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager, @@ -475,6 +485,7 @@ public class NodeManager extends CompositeService this.stateStore = stateStore; this.logAggregationReportForApps = new ConcurrentLinkedQueue< LogAggregationReport>(); + this.queuingContext = new QueuingNMContext(); } /** @@ -595,8 +606,35 @@ public class NodeManager extends CompositeService public void setNodeStatusUpdater(NodeStatusUpdater nodeStatusUpdater) { this.nodeStatusUpdater = nodeStatusUpdater; } + + @Override + public QueuingContext getQueuingContext() { + return this.queuingContext; + } } + /** + * Class that keeps the context for containers queued at the NM. + */ + public static class QueuingNMContext implements Context.QueuingContext { + protected final ConcurrentMap + queuedContainers = new ConcurrentSkipListMap<>(); + + protected final ConcurrentMap + killedQueuedContainers = new ConcurrentHashMap<>(); + + @Override + public ConcurrentMap + getQueuedContainers() { + return this.queuedContainers; + } + + @Override + public ConcurrentMap + getKilledQueuedContainers() { + return this.killedQueuedContainers; + } + } /** * @return the node health checker diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index bb9b3ee4483..162823c9cdb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -160,11 +160,11 @@ public class ContainerManagerImpl extends CompositeService implements private static final Log LOG = LogFactory.getLog(ContainerManagerImpl.class); - static final String INVALID_NMTOKEN_MSG = "Invalid NMToken"; + public static final String INVALID_NMTOKEN_MSG = "Invalid NMToken"; static final String INVALID_CONTAINERTOKEN_MSG = "Invalid ContainerToken"; - final Context context; + protected final Context context; private final ContainersMonitor containersMonitor; private Server server; private final ResourceLocalizationService rsrcLocalizationSrvc; @@ -172,7 +172,7 @@ public class ContainerManagerImpl extends CompositeService implements private final AuxServices auxiliaryServices; private final NodeManagerMetrics metrics; - private final NodeStatusUpdater nodeStatusUpdater; + protected final NodeStatusUpdater nodeStatusUpdater; protected LocalDirsHandlerService dirsHandler; protected final AsyncDispatcher dispatcher; @@ -213,14 +213,13 @@ public class ContainerManagerImpl extends CompositeService implements auxiliaryServices.registerServiceListener(this); addService(auxiliaryServices); - this.containersMonitor = - new ContainersMonitorImpl(exec, dispatcher, this.context); + this.containersMonitor = createContainersMonitor(exec); addService(this.containersMonitor); dispatcher.register(ContainerEventType.class, new ContainerEventDispatcher()); dispatcher.register(ApplicationEventType.class, - new ApplicationEventDispatcher()); + createApplicationEventDispatcher()); dispatcher.register(LocalizationEventType.class, rsrcLocalizationSrvc); dispatcher.register(AuxServicesEventType.class, auxiliaryServices); dispatcher.register(ContainersMonitorEventType.class, containersMonitor); @@ -235,6 +234,7 @@ public class ContainerManagerImpl extends CompositeService implements @Override public void serviceInit(Configuration conf) throws Exception { + LogHandler logHandler = createLogHandler(conf, this.context, this.deletionService); addIfService(logHandler); @@ -276,6 +276,10 @@ public class ContainerManagerImpl extends CompositeService implements } } + protected ContainersMonitor createContainersMonitor(ContainerExecutor exec) { + return new ContainersMonitorImpl(exec, dispatcher, this.context); + } + @SuppressWarnings("unchecked") private void recover() throws IOException, URISyntaxException { NMStateStoreService stateStore = context.getNMStateStore(); @@ -417,6 +421,10 @@ public class ContainerManagerImpl extends CompositeService implements return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler, this); } + protected EventHandler createApplicationEventDispatcher() { + return new ApplicationEventDispatcher(); + } + @Override protected void serviceStart() throws Exception { @@ -801,7 +809,8 @@ public class ContainerManagerImpl extends CompositeService implements .equals(ContainerType.APPLICATION_MASTER)) { this.getAMRMProxyService().processApplicationStartRequest(request); } - + performContainerPreStartChecks(nmTokenIdentifier, request, + containerTokenIdentifier); startContainerInternal(nmTokenIdentifier, containerTokenIdentifier, request); succeededContainers.add(containerId); @@ -821,6 +830,42 @@ public class ContainerManagerImpl extends CompositeService implements } } + private void performContainerPreStartChecks( + NMTokenIdentifier nmTokenIdentifier, StartContainerRequest request, + ContainerTokenIdentifier containerTokenIdentifier) + throws YarnException, InvalidToken { + /* + * 1) It should save the NMToken into NMTokenSecretManager. This is done + * here instead of RPC layer because at the time of opening/authenticating + * the connection it doesn't know what all RPC calls user will make on it. + * Also new NMToken is issued only at startContainer (once it gets + * renewed). + * + * 2) It should validate containerToken. Need to check below things. a) It + * is signed by correct master key (part of retrieve password). b) It + * belongs to correct Node Manager (part of retrieve password). c) It has + * correct RMIdentifier. d) It is not expired. + */ + authorizeStartAndResourceIncreaseRequest( + nmTokenIdentifier, containerTokenIdentifier, true); + // update NMToken + updateNMTokenIdentifier(nmTokenIdentifier); + + ContainerLaunchContext launchContext = request.getContainerLaunchContext(); + + Map serviceData = getAuxServiceMetaData(); + if (launchContext.getServiceData()!=null && + !launchContext.getServiceData().isEmpty()) { + for (Entry meta : launchContext.getServiceData() + .entrySet()) { + if (null == serviceData.get(meta.getKey())) { + throw new InvalidAuxServiceException("The auxService:" + meta.getKey() + + " does not exist"); + } + } + } + } + private ContainerManagerApplicationProto buildAppProto(ApplicationId appId, String user, Credentials credentials, Map appAcls, @@ -863,26 +908,10 @@ public class ContainerManagerImpl extends CompositeService implements } @SuppressWarnings("unchecked") - private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier, + protected void startContainerInternal(NMTokenIdentifier nmTokenIdentifier, ContainerTokenIdentifier containerTokenIdentifier, StartContainerRequest request) throws YarnException, IOException { - /* - * 1) It should save the NMToken into NMTokenSecretManager. This is done - * here instead of RPC layer because at the time of opening/authenticating - * the connection it doesn't know what all RPC calls user will make on it. - * Also new NMToken is issued only at startContainer (once it gets renewed). - * - * 2) It should validate containerToken. Need to check below things. a) It - * is signed by correct master key (part of retrieve password). b) It - * belongs to correct Node Manager (part of retrieve password). c) It has - * correct RMIdentifier. d) It is not expired. - */ - authorizeStartAndResourceIncreaseRequest( - nmTokenIdentifier, containerTokenIdentifier, true); - // update NMToken - updateNMTokenIdentifier(nmTokenIdentifier); - ContainerId containerId = containerTokenIdentifier.getContainerID(); String containerIdStr = containerId.toString(); String user = containerTokenIdentifier.getApplicationSubmitter(); @@ -891,18 +920,6 @@ public class ContainerManagerImpl extends CompositeService implements ContainerLaunchContext launchContext = request.getContainerLaunchContext(); - Map serviceData = getAuxServiceMetaData(); - if (launchContext.getServiceData()!=null && - !launchContext.getServiceData().isEmpty()) { - for (Map.Entry meta : launchContext.getServiceData() - .entrySet()) { - if (null == serviceData.get(meta.getKey())) { - throw new InvalidAuxServiceException("The auxService:" + meta.getKey() - + " does not exist"); - } - } - } - Credentials credentials = YarnServerSecurityUtils.parseCredentials(launchContext); @@ -922,13 +939,14 @@ public class ContainerManagerImpl extends CompositeService implements this.readLock.lock(); try { - if (!serviceStopped) { + if (!isServiceStopped()) { // Create the application - Application application = - new ApplicationImpl(dispatcher, user, applicationID, credentials, context); + Application application = new ApplicationImpl(dispatcher, user, + applicationID, credentials, context); if (null == context.getApplications().putIfAbsent(applicationID, application)) { - LOG.info("Creating a new application reference for app " + applicationID); + LOG.info("Creating a new application reference for app " + + applicationID); LogAggregationContext logAggregationContext = containerTokenIdentifier.getLogAggregationContext(); Map appAcls = @@ -1146,7 +1164,9 @@ public class ContainerManagerImpl extends CompositeService implements } for (ContainerId id : requests.getContainerIds()) { try { - stopContainerInternal(identifier, id); + Container container = this.context.getContainers().get(id); + authorizeGetAndStopContainerRequest(id, container, true, identifier); + stopContainerInternal(id); succeededRequests.add(id); } catch (YarnException e) { failedRequests.put(id, SerializedException.newInstance(e)); @@ -1157,13 +1177,11 @@ public class ContainerManagerImpl extends CompositeService implements } @SuppressWarnings("unchecked") - private void stopContainerInternal(NMTokenIdentifier nmTokenIdentifier, - ContainerId containerID) throws YarnException, IOException { + protected void stopContainerInternal(ContainerId containerID) + throws YarnException, IOException { String containerIDStr = containerID.toString(); Container container = this.context.getContainers().get(containerID); LOG.info("Stopping container with container Id: " + containerIDStr); - authorizeGetAndStopContainerRequest(containerID, container, true, - nmTokenIdentifier); if (container == null) { if (!nodeStatusUpdater.isContainerRecentlyStopped(containerID)) { @@ -1210,7 +1228,7 @@ public class ContainerManagerImpl extends CompositeService implements failedRequests); } - private ContainerStatus getContainerStatusInternal(ContainerId containerID, + protected ContainerStatus getContainerStatusInternal(ContainerId containerID, NMTokenIdentifier nmTokenIdentifier) throws YarnException { String containerIDStr = containerID.toString(); Container container = this.context.getContainers().get(containerID); @@ -1406,4 +1424,7 @@ public class ContainerManagerImpl extends CompositeService implements this.amrmProxyService = amrmProxyService; } + protected boolean isServiceStopped() { + return serviceStopped; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index a493562d65d..b1ddc2ef952 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -491,7 +491,8 @@ public class ContainerImpl implements Container { this.readLock.lock(); try { return BuilderUtils.newContainerStatus(this.containerId, - getCurrentState(), diagnostics.toString(), exitCode, getResource()); + getCurrentState(), diagnostics.toString(), exitCode, getResource(), + this.containerTokenIdentifier.getExecutionType()); } finally { this.readLock.unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java index 4d69dbfbc21..1069b4fbdf1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java @@ -22,8 +22,26 @@ import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.nodemanager.ResourceView; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo; public interface ContainersMonitor extends Service, EventHandler, ResourceView { public ResourceUtilization getContainersUtilization(); + + ResourceUtilization getContainersAllocation(); + + boolean hasResourcesAvailable(ProcessTreeInfo pti); + + void increaseContainersAllocation(ProcessTreeInfo pti); + + void decreaseContainersAllocation(ProcessTreeInfo pti); + + void increaseResourceUtilization(ResourceUtilization resourceUtil, + ProcessTreeInfo pti); + + void decreaseResourceUtilization(ResourceUtilization resourceUtil, + ProcessTreeInfo pti); + + void subtractNodeResourcesFromResourceUtilization( + ResourceUtilization resourceUtil); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index 446e7a1270e..0feac3bff39 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -63,7 +63,7 @@ public class ContainersMonitorImpl extends AbstractService implements private final ContainerExecutor containerExecutor; private final Dispatcher eventDispatcher; - private final Context context; + protected final Context context; private ResourceCalculatorPlugin resourceCalculatorPlugin; private Configuration conf; private static float vmemRatio; @@ -82,6 +82,9 @@ public class ContainersMonitorImpl extends AbstractService implements private int nodeCpuPercentageForYARN; private ResourceUtilization containersUtilization; + // Tracks the aggregated allocation of the currently allocated containers + // when queuing of containers at the NMs is enabled. + private ResourceUtilization containersAllocation; private volatile boolean stopped = false; @@ -96,6 +99,7 @@ public class ContainersMonitorImpl extends AbstractService implements this.monitoringThread = new MonitoringThread(); this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f); + this.containersAllocation = ResourceUtilization.newInstance(0, 0, 0.0f); } @Override @@ -132,10 +136,11 @@ public class ContainersMonitorImpl extends AbstractService implements YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS); long configuredPMemForContainers = - NodeManagerHardwareUtils.getContainerMemoryMB(conf) * 1024 * 1024L; + NodeManagerHardwareUtils.getContainerMemoryMB( + this.resourceCalculatorPlugin, conf) * 1024 * 1024L; long configuredVCoresForContainers = - NodeManagerHardwareUtils.getVCores(conf); + NodeManagerHardwareUtils.getVCores(this.resourceCalculatorPlugin, conf); // Setting these irrespective of whether checks are enabled. Required in // the UI. @@ -233,8 +238,7 @@ public class ContainersMonitorImpl extends AbstractService implements super.serviceStop(); } - @VisibleForTesting - static class ProcessTreeInfo { + public static class ProcessTreeInfo { private ContainerId containerId; private String pid; private ResourceCalculatorProcessTree pTree; @@ -697,6 +701,82 @@ public class ContainersMonitorImpl extends AbstractService implements this.containersUtilization = utilization; } + public ResourceUtilization getContainersAllocation() { + return this.containersAllocation; + } + + /** + * @return true if there are available allocated resources for the given + * container to start. + */ + @Override + public boolean hasResourcesAvailable(ProcessTreeInfo pti) { + synchronized (this.containersAllocation) { + // Check physical memory. + if (this.containersAllocation.getPhysicalMemory() + + (int) (pti.getPmemLimit() >> 20) > + (int) (getPmemAllocatedForContainers() >> 20)) { + return false; + } + // Check virtual memory. + if (isVmemCheckEnabled() && + this.containersAllocation.getVirtualMemory() + + (int) (pti.getVmemLimit() >> 20) > + (int) (getVmemAllocatedForContainers() >> 20)) { + return false; + } + // Check CPU. + if (this.containersAllocation.getCPU() + + allocatedCpuUsage(pti) > 1.0f) { + return false; + } + } + return true; + } + + @Override + public void increaseContainersAllocation(ProcessTreeInfo pti) { + synchronized (this.containersAllocation) { + increaseResourceUtilization(this.containersAllocation, pti); + } + } + + @Override + public void decreaseContainersAllocation(ProcessTreeInfo pti) { + synchronized (this.containersAllocation) { + decreaseResourceUtilization(this.containersAllocation, pti); + } + } + + @Override + public void increaseResourceUtilization(ResourceUtilization resourceUtil, + ProcessTreeInfo pti) { + resourceUtil.addTo((int) (pti.getPmemLimit() >> 20), + (int) (pti.getVmemLimit() >> 20), allocatedCpuUsage(pti)); + } + + @Override + public void decreaseResourceUtilization(ResourceUtilization resourceUtil, + ProcessTreeInfo pti) { + resourceUtil.subtractFrom((int) (pti.getPmemLimit() >> 20), + (int) (pti.getVmemLimit() >> 20), allocatedCpuUsage(pti)); + } + + @Override + public void subtractNodeResourcesFromResourceUtilization( + ResourceUtilization resourceUtil) { + resourceUtil.subtractFrom((int) (getPmemAllocatedForContainers() >> 20), + (int) (getVmemAllocatedForContainers() >> 20), 1.0f); + } + + private float allocatedCpuUsage(ProcessTreeInfo pti) { + float cpuUsagePercentPerCore = pti.getCpuVcores() * 100.0f; + float cpuUsageTotalCoresPercentage = cpuUsagePercentPerCore + / resourceCalculatorPlugin.getNumProcessors(); + return (cpuUsageTotalCoresPercentage * 1000 * + maxVCoresAllottedForContainers / nodeCpuPercentageForYARN) / 1000.0f; + } + @Override @SuppressWarnings("unchecked") public void handle(ContainersMonitorEvent monitoringEvent) { @@ -714,40 +794,56 @@ public class ContainersMonitorImpl extends AbstractService implements switch (monitoringEvent.getType()) { case START_MONITORING_CONTAINER: - ContainerStartMonitoringEvent startEvent = - (ContainerStartMonitoringEvent) monitoringEvent; - LOG.info("Starting resource-monitoring for " + containerId); - updateContainerMetrics(monitoringEvent); - trackingContainers.put(containerId, - new ProcessTreeInfo(containerId, null, null, - startEvent.getVmemLimit(), startEvent.getPmemLimit(), - startEvent.getCpuVcores())); + onStartMonitoringContainer(monitoringEvent, containerId); break; case STOP_MONITORING_CONTAINER: - LOG.info("Stopping resource-monitoring for " + containerId); - updateContainerMetrics(monitoringEvent); - trackingContainers.remove(containerId); + onStopMonitoringContainer(monitoringEvent, containerId); break; case CHANGE_MONITORING_CONTAINER_RESOURCE: - ChangeMonitoringContainerResourceEvent changeEvent = - (ChangeMonitoringContainerResourceEvent) monitoringEvent; - ProcessTreeInfo processTreeInfo = trackingContainers.get(containerId); - if (processTreeInfo == null) { - LOG.warn("Failed to track container " - + containerId.toString() - + ". It may have already completed."); - break; - } - LOG.info("Changing resource-monitoring for " + containerId); - updateContainerMetrics(monitoringEvent); - long pmemLimit = changeEvent.getResource().getMemory() * 1024L * 1024L; - long vmemLimit = (long) (pmemLimit * vmemRatio); - int cpuVcores = changeEvent.getResource().getVirtualCores(); - processTreeInfo.setResourceLimit(pmemLimit, vmemLimit, cpuVcores); - changeContainerResource(containerId, changeEvent.getResource()); + onChangeMonitoringContainerResource(monitoringEvent, containerId); break; default: // TODO: Wrong event. } } + + protected void onChangeMonitoringContainerResource( + ContainersMonitorEvent monitoringEvent, ContainerId containerId) { + ChangeMonitoringContainerResourceEvent changeEvent = + (ChangeMonitoringContainerResourceEvent) monitoringEvent; + ProcessTreeInfo processTreeInfo = trackingContainers.get(containerId); + if (processTreeInfo == null) { + LOG.warn("Failed to track container " + + containerId.toString() + + ". It may have already completed."); + return; + } + LOG.info("Changing resource-monitoring for " + containerId); + updateContainerMetrics(monitoringEvent); + long pmemLimit = changeEvent.getResource().getMemory() * 1024L * 1024L; + long vmemLimit = (long) (pmemLimit * vmemRatio); + int cpuVcores = changeEvent.getResource().getVirtualCores(); + processTreeInfo.setResourceLimit(pmemLimit, vmemLimit, cpuVcores); + changeContainerResource(containerId, changeEvent.getResource()); + } + + protected void onStopMonitoringContainer( + ContainersMonitorEvent monitoringEvent, ContainerId containerId) { + LOG.info("Stopping resource-monitoring for " + containerId); + updateContainerMetrics(monitoringEvent); + trackingContainers.remove(containerId); + } + + protected void onStartMonitoringContainer( + ContainersMonitorEvent monitoringEvent, ContainerId containerId) { + ContainerStartMonitoringEvent startEvent = + (ContainerStartMonitoringEvent) monitoringEvent; + LOG.info("Starting resource-monitoring for " + containerId); + updateContainerMetrics(monitoringEvent); + trackingContainers.put(containerId, + new ProcessTreeInfo(containerId, null, null, + startEvent.getVmemLimit(), startEvent.getPmemLimit(), + startEvent.getCpuVcores())); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java new file mode 100644 index 00000000000..ef4e57146c7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java @@ -0,0 +1,556 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.security.NMTokenIdentifier; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo; +import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Class extending {@link ContainerManagerImpl} and is used when queuing at the + * NM is enabled. + */ +public class QueuingContainerManagerImpl extends ContainerManagerImpl { + + private static final Logger LOG = LoggerFactory + .getLogger(QueuingContainerManagerImpl.class); + + private ConcurrentMap + allocatedGuaranteedContainers; + private ConcurrentMap + allocatedOpportunisticContainers; + + private Queue queuedGuaranteedContainers; + private Queue queuedOpportunisticContainers; + + private Set opportunisticContainersToKill; + + public QueuingContainerManagerImpl(Context context, ContainerExecutor exec, + DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater, + NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) { + super(context, exec, deletionContext, nodeStatusUpdater, metrics, + dirsHandler); + this.allocatedGuaranteedContainers = new ConcurrentHashMap<>(); + this.allocatedOpportunisticContainers = new ConcurrentHashMap<>(); + this.queuedGuaranteedContainers = new ConcurrentLinkedQueue<>(); + this.queuedOpportunisticContainers = new ConcurrentLinkedQueue<>(); + this.opportunisticContainersToKill = Collections.synchronizedSet( + new HashSet()); + } + + @Override + protected EventHandler createApplicationEventDispatcher() { + return new QueuingApplicationEventDispatcher( + super.createApplicationEventDispatcher()); + } + + @Override + protected void startContainerInternal(NMTokenIdentifier nmTokenIdentifier, + ContainerTokenIdentifier containerTokenIdentifier, + StartContainerRequest request) throws YarnException, IOException { + this.context.getQueuingContext().getQueuedContainers().put( + containerTokenIdentifier.getContainerID(), containerTokenIdentifier); + + AllocatedContainerInfo allocatedContInfo = new AllocatedContainerInfo( + containerTokenIdentifier, nmTokenIdentifier, request, + containerTokenIdentifier.getExecutionType(), containerTokenIdentifier + .getResource(), getConfig()); + + // If there are already free resources for the container to start, and + // there are no queued containers waiting to be executed, start this + // container immediately. + if (queuedGuaranteedContainers.isEmpty() && + queuedOpportunisticContainers.isEmpty() && + getContainersMonitor(). + hasResourcesAvailable(allocatedContInfo.getPti())) { + startAllocatedContainer(allocatedContInfo); + } else { + if (allocatedContInfo.getExecutionType() == ExecutionType.GUARANTEED) { + queuedGuaranteedContainers.add(allocatedContInfo); + // Kill running opportunistic containers to make space for + // guaranteed container. + killOpportunisticContainers(allocatedContInfo); + } else { + queuedOpportunisticContainers.add(allocatedContInfo); + } + } + } + + @Override + protected void stopContainerInternal(ContainerId containerID) + throws YarnException, IOException { + Container container = this.context.getContainers().get(containerID); + // If container is null and distributed scheduling is enabled, container + // might be queued. Otherwise, container might not be handled by this NM. + if (container == null && this.context.getQueuingContext() + .getQueuedContainers().containsKey(containerID)) { + ContainerTokenIdentifier containerTokenId = this.context + .getQueuingContext().getQueuedContainers().remove(containerID); + + boolean foundInQueue = removeQueuedContainer(containerID, + containerTokenId.getExecutionType()); + + if (foundInQueue) { + this.context.getQueuingContext().getKilledQueuedContainers().put( + containerTokenId, + "Queued container request removed by ApplicationMaster."); + } else { + // The container started execution in the meanwhile. + try { + stopContainerInternalIfRunning(containerID); + } catch (YarnException | IOException e) { + LOG.error("Container did not get removed successfully.", e); + } + } + + nodeStatusUpdater.sendOutofBandHeartBeat(); + } + super.stopContainerInternal(containerID); + } + + /** + * Start the execution of the given container. Also add it to the allocated + * containers, and update allocated resource utilization. + */ + private void startAllocatedContainer( + AllocatedContainerInfo allocatedContainerInfo) { + ProcessTreeInfo pti = allocatedContainerInfo.getPti(); + + if (allocatedContainerInfo.getExecutionType() == + ExecutionType.GUARANTEED) { + allocatedGuaranteedContainers.put(pti.getContainerId(), + allocatedContainerInfo); + } else { + allocatedOpportunisticContainers.put(pti.getContainerId(), + allocatedContainerInfo); + } + + getContainersMonitor().increaseContainersAllocation(pti); + + // Start execution of container. + ContainerId containerId = allocatedContainerInfo + .getContainerTokenIdentifier().getContainerID(); + this.context.getQueuingContext().getQueuedContainers().remove(containerId); + try { + super.startContainerInternal( + allocatedContainerInfo.getNMTokenIdentifier(), + allocatedContainerInfo.getContainerTokenIdentifier(), + allocatedContainerInfo.getStartRequest()); + } catch (YarnException | IOException e) { + containerFailedToStart(pti.getContainerId(), + allocatedContainerInfo.getContainerTokenIdentifier()); + LOG.error("Container failed to start.", e); + } + } + + private void containerFailedToStart(ContainerId containerId, + ContainerTokenIdentifier containerTokenId) { + this.context.getQueuingContext().getQueuedContainers().remove(containerId); + + removeAllocatedContainer(containerId); + + this.context.getQueuingContext().getKilledQueuedContainers().put( + containerTokenId, + "Container removed from queue as it failed to start."); + } + + /** + * Remove the given container from the container queues. + * + * @return true if the container was found in one of the queues. + */ + private boolean removeQueuedContainer(ContainerId containerId, + ExecutionType executionType) { + Queue queue = + (executionType == ExecutionType.GUARANTEED) ? + queuedGuaranteedContainers : queuedOpportunisticContainers; + + boolean foundInQueue = false; + Iterator iter = queue.iterator(); + while (iter.hasNext() && !foundInQueue) { + if (iter.next().getPti().getContainerId().equals(containerId)) { + iter.remove(); + foundInQueue = true; + } + } + + return foundInQueue; + } + + /** + * Remove the given container from the allocated containers, and update + * allocated container utilization accordingly. + */ + private void removeAllocatedContainer(ContainerId containerId) { + AllocatedContainerInfo contToRemove = null; + + contToRemove = allocatedGuaranteedContainers.remove(containerId); + + if (contToRemove == null) { + contToRemove = allocatedOpportunisticContainers.remove(containerId); + } + + // If container was indeed running, update allocated resource utilization. + if (contToRemove != null) { + getContainersMonitor().decreaseContainersAllocation(contToRemove + .getPti()); + } + } + + /** + * Stop a container only if it is currently running. If queued, do not stop + * it. + */ + private void stopContainerInternalIfRunning(ContainerId containerID) + throws YarnException, IOException { + if (this.context.getContainers().containsKey(containerID)) { + stopContainerInternal(containerID); + } + } + + /** + * Kill opportunistic containers to free up resources for running the given + * container. + * + * @param allocatedContInfo + * the container whose execution needs to start by freeing up + * resources occupied by opportunistic containers. + */ + private void killOpportunisticContainers( + AllocatedContainerInfo allocatedContInfo) { + ContainerId containerToStartId = allocatedContInfo.getPti() + .getContainerId(); + List extraOpportContainersToKill = + pickOpportunisticContainersToKill(containerToStartId); + + // Kill the opportunistic containers that were chosen. + for (ContainerId contIdToKill : extraOpportContainersToKill) { + try { + stopContainerInternalIfRunning(contIdToKill); + } catch (YarnException | IOException e) { + LOG.error("Container did not get removed successfully.", e); + } + LOG.info( + "Opportunistic container {} will be killed in order to start the " + + "execution of guaranteed container {}.", + contIdToKill, containerToStartId); + } + } + + /** + * Choose the opportunistic containers to kill in order to free up resources + * for running the given container. + * + * @param containerToStartId + * the container whose execution needs to start by freeing up + * resources occupied by opportunistic containers. + * @return the additional opportunistic containers that need to be killed. + */ + protected List pickOpportunisticContainersToKill( + ContainerId containerToStartId) { + // The additional opportunistic containers that need to be killed for the + // given container to start. + List extraOpportContainersToKill = new ArrayList<>(); + // Track resources that need to be freed. + ResourceUtilization resourcesToFreeUp = resourcesToFreeUp( + containerToStartId); + + // Go over the running opportunistic containers. Avoid containers that have + // already been marked for killing. + boolean hasSufficientResources = false; + for (Map.Entry runningOpportCont : + allocatedOpportunisticContainers.entrySet()) { + ContainerId runningOpportContId = runningOpportCont.getKey(); + + // If there are sufficient resources to execute the given container, do + // not kill more opportunistic containers. + if (resourcesToFreeUp.getPhysicalMemory() <= 0 && + resourcesToFreeUp.getVirtualMemory() <= 0 && + resourcesToFreeUp.getCPU() <= 0.0f) { + hasSufficientResources = true; + break; + } + + if (!opportunisticContainersToKill.contains(runningOpportContId)) { + extraOpportContainersToKill.add(runningOpportContId); + opportunisticContainersToKill.add(runningOpportContId); + getContainersMonitor().decreaseResourceUtilization(resourcesToFreeUp, + runningOpportCont.getValue().getPti()); + } + } + + if (!hasSufficientResources) { + LOG.info( + "There are no sufficient resources to start guaranteed {} even after " + + "attempting to kill any running opportunistic containers.", + containerToStartId); + } + + return extraOpportContainersToKill; + } + + /** + * Calculates the amount of resources that need to be freed up (by killing + * opportunistic containers) in order for the given guaranteed container to + * start its execution. Resource allocation to be freed up = + * containersAllocation - + * allocation of opportunisticContainersToKill + + * allocation of queuedGuaranteedContainers that will start + * before the given container + + * allocation of given container - + * total resources of node. + * + * @param containerToStartId + * the ContainerId of the guaranteed container for which we need to + * free resources, so that its execution can start. + * @return the resources that need to be freed up for the given guaranteed + * container to start. + */ + private ResourceUtilization resourcesToFreeUp( + ContainerId containerToStartId) { + // Get allocation of currently allocated containers. + ResourceUtilization resourceAllocationToFreeUp = ResourceUtilization + .newInstance(getContainersMonitor().getContainersAllocation()); + + // Subtract from the allocation the allocation of the opportunistic + // containers that are marked for killing. + for (ContainerId opportContId : opportunisticContainersToKill) { + if (allocatedOpportunisticContainers.containsKey(opportContId)) { + getContainersMonitor().decreaseResourceUtilization( + resourceAllocationToFreeUp, + allocatedOpportunisticContainers.get(opportContId).getPti()); + } + } + // Add to the allocation the allocation of the pending guaranteed + // containers that will start before the current container will be started. + for (AllocatedContainerInfo guarContInfo : queuedGuaranteedContainers) { + getContainersMonitor().increaseResourceUtilization( + resourceAllocationToFreeUp, guarContInfo.getPti()); + if (guarContInfo.getPti().getContainerId().equals(containerToStartId)) { + break; + } + } + // Subtract the overall node resources. + getContainersMonitor().subtractNodeResourcesFromResourceUtilization( + resourceAllocationToFreeUp); + + return resourceAllocationToFreeUp; + } + + /** + * If there are available resources, try to start as many pending containers + * as possible. + */ + private void startPendingContainers() { + // Start pending guaranteed containers, if resources available. + boolean resourcesAvailable = + startContainersFromQueue(queuedGuaranteedContainers); + + // Start opportunistic containers, if resources available. + if (resourcesAvailable) { + startContainersFromQueue(queuedOpportunisticContainers); + } + } + + private boolean startContainersFromQueue( + Queue queuedContainers) { + Iterator guarIter = queuedContainers.iterator(); + boolean resourcesAvailable = true; + + while (guarIter.hasNext() && resourcesAvailable) { + AllocatedContainerInfo allocatedContInfo = guarIter.next(); + + if (getContainersMonitor().hasResourcesAvailable( + allocatedContInfo.getPti())) { + startAllocatedContainer(allocatedContInfo); + guarIter.remove(); + } else { + resourcesAvailable = false; + } + } + return resourcesAvailable; + } + + @Override + protected ContainerStatus getContainerStatusInternal(ContainerId containerID, + NMTokenIdentifier nmTokenIdentifier) throws YarnException { + Container container = this.context.getContainers().get(containerID); + if (container == null) { + ContainerTokenIdentifier containerTokenId = this.context + .getQueuingContext().getQueuedContainers().get(containerID); + if (containerTokenId != null) { + ExecutionType executionType = this.context.getQueuingContext() + .getQueuedContainers().get(containerID).getExecutionType(); + return BuilderUtils.newContainerStatus(containerID, + org.apache.hadoop.yarn.api.records.ContainerState.QUEUED, "", + ContainerExitStatus.INVALID, this.context.getQueuingContext() + .getQueuedContainers().get(containerID).getResource(), + executionType); + } + } + return super.getContainerStatusInternal(containerID, nmTokenIdentifier); + } + + @VisibleForTesting + public int getNumAllocatedGuaranteedContainers() { + return allocatedGuaranteedContainers.size(); + } + + @VisibleForTesting + public int getNumAllocatedOpportunisticContainers() { + return allocatedOpportunisticContainers.size(); + } + + class QueuingApplicationEventDispatcher implements + EventHandler { + private EventHandler applicationEventDispatcher; + + public QueuingApplicationEventDispatcher( + EventHandler applicationEventDispatcher) { + this.applicationEventDispatcher = applicationEventDispatcher; + } + + @Override + @SuppressWarnings("unchecked") + public void handle(ApplicationEvent event) { + if (event.getType() == + ApplicationEventType.APPLICATION_CONTAINER_FINISHED) { + if (!(event instanceof ApplicationContainerFinishedEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + ApplicationContainerFinishedEvent finishEvent = + (ApplicationContainerFinishedEvent) event; + // Remove finished container from the allocated containers, and + // attempt to start new containers. + ContainerId contIdToRemove = finishEvent.getContainerID(); + removeAllocatedContainer(contIdToRemove); + opportunisticContainersToKill.remove(contIdToRemove); + startPendingContainers(); + } + this.applicationEventDispatcher.handle(event); + } + } + + static class AllocatedContainerInfo { + private final ContainerTokenIdentifier containerTokenIdentifier; + private final NMTokenIdentifier nmTokenIdentifier; + private final StartContainerRequest startRequest; + private final ExecutionType executionType; + private final ProcessTreeInfo pti; + + AllocatedContainerInfo(ContainerTokenIdentifier containerTokenIdentifier, + NMTokenIdentifier nmTokenIdentifier, StartContainerRequest startRequest, + ExecutionType executionType, Resource resource, Configuration conf) { + this.containerTokenIdentifier = containerTokenIdentifier; + this.nmTokenIdentifier = nmTokenIdentifier; + this.startRequest = startRequest; + this.executionType = executionType; + this.pti = createProcessTreeInfo(containerTokenIdentifier + .getContainerID(), resource, conf); + } + + private ContainerTokenIdentifier getContainerTokenIdentifier() { + return this.containerTokenIdentifier; + } + + private NMTokenIdentifier getNMTokenIdentifier() { + return this.nmTokenIdentifier; + } + + private StartContainerRequest getStartRequest() { + return this.startRequest; + } + + private ExecutionType getExecutionType() { + return this.executionType; + } + + protected ProcessTreeInfo getPti() { + return this.pti; + } + + private ProcessTreeInfo createProcessTreeInfo(ContainerId containerId, + Resource resource, Configuration conf) { + long pmemBytes = resource.getMemory() * 1024 * 1024L; + float pmemRatio = conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO, + YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); + long vmemBytes = (long) (pmemRatio * pmemBytes); + int cpuVcores = resource.getVirtualCores(); + + return new ProcessTreeInfo(containerId, null, null, vmemBytes, pmemBytes, + cpuVcores); + } + + @Override + public boolean equals(Object obj) { + boolean equal = false; + if (obj instanceof AllocatedContainerInfo) { + AllocatedContainerInfo otherContInfo = (AllocatedContainerInfo) obj; + equal = this.getPti().getContainerId() + .equals(otherContInfo.getPti().getContainerId()); + } + return equal; + } + + @Override + public int hashCode() { + return this.getPti().getContainerId().hashCode(); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/package-info.java new file mode 100644 index 00000000000..0250807bc0a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +/** + * This package contains classes related to the queuing of containers at + * the NM. + * + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index fa8d131c6b2..2fcce1d38ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -41,7 +41,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index 1aea9d27635..6c904ebef39 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -683,5 +683,10 @@ public abstract class BaseAMRMProxyTest { public NodeStatusUpdater getNodeStatusUpdater() { return null; } + + @Override + public QueuingContext getQueuingContext() { + return null; + } } -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 787778ef14d..f37129e40d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -280,21 +280,22 @@ public abstract class BaseContainerManagerTest { list.add(containerID); GetContainerStatusesRequest request = GetContainerStatusesRequest.newInstance(list); - ContainerStatus containerStatus = - containerManager.getContainerStatuses(request).getContainerStatuses() - .get(0); + ContainerStatus containerStatus = null; int timeoutSecs = 0; - while (!containerStatus.getState().equals(finalState) - && timeoutSecs++ < timeOutMax) { - Thread.sleep(1000); - LOG.info("Waiting for container to get into state " + finalState - + ". Current state is " + containerStatus.getState()); - containerStatus = containerManager.getContainerStatuses(request).getContainerStatuses().get(0); - } - LOG.info("Container state is " + containerStatus.getState()); - Assert.assertEquals("ContainerState is not correct (timedout)", - finalState, containerStatus.getState()); - } + do { + Thread.sleep(2000); + containerStatus = + containerManager.getContainerStatuses(request) + .getContainerStatuses().get(0); + LOG.info("Waiting for container to get into state " + finalState + + ". Current state is " + containerStatus.getState()); + timeoutSecs += 2; + } while (!containerStatus.getState().equals(finalState) + && timeoutSecs < timeOutMax); + LOG.info("Container state is " + containerStatus.getState()); + Assert.assertEquals("ContainerState is not correct (timedout)", + finalState, containerStatus.getState()); + } static void waitForApplicationState(ContainerManagerImpl containerManager, ApplicationId appID, ApplicationState finalState) @@ -328,19 +329,24 @@ public abstract class BaseContainerManagerTest { org.apache.hadoop.yarn.server.nodemanager.containermanager .container.ContainerState finalState, int timeOutMax) throws InterruptedException, YarnException, IOException { - Container container = - containerManager.getContext().getContainers().get(containerID); + Container container = null; org.apache.hadoop.yarn.server.nodemanager - .containermanager.container.ContainerState currentState = - container.getContainerState(); + .containermanager.container.ContainerState currentState = null; int timeoutSecs = 0; - while (!currentState.equals(finalState) - && timeoutSecs++ < timeOutMax) { - Thread.sleep(1000); - LOG.info("Waiting for NM container to get into state " + finalState - + ". Current state is " + currentState); - currentState = container.getContainerState(); - } + do { + Thread.sleep(2000); + container = + containerManager.getContext().getContainers().get(containerID); + if (container != null) { + currentState = container.getContainerState(); + } + if (currentState != null) { + LOG.info("Waiting for NM container to get into state " + finalState + + ". Current state is " + currentState); + } + timeoutSecs += 2; + } while (!currentState.equals(finalState) + && timeoutSecs++ < timeOutMax); LOG.info("Container state is " + currentState); Assert.assertEquals("ContainerState is not correct (timedout)", finalState, currentState); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index 3f5fc825c59..702198e9a9a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; @@ -74,6 +75,7 @@ import org.apache.hadoop.yarn.exceptions.InvalidContainerException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.NMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent; @@ -176,7 +178,7 @@ public class TestContainerManager extends BaseContainerManagerTest { // Just do a query for a non-existing container. boolean throwsException = false; try { - List containerIds = new ArrayList(); + List containerIds = new ArrayList<>(); ContainerId id =createContainerId(0); containerIds.add(id); GetContainerStatusesRequest request = @@ -231,14 +233,14 @@ public class TestContainerManager extends BaseContainerManagerTest { containerLaunchContext, createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(), user, context.getContainerTokenSecretManager())); - List list = new ArrayList(); + List list = new ArrayList<>(); list.add(scRequest); StartContainersRequest allRequests = StartContainersRequest.newInstance(list); containerManager.startContainers(allRequests); BaseContainerManagerTest.waitForContainerState(containerManager, cId, - ContainerState.COMPLETE); + ContainerState.COMPLETE, 40); // Now ascertain that the resources are localised correctly. ApplicationId appId = cId.getApplicationAttemptId().getApplicationId(); @@ -323,7 +325,7 @@ public class TestContainerManager extends BaseContainerManagerTest { createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(), user, context.getContainerTokenSecretManager())); - List list = new ArrayList(); + List list = new ArrayList<>(); list.add(scRequest); StartContainersRequest allRequests = StartContainersRequest.newInstance(list); @@ -355,7 +357,7 @@ public class TestContainerManager extends BaseContainerManagerTest { Assert.assertTrue("Process is not alive!", DefaultContainerExecutor.containerIsAlive(pid)); - List containerIds = new ArrayList(); + List containerIds = new ArrayList<>(); containerIds.add(cId); StopContainersRequest stopRequest = StopContainersRequest.newInstance(containerIds); @@ -375,7 +377,7 @@ public class TestContainerManager extends BaseContainerManagerTest { DefaultContainerExecutor.containerIsAlive(pid)); } - private void testContainerLaunchAndExit(int exitCode) throws IOException, + protected void testContainerLaunchAndExit(int exitCode) throws IOException, InterruptedException, YarnException { File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile"); @@ -430,7 +432,7 @@ public class TestContainerManager extends BaseContainerManagerTest { containerLaunchContext, createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(), user, context.getContainerTokenSecretManager())); - List list = new ArrayList(); + List list = new ArrayList<>(); list.add(scRequest); StartContainersRequest allRequests = StartContainersRequest.newInstance(list); @@ -439,12 +441,12 @@ public class TestContainerManager extends BaseContainerManagerTest { BaseContainerManagerTest.waitForContainerState(containerManager, cId, ContainerState.COMPLETE); - List containerIds = new ArrayList(); + List containerIds = new ArrayList<>(); containerIds.add(cId); GetContainerStatusesRequest gcsRequest = GetContainerStatusesRequest.newInstance(containerIds); - ContainerStatus containerStatus = - containerManager.getContainerStatuses(gcsRequest).getContainerStatuses().get(0); + ContainerStatus containerStatus = containerManager. + getContainerStatuses(gcsRequest).getContainerStatuses().get(0); // Verify exit status matches exit state of script Assert.assertEquals(exitCode, @@ -520,7 +522,7 @@ public class TestContainerManager extends BaseContainerManagerTest { containerLaunchContext, createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(), user, context.getContainerTokenSecretManager())); - List list = new ArrayList(); + List list = new ArrayList<>(); list.add(scRequest); StartContainersRequest allRequests = StartContainersRequest.newInstance(list); @@ -605,7 +607,7 @@ public class TestContainerManager extends BaseContainerManagerTest { createContainerToken(cId1, ResourceManagerConstants.RM_INVALID_IDENTIFIER, context.getNodeId(), user, context.getContainerTokenSecretManager())); - List list = new ArrayList(); + List list = new ArrayList<>(); list.add(startRequest1); StartContainersRequest allRequests = StartContainersRequest.newInstance(list); @@ -635,7 +637,7 @@ public class TestContainerManager extends BaseContainerManagerTest { createContainerToken(cId2, DUMMY_RM_IDENTIFIER, context.getNodeId(), user, context.getContainerTokenSecretManager())); - List list2 = new ArrayList(); + List list2 = new ArrayList<>(); list.add(startRequest2); StartContainersRequest allRequests2 = StartContainersRequest.newInstance(list2); @@ -655,7 +657,7 @@ public class TestContainerManager extends BaseContainerManagerTest { public void testMultipleContainersLaunch() throws Exception { containerManager.start(); - List list = new ArrayList(); + List list = new ArrayList<>(); ContainerLaunchContext containerLaunchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class); for (int i = 0; i < 10; i++) { @@ -679,6 +681,7 @@ public class TestContainerManager extends BaseContainerManagerTest { StartContainersResponse response = containerManager.startContainers(requestList); + Thread.sleep(5000); Assert.assertEquals(5, response.getSuccessfullyStartedContainers().size()); for (ContainerId id : response.getSuccessfullyStartedContainers()) { @@ -699,12 +702,11 @@ public class TestContainerManager extends BaseContainerManagerTest { @Test public void testMultipleContainersStopAndGetStatus() throws Exception { containerManager.start(); - List startRequest = - new ArrayList(); + List startRequest = new ArrayList<>(); ContainerLaunchContext containerLaunchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class); - List containerIds = new ArrayList(); + List containerIds = new ArrayList<>(); for (int i = 0; i < 10; i++) { ContainerId cId = createContainerId(i); String user = null; @@ -727,6 +729,7 @@ public class TestContainerManager extends BaseContainerManagerTest { StartContainersRequest requestList = StartContainersRequest.newInstance(startRequest); containerManager.startContainers(requestList); + Thread.sleep(5000); // Get container statuses GetContainerStatusesRequest statusRequest = @@ -777,8 +780,7 @@ public class TestContainerManager extends BaseContainerManagerTest { ServiceA.class, Service.class); containerManager.start(); - List startRequest = - new ArrayList(); + List startRequest = new ArrayList<>(); ContainerLaunchContext containerLaunchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class); @@ -803,8 +805,8 @@ public class TestContainerManager extends BaseContainerManagerTest { StartContainersResponse response = containerManager.startContainers(requestList); - Assert.assertTrue(response.getFailedRequests().size() == 1); - Assert.assertTrue(response.getSuccessfullyStartedContainers().size() == 0); + Assert.assertEquals(1, response.getFailedRequests().size()); + Assert.assertEquals(0, response.getSuccessfullyStartedContainers().size()); Assert.assertTrue(response.getFailedRequests().containsKey(cId)); Assert.assertTrue(response.getFailedRequests().get(cId).getMessage() .contains("The auxService:" + serviceName + " does not exist")); @@ -880,8 +882,7 @@ public class TestContainerManager extends BaseContainerManagerTest { ContainerManagerImpl.INVALID_NMTOKEN_MSG); Mockito.doNothing().when(spyContainerMgr).authorizeUser(ugInfo, null); - List reqList - = new ArrayList(); + List reqList = new ArrayList<>(); reqList.add(StartContainerRequest.newInstance(null, null)); StartContainersRequest reqs = new StartContainersRequestPBImpl(); reqs.setStartContainerRequests(reqList); @@ -925,7 +926,7 @@ public class TestContainerManager extends BaseContainerManagerTest { Thread.sleep(2000); // Construct container resource increase request, - List increaseTokens = new ArrayList(); + List increaseTokens = new ArrayList<>(); // Add increase request for container-0, the request will fail as the // container will have exited, and won't be in RUNNING state ContainerId cId0 = createContainerId(0); @@ -1012,7 +1013,7 @@ public class TestContainerManager extends BaseContainerManagerTest { containerLaunchContext, createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(), user, context.getContainerTokenSecretManager())); - List list = new ArrayList(); + List list = new ArrayList<>(); list.add(scRequest); StartContainersRequest allRequests = StartContainersRequest.newInstance(list); @@ -1022,7 +1023,7 @@ public class TestContainerManager extends BaseContainerManagerTest { org.apache.hadoop.yarn.server.nodemanager. containermanager.container.ContainerState.RUNNING); // Construct container resource increase request, - List increaseTokens = new ArrayList(); + List increaseTokens = new ArrayList<>(); // Add increase request. The increase request should fail // as the current resource does not fit in the target resource Token containerToken = @@ -1096,7 +1097,7 @@ public class TestContainerManager extends BaseContainerManagerTest { createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(), user, context.getContainerTokenSecretManager())); - List list = new ArrayList(); + List list = new ArrayList<>(); list.add(scRequest); StartContainersRequest allRequests = StartContainersRequest.newInstance(list); @@ -1106,7 +1107,7 @@ public class TestContainerManager extends BaseContainerManagerTest { org.apache.hadoop.yarn.server.nodemanager. containermanager.container.ContainerState.RUNNING); // Construct container resource increase request, - List increaseTokens = new ArrayList(); + List increaseTokens = new ArrayList<>(); // Add increase request. Resource targetResource = Resource.newInstance(4096, 2); Token containerToken = createContainerToken(cId, DUMMY_RM_IDENTIFIER, @@ -1184,6 +1185,21 @@ public class TestContainerManager extends BaseContainerManagerTest { containerTokenIdentifier); } + public static Token createContainerToken(ContainerId cId, long rmIdentifier, + NodeId nodeId, String user, Resource resource, + NMContainerTokenSecretManager containerTokenSecretManager, + LogAggregationContext logAggregationContext, ExecutionType executionType) + throws IOException { + ContainerTokenIdentifier containerTokenIdentifier = + new ContainerTokenIdentifier(cId, nodeId.toString(), user, resource, + System.currentTimeMillis() + 100000L, 123, rmIdentifier, + Priority.newInstance(0), 0, logAggregationContext, null, + ContainerType.TASK, executionType); + return BuilderUtils.newContainerToken(nodeId, containerTokenSecretManager + .retrievePassword(containerTokenIdentifier), + containerTokenIdentifier); + } + @Test public void testOutputThreadDumpSignal() throws IOException, InterruptedException, YarnException { @@ -1241,7 +1257,7 @@ public class TestContainerManager extends BaseContainerManagerTest { new HashMap(); localResources.put(destinationFile, rsrc_alpha); containerLaunchContext.setLocalResources(localResources); - List commands = new ArrayList(); + List commands = new ArrayList<>(); commands.add("/bin/bash"); commands.add(scriptFile.getAbsolutePath()); containerLaunchContext.setCommands(commands); @@ -1250,7 +1266,7 @@ public class TestContainerManager extends BaseContainerManagerTest { containerLaunchContext, createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(), user, context.getContainerTokenSecretManager())); - List list = new ArrayList(); + List list = new ArrayList<>(); list.add(scRequest); StartContainersRequest allRequests = StartContainersRequest.newInstance(list); @@ -1267,7 +1283,7 @@ public class TestContainerManager extends BaseContainerManagerTest { // Simulate NodeStatusUpdaterImpl sending CMgrSignalContainersEvent SignalContainerRequest signalReq = SignalContainerRequest.newInstance(cId, command); - List reqs = new ArrayList(); + List reqs = new ArrayList<>(); reqs.add(signalReq); containerManager.handle(new CMgrSignalContainersEvent(reqs)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java index bbde9ed7c59..0dc5c5b3ce7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java @@ -22,6 +22,10 @@ import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; public class MockResourceCalculatorPlugin extends ResourceCalculatorPlugin { + public MockResourceCalculatorPlugin() { + super(null); + } + @Override public long getVirtualMemorySize() { return 0; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java index d7f89fc0b0e..1a0c690a7e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -173,8 +174,8 @@ public class TestContainersMonitorResourceChange { assertTrue(containerEventHandler .isContainerKilled(getContainerId(1))); // create container 2 - containersMonitor.handle(new ContainerStartMonitoringEvent( - getContainerId(2), 2202009L, 1048576L, 1, 0, 0)); + containersMonitor.handle(new ContainerStartMonitoringEvent(getContainerId( + 2), 2202009L, 1048576L, 1, 0, 0)); // verify that this container is properly tracked assertNotNull(getProcessTreeInfo(getContainerId(2))); assertEquals(1048576L, getProcessTreeInfo(getContainerId(2)) @@ -215,8 +216,8 @@ public class TestContainersMonitorResourceChange { // now waiting for the next monitor cycle Thread.sleep(1000); // create a container with id 3 - containersMonitor.handle(new ContainerStartMonitoringEvent( - getContainerId(3), 2202009L, 1048576L, 1, 0, 0)); + containersMonitor.handle(new ContainerStartMonitoringEvent(getContainerId( + 3), 2202009L, 1048576L, 1, 0, 0)); // Verify that this container has been tracked assertNotNull(getProcessTreeInfo(getContainerId(3))); // trigger a change resource event, check limit after change diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java new file mode 100644 index 00000000000..0d951f4eb6e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java @@ -0,0 +1,301 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing; + +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.NMTokenIdentifier; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; + +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; + +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor + .ContainersMonitorImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.MockResourceCalculatorPlugin; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.MockResourceCalculatorProcessTree; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.junit.Assert; +import org.junit.Test; + + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TestQueuingContainerManager extends TestContainerManager { + + interface HasResources { + boolean decide(Context context, ContainerId cId); + } + + public TestQueuingContainerManager() throws UnsupportedFileSystemException { + super(); + } + + static { + LOG = LogFactory.getLog(TestQueuingContainerManager.class); + } + + HasResources hasResources = null; + boolean shouldDeleteWait = false; + + @Override + protected ContainerManagerImpl + createContainerManager(DeletionService delSrvc) { + return new QueuingContainerManagerImpl(context, exec, delSrvc, + nodeStatusUpdater, metrics, dirsHandler) { + + @Override + public void serviceInit(Configuration conf) throws Exception { + conf.set( + YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR, + MockResourceCalculatorPlugin.class.getCanonicalName()); + conf.set( + YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE, + MockResourceCalculatorProcessTree.class.getCanonicalName()); + super.serviceInit(conf); + } + + @Override + public void + setBlockNewContainerRequests(boolean blockNewContainerRequests) { + // do nothing + } + + @Override + protected UserGroupInformation getRemoteUgi() throws YarnException { + ApplicationId appId = ApplicationId.newInstance(0, 0); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + UserGroupInformation ugi = + UserGroupInformation.createRemoteUser(appAttemptId.toString()); + ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context + .getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey() + .getKeyId())); + return ugi; + } + + @Override + protected void authorizeGetAndStopContainerRequest(ContainerId containerId, + Container container, boolean stopRequest, NMTokenIdentifier identifier) throws YarnException { + if(container == null || container.getUser().equals("Fail")){ + throw new YarnException("Reject this container"); + } + } + + @Override + protected ContainersMonitor createContainersMonitor(ContainerExecutor + exec) { + return new ContainersMonitorImpl(exec, dispatcher, this.context) { + @Override + public boolean hasResourcesAvailable( + ContainersMonitorImpl.ProcessTreeInfo pti) { + return hasResources.decide(this.context, pti.getContainerId()); + } + }; + } + }; + } + + @Override + protected DeletionService createDeletionService() { + return new DeletionService(exec) { + @Override + public void delete(String user, Path subDir, Path... baseDirs) { + // Don't do any deletions. + if (shouldDeleteWait) { + try { + Thread.sleep(10000); + LOG.info("\n\nSleeping Pseudo delete : user - " + user + ", " + + "subDir - " + subDir + ", " + + "baseDirs - " + Arrays.asList(baseDirs)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } else { + LOG.info("\n\nPseudo delete : user - " + user + ", " + + "subDir - " + subDir + ", " + + "baseDirs - " + Arrays.asList(baseDirs)); + } + } + }; + } + + @Override + public void setup() throws IOException { + super.setup(); + shouldDeleteWait = false; + hasResources = new HasResources() { + @Override + public boolean decide(Context context, ContainerId cId) { + return true; + } + }; + } + + /** + * Test to verify that an OPPORTUNISTIC container is killed when + * a GUARANTEED container arrives and all the Node Resources are used up + * + * For this specific test case, 4 containers are requested (last one being + * guaranteed). Assumptions : + * 1) The first OPPORTUNISTIC Container will start running + * 2) The second and third OPP containers will be queued + * 3) When the GUARANTEED container comes in, the running OPP container + * will be killed to make room + * 4) After the GUARANTEED container finishes, the remaining 2 OPP + * containers will be dequeued and run. + * 5) Only the first OPP container will be killed. + * + * @throws Exception + */ + @Test + public void testSimpleOpportunisticContainer() throws Exception { + shouldDeleteWait = true; + containerManager.start(); + + // ////// Create the resources for the container + File dir = new File(tmpDir, "dir"); + dir.mkdirs(); + File file = new File(dir, "file"); + PrintWriter fileWriter = new PrintWriter(file); + fileWriter.write("Hello World!"); + fileWriter.close(); + + // ////// Construct the container-spec. + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + URL resource_alpha = + ConverterUtils.getYarnUrlFromPath(localFS + .makeQualified(new Path(file.getAbsolutePath()))); + LocalResource rsrc_alpha = + recordFactory.newRecordInstance(LocalResource.class); + rsrc_alpha.setResource(resource_alpha); + rsrc_alpha.setSize(-1); + rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); + rsrc_alpha.setType(LocalResourceType.FILE); + rsrc_alpha.setTimestamp(file.lastModified()); + String destinationFile = "dest_file"; + Map localResources = + new HashMap(); + localResources.put(destinationFile, rsrc_alpha); + containerLaunchContext.setLocalResources(localResources); + + // Start 3 OPPORTUNISTIC containers and 1 GUARANTEED container + List list = new ArrayList<>(); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(1024, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(1024, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(1024, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + // GUARANTEED + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(3), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, context.getContainerTokenSecretManager()))); + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + + // Plugin to simulate that the Node is full + // It only allows 1 container to run at a time. + hasResources = new HasResources() { + @Override + public boolean decide(Context context, ContainerId cId) { + int nOpp = ((QueuingContainerManagerImpl) containerManager) + .getNumAllocatedOpportunisticContainers(); + int nGuar = ((QueuingContainerManagerImpl) containerManager) + .getNumAllocatedGuaranteedContainers(); + boolean val = (nOpp + nGuar < 1); + System.out.println("\nHasResources : [" + cId + "]," + + "Opp[" + nOpp + "], Guar[" + nGuar + "], [" + val + "]\n"); + return val; + } + }; + + containerManager.startContainers(allRequests); + + BaseContainerManagerTest.waitForContainerState(containerManager, + createContainerId(3), + ContainerState.COMPLETE, 40); + List statList = new ArrayList(); + for (int i = 0; i < 4; i++) { + statList.add(createContainerId(i)); + } + GetContainerStatusesRequest statRequest = + GetContainerStatusesRequest.newInstance(statList); + List containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + for (ContainerStatus status : containerStatuses) { + // Ensure that the first opportunistic container is killed + if (status.getContainerId().equals(createContainerId(0))) { + Assert.assertTrue(status.getDiagnostics() + .contains("Container killed by the ApplicationMaster")); + } + System.out.println("\nStatus : [" + status + "]\n"); + } + } +}