YARN-5457. Refactor DistributedScheduling framework to pull out common functionality. (asuresh)

(cherry picked from commit 82c9e06101)
This commit is contained in:
Arun Suresh 2016-08-05 11:13:05 -07:00
parent 427b54086e
commit 5f7edb79d1
18 changed files with 854 additions and 556 deletions

View File

@ -91,6 +91,8 @@ public class TestMROpportunisticMaps {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
// Start the mini-MR and mini-DFS clusters // Start the mini-MR and mini-DFS clusters
conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
conf.setBoolean(YarnConfiguration.
OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true); conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true); conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true);
dfsCluster = new MiniDFSCluster.Builder(conf) dfsCluster = new MiniDFSCluster.Builder(conf)

View File

@ -301,55 +301,60 @@ public class YarnConfiguration extends Configuration {
YARN_PREFIX + "distributed-scheduling.enabled"; YARN_PREFIX + "distributed-scheduling.enabled";
public static final boolean DIST_SCHEDULING_ENABLED_DEFAULT = false; public static final boolean DIST_SCHEDULING_ENABLED_DEFAULT = false;
/** Minimum memory (in MB) used for allocating a container through distributed /** Setting that controls whether opportunistic container allocation
* scheduling. */ * is enabled or not. */
public static final String DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB = public static final String OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED =
YARN_PREFIX + "distributed-scheduling.min-container-memory-mb"; YARN_PREFIX + "opportunistic-container-allocation.enabled";
public static final int DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB_DEFAULT = 512; public static final boolean
OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED_DEFAULT = false;
/** Minimum virtual CPU cores used for allocating a container through /** Minimum memory (in MB) used for allocating an opportunistic container. */
* distributed scheduling. */ public static final String OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB =
public static final String DIST_SCHEDULING_MIN_CONTAINER_VCORES = YARN_PREFIX + "opportunistic-containers.min-memory-mb";
YARN_PREFIX + "distributed-scheduling.min-container-vcores"; public static final int OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB_DEFAULT = 512;
public static final int DIST_SCHEDULING_MIN_CONTAINER_VCORES_DEFAULT = 1;
/** Maximum memory (in MB) used for allocating a container through distributed /** Minimum virtual CPU cores used for allocating an opportunistic container.
* scheduling. */ * */
public static final String DIST_SCHEDULING_MAX_MEMORY_MB = public static final String OPPORTUNISTIC_CONTAINERS_MIN_VCORES =
YARN_PREFIX + "distributed-scheduling.max-container-memory-mb"; YARN_PREFIX + "opportunistic-containers.min-vcores";
public static final int DIST_SCHEDULING_MAX_MEMORY_MB_DEFAULT = 2048; public static final int OPPORTUNISTIC_CONTAINERS_MIN_VCORES_DEFAULT = 1;
/** Maximum virtual CPU cores used for allocating a container through /** Maximum memory (in MB) used for allocating an opportunistic container. */
* distributed scheduling. */ public static final String OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB =
public static final String DIST_SCHEDULING_MAX_CONTAINER_VCORES = YARN_PREFIX + "opportunistic-containers.max-memory-mb";
YARN_PREFIX + "distributed-scheduling.max-container-vcores"; public static final int OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB_DEFAULT = 2048;
public static final int DIST_SCHEDULING_MAX_CONTAINER_VCORES_DEFAULT = 4;
/** Incremental memory (in MB) used for allocating a container through /** Maximum virtual CPU cores used for allocating an opportunistic container.
* distributed scheduling. */ * */
public static final String DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB = public static final String OPPORTUNISTIC_CONTAINERS_MAX_VCORES =
YARN_PREFIX + "distributed-scheduling.incr-container-memory-mb"; YARN_PREFIX + "opportunistic-containers.max-vcores";
public static final int DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB_DEFAULT = public static final int OPPORTUNISTIC_CONTAINERS_MAX_VCORES_DEFAULT = 4;
/** Incremental memory (in MB) used for allocating an opportunistic container.
* */
public static final String OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB =
YARN_PREFIX + "opportunistic-containers.incr-memory-mb";
public static final int OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB_DEFAULT =
512; 512;
/** Incremental virtual CPU cores used for allocating a container through /** Incremental virtual CPU cores used for allocating an opportunistic
* distributed scheduling. */ * container. */
public static final String DIST_SCHEDULING_INCR_CONTAINER_VCORES = public static final String OPPORTUNISTIC_CONTAINERS_INCR_VCORES =
YARN_PREFIX + "distributed-scheduling.incr-vcores"; YARN_PREFIX + "opportunistic-containers.incr-vcores";
public static final int DIST_SCHEDULING_INCR_CONTAINER_VCORES_DEFAULT = 1; public static final int OPPORTUNISTIC_CONTAINERS_INCR_VCORES_DEFAULT = 1;
/** Container token expiry for container allocated via distributed /** Container token expiry for opportunistic containers. */
* scheduling. */ public static final String OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS =
public static final String DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS = YARN_PREFIX + "opportunistic-containers.container-token-expiry-ms";
YARN_PREFIX + "distributed-scheduling.container-token-expiry-ms"; public static final int OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS_DEFAULT =
public static final int DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT =
600000; 600000;
/** Number of nodes to be used by the LocalScheduler of a NodeManager for /** Number of nodes to be used by the Opportunistic Container allocator for
* dispatching containers during distributed scheduling. */ * dispatching containers during container allocation. */
public static final String DIST_SCHEDULING_NODES_NUMBER_USED = public static final String OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED =
YARN_PREFIX + "distributed-scheduling.nodes-used"; YARN_PREFIX + "opportunistic-container-allocation.nodes-used";
public static final int DIST_SCHEDULING_NODES_NUMBER_USED_DEFAULT = 10; public static final int OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED_DEFAULT =
10;
/** Frequency for computing least loaded NMs. */ /** Frequency for computing least loaded NMs. */
public static final String NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS = public static final String NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS =
@ -2733,6 +2738,18 @@ public class YarnConfiguration extends Configuration {
return clusterId; return clusterId;
} }
public static boolean isDistSchedulingEnabled(Configuration conf) {
return conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED,
YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT);
}
public static boolean isOpportunisticContainerAllocationEnabled(
Configuration conf) {
return conf.getBoolean(
YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED,
YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED_DEFAULT);
}
/* For debugging. mp configurations to system output as XML format. */ /* For debugging. mp configurations to system output as XML format. */
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
new YarnConfiguration(new Configuration()).writeXml(System.out); new YarnConfiguration(new Configuration()).writeXml(System.out);

View File

@ -84,7 +84,7 @@ import static org.mockito.Mockito.when;
* specifying OPPORTUNISTIC containers in its resource requests, * specifying OPPORTUNISTIC containers in its resource requests,
* the AMRMProxyService on the NM, the DistributedScheduler RequestInterceptor * the AMRMProxyService on the NM, the DistributedScheduler RequestInterceptor
* on the NM and the DistributedSchedulingProtocol used by the framework to talk * on the NM and the DistributedSchedulingProtocol used by the framework to talk
* to the DistributedSchedulingAMService running on the RM. * to the OpportunisticContainerAllocatorAMService running on the RM.
*/ */
public class TestDistributedScheduling extends BaseAMRMProxyE2ETest { public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
@ -105,6 +105,8 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
conf = new YarnConfiguration(); conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
conf.setBoolean(YarnConfiguration.
OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true); conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true); conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true);
cluster.init(conf); cluster.init(conf);

View File

