diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java index dfe85f27c04..021863b5e42 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java @@ -91,6 +91,8 @@ public class TestMROpportunisticMaps { Configuration conf = new Configuration(); // Start the mini-MR and mini-DFS clusters conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); + conf.setBoolean(YarnConfiguration. + OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true); conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true); dfsCluster = new MiniDFSCluster.Builder(conf) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 6c921cd839b..d1f410bfca2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -306,55 +306,60 @@ public class YarnConfiguration extends Configuration { YARN_PREFIX + "distributed-scheduling.enabled"; public static final boolean DIST_SCHEDULING_ENABLED_DEFAULT = false; - /** Minimum memory (in MB) used for allocating a container through distributed - * scheduling. */ - public static final String DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB = - YARN_PREFIX + "distributed-scheduling.min-container-memory-mb"; - public static final int DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB_DEFAULT = 512; + /** Setting that controls whether opportunistic container allocation + * is enabled or not. */ + public static final String OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED = + YARN_PREFIX + "opportunistic-container-allocation.enabled"; + public static final boolean + OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED_DEFAULT = false; - /** Minimum virtual CPU cores used for allocating a container through - * distributed scheduling. */ - public static final String DIST_SCHEDULING_MIN_CONTAINER_VCORES = - YARN_PREFIX + "distributed-scheduling.min-container-vcores"; - public static final int DIST_SCHEDULING_MIN_CONTAINER_VCORES_DEFAULT = 1; + /** Minimum memory (in MB) used for allocating an opportunistic container. */ + public static final String OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB = + YARN_PREFIX + "opportunistic-containers.min-memory-mb"; + public static final int OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB_DEFAULT = 512; - /** Maximum memory (in MB) used for allocating a container through distributed - * scheduling. */ - public static final String DIST_SCHEDULING_MAX_MEMORY_MB = - YARN_PREFIX + "distributed-scheduling.max-container-memory-mb"; - public static final int DIST_SCHEDULING_MAX_MEMORY_MB_DEFAULT = 2048; + /** Minimum virtual CPU cores used for allocating an opportunistic container. + * */ + public static final String OPPORTUNISTIC_CONTAINERS_MIN_VCORES = + YARN_PREFIX + "opportunistic-containers.min-vcores"; + public static final int OPPORTUNISTIC_CONTAINERS_MIN_VCORES_DEFAULT = 1; - /** Maximum virtual CPU cores used for allocating a container through - * distributed scheduling. */ - public static final String DIST_SCHEDULING_MAX_CONTAINER_VCORES = - YARN_PREFIX + "distributed-scheduling.max-container-vcores"; - public static final int DIST_SCHEDULING_MAX_CONTAINER_VCORES_DEFAULT = 4; + /** Maximum memory (in MB) used for allocating an opportunistic container. */ + public static final String OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB = + YARN_PREFIX + "opportunistic-containers.max-memory-mb"; + public static final int OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB_DEFAULT = 2048; - /** Incremental memory (in MB) used for allocating a container through - * distributed scheduling. */ - public static final String DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB = - YARN_PREFIX + "distributed-scheduling.incr-container-memory-mb"; - public static final int DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB_DEFAULT = + /** Maximum virtual CPU cores used for allocating an opportunistic container. + * */ + public static final String OPPORTUNISTIC_CONTAINERS_MAX_VCORES = + YARN_PREFIX + "opportunistic-containers.max-vcores"; + public static final int OPPORTUNISTIC_CONTAINERS_MAX_VCORES_DEFAULT = 4; + + /** Incremental memory (in MB) used for allocating an opportunistic container. + * */ + public static final String OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB = + YARN_PREFIX + "opportunistic-containers.incr-memory-mb"; + public static final int OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB_DEFAULT = 512; - /** Incremental virtual CPU cores used for allocating a container through - * distributed scheduling. */ - public static final String DIST_SCHEDULING_INCR_CONTAINER_VCORES = - YARN_PREFIX + "distributed-scheduling.incr-vcores"; - public static final int DIST_SCHEDULING_INCR_CONTAINER_VCORES_DEFAULT = 1; + /** Incremental virtual CPU cores used for allocating an opportunistic + * container. */ + public static final String OPPORTUNISTIC_CONTAINERS_INCR_VCORES = + YARN_PREFIX + "opportunistic-containers.incr-vcores"; + public static final int OPPORTUNISTIC_CONTAINERS_INCR_VCORES_DEFAULT = 1; - /** Container token expiry for container allocated via distributed - * scheduling. */ - public static final String DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS = - YARN_PREFIX + "distributed-scheduling.container-token-expiry-ms"; - public static final int DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT = + /** Container token expiry for opportunistic containers. */ + public static final String OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS = + YARN_PREFIX + "opportunistic-containers.container-token-expiry-ms"; + public static final int OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS_DEFAULT = 600000; - /** Number of nodes to be used by the LocalScheduler of a NodeManager for - * dispatching containers during distributed scheduling. */ - public static final String DIST_SCHEDULING_NODES_NUMBER_USED = - YARN_PREFIX + "distributed-scheduling.nodes-used"; - public static final int DIST_SCHEDULING_NODES_NUMBER_USED_DEFAULT = 10; + /** Number of nodes to be used by the Opportunistic Container allocator for + * dispatching containers during container allocation. */ + public static final String OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED = + YARN_PREFIX + "opportunistic-container-allocation.nodes-used"; + public static final int OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED_DEFAULT = + 10; /** Frequency for computing least loaded NMs. */ public static final String NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS = @@ -2829,6 +2834,18 @@ public class YarnConfiguration extends Configuration { return clusterId; } + public static boolean isDistSchedulingEnabled(Configuration conf) { + return conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, + YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT); + } + + public static boolean isOpportunisticContainerAllocationEnabled( + Configuration conf) { + return conf.getBoolean( + YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, + YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED_DEFAULT); + } + // helper methods for timeline service configuration /** * Returns whether the timeline service is enabled via configuration. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java index 71321e3f51d..40390d40adc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java @@ -84,7 +84,7 @@ import static org.mockito.Mockito.when; * specifying OPPORTUNISTIC containers in its resource requests, * the AMRMProxyService on the NM, the DistributedScheduler RequestInterceptor * on the NM and the DistributedSchedulingProtocol used by the framework to talk - * to the DistributedSchedulingAMService running on the RM. + * to the OpportunisticContainerAllocatorAMService running on the RM. */ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest { @@ -105,6 +105,8 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest { conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); + conf.setBoolean(YarnConfiguration. + OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true); conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true); cluster.init(conf); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 3ebdc9988fa..7305283d692 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2761,72 +2761,76 @@ - Minimum memory (in MB) used for allocating a container through distributed - scheduling. + Setting that controls whether opportunistic container allocation + is enabled. - yarn.distributed-scheduling.min-container-memory-mb + yarn.opportunistic-container-allocation.enabled + false + + + + + Minimum memory (in MB) used for allocating an opportunistic container. + + yarn.opportunistic-containers.min-memory-mb 512 - Minimum virtual CPU cores used for allocating a container through - distributed scheduling. + Minimum virtual CPU cores used for allocating an opportunistic container. - yarn.distributed-scheduling.min-container-vcores + yarn.opportunistic-containers.min-vcores 1 - Maximum memory (in MB) used for allocating a container through distributed - scheduling. + Maximum memory (in MB) used for allocating an opportunistic container. - yarn.distributed-scheduling.max-container-memory-mb + yarn.opportunistic-containers.max-memory-mb 2048 - Maximum virtual CPU cores used for allocating a container through - distributed scheduling. + Maximum virtual CPU cores used for allocating an opportunistic container. - yarn.distributed-scheduling.max-container-vcores + yarn.opportunistic-containers.max-vcores 4 - Incremental memory (in MB) used for allocating a container through - distributed scheduling. + Incremental memory (in MB) used for allocating an opportunistic container. - yarn.distributed-scheduling.incr-container-memory-mb + yarn.opportunistic-containers.incr-memory-mb 512 - Incremental virtual CPU cores used for allocating a container through - distributed scheduling. + Incremental virtual CPU cores used for allocating an opportunistic + container. - yarn.distributed-scheduling.incr-vcores + yarn.opportunistic-containers.incr-vcores 1 - Container token expiry for container allocated via distributed scheduling. + Container token expiry for opportunistic containers. - yarn.distributed-scheduling.container-token-expiry-ms + yarn.opportunistic-containers.container-token-expiry-ms 600000 - Number of nodes to be used by the LocalScheduler of a NodeManager for - dispatching containers during distributed scheduling. + Number of nodes to be used by the Opportunistic Container Allocator for + dispatching containers during container allocation. - yarn.distributed-scheduling.nodes-used + yarn.opportunistic-container-allocation.nodes-used 10 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java new file mode 100644 index 00000000000..41b5d566108 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java @@ -0,0 +1,378 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.scheduler; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.api.ContainerType; + +import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +/** + *

+ * The OpportunisticContainerAllocator allocates containers on a given list of + * nodes, after modifying the container sizes to respect the limits set by the + * ResourceManager. It tries to distribute the containers as evenly as possible. + *

+ */ +public class OpportunisticContainerAllocator { + + /** + * This class encapsulates application specific parameters used to build a + * Container. + */ + public static class AllocationParams { + private Resource maxResource; + private Resource minResource; + private Resource incrementResource; + private int containerTokenExpiryInterval; + + /** + * Return Max Resource. + * @return Resource + */ + public Resource getMaxResource() { + return maxResource; + } + + /** + * Set Max Resource. + * @param maxResource Resource + */ + public void setMaxResource(Resource maxResource) { + this.maxResource = maxResource; + } + + /** + * Get Min Resource. + * @return Resource + */ + public Resource getMinResource() { + return minResource; + } + + /** + * Set Min Resource. + * @param minResource Resource + */ + public void setMinResource(Resource minResource) { + this.minResource = minResource; + } + + /** + * Get Incremental Resource. + * @return Incremental Resource + */ + public Resource getIncrementResource() { + return incrementResource; + } + + /** + * Set Incremental resource. + * @param incrementResource Resource + */ + public void setIncrementResource(Resource incrementResource) { + this.incrementResource = incrementResource; + } + + /** + * Get Container Token Expiry interval. + * @return Container Token Expiry interval + */ + public int getContainerTokenExpiryInterval() { + return containerTokenExpiryInterval; + } + + /** + * Set Container Token Expiry time in ms. + * @param containerTokenExpiryInterval Container Token Expiry in ms + */ + public void setContainerTokenExpiryInterval( + int containerTokenExpiryInterval) { + this.containerTokenExpiryInterval = containerTokenExpiryInterval; + } + } + + /** + * A Container Id Generator. + */ + public static class ContainerIdGenerator { + + protected volatile AtomicLong containerIdCounter = new AtomicLong(1); + + /** + * This method can reset the generator to a specific value. + * @param containerIdStart containerId + */ + public void resetContainerIdCounter(long containerIdStart) { + this.containerIdCounter.set(containerIdStart); + } + + /** + * Sets the underlying Atomic Long. To be used when implementation needs to + * share the underlying AtomicLong of an existing counter. + * @param counter AtomicLong + */ + public void setContainerIdCounter(AtomicLong counter) { + this.containerIdCounter = counter; + } + + /** + * Generates a new long value. Default implementation increments the + * underlying AtomicLong. Sub classes are encouraged to over-ride this + * behaviour. + * @return Counter. + */ + public long generateContainerId() { + return this.containerIdCounter.incrementAndGet(); + } + } + + static class PartitionedResourceRequests { + private List guaranteed = new ArrayList<>(); + private List opportunistic = new ArrayList<>(); + public List getGuaranteed() { + return guaranteed; + } + public List getOpportunistic() { + return opportunistic; + } + } + + private static final Log LOG = + LogFactory.getLog(OpportunisticContainerAllocator.class); + + private static final ResourceCalculator RESOURCE_CALCULATOR = + new DominantResourceCalculator(); + + private final BaseContainerTokenSecretManager tokenSecretManager; + private int webpagePort; + + /** + * Create a new Opportunistic Container Allocator. + * @param tokenSecretManager TokenSecretManager + * @param webpagePort Webpage Port + */ + public OpportunisticContainerAllocator( + BaseContainerTokenSecretManager tokenSecretManager, int webpagePort) { + this.tokenSecretManager = tokenSecretManager; + this.webpagePort = webpagePort; + } + + /** + * Entry point into the Opportunistic Container Allocator. + * @param request AllocateRequest + * @param applicationAttemptId ApplicationAttemptId + * @param appContext App Specific OpportunisticContainerContext + * @param rmIdentifier RM Identifier + * @param appSubmitter App Submitter + * @return List of Containers. + * @throws YarnException YarnException + */ + public List allocateContainers( + AllocateRequest request, ApplicationAttemptId applicationAttemptId, + OpportunisticContainerContext appContext, long rmIdentifier, + String appSubmitter) throws YarnException { + // Partition requests into GUARANTEED and OPPORTUNISTIC reqs + PartitionedResourceRequests partitionedAsks = + partitionAskList(request.getAskList()); + + List releasedContainers = request.getReleaseList(); + int numReleasedContainers = releasedContainers.size(); + if (numReleasedContainers > 0) { + LOG.info("AttemptID: " + applicationAttemptId + " released: " + + numReleasedContainers); + appContext.getContainersAllocated().removeAll(releasedContainers); + } + + // Also, update black list + ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest(); + if (rbr != null) { + appContext.getBlacklist().removeAll(rbr.getBlacklistRemovals()); + appContext.getBlacklist().addAll(rbr.getBlacklistAdditions()); + } + + // Add OPPORTUNISTIC reqs to the outstanding reqs + appContext.addToOutstandingReqs(partitionedAsks.getOpportunistic()); + + List allocatedContainers = new ArrayList<>(); + for (Priority priority : + appContext.getOutstandingOpReqs().descendingKeySet()) { + // Allocated containers : + // Key = Requested Capability, + // Value = List of Containers of given Cap (The actual container size + // might be different than what is requested.. which is why + // we need the requested capability (key) to match against + // the outstanding reqs) + Map> allocated = allocate(rmIdentifier, + appContext, priority, applicationAttemptId, appSubmitter); + for (Map.Entry> e : allocated.entrySet()) { + appContext.matchAllocationToOutstandingRequest( + e.getKey(), e.getValue()); + allocatedContainers.addAll(e.getValue()); + } + } + + // Send all the GUARANTEED Reqs to RM + request.setAskList(partitionedAsks.getGuaranteed()); + return allocatedContainers; + } + + private Map> allocate(long rmIdentifier, + OpportunisticContainerContext appContext, Priority priority, + ApplicationAttemptId appAttId, String userName) throws YarnException { + Map> containers = new HashMap<>(); + for (ResourceRequest anyAsk : + appContext.getOutstandingOpReqs().get(priority).values()) { + allocateContainersInternal(rmIdentifier, appContext.getAppParams(), + appContext.getContainerIdGenerator(), appContext.getBlacklist(), + appAttId, appContext.getNodeMap(), userName, containers, anyAsk); + LOG.info("Opportunistic allocation requested for [" + + "priority=" + anyAsk.getPriority() + + ", num_containers=" + anyAsk.getNumContainers() + + ", capability=" + anyAsk.getCapability() + "]" + + " allocated = " + containers.get(anyAsk.getCapability()).size()); + } + return containers; + } + + private void allocateContainersInternal(long rmIdentifier, + AllocationParams appParams, ContainerIdGenerator idCounter, + Set blacklist, ApplicationAttemptId id, + Map allNodes, String userName, + Map> containers, ResourceRequest anyAsk) + throws YarnException { + int toAllocate = anyAsk.getNumContainers() + - (containers.isEmpty() ? 0 : + containers.get(anyAsk.getCapability()).size()); + + List nodesForScheduling = new ArrayList<>(); + for (Entry nodeEntry : allNodes.entrySet()) { + // Do not use blacklisted nodes for scheduling. + if (blacklist.contains(nodeEntry.getKey())) { + continue; + } + nodesForScheduling.add(nodeEntry.getValue()); + } + int numAllocated = 0; + int nextNodeToSchedule = 0; + for (int numCont = 0; numCont < toAllocate; numCont++) { + nextNodeToSchedule++; + nextNodeToSchedule %= nodesForScheduling.size(); + NodeId nodeId = nodesForScheduling.get(nextNodeToSchedule); + Container container = buildContainer(rmIdentifier, appParams, idCounter, + anyAsk, id, userName, nodeId); + List cList = containers.get(anyAsk.getCapability()); + if (cList == null) { + cList = new ArrayList<>(); + containers.put(anyAsk.getCapability(), cList); + } + cList.add(container); + numAllocated++; + LOG.info("Allocated [" + container.getId() + "] as opportunistic."); + } + LOG.info("Allocated " + numAllocated + " opportunistic containers."); + } + + private Container buildContainer(long rmIdentifier, + AllocationParams appParams, ContainerIdGenerator idCounter, + ResourceRequest rr, ApplicationAttemptId id, String userName, + NodeId nodeId) throws YarnException { + ContainerId cId = + ContainerId.newContainerId(id, idCounter.generateContainerId()); + + // Normalize the resource asks (Similar to what the the RM scheduler does + // before accepting an ask) + Resource capability = normalizeCapability(appParams, rr); + + long currTime = System.currentTimeMillis(); + ContainerTokenIdentifier containerTokenIdentifier = + new ContainerTokenIdentifier( + cId, nodeId.getHost() + ":" + nodeId.getPort(), userName, + capability, currTime + appParams.containerTokenExpiryInterval, + tokenSecretManager.getCurrentKey().getKeyId(), rmIdentifier, + rr.getPriority(), currTime, + null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK, + ExecutionType.OPPORTUNISTIC); + byte[] pwd = + tokenSecretManager.createPassword(containerTokenIdentifier); + Token containerToken = newContainerToken(nodeId, pwd, + containerTokenIdentifier); + Container container = BuilderUtils.newContainer( + cId, nodeId, nodeId.getHost() + ":" + webpagePort, + capability, rr.getPriority(), containerToken, + containerTokenIdentifier.getExecutionType(), + rr.getAllocationRequestId()); + return container; + } + + private Resource normalizeCapability(AllocationParams appParams, + ResourceRequest ask) { + return Resources.normalize(RESOURCE_CALCULATOR, + ask.getCapability(), appParams.minResource, appParams.maxResource, + appParams.incrementResource); + } + + private static Token newContainerToken(NodeId nodeId, byte[] password, + ContainerTokenIdentifier tokenIdentifier) { + // RPC layer client expects ip:port as service for tokens + InetSocketAddress addr = NetUtils.createSocketAddrForHost(nodeId.getHost(), + nodeId.getPort()); + // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token + Token containerToken = Token.newInstance(tokenIdentifier.getBytes(), + ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil + .buildTokenService(addr).toString()); + return containerToken; + } + + private PartitionedResourceRequests partitionAskList(List + askList) { + PartitionedResourceRequests partitionedRequests = + new PartitionedResourceRequests(); + for (ResourceRequest rr : askList) { + if (rr.getExecutionTypeRequest().getExecutionType() == + ExecutionType.OPPORTUNISTIC) { + partitionedRequests.getOpportunistic().add(rr); + } else { + partitionedRequests.getGuaranteed().add(rr); + } + } + return partitionedRequests; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java new file mode 100644 index 00000000000..1b701eaaa3a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.scheduler; + +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.AllocationParams; +import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.ContainerIdGenerator; + +/** + * This encapsulates application specific information used by the + * Opportunistic Container Allocator to allocate containers. + */ +public class OpportunisticContainerContext { + + private static final Logger LOG = LoggerFactory + .getLogger(OpportunisticContainerContext.class); + + // Currently just used to keep track of allocated containers. + // Can be used for reporting stats later. + private Set containersAllocated = new HashSet<>(); + private AllocationParams appParams = + new AllocationParams(); + private ContainerIdGenerator containerIdGenerator = + new ContainerIdGenerator(); + + private Map nodeMap = new LinkedHashMap<>(); + + // Mapping of NodeId to NodeTokens. Populated either from RM response or + // generated locally if required. + private Map nodeTokens = new HashMap<>(); + private final Set blacklist = new HashSet<>(); + + // This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by Priority, + // Resource Name (Host/rack/any) and capability. This mapping is required + // to match a received Container to an outstanding OPPORTUNISTIC + // ResourceRequest (ask). + private final TreeMap> + outstandingOpReqs = new TreeMap<>(); + + public Set getContainersAllocated() { + return containersAllocated; + } + + public OpportunisticContainerAllocator.AllocationParams getAppParams() { + return appParams; + } + + public ContainerIdGenerator getContainerIdGenerator() { + return containerIdGenerator; + } + + public void setContainerIdGenerator( + ContainerIdGenerator containerIdGenerator) { + this.containerIdGenerator = containerIdGenerator; + } + + public Map getNodeMap() { + return nodeMap; + } + + public Map getNodeTokens() { + return nodeTokens; + } + + public Set getBlacklist() { + return blacklist; + } + + public TreeMap> + getOutstandingOpReqs() { + return outstandingOpReqs; + } + + /** + * Takes a list of ResourceRequests (asks), extracts the key information viz. + * (Priority, ResourceName, Capability) and adds to the outstanding + * OPPORTUNISTIC outstandingOpReqs map. The nested map is required to enforce + * the current YARN constraint that only a single ResourceRequest can exist at + * a give Priority and Capability. + * + * @param resourceAsks the list with the {@link ResourceRequest}s + */ + public void addToOutstandingReqs(List resourceAsks) { + for (ResourceRequest request : resourceAsks) { + Priority priority = request.getPriority(); + + // TODO: Extend for Node/Rack locality. We only handle ANY requests now + if (!ResourceRequest.isAnyLocation(request.getResourceName())) { + continue; + } + + if (request.getNumContainers() == 0) { + continue; + } + + Map reqMap = + outstandingOpReqs.get(priority); + if (reqMap == null) { + reqMap = new HashMap<>(); + outstandingOpReqs.put(priority, reqMap); + } + + ResourceRequest resourceRequest = reqMap.get(request.getCapability()); + if (resourceRequest == null) { + resourceRequest = request; + reqMap.put(request.getCapability(), request); + } else { + resourceRequest.setNumContainers( + resourceRequest.getNumContainers() + request.getNumContainers()); + } + if (ResourceRequest.isAnyLocation(request.getResourceName())) { + LOG.info("# of outstandingOpReqs in ANY (at priority = " + priority + + ", with capability = " + request.getCapability() + " ) : " + + resourceRequest.getNumContainers()); + } + } + } + + /** + * This method matches a returned list of Container Allocations to any + * outstanding OPPORTUNISTIC ResourceRequest. + * @param capability Capability + * @param allocatedContainers Allocated Containers + */ + public void matchAllocationToOutstandingRequest(Resource capability, + List allocatedContainers) { + for (Container c : allocatedContainers) { + containersAllocated.add(c.getId()); + Map asks = + outstandingOpReqs.get(c.getPriority()); + + if (asks == null) { + continue; + } + + ResourceRequest rr = asks.get(capability); + if (rr != null) { + rr.setNumContainers(rr.getNumContainers() - 1); + if (rr.getNumContainers() == 0) { + asks.remove(capability); + } + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/package-info.java new file mode 100644 index 00000000000..dd56829dc1b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Utility classes used for Scheduling. + */ +package org.apache.hadoop.yarn.server.scheduler; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 131eaa3df05..88bc29cb870 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; -import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator; +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 5bfbb8df1a3..280a086abe1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -72,7 +72,7 @@ import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ScriptBasedNodeLabel import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; -import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator; +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; @@ -336,7 +336,8 @@ public class NodeManager extends CompositeService addService(nodeHealthChecker); boolean isDistSchedulingEnabled = - conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, + conf.getBoolean(YarnConfiguration. + OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT); this.context = createNMContext(containerTokenSecretManager, @@ -370,8 +371,8 @@ public class NodeManager extends CompositeService ((NMContext) context).setWebServer(webServer); ((NMContext) context).setQueueableContainerAllocator( - new OpportunisticContainerAllocator(nodeStatusUpdater, context, - webServer.getPort())); + new OpportunisticContainerAllocator( + context.getContainerTokenSecretManager(), webServer.getPort())); dispatcher.register(ContainerManagerEventType.class, containerManager); dispatcher.register(NodeManagerEventType.class, this); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java index 75fe022b3a2..efbdfb4e8ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java @@ -62,7 +62,7 @@ public final class DefaultRequestInterceptor extends AbstractRequestInterceptor { private static final Logger LOG = LoggerFactory .getLogger(DefaultRequestInterceptor.class); - private DistributedSchedulingAMProtocol rmClient; + private ApplicationMasterProtocol rmClient; private UserGroupInformation user = null; @Override @@ -76,15 +76,7 @@ public final class DefaultRequestInterceptor extends user.addToken(appContext.getAMRMToken()); final Configuration conf = this.getConf(); - rmClient = user.doAs( - new PrivilegedExceptionAction() { - @Override - public DistributedSchedulingAMProtocol run() throws Exception { - setAMRMTokenService(conf); - return ServerRMProxy.createRMProxy(conf, - DistributedSchedulingAMProtocol.class); - } - }); + rmClient = createRMClient(appContext, conf); } catch (IOException e) { String message = "Error while creating of RM app master service proxy for attemptId:" @@ -100,6 +92,32 @@ public final class DefaultRequestInterceptor extends } } + private ApplicationMasterProtocol createRMClient( + AMRMProxyApplicationContext appContext, final Configuration conf) + throws IOException, InterruptedException { + if (appContext.getNMCotext().isDistributedSchedulingEnabled()) { + return user.doAs( + new PrivilegedExceptionAction() { + @Override + public DistributedSchedulingAMProtocol run() throws Exception { + setAMRMTokenService(conf); + return ServerRMProxy.createRMProxy(conf, + DistributedSchedulingAMProtocol.class); + } + }); + } else { + return user.doAs( + new PrivilegedExceptionAction() { + @Override + public ApplicationMasterProtocol run() throws Exception { + setAMRMTokenService(conf); + return ClientRMProxy.createRMProxy(conf, + ApplicationMasterProtocol.class); + } + }); + } + } + @Override public RegisterApplicationMasterResponse registerApplicationMaster( final RegisterApplicationMasterRequest request) @@ -127,9 +145,15 @@ public final class DefaultRequestInterceptor extends registerApplicationMasterForDistributedScheduling (RegisterApplicationMasterRequest request) throws YarnException, IOException { - LOG.info("Forwarding registerApplicationMasterForDistributedScheduling" + - "request to the real YARN RM"); - return rmClient.registerApplicationMasterForDistributedScheduling(request); + if (getApplicationContext().getNMCotext() + .isDistributedSchedulingEnabled()) { + LOG.info("Forwarding registerApplicationMasterForDistributedScheduling" + + "request to the real YARN RM"); + return ((DistributedSchedulingAMProtocol)rmClient) + .registerApplicationMasterForDistributedScheduling(request); + } else { + throw new YarnException("Distributed Scheduling is not enabled !!"); + } } @Override @@ -140,13 +164,18 @@ public final class DefaultRequestInterceptor extends LOG.debug("Forwarding allocateForDistributedScheduling request" + "to the real YARN RM"); } - DistributedSchedulingAllocateResponse allocateResponse = - rmClient.allocateForDistributedScheduling(request); - if (allocateResponse.getAllocateResponse().getAMRMToken() != null) { - updateAMRMToken(allocateResponse.getAllocateResponse().getAMRMToken()); + if (getApplicationContext().getNMCotext() + .isDistributedSchedulingEnabled()) { + DistributedSchedulingAllocateResponse allocateResponse = + ((DistributedSchedulingAMProtocol)rmClient) + .allocateForDistributedScheduling(request); + if (allocateResponse.getAllocateResponse().getAMRMToken() != null) { + updateAMRMToken(allocateResponse.getAllocateResponse().getAMRMToken()); + } + return allocateResponse; + } else { + throw new YarnException("Distributed Scheduling is not enabled !!"); } - - return allocateResponse; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java index bfb12eefdfd..368858c43d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java @@ -32,34 +32,23 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; -import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyApplicationContext; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AbstractRequestInterceptor; - - - import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; + +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; /** *

The DistributedScheduler runs on the NodeManager and is modeled as an @@ -76,74 +65,49 @@ import java.util.TreeMap; */ public final class DistributedScheduler extends AbstractRequestInterceptor { - static class PartitionedResourceRequests { - private List guaranteed = new ArrayList<>(); - private List opportunistic = new ArrayList<>(); - public List getGuaranteed() { - return guaranteed; - } - public List getOpportunistic() { - return opportunistic; - } - } - - static class DistributedSchedulerParams { - Resource maxResource; - Resource minResource; - Resource incrementResource; - int containerTokenExpiryInterval; - } - private static final Logger LOG = LoggerFactory .getLogger(DistributedScheduler.class); private final static RecordFactory RECORD_FACTORY = RecordFactoryProvider.getRecordFactory(null); - // Currently just used to keep track of allocated containers. - // Can be used for reporting stats later. - private Set containersAllocated = new HashSet<>(); - - private DistributedSchedulerParams appParams = - new DistributedSchedulerParams(); - private final OpportunisticContainerAllocator.ContainerIdCounter - containerIdCounter = - new OpportunisticContainerAllocator.ContainerIdCounter(); - private Map nodeList = new LinkedHashMap<>(); - - // Mapping of NodeId to NodeTokens. Populated either from RM response or - // generated locally if required. - private Map nodeTokens = new HashMap<>(); - final Set blacklist = new HashSet<>(); - - // This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by Priority, - // Resource Name (Host/rack/any) and capability. This mapping is required - // to match a received Container to an outstanding OPPORTUNISTIC - // ResourceRequest (ask). - final TreeMap> - outstandingOpReqs = new TreeMap<>(); + private OpportunisticContainerContext oppContainerContext = + new OpportunisticContainerContext(); private ApplicationAttemptId applicationAttemptId; private OpportunisticContainerAllocator containerAllocator; private NMTokenSecretManagerInNM nmSecretManager; private String appSubmitter; + private long rmIdentifier; - public void init(AMRMProxyApplicationContext appContext) { - super.init(appContext); - initLocal(appContext.getApplicationAttemptId(), - appContext.getNMCotext().getContainerAllocator(), - appContext.getNMCotext().getNMTokenSecretManager(), - appContext.getUser()); + public void init(AMRMProxyApplicationContext applicationContext) { + super.init(applicationContext); + initLocal(applicationContext.getNMCotext().getNodeStatusUpdater() + .getRMIdentifier(), + applicationContext.getApplicationAttemptId(), + applicationContext.getNMCotext().getContainerAllocator(), + applicationContext.getNMCotext().getNMTokenSecretManager(), + applicationContext.getUser()); } @VisibleForTesting - void initLocal(ApplicationAttemptId applicationAttemptId, - OpportunisticContainerAllocator containerAllocator, + void initLocal(long rmId, ApplicationAttemptId appAttemptId, + OpportunisticContainerAllocator oppContainerAllocator, NMTokenSecretManagerInNM nmSecretManager, String appSubmitter) { - this.applicationAttemptId = applicationAttemptId; - this.containerAllocator = containerAllocator; + this.rmIdentifier = rmId; + this.applicationAttemptId = appAttemptId; + this.containerAllocator = oppContainerAllocator; this.nmSecretManager = nmSecretManager; this.appSubmitter = appSubmitter; + + // Overrides the Generator to decrement container id. + this.oppContainerContext.setContainerIdGenerator( + new OpportunisticContainerAllocator.ContainerIdGenerator() { + @Override + public long generateContainerId() { + return this.containerIdCounter.decrementAndGet(); + } + }); } /** @@ -202,7 +166,8 @@ public final class DistributedScheduler extends AbstractRequestInterceptor { if (allocatedContainers.size() > 0) { response.getAllocatedContainers().addAll(allocatedContainers); for (Container alloc : allocatedContainers) { - if (!nodeTokens.containsKey(alloc.getNodeId())) { + if (!oppContainerContext.getNodeTokens().containsKey( + alloc.getNodeId())) { newTokens.add(nmSecretManager.generateNMToken(appSubmitter, alloc)); } } @@ -212,115 +177,34 @@ public final class DistributedScheduler extends AbstractRequestInterceptor { } } - private PartitionedResourceRequests partitionAskList(List - askList) { - PartitionedResourceRequests partitionedRequests = - new PartitionedResourceRequests(); - for (ResourceRequest rr : askList) { - if (rr.getExecutionTypeRequest().getExecutionType() == - ExecutionType.OPPORTUNISTIC) { - partitionedRequests.getOpportunistic().add(rr); - } else { - partitionedRequests.getGuaranteed().add(rr); - } - } - return partitionedRequests; - } - private void updateParameters( RegisterDistributedSchedulingAMResponse registerResponse) { - appParams.minResource = registerResponse.getMinContainerResource(); - appParams.maxResource = registerResponse.getMaxContainerResource(); - appParams.incrementResource = - registerResponse.getIncrContainerResource(); - if (appParams.incrementResource == null) { - appParams.incrementResource = appParams.minResource; + oppContainerContext.getAppParams().setMinResource( + registerResponse.getMinContainerResource()); + oppContainerContext.getAppParams().setMaxResource( + registerResponse.getMaxContainerResource()); + oppContainerContext.getAppParams().setIncrementResource( + registerResponse.getIncrContainerResource()); + if (oppContainerContext.getAppParams().getIncrementResource() == null) { + oppContainerContext.getAppParams().setIncrementResource( + oppContainerContext.getAppParams().getMinResource()); } - appParams.containerTokenExpiryInterval = registerResponse - .getContainerTokenExpiryInterval(); + oppContainerContext.getAppParams().setContainerTokenExpiryInterval( + registerResponse.getContainerTokenExpiryInterval()); - containerIdCounter + oppContainerContext.getContainerIdGenerator() .resetContainerIdCounter(registerResponse.getContainerIdStart()); setNodeList(registerResponse.getNodesForScheduling()); } - /** - * Takes a list of ResourceRequests (asks), extracts the key information viz. - * (Priority, ResourceName, Capability) and adds to the outstanding - * OPPORTUNISTIC outstandingOpReqs map. The nested map is required to enforce - * the current YARN constraint that only a single ResourceRequest can exist at - * a give Priority and Capability. - * - * @param resourceAsks the list with the {@link ResourceRequest}s - */ - public void addToOutstandingReqs(List resourceAsks) { - for (ResourceRequest request : resourceAsks) { - Priority priority = request.getPriority(); - - // TODO: Extend for Node/Rack locality. We only handle ANY requests now - if (!ResourceRequest.isAnyLocation(request.getResourceName())) { - continue; - } - - if (request.getNumContainers() == 0) { - continue; - } - - Map reqMap = - this.outstandingOpReqs.get(priority); - if (reqMap == null) { - reqMap = new HashMap<>(); - this.outstandingOpReqs.put(priority, reqMap); - } - - ResourceRequest resourceRequest = reqMap.get(request.getCapability()); - if (resourceRequest == null) { - resourceRequest = request; - reqMap.put(request.getCapability(), request); - } else { - resourceRequest.setNumContainers( - resourceRequest.getNumContainers() + request.getNumContainers()); - } - if (ResourceRequest.isAnyLocation(request.getResourceName())) { - LOG.info("# of outstandingOpReqs in ANY (at priority = " + priority - + ", with capability = " + request.getCapability() + " ) : " - + resourceRequest.getNumContainers()); - } - } - } - - /** - * This method matches a returned list of Container Allocations to any - * outstanding OPPORTUNISTIC ResourceRequest. - */ - private void matchAllocationToOutstandingRequest(Resource capability, - List allocatedContainers) { - for (Container c : allocatedContainers) { - containersAllocated.add(c.getId()); - Map asks = - outstandingOpReqs.get(c.getPriority()); - - if (asks == null) - continue; - - ResourceRequest rr = asks.get(capability); - if (rr != null) { - rr.setNumContainers(rr.getNumContainers() - 1); - if (rr.getNumContainers() == 0) { - asks.remove(capability); - } - } - } - } - private void setNodeList(List nodeList) { - this.nodeList.clear(); + oppContainerContext.getNodeMap().clear(); addToNodeList(nodeList); } private void addToNodeList(List nodes) { for (NodeId n : nodes) { - this.nodeList.put(n.getHost(), n); + oppContainerContext.getNodeMap().put(n.getHost(), n); } } @@ -345,52 +229,13 @@ public final class DistributedScheduler extends AbstractRequestInterceptor { LOG.debug("Forwarding allocate request to the" + "Distributed Scheduler Service on YARN RM"); } - // Partition requests into GUARANTEED and OPPORTUNISTIC reqs - PartitionedResourceRequests partitionedAsks = - partitionAskList(request.getAllocateRequest().getAskList()); - - List releasedContainers = - request.getAllocateRequest().getReleaseList(); - int numReleasedContainers = releasedContainers.size(); - if (numReleasedContainers > 0) { - LOG.info("AttemptID: " + applicationAttemptId + " released: " - + numReleasedContainers); - containersAllocated.removeAll(releasedContainers); - } - - // Also, update black list - ResourceBlacklistRequest rbr = - request.getAllocateRequest().getResourceBlacklistRequest(); - if (rbr != null) { - blacklist.removeAll(rbr.getBlacklistRemovals()); - blacklist.addAll(rbr.getBlacklistAdditions()); - } - - // Add OPPORTUNISTIC reqs to the outstanding reqs - addToOutstandingReqs(partitionedAsks.getOpportunistic()); - - List allocatedContainers = new ArrayList<>(); - for (Priority priority : outstandingOpReqs.descendingKeySet()) { - // Allocated containers : - // Key = Requested Capability, - // Value = List of Containers of given Cap (The actual container size - // might be different than what is requested.. which is why - // we need the requested capability (key) to match against - // the outstanding reqs) - Map> allocated = - containerAllocator.allocate(this.appParams, containerIdCounter, - outstandingOpReqs.get(priority).values(), blacklist, - applicationAttemptId, nodeList, appSubmitter); - for (Map.Entry> e : allocated.entrySet()) { - matchAllocationToOutstandingRequest(e.getKey(), e.getValue()); - allocatedContainers.addAll(e.getValue()); - } - } + List allocatedContainers = + containerAllocator.allocateContainers( + request.getAllocateRequest(), applicationAttemptId, + oppContainerContext, rmIdentifier, appSubmitter); request.setAllocatedContainers(allocatedContainers); - // Send all the GUARANTEED Reqs to RM - request.getAllocateRequest().setAskList(partitionedAsks.getGuaranteed()); DistributedSchedulingAllocateResponse dsResp = getNextInterceptor().allocateForDistributedScheduling(request); @@ -398,7 +243,7 @@ public final class DistributedScheduler extends AbstractRequestInterceptor { setNodeList(dsResp.getNodesForScheduling()); List nmTokens = dsResp.getAllocateResponse().getNMTokens(); for (NMToken nmToken : nmTokens) { - nodeTokens.put(nmToken.getNodeId(), nmToken); + oppContainerContext.getNodeTokens().put(nmToken.getNodeId(), nmToken); } List completedContainers = @@ -407,7 +252,8 @@ public final class DistributedScheduler extends AbstractRequestInterceptor { // Only account for opportunistic containers for (ContainerStatus cs : completedContainers) { if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) { - containersAllocated.remove(cs.getContainerId()); + oppContainerContext.getContainersAllocated() + .remove(cs.getContainerId()); } } @@ -417,9 +263,9 @@ public final class DistributedScheduler extends AbstractRequestInterceptor { dsResp.getAllocateResponse(), nmTokens, allocatedContainers); if (LOG.isDebugEnabled()) { - LOG.debug( - "Number of opportunistic containers currently allocated by" + - "application: " + containersAllocated.size()); + LOG.debug("Number of opportunistic containers currently" + + "allocated by application: " + oppContainerContext + .getContainersAllocated().size()); } return dsResp; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java deleted file mode 100644 index 4723233ab1b..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java +++ /dev/null @@ -1,190 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.nodemanager.scheduler; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.yarn.api.records.*; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; -import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; -import org.apache.hadoop.yarn.server.api.ContainerType; -import org.apache.hadoop.yarn.server.nodemanager.Context; -import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; -import org.apache.hadoop.yarn.server.nodemanager.scheduler.DistributedScheduler.DistributedSchedulerParams; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; -import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; -import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import org.apache.hadoop.yarn.util.resource.Resources; - -import java.net.InetSocketAddress; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.atomic.AtomicLong; - -/** - *

- * The OpportunisticContainerAllocator allocates containers on a given list of - * nodes, after modifying the container sizes to respect the limits set by the - * ResourceManager. It tries to distribute the containers as evenly as possible. - * It also uses the NMTokenSecretManagerInNM to generate the - * required NM tokens for the allocated containers. - *

- */ -public class OpportunisticContainerAllocator { - - private static final Log LOG = - LogFactory.getLog(OpportunisticContainerAllocator.class); - - private static final ResourceCalculator RESOURCE_CALCULATOR = - new DominantResourceCalculator(); - - static class ContainerIdCounter { - final AtomicLong containerIdCounter = new AtomicLong(1); - - void resetContainerIdCounter(long containerIdStart) { - this.containerIdCounter.set(containerIdStart); - } - - long generateContainerId() { - return this.containerIdCounter.decrementAndGet(); - } - } - - private final NodeStatusUpdater nodeStatusUpdater; - private final Context context; - private int webpagePort; - - public OpportunisticContainerAllocator(NodeStatusUpdater nodeStatusUpdater, - Context context, int webpagePort) { - this.nodeStatusUpdater = nodeStatusUpdater; - this.context = context; - this.webpagePort = webpagePort; - } - - public Map> allocate( - DistributedSchedulerParams appParams, ContainerIdCounter idCounter, - Collection resourceAsks, Set blacklist, - ApplicationAttemptId appAttId, Map allNodes, - String userName) throws YarnException { - Map> containers = new HashMap<>(); - for (ResourceRequest anyAsk : resourceAsks) { - allocateOpportunisticContainers(appParams, idCounter, blacklist, appAttId, - allNodes, userName, containers, anyAsk); - LOG.info("Opportunistic allocation requested for [" - + "priority=" + anyAsk.getPriority() - + ", num_containers=" + anyAsk.getNumContainers() - + ", capability=" + anyAsk.getCapability() + "]" - + " allocated = " + containers.get(anyAsk.getCapability()).size()); - } - return containers; - } - - private void allocateOpportunisticContainers( - DistributedSchedulerParams appParams, ContainerIdCounter idCounter, - Set blacklist, ApplicationAttemptId id, - Map allNodes, String userName, - Map> containers, ResourceRequest anyAsk) - throws YarnException { - int toAllocate = anyAsk.getNumContainers() - - (containers.isEmpty() ? 0 : - containers.get(anyAsk.getCapability()).size()); - - List nodesForScheduling = new ArrayList<>(); - for (Entry nodeEntry : allNodes.entrySet()) { - // Do not use blacklisted nodes for scheduling. - if (blacklist.contains(nodeEntry.getKey())) { - continue; - } - nodesForScheduling.add(nodeEntry.getValue()); - } - int numAllocated = 0; - int nextNodeToSchedule = 0; - for (int numCont = 0; numCont < toAllocate; numCont++) { - nextNodeToSchedule++; - nextNodeToSchedule %= nodesForScheduling.size(); - NodeId nodeId = nodesForScheduling.get(nextNodeToSchedule); - Container container = buildContainer(appParams, idCounter, anyAsk, id, - userName, nodeId); - List cList = containers.get(anyAsk.getCapability()); - if (cList == null) { - cList = new ArrayList<>(); - containers.put(anyAsk.getCapability(), cList); - } - cList.add(container); - numAllocated++; - LOG.info("Allocated [" + container.getId() + "] as opportunistic."); - } - LOG.info("Allocated " + numAllocated + " opportunistic containers."); - } - - private Container buildContainer(DistributedSchedulerParams appParams, - ContainerIdCounter idCounter, ResourceRequest rr, ApplicationAttemptId id, - String userName, NodeId nodeId) throws YarnException { - ContainerId cId = - ContainerId.newContainerId(id, idCounter.generateContainerId()); - - // Normalize the resource asks (Similar to what the the RM scheduler does - // before accepting an ask) - Resource capability = normalizeCapability(appParams, rr); - - long currTime = System.currentTimeMillis(); - ContainerTokenIdentifier containerTokenIdentifier = - new ContainerTokenIdentifier( - cId, nodeId.getHost() + ":" + nodeId.getPort(), userName, - capability, currTime + appParams.containerTokenExpiryInterval, - context.getContainerTokenSecretManager().getCurrentKey().getKeyId(), - nodeStatusUpdater.getRMIdentifier(), rr.getPriority(), currTime, - null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK, - ExecutionType.OPPORTUNISTIC); - byte[] pwd = - context.getContainerTokenSecretManager().createPassword( - containerTokenIdentifier); - Token containerToken = newContainerToken(nodeId, pwd, - containerTokenIdentifier); - Container container = BuilderUtils.newContainer( - cId, nodeId, nodeId.getHost() + ":" + webpagePort, - capability, rr.getPriority(), containerToken, - containerTokenIdentifier.getExecutionType(), - rr.getAllocationRequestId()); - return container; - } - - private Resource normalizeCapability(DistributedSchedulerParams appParams, - ResourceRequest ask) { - return Resources.normalize(RESOURCE_CALCULATOR, - ask.getCapability(), appParams.minResource, appParams.maxResource, - appParams.incrementResource); - } - - public static Token newContainerToken(NodeId nodeId, byte[] password, - ContainerTokenIdentifier tokenIdentifier) { - // RPC layer client expects ip:port as service for tokens - InetSocketAddress addr = NetUtils.createSocketAddrForHost(nodeId.getHost(), - nodeId.getPort()); - // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token - Token containerToken = Token.newInstance(tokenIdentifier.getBytes(), - ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil - .buildTokenService(addr).toString()); - return containerToken; - } - -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index f716d443283..d8660dd1d7d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -67,7 +67,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; -import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator; +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java index b093b3b6764..8f1ae7fb431 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java @@ -38,11 +38,12 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAl import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse; import org.apache.hadoop.yarn.server.api.records.MasterKey; -import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; + +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; @@ -189,7 +190,6 @@ public class TestDistributedScheduler { DistributedScheduler distributedScheduler) { NodeStatusUpdater nodeStatusUpdater = Mockito.mock(NodeStatusUpdater.class); Mockito.when(nodeStatusUpdater.getRMIdentifier()).thenReturn(12345l); - Context context = Mockito.mock(Context.class); NMContainerTokenSecretManager nmContainerTokenSecretManager = new NMContainerTokenSecretManager(conf); MasterKey mKey = new MasterKey() { @@ -207,15 +207,13 @@ public class TestDistributedScheduler { public void setBytes(ByteBuffer bytes) {} }; nmContainerTokenSecretManager.setMasterKey(mKey); - Mockito.when(context.getContainerTokenSecretManager()).thenReturn - (nmContainerTokenSecretManager); OpportunisticContainerAllocator containerAllocator = - new OpportunisticContainerAllocator(nodeStatusUpdater, context, 7777); + new OpportunisticContainerAllocator(nmContainerTokenSecretManager, 77); NMTokenSecretManagerInNM nmTokenSecretManagerInNM = new NMTokenSecretManagerInNM(); nmTokenSecretManagerInNM.setMasterKey(mKey); - distributedScheduler.initLocal( + distributedScheduler.initLocal(1234, ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1), containerAllocator, nmTokenSecretManagerInNM, "test"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java similarity index 83% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index 843ac099ec8..a473b14d0da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -73,18 +73,19 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** - * The DistributedSchedulingAMService is started instead of the + * The OpportunisticContainerAllocatorAMService is started instead of the * ApplicationMasterService if distributed scheduling is enabled for the YARN * cluster. * It extends the functionality of the ApplicationMasterService by servicing * clients (AMs and AMRMProxy request interceptors) that understand the * DistributedSchedulingProtocol. */ -public class DistributedSchedulingAMService extends ApplicationMasterService - implements DistributedSchedulingAMProtocol, EventHandler { +public class OpportunisticContainerAllocatorAMService + extends ApplicationMasterService implements DistributedSchedulingAMProtocol, + EventHandler { private static final Log LOG = - LogFactory.getLog(DistributedSchedulingAMService.class); + LogFactory.getLog(OpportunisticContainerAllocatorAMService.class); private final NodeQueueLoadMonitor nodeMonitor; @@ -94,12 +95,13 @@ public class DistributedSchedulingAMService extends ApplicationMasterService new ConcurrentHashMap<>(); private final int k; - public DistributedSchedulingAMService(RMContext rmContext, - YarnScheduler scheduler) { - super(DistributedSchedulingAMService.class.getName(), rmContext, scheduler); + public OpportunisticContainerAllocatorAMService(RMContext rmContext, + YarnScheduler scheduler) { + super(OpportunisticContainerAllocatorAMService.class.getName(), + rmContext, scheduler); this.k = rmContext.getYarnConfiguration().getInt( - YarnConfiguration.DIST_SCHEDULING_NODES_NUMBER_USED, - YarnConfiguration.DIST_SCHEDULING_NODES_NUMBER_USED_DEFAULT); + YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED, + YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED_DEFAULT); long nodeSortInterval = rmContext.getYarnConfiguration().getLong( YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, YarnConfiguration. @@ -149,18 +151,21 @@ public class DistributedSchedulingAMService extends ApplicationMasterService @Override public Server getServer(YarnRPC rpc, Configuration serverConf, InetSocketAddress addr, AMRMTokenSecretManager secretManager) { - Server server = rpc.getServer(DistributedSchedulingAMProtocol.class, this, - addr, serverConf, secretManager, - serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT, - YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT)); - // To support application running on NMs that DO NOT support - // Dist Scheduling... The server multiplexes both the - // ApplicationMasterProtocol as well as the DistributedSchedulingProtocol - ((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, - ApplicationMasterProtocolPB.class, - ApplicationMasterProtocolService.newReflectiveBlockingService( - new ApplicationMasterProtocolPBServiceImpl(this))); - return server; + if (YarnConfiguration.isDistSchedulingEnabled(serverConf)) { + Server server = rpc.getServer(DistributedSchedulingAMProtocol.class, this, + addr, serverConf, secretManager, + serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT, + YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT)); + // To support application running on NMs that DO NOT support + // Dist Scheduling... The server multiplexes both the + // ApplicationMasterProtocol as well as the DistributedSchedulingProtocol + ((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, + ApplicationMasterProtocolPB.class, + ApplicationMasterProtocolService.newReflectiveBlockingService( + new ApplicationMasterProtocolPBServiceImpl(this))); + return server; + } + return super.getServer(rpc, serverConf, addr, secretManager); } @Override @@ -196,40 +201,41 @@ public class DistributedSchedulingAMService extends ApplicationMasterService dsResp.setMinContainerResource( Resource.newInstance( getConfig().getInt( - YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB, + YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB, YarnConfiguration. - DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB_DEFAULT), + OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB_DEFAULT), getConfig().getInt( - YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_VCORES, - YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_VCORES_DEFAULT) + YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES, + YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES_DEFAULT) ) ); dsResp.setMaxContainerResource( Resource.newInstance( getConfig().getInt( - YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_MB, - YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_MB_DEFAULT), + YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB, + YarnConfiguration + .OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB_DEFAULT), getConfig().getInt( - YarnConfiguration.DIST_SCHEDULING_MAX_CONTAINER_VCORES, - YarnConfiguration.DIST_SCHEDULING_MAX_CONTAINER_VCORES_DEFAULT) + YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES, + YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES_DEFAULT) ) ); dsResp.setIncrContainerResource( Resource.newInstance( getConfig().getInt( - YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB, + YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB, YarnConfiguration. - DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB_DEFAULT), + OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB_DEFAULT), getConfig().getInt( - YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_VCORES, - YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_VCORES_DEFAULT) + YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES, + YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES_DEFAULT) ) ); dsResp.setContainerTokenExpiryInterval( getConfig().getInt( - YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS, + YarnConfiguration.OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS, YarnConfiguration. - DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT)); + OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS_DEFAULT)); dsResp.setContainerIdStart( this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT); @@ -349,8 +355,8 @@ public class DistributedSchedulingAMService extends ApplicationMasterService break; // <-- IGNORED EVENTS : END --> default: - LOG.error("Unknown event arrived at DistributedSchedulingAMService: " - + event.toString()); + LOG.error("Unknown event arrived at" + + "OpportunisticContainerAllocatorAMService: " + event.toString()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 4509045df66..bf72fc1465e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -116,7 +116,6 @@ import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher; import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; import org.apache.hadoop.yarn.server.webproxy.WebAppProxy; import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.webapp.WebApp; import org.apache.hadoop.yarn.webapp.WebApps; import org.apache.hadoop.yarn.webapp.WebApps.Builder; @@ -1177,24 +1176,27 @@ public class ResourceManager extends CompositeService implements Recoverable { } protected ApplicationMasterService createApplicationMasterService() { - if (this.rmContext.getYarnConfiguration().getBoolean( - YarnConfiguration.DIST_SCHEDULING_ENABLED, - YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) { - DistributedSchedulingAMService distributedSchedulingService = new - DistributedSchedulingAMService(this.rmContext, scheduler); - EventDispatcher distSchedulerEventDispatcher = - new EventDispatcher(distributedSchedulingService, - DistributedSchedulingAMService.class.getName()); - // Add an event dispatcher for the DistributedSchedulingAMService - // to handle node updates/additions and removals. + Configuration config = this.rmContext.getYarnConfiguration(); + if (YarnConfiguration.isOpportunisticContainerAllocationEnabled(config) + || YarnConfiguration.isDistSchedulingEnabled(config)) { + OpportunisticContainerAllocatorAMService + oppContainerAllocatingAMService = + new OpportunisticContainerAllocatorAMService(this.rmContext, + scheduler); + EventDispatcher oppContainerAllocEventDispatcher = + new EventDispatcher(oppContainerAllocatingAMService, + OpportunisticContainerAllocatorAMService.class.getName()); + // Add an event dispatcher for the + // OpportunisticContainerAllocatorAMService to handle node + // updates/additions and removals. // Since the SchedulerEvent is currently a super set of theses, // we register interest for it.. - addService(distSchedulerEventDispatcher); + addService(oppContainerAllocEventDispatcher); rmDispatcher.register(SchedulerEventType.class, - distSchedulerEventDispatcher); + oppContainerAllocEventDispatcher); this.rmContext.setContainerQueueLimitCalculator( - distributedSchedulingService.getNodeManagerQueueLimitCalculator()); - return distributedSchedulingService; + oppContainerAllocatingAMService.getNodeManagerQueueLimitCalculator()); + return oppContainerAllocatingAMService; } return new ApplicationMasterService(this.rmContext, scheduler); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 7d1b3c33b9e..5856e595f6f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -820,9 +820,10 @@ public class MockRM extends ResourceManager { @Override protected ApplicationMasterService createApplicationMasterService() { if (this.rmContext.getYarnConfiguration().getBoolean( - YarnConfiguration.DIST_SCHEDULING_ENABLED, - YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) { - return new DistributedSchedulingAMService(getRMContext(), scheduler) { + YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, + YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED_DEFAULT)) { + return new OpportunisticContainerAllocatorAMService(getRMContext(), + scheduler) { @Override protected void serviceStart() { // override to not start rpc handler diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java similarity index 95% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingAMService.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java index 0213a94bd63..07c6b54f1fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java @@ -71,12 +71,12 @@ import java.util.Arrays; import java.util.List; /** - * Test cases for {@link DistributedSchedulingAMService}. + * Test cases for {@link OpportunisticContainerAllocatorAMService}. */ -public class TestDistributedSchedulingAMService { +public class TestOpportunisticContainerAllocatorAMService { - // Test if the DistributedSchedulingAMService can handle both DSProtocol as - // well as AMProtocol clients + // Test if the OpportunisticContainerAllocatorAMService can handle both + // DSProtocol as well as AMProtocol clients @Test public void testRPCWrapping() throws Exception { Configuration conf = new Configuration(); @@ -111,8 +111,9 @@ public class TestDistributedSchedulingAMService { Resource.newInstance(1, 2), 1, true, "exp", ExecutionTypeRequest.newInstance( ExecutionType.OPPORTUNISTIC, true)))); - DistributedSchedulingAMService service = + OpportunisticContainerAllocatorAMService service = createService(factory, rmContext, c); + conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true); Server server = service.getServer(rpc, conf, addr, null); server.start(); @@ -195,9 +196,10 @@ public class TestDistributedSchedulingAMService { false, dsfinishResp.getIsUnregistered()); } - private DistributedSchedulingAMService createService(final RecordFactory - factory, final RMContext rmContext, final Container c) { - return new DistributedSchedulingAMService(rmContext, null) { + private OpportunisticContainerAllocatorAMService createService( + final RecordFactory factory, final RMContext rmContext, + final Container c) { + return new OpportunisticContainerAllocatorAMService(rmContext, null) { @Override public RegisterApplicationMasterResponse registerApplicationMaster( RegisterApplicationMasterRequest request) throws