@ -2678,72 +2678,76 @@
<property> <property>
<description> <description>
Minimum memory (in MB) used for allocating a container through distributed Setting that controls whether opportunistic container allocation
scheduling. is enabled.
</description> </description>
<name>yarn.distributed-scheduling.min-container-memory-mb</name> <name>yarn.opportunistic-container-allocation.enabled</name>
<value>false</value>
</property>
<property>
<description>
Minimum memory (in MB) used for allocating an opportunistic container.
</description>
<name>yarn.opportunistic-containers.min-memory-mb</name>
<value>512</value> <value>512</value>
</property> </property>
<property> <property>
<description> <description>
Minimum virtual CPU cores used for allocating a container through Minimum virtual CPU cores used for allocating an opportunistic container.
distributed scheduling.
</description> </description>
<name>yarn.distributed-scheduling.min-container-vcores</name> <name>yarn.opportunistic-containers.min-vcores</name>
<value>1</value> <value>1</value>
</property> </property>
<property> <property>
<description> <description>
Maximum memory (in MB) used for allocating a container through distributed Maximum memory (in MB) used for allocating an opportunistic container.
scheduling.
</description> </description>
<name>yarn.distributed-scheduling.max-container-memory-mb</name> <name>yarn.opportunistic-containers.max-memory-mb</name>
<value>2048</value> <value>2048</value>
</property> </property>
<property> <property>
<description> <description>
Maximum virtual CPU cores used for allocating a container through Maximum virtual CPU cores used for allocating an opportunistic container.
distributed scheduling.
</description> </description>
<name>yarn.distributed-scheduling.max-container-vcores</name> <name>yarn.opportunistic-containers.max-vcores</name>
<value>4</value> <value>4</value>
</property> </property>
<property> <property>
<description> <description>
Incremental memory (in MB) used for allocating a container through Incremental memory (in MB) used for allocating an opportunistic container.
distributed scheduling.
</description> </description>
<name>yarn.distributed-scheduling.incr-container-memory-mb</name> <name>yarn.opportunistic-containers.incr-memory-mb</name>
<value>512</value> <value>512</value>
</property> </property>
<property> <property>
<description> <description>
Incremental virtual CPU cores used for allocating a container through Incremental virtual CPU cores used for allocating an opportunistic
distributed scheduling. container.
</description> </description>
<name>yarn.distributed-scheduling.incr-vcores</name> <name>yarn.opportunistic-containers.incr-vcores</name>
<value>1</value> <value>1</value>
</property> </property>
<property> <property>
<description> <description>
Container token expiry for container allocated via distributed scheduling. Container token expiry for opportunistic containers.
</description> </description>
<name>yarn.distributed-scheduling.container-token-expiry-ms</name> <name>yarn.opportunistic-containers.container-token-expiry-ms</name>
<value>600000</value> <value>600000</value>
</property> </property>
<property> <property>
<description> <description>
Number of nodes to be used by the LocalScheduler of a NodeManager for Number of nodes to be used by the Opportunistic Container Allocator for
dispatching containers during distributed scheduling. dispatching containers during container allocation.
</description> </description>
<name>yarn.distributed-scheduling.nodes-used</name> <name>yarn.opportunistic-container-allocation.nodes-used</name>
<value>10</value> <value>10</value>
</property> </property>

View File

@ -0,0 +1,378 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.scheduler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
/**
* <p>
* The OpportunisticContainerAllocator allocates containers on a given list of
* nodes, after modifying the container sizes to respect the limits set by the
* ResourceManager. It tries to distribute the containers as evenly as possible.
* </p>
*/
public class OpportunisticContainerAllocator {
/**
* This class encapsulates application specific parameters used to build a
* Container.
*/
public static class AllocationParams {
private Resource maxResource;
private Resource minResource;
private Resource incrementResource;
private int containerTokenExpiryInterval;
/**
* Return Max Resource.
* @return Resource
*/
public Resource getMaxResource() {
return maxResource;
}
/**
* Set Max Resource.
* @param maxResource Resource
*/
public void setMaxResource(Resource maxResource) {
this.maxResource = maxResource;
}
/**
* Get Min Resource.
* @return Resource
*/
public Resource getMinResource() {
return minResource;
}
/**
* Set Min Resource.
* @param minResource Resource
*/
public void setMinResource(Resource minResource) {
this.minResource = minResource;
}
/**
* Get Incremental Resource.
* @return Incremental Resource
*/
public Resource getIncrementResource() {
return incrementResource;
}
/**
* Set Incremental resource.
* @param incrementResource Resource
*/
public void setIncrementResource(Resource incrementResource) {
this.incrementResource = incrementResource;
}
/**
* Get Container Token Expiry interval.
* @return Container Token Expiry interval
*/
public int getContainerTokenExpiryInterval() {
return containerTokenExpiryInterval;
}
/**
* Set Container Token Expiry time in ms.
* @param containerTokenExpiryInterval Container Token Expiry in ms
*/
public void setContainerTokenExpiryInterval(
int containerTokenExpiryInterval) {
this.containerTokenExpiryInterval = containerTokenExpiryInterval;
}
}
/**
* A Container Id Generator.
*/
public static class ContainerIdGenerator {
protected volatile AtomicLong containerIdCounter = new AtomicLong(1);
/**
* This method can reset the generator to a specific value.
* @param containerIdStart containerId
*/
public void resetContainerIdCounter(long containerIdStart) {
this.containerIdCounter.set(containerIdStart);
}
/**
* Sets the underlying Atomic Long. To be used when implementation needs to
* share the underlying AtomicLong of an existing counter.
* @param counter AtomicLong
*/
public void setContainerIdCounter(AtomicLong counter) {
this.containerIdCounter = counter;
}
/**
* Generates a new long value. Default implementation increments the
* underlying AtomicLong. Sub classes are encouraged to over-ride this
* behaviour.
* @return Counter.
*/
public long generateContainerId() {
return this.containerIdCounter.incrementAndGet();
}
}
static class PartitionedResourceRequests {
private List<ResourceRequest> guaranteed = new ArrayList<>();
private List<ResourceRequest> opportunistic = new ArrayList<>();
public List<ResourceRequest> getGuaranteed() {
return guaranteed;
}
public List<ResourceRequest> getOpportunistic() {
return opportunistic;
}
}
private static final Log LOG =
LogFactory.getLog(OpportunisticContainerAllocator.class);
private static final ResourceCalculator RESOURCE_CALCULATOR =
new DominantResourceCalculator();
private final BaseContainerTokenSecretManager tokenSecretManager;
private int webpagePort;
/**
* Create a new Opportunistic Container Allocator.
* @param tokenSecretManager TokenSecretManager
* @param webpagePort Webpage Port
*/
public OpportunisticContainerAllocator(
BaseContainerTokenSecretManager tokenSecretManager, int webpagePort) {
this.tokenSecretManager = tokenSecretManager;
this.webpagePort = webpagePort;
}
/**
* Entry point into the Opportunistic Container Allocator.
* @param request AllocateRequest
* @param applicationAttemptId ApplicationAttemptId
* @param appContext App Specific OpportunisticContainerContext
* @param rmIdentifier RM Identifier
* @param appSubmitter App Submitter
* @return List of Containers.
* @throws YarnException YarnException
*/
public List<Container> allocateContainers(
AllocateRequest request, ApplicationAttemptId applicationAttemptId,
OpportunisticContainerContext appContext, long rmIdentifier,
String appSubmitter) throws YarnException {
// Partition requests into GUARANTEED and OPPORTUNISTIC reqs
PartitionedResourceRequests partitionedAsks =
partitionAskList(request.getAskList());
List<ContainerId> releasedContainers = request.getReleaseList();
int numReleasedContainers = releasedContainers.size();
if (numReleasedContainers > 0) {
LOG.info("AttemptID: " + applicationAttemptId + " released: "
+ numReleasedContainers);
appContext.getContainersAllocated().removeAll(releasedContainers);
}
// Also, update black list
ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest();
if (rbr != null) {
appContext.getBlacklist().removeAll(rbr.getBlacklistRemovals());
appContext.getBlacklist().addAll(rbr.getBlacklistAdditions());
}
// Add OPPORTUNISTIC reqs to the outstanding reqs
appContext.addToOutstandingReqs(partitionedAsks.getOpportunistic());
List<Container> allocatedContainers = new ArrayList<>();
for (Priority priority :
appContext.getOutstandingOpReqs().descendingKeySet()) {
// Allocated containers :
// Key = Requested Capability,
// Value = List of Containers of given Cap (The actual container size
// might be different than what is requested.. which is why
// we need the requested capability (key) to match against
// the outstanding reqs)
Map<Resource, List<Container>> allocated = allocate(rmIdentifier,
appContext, priority, applicationAttemptId, appSubmitter);
for (Map.Entry<Resource, List<Container>> e : allocated.entrySet()) {
appContext.matchAllocationToOutstandingRequest(
e.getKey(), e.getValue());
allocatedContainers.addAll(e.getValue());
}
}
// Send all the GUARANTEED Reqs to RM
request.setAskList(partitionedAsks.getGuaranteed());
return allocatedContainers;
}
private Map<Resource, List<Container>> allocate(long rmIdentifier,
OpportunisticContainerContext appContext, Priority priority,
ApplicationAttemptId appAttId, String userName) throws YarnException {
Map<Resource, List<Container>> containers = new HashMap<>();
for (ResourceRequest anyAsk :
appContext.getOutstandingOpReqs().get(priority).values()) {
allocateContainersInternal(rmIdentifier, appContext.getAppParams(),
appContext.getContainerIdGenerator(), appContext.getBlacklist(),
appAttId, appContext.getNodeMap(), userName, containers, anyAsk);
LOG.info("Opportunistic allocation requested for ["
+ "priority=" + anyAsk.getPriority()
+ ", num_containers=" + anyAsk.getNumContainers()
+ ", capability=" + anyAsk.getCapability() + "]"
+ " allocated = " + containers.get(anyAsk.getCapability()).size());
}
return containers;
}
private void allocateContainersInternal(long rmIdentifier,
AllocationParams appParams, ContainerIdGenerator idCounter,
Set<String> blacklist, ApplicationAttemptId id,
Map<String, NodeId> allNodes, String userName,
Map<Resource, List<Container>> containers, ResourceRequest anyAsk)
throws YarnException {
int toAllocate = anyAsk.getNumContainers()
- (containers.isEmpty() ? 0 :
containers.get(anyAsk.getCapability()).size());
List<NodeId> nodesForScheduling = new ArrayList<>();
for (Entry<String, NodeId> nodeEntry : allNodes.entrySet()) {
// Do not use blacklisted nodes for scheduling.
if (blacklist.contains(nodeEntry.getKey())) {
continue;
}
nodesForScheduling.add(nodeEntry.getValue());
}
int numAllocated = 0;
int nextNodeToSchedule = 0;
for (int numCont = 0; numCont < toAllocate; numCont++) {
nextNodeToSchedule++;
nextNodeToSchedule %= nodesForScheduling.size();
NodeId nodeId = nodesForScheduling.get(nextNodeToSchedule);
Container container = buildContainer(rmIdentifier, appParams, idCounter,
anyAsk, id, userName, nodeId);
List<Container> cList = containers.get(anyAsk.getCapability());
if (cList == null) {
cList = new ArrayList<>();
containers.put(anyAsk.getCapability(), cList);
}
cList.add(container);
numAllocated++;
LOG.info("Allocated [" + container.getId() + "] as opportunistic.");
}
LOG.info("Allocated " + numAllocated + " opportunistic containers.");
}
private Container buildContainer(long rmIdentifier,
AllocationParams appParams, ContainerIdGenerator idCounter,
ResourceRequest rr, ApplicationAttemptId id, String userName,
NodeId nodeId) throws YarnException {
ContainerId cId =
ContainerId.newContainerId(id, idCounter.generateContainerId());
// Normalize the resource asks (Similar to what the the RM scheduler does
// before accepting an ask)
Resource capability = normalizeCapability(appParams, rr);
long currTime = System.currentTimeMillis();
ContainerTokenIdentifier containerTokenIdentifier =
new ContainerTokenIdentifier(
cId, nodeId.getHost() + ":" + nodeId.getPort(), userName,
capability, currTime + appParams.containerTokenExpiryInterval,
tokenSecretManager.getCurrentKey().getKeyId(), rmIdentifier,
rr.getPriority(), currTime,
null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK,
ExecutionType.OPPORTUNISTIC);
byte[] pwd =
tokenSecretManager.createPassword(containerTokenIdentifier);
Token containerToken = newContainerToken(nodeId, pwd,
containerTokenIdentifier);
Container container = BuilderUtils.newContainer(
cId, nodeId, nodeId.getHost() + ":" + webpagePort,
capability, rr.getPriority(), containerToken,
containerTokenIdentifier.getExecutionType(),
rr.getAllocationRequestId());
return container;
}
private Resource normalizeCapability(AllocationParams appParams,
ResourceRequest ask) {
return Resources.normalize(RESOURCE_CALCULATOR,
ask.getCapability(), appParams.minResource, appParams.maxResource,
appParams.incrementResource);
}
private static Token newContainerToken(NodeId nodeId, byte[] password,
ContainerTokenIdentifier tokenIdentifier) {
// RPC layer client expects ip:port as service for tokens
InetSocketAddress addr = NetUtils.createSocketAddrForHost(nodeId.getHost(),
nodeId.getPort());
// NOTE: use SecurityUtil.setTokenService if this becomes a "real" token
Token containerToken = Token.newInstance(tokenIdentifier.getBytes(),
ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil
.buildTokenService(addr).toString());
return containerToken;
}
private PartitionedResourceRequests partitionAskList(List<ResourceRequest>
askList) {
PartitionedResourceRequests partitionedRequests =
new PartitionedResourceRequests();
for (ResourceRequest rr : askList) {
if (rr.getExecutionTypeRequest().getExecutionType() ==
ExecutionType.OPPORTUNISTIC) {
partitionedRequests.getOpportunistic().add(rr);
} else {
partitionedRequests.getGuaranteed().add(rr);
}
}
return partitionedRequests;
}
}

View File

@ -0,0 +1,178 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.scheduler;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.AllocationParams;
import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.ContainerIdGenerator;
/**
* This encapsulates application specific information used by the
* Opportunistic Container Allocator to allocate containers.
*/
public class OpportunisticContainerContext {
private static final Logger LOG = LoggerFactory
.getLogger(OpportunisticContainerContext.class);
// Currently just used to keep track of allocated containers.
// Can be used for reporting stats later.
private Set<ContainerId> containersAllocated = new HashSet<>();
private AllocationParams appParams =
new AllocationParams();
private ContainerIdGenerator containerIdGenerator =
new ContainerIdGenerator();
private Map<String, NodeId> nodeMap = new LinkedHashMap<>();
// Mapping of NodeId to NodeTokens. Populated either from RM response or
// generated locally if required.
private Map<NodeId, NMToken> nodeTokens = new HashMap<>();
private final Set<String> blacklist = new HashSet<>();
// This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by Priority,
// Resource Name (Host/rack/any) and capability. This mapping is required
// to match a received Container to an outstanding OPPORTUNISTIC
// ResourceRequest (ask).
private final TreeMap<Priority, Map<Resource, ResourceRequest>>
outstandingOpReqs = new TreeMap<>();
public Set<ContainerId> getContainersAllocated() {
return containersAllocated;
}
public OpportunisticContainerAllocator.AllocationParams getAppParams() {
return appParams;
}
public ContainerIdGenerator getContainerIdGenerator() {
return containerIdGenerator;
}
public void setContainerIdGenerator(
ContainerIdGenerator containerIdGenerator) {
this.containerIdGenerator = containerIdGenerator;
}
public Map<String, NodeId> getNodeMap() {
return nodeMap;
}
public Map<NodeId, NMToken> getNodeTokens() {
return nodeTokens;
}
public Set<String> getBlacklist() {
return blacklist;
}
public TreeMap<Priority, Map<Resource, ResourceRequest>>
getOutstandingOpReqs() {
return outstandingOpReqs;
}
/**
* Takes a list of ResourceRequests (asks), extracts the key information viz.
* (Priority, ResourceName, Capability) and adds to the outstanding
* OPPORTUNISTIC outstandingOpReqs map. The nested map is required to enforce
* the current YARN constraint that only a single ResourceRequest can exist at
* a give Priority and Capability.
*
* @param resourceAsks the list with the {@link ResourceRequest}s
*/
public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) {
for (ResourceRequest request : resourceAsks) {
Priority priority = request.getPriority();
// TODO: Extend for Node/Rack locality. We only handle ANY requests now
if (!ResourceRequest.isAnyLocation(request.getResourceName())) {
continue;
}
if (request.getNumContainers() == 0) {
continue;
}
Map<Resource, ResourceRequest> reqMap =
outstandingOpReqs.get(priority);
if (reqMap == null) {
reqMap = new HashMap<>();
outstandingOpReqs.put(priority, reqMap);
}
ResourceRequest resourceRequest = reqMap.get(request.getCapability());
if (resourceRequest == null) {
resourceRequest = request;
reqMap.put(request.getCapability(), request);
} else {
resourceRequest.setNumContainers(
resourceRequest.getNumContainers() + request.getNumContainers());
}
if (ResourceRequest.isAnyLocation(request.getResourceName())) {
LOG.info("# of outstandingOpReqs in ANY (at priority = " + priority
+ ", with capability = " + request.getCapability() + " ) : "
+ resourceRequest.getNumContainers());
}
}
}
/**
* This method matches a returned list of Container Allocations to any
* outstanding OPPORTUNISTIC ResourceRequest.
* @param capability Capability
* @param allocatedContainers Allocated Containers
*/
public void matchAllocationToOutstandingRequest(Resource capability,
List<Container> allocatedContainers) {
for (Container c : allocatedContainers) {
containersAllocated.add(c.getId());
Map<Resource, ResourceRequest> asks =
outstandingOpReqs.get(c.getPriority());
if (asks == null) {
continue;
}
ResourceRequest rr = asks.get(capability);
if (rr != null) {
rr.setNumContainers(rr.getNumContainers() - 1);
if (rr.getNumContainers() == 0) {
asks.remove(capability);
}
}
}
}
}

View File

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Utility classes used for Scheduling.
*/
package org.apache.hadoop.yarn.server.scheduler;

View File

@ -33,7 +33,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;

View File

@ -71,7 +71,7 @@ import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ScriptBasedNodeLabel
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer; import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer;
@ -327,7 +327,8 @@ public class NodeManager extends CompositeService
addService(nodeHealthChecker); addService(nodeHealthChecker);
boolean isDistSchedulingEnabled = boolean isDistSchedulingEnabled =
conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, conf.getBoolean(YarnConfiguration.
OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED,
YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT); YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT);
this.context = createNMContext(containerTokenSecretManager, this.context = createNMContext(containerTokenSecretManager,
@ -361,8 +362,8 @@ public class NodeManager extends CompositeService
((NMContext) context).setWebServer(webServer); ((NMContext) context).setWebServer(webServer);
((NMContext) context).setQueueableContainerAllocator( ((NMContext) context).setQueueableContainerAllocator(
new OpportunisticContainerAllocator(nodeStatusUpdater, context, new OpportunisticContainerAllocator(
webServer.getPort())); context.getContainerTokenSecretManager(), webServer.getPort()));
dispatcher.register(ContainerManagerEventType.class, containerManager); dispatcher.register(ContainerManagerEventType.class, containerManager);
dispatcher.register(NodeManagerEventType.class, this); dispatcher.register(NodeManagerEventType.class, this);

View File

@ -62,7 +62,7 @@ public final class DefaultRequestInterceptor extends
AbstractRequestInterceptor { AbstractRequestInterceptor {
private static final Logger LOG = LoggerFactory private static final Logger LOG = LoggerFactory
.getLogger(DefaultRequestInterceptor.class); .getLogger(DefaultRequestInterceptor.class);
private DistributedSchedulingAMProtocol rmClient; private ApplicationMasterProtocol rmClient;
private UserGroupInformation user = null; private UserGroupInformation user = null;
@Override @Override
@ -76,15 +76,7 @@ public final class DefaultRequestInterceptor extends
user.addToken(appContext.getAMRMToken()); user.addToken(appContext.getAMRMToken());
final Configuration conf = this.getConf(); final Configuration conf = this.getConf();
rmClient = user.doAs( rmClient = createRMClient(appContext, conf);
new PrivilegedExceptionAction<DistributedSchedulingAMProtocol>() {
@Override
public DistributedSchedulingAMProtocol run() throws Exception {
setAMRMTokenService(conf);
return ServerRMProxy.createRMProxy(conf,
DistributedSchedulingAMProtocol.class);
}
});
} catch (IOException e) { } catch (IOException e) {
String message = String message =
"Error while creating of RM app master service proxy for attemptId:" "Error while creating of RM app master service proxy for attemptId:"
@ -100,6 +92,32 @@ public final class DefaultRequestInterceptor extends
} }
} }
private ApplicationMasterProtocol createRMClient(
AMRMProxyApplicationContext appContext, final Configuration conf)
throws IOException, InterruptedException {
if (appContext.getNMCotext().isDistributedSchedulingEnabled()) {
return user.doAs(
new PrivilegedExceptionAction<DistributedSchedulingAMProtocol>() {
@Override
public DistributedSchedulingAMProtocol run() throws Exception {
setAMRMTokenService(conf);
return ServerRMProxy.createRMProxy(conf,
DistributedSchedulingAMProtocol.class);
}
});
} else {
return user.doAs(
new PrivilegedExceptionAction<ApplicationMasterProtocol>() {
@Override
public ApplicationMasterProtocol run() throws Exception {
setAMRMTokenService(conf);
return ClientRMProxy.createRMProxy(conf,
ApplicationMasterProtocol.class);
}
});
}
}
@Override @Override
public RegisterApplicationMasterResponse registerApplicationMaster( public RegisterApplicationMasterResponse registerApplicationMaster(
final RegisterApplicationMasterRequest request) final RegisterApplicationMasterRequest request)
@ -127,9 +145,15 @@ public final class DefaultRequestInterceptor extends
registerApplicationMasterForDistributedScheduling registerApplicationMasterForDistributedScheduling
(RegisterApplicationMasterRequest request) throws YarnException, (RegisterApplicationMasterRequest request) throws YarnException,
IOException { IOException {
if (getApplicationContext().getNMCotext()
.isDistributedSchedulingEnabled()) {
LOG.info("Forwarding registerApplicationMasterForDistributedScheduling" + LOG.info("Forwarding registerApplicationMasterForDistributedScheduling" +
"request to the real YARN RM"); "request to the real YARN RM");
return rmClient.registerApplicationMasterForDistributedScheduling(request); return ((DistributedSchedulingAMProtocol)rmClient)
.registerApplicationMasterForDistributedScheduling(request);
} else {
throw new YarnException("Distributed Scheduling is not enabled !!");
}
} }
@Override @Override
@ -140,13 +164,18 @@ public final class DefaultRequestInterceptor extends
LOG.debug("Forwarding allocateForDistributedScheduling request" + LOG.debug("Forwarding allocateForDistributedScheduling request" +
"to the real YARN RM"); "to the real YARN RM");
} }
if (getApplicationContext().getNMCotext()
.isDistributedSchedulingEnabled()) {
DistributedSchedulingAllocateResponse allocateResponse = DistributedSchedulingAllocateResponse allocateResponse =
rmClient.allocateForDistributedScheduling(request); ((DistributedSchedulingAMProtocol)rmClient)
.allocateForDistributedScheduling(request);
if (allocateResponse.getAllocateResponse().getAMRMToken() != null) { if (allocateResponse.getAllocateResponse().getAMRMToken() != null) {
updateAMRMToken(allocateResponse.getAllocateResponse().getAMRMToken()); updateAMRMToken(allocateResponse.getAllocateResponse().getAMRMToken());
} }
return allocateResponse; return allocateResponse;
} else {
throw new YarnException("Distributed Scheduling is not enabled !!");
}
} }
@Override @Override

View File

@ -32,34 +32,23 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyApplicationContext; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyApplicationContext;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AbstractRequestInterceptor; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AbstractRequestInterceptor;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
/** /**
* <p>The DistributedScheduler runs on the NodeManager and is modeled as an * <p>The DistributedScheduler runs on the NodeManager and is modeled as an
@ -76,74 +65,49 @@ import java.util.TreeMap;
*/ */
public final class DistributedScheduler extends AbstractRequestInterceptor { public final class DistributedScheduler extends AbstractRequestInterceptor {
static class PartitionedResourceRequests {
private List<ResourceRequest> guaranteed = new ArrayList<>();
private List<ResourceRequest> opportunistic = new ArrayList<>();
public List<ResourceRequest> getGuaranteed() {
return guaranteed;
}
public List<ResourceRequest> getOpportunistic() {
return opportunistic;
}
}
static class DistributedSchedulerParams {
Resource maxResource;
Resource minResource;
Resource incrementResource;
int containerTokenExpiryInterval;
}
private static final Logger LOG = LoggerFactory private static final Logger LOG = LoggerFactory
.getLogger(DistributedScheduler.class); .getLogger(DistributedScheduler.class);
private final static RecordFactory RECORD_FACTORY = private final static RecordFactory RECORD_FACTORY =
RecordFactoryProvider.getRecordFactory(null); RecordFactoryProvider.getRecordFactory(null);
// Currently just used to keep track of allocated containers. private OpportunisticContainerContext oppContainerContext =
// Can be used for reporting stats later. new OpportunisticContainerContext();
private Set<ContainerId> containersAllocated = new HashSet<>();
private DistributedSchedulerParams appParams =
new DistributedSchedulerParams();
private final OpportunisticContainerAllocator.ContainerIdCounter
containerIdCounter =
new OpportunisticContainerAllocator.ContainerIdCounter();
private Map<String, NodeId> nodeList = new LinkedHashMap<>();
// Mapping of NodeId to NodeTokens. Populated either from RM response or
// generated locally if required.
private Map<NodeId, NMToken> nodeTokens = new HashMap<>();
final Set<String> blacklist = new HashSet<>();
// This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by Priority,
// Resource Name (Host/rack/any) and capability. This mapping is required
// to match a received Container to an outstanding OPPORTUNISTIC
// ResourceRequest (ask).
final TreeMap<Priority, Map<Resource, ResourceRequest>>
outstandingOpReqs = new TreeMap<>();
private ApplicationAttemptId applicationAttemptId; private ApplicationAttemptId applicationAttemptId;
private OpportunisticContainerAllocator containerAllocator; private OpportunisticContainerAllocator containerAllocator;
private NMTokenSecretManagerInNM nmSecretManager; private NMTokenSecretManagerInNM nmSecretManager;
private String appSubmitter; private String appSubmitter;
private long rmIdentifier;
public void init(AMRMProxyApplicationContext appContext) { public void init(AMRMProxyApplicationContext applicationContext) {
super.init(appContext); super.init(applicationContext);
initLocal(appContext.getApplicationAttemptId(), initLocal(applicationContext.getNMCotext().getNodeStatusUpdater()
appContext.getNMCotext().getContainerAllocator(), .getRMIdentifier(),
appContext.getNMCotext().getNMTokenSecretManager(), applicationContext.getApplicationAttemptId(),
appContext.getUser()); applicationContext.getNMCotext().getContainerAllocator(),
applicationContext.getNMCotext().getNMTokenSecretManager(),
applicationContext.getUser());
} }
@VisibleForTesting @VisibleForTesting
void initLocal(ApplicationAttemptId applicationAttemptId, void initLocal(long rmId, ApplicationAttemptId appAttemptId,
OpportunisticContainerAllocator containerAllocator, OpportunisticContainerAllocator oppContainerAllocator,
NMTokenSecretManagerInNM nmSecretManager, String appSubmitter) { NMTokenSecretManagerInNM nmSecretManager, String appSubmitter) {
this.applicationAttemptId = applicationAttemptId; this.rmIdentifier = rmId;
this.containerAllocator = containerAllocator; this.applicationAttemptId = appAttemptId;
this.containerAllocator = oppContainerAllocator;
this.nmSecretManager = nmSecretManager; this.nmSecretManager = nmSecretManager;
this.appSubmitter = appSubmitter; this.appSubmitter = appSubmitter;
// Overrides the Generator to decrement container id.
this.oppContainerContext.setContainerIdGenerator(
new OpportunisticContainerAllocator.ContainerIdGenerator() {
@Override
public long generateContainerId() {
return this.containerIdCounter.decrementAndGet();
}
});
} }
/** /**
@ -202,7 +166,8 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
if (allocatedContainers.size() > 0) { if (allocatedContainers.size() > 0) {
response.getAllocatedContainers().addAll(allocatedContainers); response.getAllocatedContainers().addAll(allocatedContainers);
for (Container alloc : allocatedContainers) { for (Container alloc : allocatedContainers) {
if (!nodeTokens.containsKey(alloc.getNodeId())) { if (!oppContainerContext.getNodeTokens().containsKey(
alloc.getNodeId())) {
newTokens.add(nmSecretManager.generateNMToken(appSubmitter, alloc)); newTokens.add(nmSecretManager.generateNMToken(appSubmitter, alloc));
} }
} }
@ -212,115 +177,34 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
} }
} }
private PartitionedResourceRequests partitionAskList(List<ResourceRequest>
askList) {
PartitionedResourceRequests partitionedRequests =
new PartitionedResourceRequests();
for (ResourceRequest rr : askList) {
if (rr.getExecutionTypeRequest().getExecutionType() ==
ExecutionType.OPPORTUNISTIC) {
partitionedRequests.getOpportunistic().add(rr);
} else {
partitionedRequests.getGuaranteed().add(rr);
}
}
return partitionedRequests;
}
private void updateParameters( private void updateParameters(
RegisterDistributedSchedulingAMResponse registerResponse) { RegisterDistributedSchedulingAMResponse registerResponse) {
appParams.minResource = registerResponse.getMinContainerResource(); oppContainerContext.getAppParams().setMinResource(
appParams.maxResource = registerResponse.getMaxContainerResource(); registerResponse.getMinContainerResource());
appParams.incrementResource = oppContainerContext.getAppParams().setMaxResource(
registerResponse.getIncrContainerResource(); registerResponse.getMaxContainerResource());
if (appParams.incrementResource == null) { oppContainerContext.getAppParams().setIncrementResource(
appParams.incrementResource = appParams.minResource; registerResponse.getIncrContainerResource());
if (oppContainerContext.getAppParams().getIncrementResource() == null) {
oppContainerContext.getAppParams().setIncrementResource(
oppContainerContext.getAppParams().getMinResource());
} }
appParams.containerTokenExpiryInterval = registerResponse oppContainerContext.getAppParams().setContainerTokenExpiryInterval(
.getContainerTokenExpiryInterval(); registerResponse.getContainerTokenExpiryInterval());
containerIdCounter oppContainerContext.getContainerIdGenerator()
.resetContainerIdCounter(registerResponse.getContainerIdStart()); .resetContainerIdCounter(registerResponse.getContainerIdStart());
setNodeList(registerResponse.getNodesForScheduling()); setNodeList(registerResponse.getNodesForScheduling());
} }
/**
* Takes a list of ResourceRequests (asks), extracts the key information viz.
* (Priority, ResourceName, Capability) and adds to the outstanding
* OPPORTUNISTIC outstandingOpReqs map. The nested map is required to enforce
* the current YARN constraint that only a single ResourceRequest can exist at
* a give Priority and Capability.
*
* @param resourceAsks the list with the {@link ResourceRequest}s
*/
public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) {
for (ResourceRequest request : resourceAsks) {
Priority priority = request.getPriority();
// TODO: Extend for Node/Rack locality. We only handle ANY requests now
if (!ResourceRequest.isAnyLocation(request.getResourceName())) {
continue;
}
if (request.getNumContainers() == 0) {
continue;
}
Map<Resource, ResourceRequest> reqMap =
this.outstandingOpReqs.get(priority);
if (reqMap == null) {
reqMap = new HashMap<>();
this.outstandingOpReqs.put(priority, reqMap);
}
ResourceRequest resourceRequest = reqMap.get(request.getCapability());
if (resourceRequest == null) {
resourceRequest = request;
reqMap.put(request.getCapability(), request);
} else {
resourceRequest.setNumContainers(
resourceRequest.getNumContainers() + request.getNumContainers());
}
if (ResourceRequest.isAnyLocation(request.getResourceName())) {
LOG.info("# of outstandingOpReqs in ANY (at priority = " + priority
+ ", with capability = " + request.getCapability() + " ) : "
+ resourceRequest.getNumContainers());
}
}
}
/**
* This method matches a returned list of Container Allocations to any
* outstanding OPPORTUNISTIC ResourceRequest.
*/
private void matchAllocationToOutstandingRequest(Resource capability,
List<Container> allocatedContainers) {
for (Container c : allocatedContainers) {
containersAllocated.add(c.getId());
Map<Resource, ResourceRequest> asks =
outstandingOpReqs.get(c.getPriority());
if (asks == null)
continue;
ResourceRequest rr = asks.get(capability);
if (rr != null) {
rr.setNumContainers(rr.getNumContainers() - 1);
if (rr.getNumContainers() == 0) {
asks.remove(capability);
}
}
}
}
private void setNodeList(List<NodeId> nodeList) { private void setNodeList(List<NodeId> nodeList) {
this.nodeList.clear(); oppContainerContext.getNodeMap().clear();
addToNodeList(nodeList); addToNodeList(nodeList);
} }
private void addToNodeList(List<NodeId> nodes) { private void addToNodeList(List<NodeId> nodes) {
for (NodeId n : nodes) { for (NodeId n : nodes) {
this.nodeList.put(n.getHost(), n); oppContainerContext.getNodeMap().put(n.getHost(), n);
} }
} }
@ -345,52 +229,13 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
LOG.debug("Forwarding allocate request to the" + LOG.debug("Forwarding allocate request to the" +
"Distributed Scheduler Service on YARN RM"); "Distributed Scheduler Service on YARN RM");
} }
// Partition requests into GUARANTEED and OPPORTUNISTIC reqs List<Container> allocatedContainers =
PartitionedResourceRequests partitionedAsks = containerAllocator.allocateContainers(
partitionAskList(request.getAllocateRequest().getAskList()); request.getAllocateRequest(), applicationAttemptId,
oppContainerContext, rmIdentifier, appSubmitter);
List<ContainerId> releasedContainers =
request.getAllocateRequest().getReleaseList();
int numReleasedContainers = releasedContainers.size();
if (numReleasedContainers > 0) {
LOG.info("AttemptID: " + applicationAttemptId + " released: "
+ numReleasedContainers);
containersAllocated.removeAll(releasedContainers);
}
// Also, update black list
ResourceBlacklistRequest rbr =
request.getAllocateRequest().getResourceBlacklistRequest();
if (rbr != null) {
blacklist.removeAll(rbr.getBlacklistRemovals());
blacklist.addAll(rbr.getBlacklistAdditions());
}
// Add OPPORTUNISTIC reqs to the outstanding reqs
addToOutstandingReqs(partitionedAsks.getOpportunistic());
List<Container> allocatedContainers = new ArrayList<>();
for (Priority priority : outstandingOpReqs.descendingKeySet()) {
// Allocated containers :
// Key = Requested Capability,
// Value = List of Containers of given Cap (The actual container size
// might be different than what is requested.. which is why
// we need the requested capability (key) to match against
// the outstanding reqs)
Map<Resource, List<Container>> allocated =
containerAllocator.allocate(this.appParams, containerIdCounter,
outstandingOpReqs.get(priority).values(), blacklist,
applicationAttemptId, nodeList, appSubmitter);
for (Map.Entry<Resource, List<Container>> e : allocated.entrySet()) {
matchAllocationToOutstandingRequest(e.getKey(), e.getValue());
allocatedContainers.addAll(e.getValue());
}
}
request.setAllocatedContainers(allocatedContainers); request.setAllocatedContainers(allocatedContainers);
// Send all the GUARANTEED Reqs to RM
request.getAllocateRequest().setAskList(partitionedAsks.getGuaranteed());
DistributedSchedulingAllocateResponse dsResp = DistributedSchedulingAllocateResponse dsResp =
getNextInterceptor().allocateForDistributedScheduling(request); getNextInterceptor().allocateForDistributedScheduling(request);
@ -398,7 +243,7 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
setNodeList(dsResp.getNodesForScheduling()); setNodeList(dsResp.getNodesForScheduling());
List<NMToken> nmTokens = dsResp.getAllocateResponse().getNMTokens(); List<NMToken> nmTokens = dsResp.getAllocateResponse().getNMTokens();
for (NMToken nmToken : nmTokens) { for (NMToken nmToken : nmTokens) {
nodeTokens.put(nmToken.getNodeId(), nmToken); oppContainerContext.getNodeTokens().put(nmToken.getNodeId(), nmToken);
} }
List<ContainerStatus> completedContainers = List<ContainerStatus> completedContainers =
@ -407,7 +252,8 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
// Only account for opportunistic containers // Only account for opportunistic containers
for (ContainerStatus cs : completedContainers) { for (ContainerStatus cs : completedContainers) {
if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) { if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
containersAllocated.remove(cs.getContainerId()); oppContainerContext.getContainersAllocated()
.remove(cs.getContainerId());
} }
} }
@ -417,9 +263,9 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
dsResp.getAllocateResponse(), nmTokens, allocatedContainers); dsResp.getAllocateResponse(), nmTokens, allocatedContainers);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug( LOG.debug("Number of opportunistic containers currently" +
"Number of opportunistic containers currently allocated by" + "allocated by application: " + oppContainerContext
"application: " + containersAllocated.size()); .getContainersAllocated().size());
} }
return dsResp; return dsResp;
} }

View File

@ -1,190 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.scheduler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.scheduler.DistributedScheduler.DistributedSchedulerParams;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicLong;
/**
* <p>
* The OpportunisticContainerAllocator allocates containers on a given list of
* nodes, after modifying the container sizes to respect the limits set by the
* ResourceManager. It tries to distribute the containers as evenly as possible.
* It also uses the <code>NMTokenSecretManagerInNM</code> to generate the
* required NM tokens for the allocated containers.
* </p>
*/
public class OpportunisticContainerAllocator {
private static final Log LOG =
LogFactory.getLog(OpportunisticContainerAllocator.class);
private static final ResourceCalculator RESOURCE_CALCULATOR =
new DominantResourceCalculator();
static class ContainerIdCounter {
final AtomicLong containerIdCounter = new AtomicLong(1);
void resetContainerIdCounter(long containerIdStart) {
this.containerIdCounter.set(containerIdStart);
}
long generateContainerId() {
return this.containerIdCounter.decrementAndGet();
}
}
private final NodeStatusUpdater nodeStatusUpdater;
private final Context context;
private int webpagePort;
public OpportunisticContainerAllocator(NodeStatusUpdater nodeStatusUpdater,
Context context, int webpagePort) {
this.nodeStatusUpdater = nodeStatusUpdater;
this.context = context;
this.webpagePort = webpagePort;
}
public Map<Resource, List<Container>> allocate(
DistributedSchedulerParams appParams, ContainerIdCounter idCounter,
Collection<ResourceRequest> resourceAsks, Set<String> blacklist,
ApplicationAttemptId appAttId, Map<String, NodeId> allNodes,
String userName) throws YarnException {
Map<Resource, List<Container>> containers = new HashMap<>();
for (ResourceRequest anyAsk : resourceAsks) {
allocateOpportunisticContainers(appParams, idCounter, blacklist, appAttId,
allNodes, userName, containers, anyAsk);
LOG.info("Opportunistic allocation requested for ["
+ "priority=" + anyAsk.getPriority()
+ ", num_containers=" + anyAsk.getNumContainers()
+ ", capability=" + anyAsk.getCapability() + "]"
+ " allocated = " + containers.get(anyAsk.getCapability()).size());
}
return containers;
}
private void allocateOpportunisticContainers(
DistributedSchedulerParams appParams, ContainerIdCounter idCounter,
Set<String> blacklist, ApplicationAttemptId id,
Map<String, NodeId> allNodes, String userName,
Map<Resource, List<Container>> containers, ResourceRequest anyAsk)
throws YarnException {
int toAllocate = anyAsk.getNumContainers()
- (containers.isEmpty() ? 0 :
containers.get(anyAsk.getCapability()).size());
List<NodeId> nodesForScheduling = new ArrayList<>();
for (Entry<String, NodeId> nodeEntry : allNodes.entrySet()) {
// Do not use blacklisted nodes for scheduling.
if (blacklist.contains(nodeEntry.getKey())) {
continue;
}
nodesForScheduling.add(nodeEntry.getValue());
}
int numAllocated = 0;
int nextNodeToSchedule = 0;
for (int numCont = 0; numCont < toAllocate; numCont++) {
nextNodeToSchedule++;
nextNodeToSchedule %= nodesForScheduling.size();
NodeId nodeId = nodesForScheduling.get(nextNodeToSchedule);
Container container = buildContainer(appParams, idCounter, anyAsk, id,
userName, nodeId);
List<Container> cList = containers.get(anyAsk.getCapability());
if (cList == null) {
cList = new ArrayList<>();
containers.put(anyAsk.getCapability(), cList);
}
cList.add(container);
numAllocated++;
LOG.info("Allocated [" + container.getId() + "] as opportunistic.");
}
LOG.info("Allocated " + numAllocated + " opportunistic containers.");
}
private Container buildContainer(DistributedSchedulerParams appParams,
ContainerIdCounter idCounter, ResourceRequest rr, ApplicationAttemptId id,
String userName, NodeId nodeId) throws YarnException {
ContainerId cId =
ContainerId.newContainerId(id, idCounter.generateContainerId());
// Normalize the resource asks (Similar to what the the RM scheduler does
// before accepting an ask)
Resource capability = normalizeCapability(appParams, rr);
long currTime = System.currentTimeMillis();
ContainerTokenIdentifier containerTokenIdentifier =
new ContainerTokenIdentifier(
cId, nodeId.getHost() + ":" + nodeId.getPort(), userName,
capability, currTime + appParams.containerTokenExpiryInterval,
context.getContainerTokenSecretManager().getCurrentKey().getKeyId(),
nodeStatusUpdater.getRMIdentifier(), rr.getPriority(), currTime,
null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK,
ExecutionType.OPPORTUNISTIC);
byte[] pwd =
context.getContainerTokenSecretManager().createPassword(
containerTokenIdentifier);
Token containerToken = newContainerToken(nodeId, pwd,
containerTokenIdentifier);
Container container = BuilderUtils.newContainer(
cId, nodeId, nodeId.getHost() + ":" + webpagePort,
capability, rr.getPriority(), containerToken,
containerTokenIdentifier.getExecutionType(),
rr.getAllocationRequestId());
return container;
}
private Resource normalizeCapability(DistributedSchedulerParams appParams,
ResourceRequest ask) {
return Resources.normalize(RESOURCE_CALCULATOR,
ask.getCapability(), appParams.minResource, appParams.maxResource,
appParams.incrementResource);
}
public static Token newContainerToken(NodeId nodeId, byte[] password,
ContainerTokenIdentifier tokenIdentifier) {
// RPC layer client expects ip:port as service for tokens
InetSocketAddress addr = NetUtils.createSocketAddrForHost(nodeId.getHost(),
nodeId.getPort());
// NOTE: use SecurityUtil.setTokenService if this becomes a "real" token
Token containerToken = Token.newInstance(tokenIdentifier.getBytes(),
ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil
.buildTokenService(addr).toString());
return containerToken;
}
}

View File

@ -67,7 +67,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;

View File

@ -38,11 +38,12 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAl
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert; import org.junit.Assert;
@ -189,7 +190,6 @@ public class TestDistributedScheduler {
DistributedScheduler distributedScheduler) { DistributedScheduler distributedScheduler) {
NodeStatusUpdater nodeStatusUpdater = Mockito.mock(NodeStatusUpdater.class); NodeStatusUpdater nodeStatusUpdater = Mockito.mock(NodeStatusUpdater.class);
Mockito.when(nodeStatusUpdater.getRMIdentifier()).thenReturn(12345l); Mockito.when(nodeStatusUpdater.getRMIdentifier()).thenReturn(12345l);
Context context = Mockito.mock(Context.class);
NMContainerTokenSecretManager nmContainerTokenSecretManager = new NMContainerTokenSecretManager nmContainerTokenSecretManager = new
NMContainerTokenSecretManager(conf); NMContainerTokenSecretManager(conf);
MasterKey mKey = new MasterKey() { MasterKey mKey = new MasterKey() {
@ -207,15 +207,13 @@ public class TestDistributedScheduler {
public void setBytes(ByteBuffer bytes) {} public void setBytes(ByteBuffer bytes) {}
}; };
nmContainerTokenSecretManager.setMasterKey(mKey); nmContainerTokenSecretManager.setMasterKey(mKey);
Mockito.when(context.getContainerTokenSecretManager()).thenReturn
(nmContainerTokenSecretManager);
OpportunisticContainerAllocator containerAllocator = OpportunisticContainerAllocator containerAllocator =
new OpportunisticContainerAllocator(nodeStatusUpdater, context, 7777); new OpportunisticContainerAllocator(nmContainerTokenSecretManager, 77);
NMTokenSecretManagerInNM nmTokenSecretManagerInNM = NMTokenSecretManagerInNM nmTokenSecretManagerInNM =
new NMTokenSecretManagerInNM(); new NMTokenSecretManagerInNM();
nmTokenSecretManagerInNM.setMasterKey(mKey); nmTokenSecretManagerInNM.setMasterKey(mKey);
distributedScheduler.initLocal( distributedScheduler.initLocal(1234,
ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1), ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1),
containerAllocator, nmTokenSecretManagerInNM, "test"); containerAllocator, nmTokenSecretManagerInNM, "test");

View File

@ -73,18 +73,19 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
/** /**
* The DistributedSchedulingAMService is started instead of the * The OpportunisticContainerAllocatorAMService is started instead of the
* ApplicationMasterService if distributed scheduling is enabled for the YARN * ApplicationMasterService if distributed scheduling is enabled for the YARN
* cluster. * cluster.
* It extends the functionality of the ApplicationMasterService by servicing * It extends the functionality of the ApplicationMasterService by servicing
* clients (AMs and AMRMProxy request interceptors) that understand the * clients (AMs and AMRMProxy request interceptors) that understand the
* DistributedSchedulingProtocol. * DistributedSchedulingProtocol.
*/ */
public class DistributedSchedulingAMService extends ApplicationMasterService public class OpportunisticContainerAllocatorAMService
implements DistributedSchedulingAMProtocol, EventHandler<SchedulerEvent> { extends ApplicationMasterService implements DistributedSchedulingAMProtocol,
EventHandler<SchedulerEvent> {
private static final Log LOG = private static final Log LOG =
LogFactory.getLog(DistributedSchedulingAMService.class); LogFactory.getLog(OpportunisticContainerAllocatorAMService.class);
private final NodeQueueLoadMonitor nodeMonitor; private final NodeQueueLoadMonitor nodeMonitor;
@ -94,12 +95,13 @@ public class DistributedSchedulingAMService extends ApplicationMasterService
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
private final int k; private final int k;
public DistributedSchedulingAMService(RMContext rmContext, public OpportunisticContainerAllocatorAMService(RMContext rmContext,
YarnScheduler scheduler) { YarnScheduler scheduler) {
super(DistributedSchedulingAMService.class.getName(), rmContext, scheduler); super(OpportunisticContainerAllocatorAMService.class.getName(),
rmContext, scheduler);
this.k = rmContext.getYarnConfiguration().getInt( this.k = rmContext.getYarnConfiguration().getInt(
YarnConfiguration.DIST_SCHEDULING_NODES_NUMBER_USED, YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED,
YarnConfiguration.DIST_SCHEDULING_NODES_NUMBER_USED_DEFAULT); YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED_DEFAULT);
long nodeSortInterval = rmContext.getYarnConfiguration().getLong( long nodeSortInterval = rmContext.getYarnConfiguration().getLong(
YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS,
YarnConfiguration. YarnConfiguration.
@ -149,6 +151,7 @@ public class DistributedSchedulingAMService extends ApplicationMasterService
@Override @Override
public Server getServer(YarnRPC rpc, Configuration serverConf, public Server getServer(YarnRPC rpc, Configuration serverConf,
InetSocketAddress addr, AMRMTokenSecretManager secretManager) { InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
if (YarnConfiguration.isDistSchedulingEnabled(serverConf)) {
Server server = rpc.getServer(DistributedSchedulingAMProtocol.class, this, Server server = rpc.getServer(DistributedSchedulingAMProtocol.class, this,
addr, serverConf, secretManager, addr, serverConf, secretManager,
serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT, serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
@ -162,6 +165,8 @@ public class DistributedSchedulingAMService extends ApplicationMasterService
new ApplicationMasterProtocolPBServiceImpl(this))); new ApplicationMasterProtocolPBServiceImpl(this)));
return server; return server;
} }
return super.getServer(rpc, serverConf, addr, secretManager);
}
@Override @Override
public RegisterApplicationMasterResponse registerApplicationMaster public RegisterApplicationMasterResponse registerApplicationMaster
@ -196,40 +201,41 @@ public class DistributedSchedulingAMService extends ApplicationMasterService
dsResp.setMinContainerResource( dsResp.setMinContainerResource(
Resource.newInstance( Resource.newInstance(
getConfig().getInt( getConfig().getInt(
YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB, YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB,
YarnConfiguration. YarnConfiguration.
DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB_DEFAULT), OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB_DEFAULT),
getConfig().getInt( getConfig().getInt(
YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_VCORES, YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES,
YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_VCORES_DEFAULT) YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES_DEFAULT)
) )
); );
dsResp.setMaxContainerResource( dsResp.setMaxContainerResource(
Resource.newInstance( Resource.newInstance(
getConfig().getInt( getConfig().getInt(
YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_MB, YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB,
YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_MB_DEFAULT), YarnConfiguration
.OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB_DEFAULT),
getConfig().getInt( getConfig().getInt(
YarnConfiguration.DIST_SCHEDULING_MAX_CONTAINER_VCORES, YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES,
YarnConfiguration.DIST_SCHEDULING_MAX_CONTAINER_VCORES_DEFAULT) YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES_DEFAULT)
) )
); );
dsResp.setIncrContainerResource( dsResp.setIncrContainerResource(
Resource.newInstance( Resource.newInstance(
getConfig().getInt( getConfig().getInt(
YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB, YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB,
YarnConfiguration. YarnConfiguration.
DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB_DEFAULT), OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB_DEFAULT),
getConfig().getInt( getConfig().getInt(
YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_VCORES, YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES,
YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_VCORES_DEFAULT) YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES_DEFAULT)
) )
); );
dsResp.setContainerTokenExpiryInterval( dsResp.setContainerTokenExpiryInterval(
getConfig().getInt( getConfig().getInt(
YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS, YarnConfiguration.OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS,
YarnConfiguration. YarnConfiguration.
DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT)); OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS_DEFAULT));
dsResp.setContainerIdStart( dsResp.setContainerIdStart(
this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT); this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT);
@ -349,8 +355,8 @@ public class DistributedSchedulingAMService extends ApplicationMasterService
break; break;
// <-- IGNORED EVENTS : END --> // <-- IGNORED EVENTS : END -->
default: default:
LOG.error("Unknown event arrived at DistributedSchedulingAMService: " LOG.error("Unknown event arrived at" +
+ event.toString()); "OpportunisticContainerAllocatorAMService: " + event.toString());
} }
} }

View File

@ -103,7 +103,6 @@ import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher;
import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
import org.apache.hadoop.yarn.server.webproxy.WebAppProxy; import org.apache.hadoop.yarn.server.webproxy.WebAppProxy;
import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet; import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.webapp.WebApp; import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps; import org.apache.hadoop.yarn.webapp.WebApps;
import org.apache.hadoop.yarn.webapp.WebApps.Builder; import org.apache.hadoop.yarn.webapp.WebApps.Builder;
@ -1140,24 +1139,27 @@ public class ResourceManager extends CompositeService implements Recoverable {
} }
protected ApplicationMasterService createApplicationMasterService() { protected ApplicationMasterService createApplicationMasterService() {
if (this.rmContext.getYarnConfiguration().getBoolean( Configuration config = this.rmContext.getYarnConfiguration();
YarnConfiguration.DIST_SCHEDULING_ENABLED, if (YarnConfiguration.isOpportunisticContainerAllocationEnabled(config)
YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) { || YarnConfiguration.isDistSchedulingEnabled(config)) {
DistributedSchedulingAMService distributedSchedulingService = new OpportunisticContainerAllocatorAMService
DistributedSchedulingAMService(this.rmContext, scheduler); oppContainerAllocatingAMService =
EventDispatcher distSchedulerEventDispatcher = new OpportunisticContainerAllocatorAMService(this.rmContext,
new EventDispatcher(distributedSchedulingService, scheduler);
DistributedSchedulingAMService.class.getName()); EventDispatcher oppContainerAllocEventDispatcher =
// Add an event dispatcher for the DistributedSchedulingAMService new EventDispatcher(oppContainerAllocatingAMService,
// to handle node updates/additions and removals. OpportunisticContainerAllocatorAMService.class.getName());
// Add an event dispatcher for the
// OpportunisticContainerAllocatorAMService to handle node
// updates/additions and removals.
// Since the SchedulerEvent is currently a super set of theses, // Since the SchedulerEvent is currently a super set of theses,
// we register interest for it.. // we register interest for it..
addService(distSchedulerEventDispatcher); addService(oppContainerAllocEventDispatcher);
rmDispatcher.register(SchedulerEventType.class, rmDispatcher.register(SchedulerEventType.class,
distSchedulerEventDispatcher); oppContainerAllocEventDispatcher);
this.rmContext.setContainerQueueLimitCalculator( this.rmContext.setContainerQueueLimitCalculator(
distributedSchedulingService.getNodeManagerQueueLimitCalculator()); oppContainerAllocatingAMService.getNodeManagerQueueLimitCalculator());
return distributedSchedulingService; return oppContainerAllocatingAMService;
} }
return new ApplicationMasterService(this.rmContext, scheduler); return new ApplicationMasterService(this.rmContext, scheduler);
} }

View File

@ -820,9 +820,10 @@ public class MockRM extends ResourceManager {
@Override @Override
protected ApplicationMasterService createApplicationMasterService() { protected ApplicationMasterService createApplicationMasterService() {
if (this.rmContext.getYarnConfiguration().getBoolean( if (this.rmContext.getYarnConfiguration().getBoolean(
YarnConfiguration.DIST_SCHEDULING_ENABLED, YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED,
YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) { YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED_DEFAULT)) {
return new DistributedSchedulingAMService(getRMContext(), scheduler) { return new OpportunisticContainerAllocatorAMService(getRMContext(),
scheduler) {
@Override @Override
protected void serviceStart() { protected void serviceStart() {
// override to not start rpc handler // override to not start rpc handler

View File

@ -71,12 +71,12 @@ import java.util.Arrays;
import java.util.List; import java.util.List;
/** /**
* Test cases for {@link DistributedSchedulingAMService}. * Test cases for {@link OpportunisticContainerAllocatorAMService}.
*/ */
public class TestDistributedSchedulingAMService { public class TestOpportunisticContainerAllocatorAMService {
// Test if the DistributedSchedulingAMService can handle both DSProtocol as // Test if the OpportunisticContainerAllocatorAMService can handle both
// well as AMProtocol clients // DSProtocol as well as AMProtocol clients
@Test @Test
public void testRPCWrapping() throws Exception { public void testRPCWrapping() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
@ -111,8 +111,9 @@ public class TestDistributedSchedulingAMService {
Resource.newInstance(1, 2), 1, true, "exp", Resource.newInstance(1, 2), 1, true, "exp",
ExecutionTypeRequest.newInstance( ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)))); ExecutionType.OPPORTUNISTIC, true))));
DistributedSchedulingAMService service = OpportunisticContainerAllocatorAMService service =
createService(factory, rmContext, c); createService(factory, rmContext, c);
conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
Server server = service.getServer(rpc, conf, addr, null); Server server = service.getServer(rpc, conf, addr, null);
server.start(); server.start();
@ -195,9 +196,10 @@ public class TestDistributedSchedulingAMService {
false, dsfinishResp.getIsUnregistered()); false, dsfinishResp.getIsUnregistered());
} }
private DistributedSchedulingAMService createService(final RecordFactory private OpportunisticContainerAllocatorAMService createService(
factory, final RMContext rmContext, final Container c) { final RecordFactory factory, final RMContext rmContext,
return new DistributedSchedulingAMService(rmContext, null) { final Container c) {
return new OpportunisticContainerAllocatorAMService(rmContext, null) {
@Override @Override
public RegisterApplicationMasterResponse registerApplicationMaster( public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request) throws RegisterApplicationMasterRequest request) throws