diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 26033d3c1e1..8aa0419adf4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -296,53 +296,60 @@ public class YarnConfiguration extends Configuration {
/** ACL used in case none is found. Allows nothing. */
public static final String DEFAULT_YARN_APP_ACL = " ";
- /** Is Distributed Scheduling Enabled. */
+ /** Setting that controls whether distributed scheduling is enabled or not. */
public static final String DIST_SCHEDULING_ENABLED =
YARN_PREFIX + "distributed-scheduling.enabled";
public static final boolean DIST_SCHEDULING_ENABLED_DEFAULT = false;
- /** Mininum allocatable container memory for Distributed Scheduling. */
- public static final String DIST_SCHEDULING_MIN_MEMORY =
- YARN_PREFIX + "distributed-scheduling.min-memory";
- public static final int DIST_SCHEDULING_MIN_MEMORY_DEFAULT = 512;
+ /** Minimum memory (in MB) used for allocating a container through distributed
+ * scheduling. */
+ public static final String DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB =
+ YARN_PREFIX + "distributed-scheduling.min-container-memory-mb";
+ public static final int DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB_DEFAULT = 512;
- /** Mininum allocatable container vcores for Distributed Scheduling. */
- public static final String DIST_SCHEDULING_MIN_VCORES =
- YARN_PREFIX + "distributed-scheduling.min-vcores";
- public static final int DIST_SCHEDULING_MIN_VCORES_DEFAULT = 1;
+ /** Minimum virtual CPU cores used for allocating a container through
+ * distributed scheduling. */
+ public static final String DIST_SCHEDULING_MIN_CONTAINER_VCORES =
+ YARN_PREFIX + "distributed-scheduling.min-container-vcores";
+ public static final int DIST_SCHEDULING_MIN_CONTAINER_VCORES_DEFAULT = 1;
- /** Maximum allocatable container memory for Distributed Scheduling. */
- public static final String DIST_SCHEDULING_MAX_MEMORY =
- YARN_PREFIX + "distributed-scheduling.max-memory";
- public static final int DIST_SCHEDULING_MAX_MEMORY_DEFAULT = 2048;
+ /** Maximum memory (in MB) used for allocating a container through distributed
+ * scheduling. */
+ public static final String DIST_SCHEDULING_MAX_MEMORY_MB =
+ YARN_PREFIX + "distributed-scheduling.max-container-memory-mb";
+ public static final int DIST_SCHEDULING_MAX_MEMORY_MB_DEFAULT = 2048;
- /** Maximum allocatable container vcores for Distributed Scheduling. */
- public static final String DIST_SCHEDULING_MAX_VCORES =
- YARN_PREFIX + "distributed-scheduling.max-vcores";
- public static final int DIST_SCHEDULING_MAX_VCORES_DEFAULT = 4;
+ /** Maximum virtual CPU cores used for allocating a container through
+ * distributed scheduling. */
+ public static final String DIST_SCHEDULING_MAX_CONTAINER_VCORES =
+ YARN_PREFIX + "distributed-scheduling.max-container-vcores";
+ public static final int DIST_SCHEDULING_MAX_CONTAINER_VCORES_DEFAULT = 4;
- /** Incremental allocatable container memory for Distributed Scheduling. */
- public static final String DIST_SCHEDULING_INCR_MEMORY =
- YARN_PREFIX + "distributed-scheduling.incr-memory";
- public static final int DIST_SCHEDULING_INCR_MEMORY_DEFAULT = 512;
+ /** Incremental memory (in MB) used for allocating a container through
+ * distributed scheduling. */
+ public static final String DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB =
+ YARN_PREFIX + "distributed-scheduling.incr-container-memory-mb";
+ public static final int DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB_DEFAULT =
+ 512;
- /** Incremental allocatable container vcores for Distributed Scheduling. */
- public static final String DIST_SCHEDULING_INCR_VCORES =
+ /** Incremental virtual CPU cores used for allocating a container through
+ * distributed scheduling. */
+ public static final String DIST_SCHEDULING_INCR_CONTAINER_VCORES =
YARN_PREFIX + "distributed-scheduling.incr-vcores";
- public static final int DIST_SCHEDULING_INCR_VCORES_DEFAULT = 1;
+ public static final int DIST_SCHEDULING_INCR_CONTAINER_VCORES_DEFAULT = 1;
- /** Container token expiry for container allocated via Distributed
- * Scheduling. */
+ /** Container token expiry for container allocated via distributed
+ * scheduling. */
public static final String DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS =
- YARN_PREFIX + "distributed-scheduling.container-token-expiry";
+ YARN_PREFIX + "distributed-scheduling.container-token-expiry-ms";
public static final int DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT =
600000;
- /** K least loaded nodes to be provided to the LocalScheduler of a
- * NodeManager for Distributed Scheduling. */
- public static final String DIST_SCHEDULING_TOP_K =
- YARN_PREFIX + "distributed-scheduling.top-k";
- public static final int DIST_SCHEDULING_TOP_K_DEFAULT = 10;
+ /** Number of nodes to be used by the LocalScheduler of a NodeManager for
+ * dispatching containers during distributed scheduling. */
+ public static final String DIST_SCHEDULING_NODES_NUMBER_USED =
+ YARN_PREFIX + "distributed-scheduling.nodes-used";
+ public static final int DIST_SCHEDULING_NODES_NUMBER_USED_DEFAULT = 10;
/** Frequency for computing least loaded NMs. */
public static final String NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS =
@@ -350,7 +357,7 @@ public class YarnConfiguration extends Configuration {
public static final long
NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS_DEFAULT = 1000;
- /** Comparator for determining Node Load for Distributed Scheduling. */
+ /** Comparator for determining node load for Distributed Scheduling. */
public static final String NM_CONTAINER_QUEUING_LOAD_COMPARATOR =
YARN_PREFIX + "nm-container-queuing.load-comparator";
public static final String NM_CONTAINER_QUEUING_LOAD_COMPARATOR_DEFAULT =
@@ -373,13 +380,13 @@ public class YarnConfiguration extends Configuration {
YARN_PREFIX + "nm-container-queuing.max-queue-length";
public static final int NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT = 10;
- /** Min wait time of container queue at NodeManager. */
+ /** Min queue wait time for a container at a NodeManager. */
public static final String NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS =
YARN_PREFIX + "nm-container-queuing.min-queue-wait-time-ms";
public static final int NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS_DEFAULT =
1;
- /** Max wait time of container queue at NodeManager. */
+ /** Max queue wait time for a container queue at a NodeManager. */
public static final String NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS =
YARN_PREFIX + "nm-container-queuing.max-queue-wait-time-ms";
public static final int NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS_DEFAULT =
@@ -1653,17 +1660,21 @@ public class YarnConfiguration extends Configuration {
public static final String YARN_APPLICATION_CLASSPATH = YARN_PREFIX
+ "application.classpath";
+ /** The setting that controls whether AMRMProxy is enabled or not. */
public static final String AMRM_PROXY_ENABLED = NM_PREFIX
- + "amrmproxy.enable";
+ + "amrmproxy.enabled";
public static final boolean DEFAULT_AMRM_PROXY_ENABLED = false;
+
public static final String AMRM_PROXY_ADDRESS = NM_PREFIX
+ "amrmproxy.address";
public static final int DEFAULT_AMRM_PROXY_PORT = 8048;
public static final String DEFAULT_AMRM_PROXY_ADDRESS = "0.0.0.0:"
+ DEFAULT_AMRM_PROXY_PORT;
+
public static final String AMRM_PROXY_CLIENT_THREAD_COUNT = NM_PREFIX
+ "amrmproxy.client.thread-count";
public static final int DEFAULT_AMRM_PROXY_CLIENT_THREAD_COUNT = 25;
+
public static final String AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE =
NM_PREFIX + "amrmproxy.interceptor-class.pipeline";
public static final String DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index 2c45b87205f..f6f03e2df11 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -121,41 +121,6 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
configurationPrefixToSkipCompare
.add(YarnConfiguration.NM_CPU_RESOURCE_ENABLED);
- // Ignore Distributed Scheduling Related Configurations.
- // Since it is still a "work in progress" feature
- configurationPrefixToSkipCompare
- .add(YarnConfiguration.DIST_SCHEDULING_ENABLED);
- configurationPrefixToSkipCompare
- .add(YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY);
- configurationPrefixToSkipCompare
- .add(YarnConfiguration.DIST_SCHEDULING_INCR_VCORES);
- configurationPrefixToSkipCompare
- .add(YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY);
- configurationPrefixToSkipCompare
- .add(YarnConfiguration.DIST_SCHEDULING_MAX_VCORES);
- configurationPrefixToSkipCompare
- .add(YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS);
- configurationPrefixToSkipCompare
- .add(YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY);
- configurationPrefixToSkipCompare
- .add(YarnConfiguration.DIST_SCHEDULING_MIN_VCORES);
- configurationPrefixToSkipCompare
- .add(YarnConfiguration.DIST_SCHEDULING_TOP_K);
- configurationPrefixToSkipCompare
- .add(YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS);
- configurationPrefixToSkipCompare
- .add(YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR);
- configurationPrefixToSkipCompare
- .add(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH);
- configurationPrefixToSkipCompare
- .add(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH);
- configurationPrefixToSkipCompare
- .add(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS);
- configurationPrefixToSkipCompare
- .add(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS);
- configurationPrefixToSkipCompare
- .add(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV);
-
// Set by container-executor.cfg
configurationPrefixToSkipCompare.add(YarnConfiguration.NM_USER_HOME_DIR);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
index f35a9b41dd9..7245bc60f31 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
@@ -82,9 +82,9 @@ import static org.mockito.Mockito.when;
/**
* Validates End2End Distributed Scheduling flow which includes the AM
* specifying OPPORTUNISTIC containers in its resource requests,
- * the AMRMProxyService on the NM, the LocalScheduler RequestInterceptor on
- * the NM and the DistributedSchedulingProtocol used by the framework to talk
- * to the DistributedSchedulingService running on the RM.
+ * the AMRMProxyService on the NM, the DistributedScheduler RequestInterceptor
+ * on the NM and the DistributedSchedulingProtocol used by the framework to talk
+ * to the DistributedSchedulingAMService running on the RM.
*/
public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 0eade8ee8e6..9b16d05c075 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2635,10 +2635,10 @@
- Enable/Disable AMRMProxyService in the node manager. This service is used to intercept
- calls from the application masters to the resource manager.
+ Enable/Disable AMRMProxyService in the node manager. This service is used to
+ intercept calls from the application masters to the resource manager.
- yarn.nodemanager.amrmproxy.enable
+ yarn.nodemanager.amrmproxy.enabledfalse
@@ -2660,13 +2660,149 @@
- The comma separated list of class names that implement the RequestInterceptor interface. This is used by the
- AMRMProxyService to create the request processing pipeline for applications.
+ The comma separated list of class names that implement the
+ RequestInterceptor interface. This is used by the AMRMProxyService to create
+ the request processing pipeline for applications.
yarn.nodemanager.amrmproxy.interceptor-class.pipelineorg.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor
+
+
+ Setting that controls whether distributed scheduling is enabled.
+
+ yarn.distributed-scheduling.enabled
+ false
+
+
+
+
+ Minimum memory (in MB) used for allocating a container through distributed
+ scheduling.
+
+ yarn.distributed-scheduling.min-container-memory-mb
+ 512
+
+
+
+
+ Minimum virtual CPU cores used for allocating a container through
+ distributed scheduling.
+
+ yarn.distributed-scheduling.min-container-vcores
+ 1
+
+
+
+
+ Maximum memory (in MB) used for allocating a container through distributed
+ scheduling.
+
+ yarn.distributed-scheduling.max-container-memory-mb
+ 2048
+
+
+
+
+ Maximum virtual CPU cores used for allocating a container through
+ distributed scheduling.
+
+ yarn.distributed-scheduling.max-container-vcores
+ 4
+
+
+
+
+ Incremental memory (in MB) used for allocating a container through
+ distributed scheduling.
+
+ yarn.distributed-scheduling.incr-container-memory-mb
+ 512
+
+
+
+
+ Incremental virtual CPU cores used for allocating a container through
+ distributed scheduling.
+
+ yarn.distributed-scheduling.incr-vcores
+ 1
+
+
+
+
+ Container token expiry for container allocated via distributed scheduling.
+
+ yarn.distributed-scheduling.container-token-expiry-ms
+ 600000
+
+
+
+
+ Number of nodes to be used by the LocalScheduler of a NodeManager for
+ dispatching containers during distributed scheduling.
+
+ yarn.distributed-scheduling.nodes-used
+ 10
+
+
+
+
+ Frequency for computing least loaded NMs.
+
+ yarn.nm-container-queuing.sorting-nodes-interval-ms
+ 1000
+
+
+
+
+ Comparator for determining node load for Distributed Scheduling.
+
+ yarn.nm-container-queuing.load-comparator
+ QUEUE_LENGTH
+
+
+
+
+ Value of standard deviation used for calculation of queue limit thresholds.
+
+ yarn.nm-container-queuing.queue-limit-stdev
+ 1.0f
+
+
+
+
+ Min length of container queue at NodeManager.
+
+ yarn.nm-container-queuing.min-queue-length
+ 1
+
+
+
+
+ Max length of container queue at NodeManager.
+
+ yarn.nm-container-queuing.max-queue-length
+ 10
+
+
+
+
+ Min queue wait time for a container at a NodeManager.
+
+ yarn.nm-container-queuing.min-queue-wait-time-ms
+ 1
+
+
+
+
+ Max queue wait time for a container queue at a NodeManager.
+
+ yarn.nm-container-queuing.max-queue-wait-time-ms
+ 10
+
+
Error filename pattern, to identify the file in the container's
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
index 8a170005c68..cd131d18b1c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
@@ -144,7 +144,7 @@
*
*
Intercept ApplicationMasterProtocol calls and unwrap the
* response objects to extract instructions from the
- * ClusterManager running on the ResourceManager to aid in making
- * Scheduling scheduling decisions
+ * ClusterMonitor running on the ResourceManager to aid in making
+ * distributed scheduling decisions.
*
Call the OpportunisticContainerAllocator to allocate
- * containers for the opportunistic resource outstandingOpReqs
+ * containers for the outstanding OPPORTUNISTIC container requests.
*
*/
-public final class LocalScheduler extends AbstractRequestInterceptor {
+public final class DistributedScheduler extends AbstractRequestInterceptor {
static class PartitionedResourceRequests {
private List guaranteed = new ArrayList<>();
@@ -93,7 +87,7 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
}
}
- static class DistSchedulerParams {
+ static class DistributedSchedulerParams {
Resource maxResource;
Resource minResource;
Resource incrementResource;
@@ -101,18 +95,20 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
}
private static final Logger LOG = LoggerFactory
- .getLogger(LocalScheduler.class);
+ .getLogger(DistributedScheduler.class);
private final static RecordFactory RECORD_FACTORY =
RecordFactoryProvider.getRecordFactory(null);
- // Currently just used to keep track of allocated Containers
- // Can be used for reporting stats later
+ // Currently just used to keep track of allocated containers.
+ // Can be used for reporting stats later.
private Set containersAllocated = new HashSet<>();
- private DistSchedulerParams appParams = new DistSchedulerParams();
- private final OpportunisticContainerAllocator.ContainerIdCounter containerIdCounter =
- new OpportunisticContainerAllocator.ContainerIdCounter();
+ private DistributedSchedulerParams appParams =
+ new DistributedSchedulerParams();
+ private final OpportunisticContainerAllocator.ContainerIdCounter
+ containerIdCounter =
+ new OpportunisticContainerAllocator.ContainerIdCounter();
private Map nodeList = new LinkedHashMap<>();
// Mapping of NodeId to NodeTokens. Populated either from RM response or
@@ -123,7 +119,7 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
// This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by Priority,
// Resource Name (Host/rack/any) and capability. This mapping is required
// to match a received Container to an outstanding OPPORTUNISTIC
- // ResourceRequests (ask)
+ // ResourceRequest (ask).
final TreeMap>
outstandingOpReqs = new TreeMap<>();
@@ -158,8 +154,8 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
* @param request
* registration request
* @return Allocate Response
- * @throws YarnException
- * @throws IOException
+ * @throws YarnException YarnException
+ * @throws IOException IOException
*/
@Override
public RegisterApplicationMasterResponse registerApplicationMaster
@@ -177,14 +173,14 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
* @param request
* allocation request
* @return Allocate Response
- * @throws YarnException
- * @throws IOException
+ * @throws YarnException YarnException
+ * @throws IOException IOException
*/
@Override
public AllocateResponse allocate(AllocateRequest request) throws
YarnException, IOException {
- DistSchedAllocateRequest distRequest =
- RECORD_FACTORY.newRecordInstance(DistSchedAllocateRequest.class);
+ DistributedSchedulingAllocateRequest distRequest = RECORD_FACTORY
+ .newRecordInstance(DistributedSchedulingAllocateRequest.class);
distRequest.setAllocateRequest(request);
return allocateForDistributedScheduling(distRequest).getAllocateResponse();
}
@@ -199,9 +195,6 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
/**
* Check if we already have a NMToken. if Not, generate the Token and
* add it to the response
- * @param response
- * @param nmTokens
- * @param allocatedContainers
*/
private void updateResponseWithNMTokens(AllocateResponse response,
List nmTokens, List allocatedContainers) {
@@ -235,11 +228,11 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
}
private void updateParameters(
- DistSchedRegisterResponse registerResponse) {
- appParams.minResource = registerResponse.getMinAllocatableCapabilty();
- appParams.maxResource = registerResponse.getMaxAllocatableCapabilty();
+ RegisterDistributedSchedulingAMResponse registerResponse) {
+ appParams.minResource = registerResponse.getMinContainerResource();
+ appParams.maxResource = registerResponse.getMaxContainerResource();
appParams.incrementResource =
- registerResponse.getIncrAllocatableCapabilty();
+ registerResponse.getIncrContainerResource();
if (appParams.incrementResource == null) {
appParams.incrementResource = appParams.minResource;
}
@@ -253,11 +246,12 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
/**
* Takes a list of ResourceRequests (asks), extracts the key information viz.
- * (Priority, ResourceName, Capability) and adds it the outstanding
+ * (Priority, ResourceName, Capability) and adds to the outstanding
* OPPORTUNISTIC outstandingOpReqs map. The nested map is required to enforce
* the current YARN constraint that only a single ResourceRequest can exist at
- * a give Priority and Capability
- * @param resourceAsks
+ * a give Priority and Capability.
+ *
+ * @param resourceAsks the list with the {@link ResourceRequest}s
*/
public void addToOutstandingReqs(List resourceAsks) {
for (ResourceRequest request : resourceAsks) {
@@ -297,11 +291,9 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
/**
* This method matches a returned list of Container Allocations to any
- * outstanding OPPORTUNISTIC ResourceRequest
- * @param capability
- * @param allocatedContainers
+ * outstanding OPPORTUNISTIC ResourceRequest.
*/
- public void matchAllocationToOutstandingRequest(Resource capability,
+ private void matchAllocationToOutstandingRequest(Resource capability,
List allocatedContainers) {
for (Container c : allocatedContainers) {
containersAllocated.add(c.getId());
@@ -333,28 +325,29 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
}
@Override
- public DistSchedRegisterResponse
+ public RegisterDistributedSchedulingAMResponse
registerApplicationMasterForDistributedScheduling(
RegisterApplicationMasterRequest request)
- throws YarnException, IOException {
+ throws YarnException, IOException {
LOG.info("Forwarding registration request to the" +
"Distributed Scheduler Service on YARN RM");
- DistSchedRegisterResponse dsResp = getNextInterceptor()
+ RegisterDistributedSchedulingAMResponse dsResp = getNextInterceptor()
.registerApplicationMasterForDistributedScheduling(request);
updateParameters(dsResp);
return dsResp;
}
@Override
- public DistSchedAllocateResponse allocateForDistributedScheduling(
- DistSchedAllocateRequest request) throws YarnException, IOException {
+ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
+ DistributedSchedulingAllocateRequest request)
+ throws YarnException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Forwarding allocate request to the" +
"Distributed Scheduler Service on YARN RM");
}
// Partition requests into GUARANTEED and OPPORTUNISTIC reqs
- PartitionedResourceRequests partitionedAsks = partitionAskList(
- request.getAllocateRequest().getAskList());
+ PartitionedResourceRequests partitionedAsks =
+ partitionAskList(request.getAllocateRequest().getAskList());
List releasedContainers =
request.getAllocateRequest().getReleaseList();
@@ -393,11 +386,12 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
allocatedContainers.addAll(e.getValue());
}
}
+
request.setAllocatedContainers(allocatedContainers);
// Send all the GUARANTEED Reqs to RM
request.getAllocateRequest().setAskList(partitionedAsks.getGuaranteed());
- DistSchedAllocateResponse dsResp =
+ DistributedSchedulingAllocateResponse dsResp =
getNextInterceptor().allocateForDistributedScheduling(request);
// Update host to nodeId mapping
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
index 22a6a2425a5..ce5bda080d7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
-import org.apache.hadoop.yarn.server.nodemanager.scheduler.LocalScheduler.DistSchedulerParams;
+import org.apache.hadoop.yarn.server.nodemanager.scheduler.DistributedScheduler.DistributedSchedulerParams;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -37,15 +37,17 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import java.net.InetSocketAddress;
import java.util.*;
+import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicLong;
/**
- *
The OpportunisticContainerAllocator allocates containers on a given list
- * of Nodes after it modifies the container sizes to within allowable limits
- * specified by the ClusterManager running on the RM. It tries to
- * distribute the containers as evenly as possible. It also uses the
- * NMTokenSecretManagerInNM to generate the required NM tokens for
- * the allocated containers
+ *
+ * The OpportunisticContainerAllocator allocates containers on a given list of
+ * nodes, after modifying the container sizes to respect the limits set by the
+ * ResourceManager. It tries to distribute the containers as evenly as possible.
+ * It also uses the NMTokenSecretManagerInNM to generate the
+ * required NM tokens for the allocated containers.
+ *
*/
public class OpportunisticContainerAllocator {
@@ -78,15 +80,15 @@ public class OpportunisticContainerAllocator {
this.webpagePort = webpagePort;
}
- public Map> allocate(DistSchedulerParams appParams,
- ContainerIdCounter idCounter, Collection resourceAsks,
- Set blacklist, ApplicationAttemptId appAttId,
- Map allNodes, String userName) throws YarnException {
+ public Map> allocate(
+ DistributedSchedulerParams appParams, ContainerIdCounter idCounter,
+ Collection resourceAsks, Set blacklist,
+ ApplicationAttemptId appAttId, Map allNodes,
+ String userName) throws YarnException {
Map> containers = new HashMap<>();
- Set nodesAllocated = new HashSet<>();
for (ResourceRequest anyAsk : resourceAsks) {
allocateOpportunisticContainers(appParams, idCounter, blacklist, appAttId,
- allNodes, userName, containers, nodesAllocated, anyAsk);
+ allNodes, userName, containers, anyAsk);
LOG.info("Opportunistic allocation requested for ["
+ "priority=" + anyAsk.getPriority()
+ ", num_containers=" + anyAsk.getNumContainers()
@@ -96,30 +98,30 @@ public class OpportunisticContainerAllocator {
return containers;
}
- private void allocateOpportunisticContainers(DistSchedulerParams appParams,
- ContainerIdCounter idCounter, Set blacklist,
- ApplicationAttemptId id, Map allNodes, String userName,
- Map> containers, Set nodesAllocated,
- ResourceRequest anyAsk) throws YarnException {
+ private void allocateOpportunisticContainers(
+ DistributedSchedulerParams appParams, ContainerIdCounter idCounter,
+ Set blacklist, ApplicationAttemptId id,
+ Map allNodes, String userName,
+ Map> containers, ResourceRequest anyAsk)
+ throws YarnException {
int toAllocate = anyAsk.getNumContainers()
- - (containers.isEmpty() ?
- 0 : containers.get(anyAsk.getCapability()).size());
+ - (containers.isEmpty() ? 0 :
+ containers.get(anyAsk.getCapability()).size());
- List topKNodesLeft = new ArrayList<>();
- for (String s : allNodes.keySet()) {
- // Bias away from whatever we have already allocated and respect blacklist
- if (nodesAllocated.contains(s) || blacklist.contains(s)) {
+ List nodesForScheduling = new ArrayList<>();
+ for (Entry nodeEntry : allNodes.entrySet()) {
+ // Do not use blacklisted nodes for scheduling.
+ if (blacklist.contains(nodeEntry.getKey())) {
continue;
}
- topKNodesLeft.add(s);
+ nodesForScheduling.add(nodeEntry.getValue());
}
int numAllocated = 0;
- int nextNodeToAllocate = 0;
+ int nextNodeToSchedule = 0;
for (int numCont = 0; numCont < toAllocate; numCont++) {
- String topNode = topKNodesLeft.get(nextNodeToAllocate);
- nextNodeToAllocate++;
- nextNodeToAllocate %= topKNodesLeft.size();
- NodeId nodeId = allNodes.get(topNode);
+ nextNodeToSchedule++;
+ nextNodeToSchedule %= nodesForScheduling.size();
+ NodeId nodeId = nodesForScheduling.get(nextNodeToSchedule);
Container container = buildContainer(appParams, idCounter, anyAsk, id,
userName, nodeId);
List cList = containers.get(anyAsk.getCapability());
@@ -134,7 +136,7 @@ public class OpportunisticContainerAllocator {
LOG.info("Allocated " + numAllocated + " opportunistic containers.");
}
- private Container buildContainer(DistSchedulerParams appParams,
+ private Container buildContainer(DistributedSchedulerParams appParams,
ContainerIdCounter idCounter, ResourceRequest rr, ApplicationAttemptId id,
String userName, NodeId nodeId) throws YarnException {
ContainerId cId =
@@ -165,7 +167,7 @@ public class OpportunisticContainerAllocator {
return container;
}
- private Resource normalizeCapability(DistSchedulerParams appParams,
+ private Resource normalizeCapability(DistributedSchedulerParams appParams,
ResourceRequest ask) {
return Resources.normalize(RESOURCE_CALCULATOR,
ask.getCapability(), appParams.minResource, appParams.maxResource,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java
similarity index 75%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java
index 8de849ba930..b093b3b6764 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java
@@ -19,34 +19,30 @@
package org.apache.hadoop.yarn.server.nodemanager.scheduler;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords
- .RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords
- .RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor;
-import org.apache.hadoop.yarn.server.nodemanager.security
- .NMContainerTokenSecretManager;
-import org.apache.hadoop.yarn.server.nodemanager.security
- .NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
@@ -63,27 +59,30 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
-public class TestLocalScheduler {
+/**
+ * Test cases for {@link DistributedScheduler}.
+ */
+public class TestDistributedScheduler {
@Test
- public void testLocalScheduler() throws Exception {
+ public void testDistributedScheduler() throws Exception {
Configuration conf = new Configuration();
- LocalScheduler localScheduler = new LocalScheduler();
+ DistributedScheduler distributedScheduler = new DistributedScheduler();
- RequestInterceptor finalReqIntcptr = setup(conf, localScheduler);
+ RequestInterceptor finalReqIntcptr = setup(conf, distributedScheduler);
- registerAM(localScheduler, finalReqIntcptr, Arrays.asList(
+ registerAM(distributedScheduler, finalReqIntcptr, Arrays.asList(
NodeId.newInstance("a", 1), NodeId.newInstance("b", 2)));
- final AtomicBoolean flipFlag = new AtomicBoolean(false);
+ final AtomicBoolean flipFlag = new AtomicBoolean(true);
Mockito.when(
finalReqIntcptr.allocateForDistributedScheduling(
- Mockito.any(DistSchedAllocateRequest.class)))
- .thenAnswer(new Answer() {
+ Mockito.any(DistributedSchedulingAllocateRequest.class)))
+ .thenAnswer(new Answer() {
@Override
- public DistSchedAllocateResponse answer(InvocationOnMock
- invocationOnMock) throws Throwable {
+ public DistributedSchedulingAllocateResponse answer(
+ InvocationOnMock invocationOnMock) throws Throwable {
flipFlag.set(!flipFlag.get());
if (flipFlag.get()) {
return createAllocateResponse(Arrays.asList(
@@ -101,15 +100,15 @@ public class TestLocalScheduler {
ResourceRequest opportunisticReq =
createResourceRequest(ExecutionType.OPPORTUNISTIC, 4, "*");
+
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
// Verify 4 containers were allocated
AllocateResponse allocateResponse =
- localScheduler.allocate(allocateRequest);
+ distributedScheduler.allocate(allocateRequest);
Assert.assertEquals(4, allocateResponse.getAllocatedContainers().size());
- // Verify equal distribution on hosts a and b
- // And None on c and d
+ // Verify equal distribution on hosts a and b, and none on c or d
Map> allocs = mapAllocs(allocateResponse, 4);
Assert.assertEquals(2, allocs.get(NodeId.newInstance("a", 1)).size());
Assert.assertEquals(2, allocs.get(NodeId.newInstance("b", 2)).size());
@@ -123,18 +122,18 @@ public class TestLocalScheduler {
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
// Verify 6 containers were allocated
- allocateResponse = localScheduler.allocate(allocateRequest);
+ allocateResponse = distributedScheduler.allocate(allocateRequest);
Assert.assertEquals(6, allocateResponse.getAllocatedContainers().size());
- // Verify New containers are equally distribution on hosts c and d
- // And None on a and b
+ // Verify new containers are equally distribution on hosts c and d,
+ // and none on a or b
allocs = mapAllocs(allocateResponse, 6);
Assert.assertEquals(3, allocs.get(NodeId.newInstance("c", 3)).size());
Assert.assertEquals(3, allocs.get(NodeId.newInstance("d", 4)).size());
Assert.assertNull(allocs.get(NodeId.newInstance("a", 1)));
Assert.assertNull(allocs.get(NodeId.newInstance("b", 2)));
- // Ensure the LocalScheduler respects the list order..
+ // Ensure the DistributedScheduler respects the list order..
// The first request should be allocated to "d" since it is ranked higher
// The second request should be allocated to "c" since the ranking is
// flipped on every allocate response.
@@ -142,7 +141,7 @@ public class TestLocalScheduler {
opportunisticReq =
createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*");
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
- allocateResponse = localScheduler.allocate(allocateRequest);
+ allocateResponse = distributedScheduler.allocate(allocateRequest);
allocs = mapAllocs(allocateResponse, 1);
Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size());
@@ -150,7 +149,7 @@ public class TestLocalScheduler {
opportunisticReq =
createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*");
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
- allocateResponse = localScheduler.allocate(allocateRequest);
+ allocateResponse = distributedScheduler.allocate(allocateRequest);
allocs = mapAllocs(allocateResponse, 1);
Assert.assertEquals(1, allocs.get(NodeId.newInstance("c", 3)).size());
@@ -158,22 +157,23 @@ public class TestLocalScheduler {
opportunisticReq =
createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*");
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
- allocateResponse = localScheduler.allocate(allocateRequest);
+ allocateResponse = distributedScheduler.allocate(allocateRequest);
allocs = mapAllocs(allocateResponse, 1);
Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size());
}
- private void registerAM(LocalScheduler localScheduler, RequestInterceptor
- finalReqIntcptr, List nodeList) throws Exception {
- DistSchedRegisterResponse distSchedRegisterResponse =
- Records.newRecord(DistSchedRegisterResponse.class);
+ private void registerAM(DistributedScheduler distributedScheduler,
+ RequestInterceptor finalReqIntcptr, List nodeList)
+ throws Exception {
+ RegisterDistributedSchedulingAMResponse distSchedRegisterResponse =
+ Records.newRecord(RegisterDistributedSchedulingAMResponse.class);
distSchedRegisterResponse.setRegisterResponse(
Records.newRecord(RegisterApplicationMasterResponse.class));
distSchedRegisterResponse.setContainerTokenExpiryInterval(12345);
distSchedRegisterResponse.setContainerIdStart(0);
- distSchedRegisterResponse.setMaxAllocatableCapabilty(
+ distSchedRegisterResponse.setMaxContainerResource(
Resource.newInstance(1024, 4));
- distSchedRegisterResponse.setMinAllocatableCapabilty(
+ distSchedRegisterResponse.setMinContainerResource(
Resource.newInstance(512, 2));
distSchedRegisterResponse.setNodesForScheduling(nodeList);
Mockito.when(
@@ -181,12 +181,12 @@ public class TestLocalScheduler {
Mockito.any(RegisterApplicationMasterRequest.class)))
.thenReturn(distSchedRegisterResponse);
- localScheduler.registerApplicationMaster(
+ distributedScheduler.registerApplicationMaster(
Records.newRecord(RegisterApplicationMasterRequest.class));
}
- private RequestInterceptor setup(Configuration conf, LocalScheduler
- localScheduler) {
+ private RequestInterceptor setup(Configuration conf,
+ DistributedScheduler distributedScheduler) {
NodeStatusUpdater nodeStatusUpdater = Mockito.mock(NodeStatusUpdater.class);
Mockito.when(nodeStatusUpdater.getRMIdentifier()).thenReturn(12345l);
Context context = Mockito.mock(Context.class);
@@ -215,12 +215,12 @@ public class TestLocalScheduler {
NMTokenSecretManagerInNM nmTokenSecretManagerInNM =
new NMTokenSecretManagerInNM();
nmTokenSecretManagerInNM.setMasterKey(mKey);
- localScheduler.initLocal(
+ distributedScheduler.initLocal(
ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1),
containerAllocator, nmTokenSecretManagerInNM, "test");
RequestInterceptor finalReqIntcptr = Mockito.mock(RequestInterceptor.class);
- localScheduler.setNextInterceptor(finalReqIntcptr);
+ distributedScheduler.setNextInterceptor(finalReqIntcptr);
return finalReqIntcptr;
}
@@ -237,17 +237,18 @@ public class TestLocalScheduler {
return opportunisticReq;
}
- private DistSchedAllocateResponse createAllocateResponse(List nodes) {
- DistSchedAllocateResponse distSchedAllocateResponse = Records.newRecord
- (DistSchedAllocateResponse.class);
- distSchedAllocateResponse.setAllocateResponse(
- Records.newRecord(AllocateResponse.class));
+ private DistributedSchedulingAllocateResponse createAllocateResponse(
+ List nodes) {
+ DistributedSchedulingAllocateResponse distSchedAllocateResponse =
+ Records.newRecord(DistributedSchedulingAllocateResponse.class);
+ distSchedAllocateResponse
+ .setAllocateResponse(Records.newRecord(AllocateResponse.class));
distSchedAllocateResponse.setNodesForScheduling(nodes);
return distSchedAllocateResponse;
}
- private Map> mapAllocs(AllocateResponse
- allocateResponse, int expectedSize) throws Exception {
+ private Map> mapAllocs(
+ AllocateResponse allocateResponse, int expectedSize) throws Exception {
Assert.assertEquals(expectedSize,
allocateResponse.getAllocatedContainers().size());
Map> allocs = new HashMap<>();
@@ -266,5 +267,4 @@ public class TestLocalScheduler {
}
return allocs;
}
-
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java
similarity index 81%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java
index 5aabddc04fb..843ac099ec8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java
@@ -27,15 +27,15 @@ import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
@@ -73,18 +73,18 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
- * The DistributedSchedulingService is started instead of the
- * ApplicationMasterService if DistributedScheduling is enabled for the YARN
+ * The DistributedSchedulingAMService is started instead of the
+ * ApplicationMasterService if distributed scheduling is enabled for the YARN
* cluster.
* It extends the functionality of the ApplicationMasterService by servicing
* clients (AMs and AMRMProxy request interceptors) that understand the
* DistributedSchedulingProtocol.
*/
-public class DistributedSchedulingService extends ApplicationMasterService
- implements DistributedSchedulerProtocol, EventHandler {
+public class DistributedSchedulingAMService extends ApplicationMasterService
+ implements DistributedSchedulingAMProtocol, EventHandler {
private static final Log LOG =
- LogFactory.getLog(DistributedSchedulingService.class);
+ LogFactory.getLog(DistributedSchedulingAMService.class);
private final NodeQueueLoadMonitor nodeMonitor;
@@ -94,12 +94,12 @@ public class DistributedSchedulingService extends ApplicationMasterService
new ConcurrentHashMap<>();
private final int k;
- public DistributedSchedulingService(RMContext rmContext,
- YarnScheduler scheduler) {
- super(DistributedSchedulingService.class.getName(), rmContext, scheduler);
+ public DistributedSchedulingAMService(RMContext rmContext,
+ YarnScheduler scheduler) {
+ super(DistributedSchedulingAMService.class.getName(), rmContext, scheduler);
this.k = rmContext.getYarnConfiguration().getInt(
- YarnConfiguration.DIST_SCHEDULING_TOP_K,
- YarnConfiguration.DIST_SCHEDULING_TOP_K_DEFAULT);
+ YarnConfiguration.DIST_SCHEDULING_NODES_NUMBER_USED,
+ YarnConfiguration.DIST_SCHEDULING_NODES_NUMBER_USED_DEFAULT);
long nodeSortInterval = rmContext.getYarnConfiguration().getLong(
YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS,
YarnConfiguration.
@@ -149,7 +149,7 @@ public class DistributedSchedulingService extends ApplicationMasterService
@Override
public Server getServer(YarnRPC rpc, Configuration serverConf,
InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
- Server server = rpc.getServer(DistributedSchedulerProtocol.class, this,
+ Server server = rpc.getServer(DistributedSchedulingAMProtocol.class, this,
addr, serverConf, secretManager,
serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
@@ -184,43 +184,45 @@ public class DistributedSchedulingService extends ApplicationMasterService
}
@Override
- public DistSchedRegisterResponse
+ public RegisterDistributedSchedulingAMResponse
registerApplicationMasterForDistributedScheduling(
RegisterApplicationMasterRequest request) throws YarnException,
IOException {
RegisterApplicationMasterResponse response =
registerApplicationMaster(request);
- DistSchedRegisterResponse dsResp = recordFactory
- .newRecordInstance(DistSchedRegisterResponse.class);
+ RegisterDistributedSchedulingAMResponse dsResp = recordFactory
+ .newRecordInstance(RegisterDistributedSchedulingAMResponse.class);
dsResp.setRegisterResponse(response);
- dsResp.setMinAllocatableCapabilty(
+ dsResp.setMinContainerResource(
Resource.newInstance(
getConfig().getInt(
- YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY,
- YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY_DEFAULT),
+ YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB,
+ YarnConfiguration.
+ DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB_DEFAULT),
getConfig().getInt(
- YarnConfiguration.DIST_SCHEDULING_MIN_VCORES,
- YarnConfiguration.DIST_SCHEDULING_MIN_VCORES_DEFAULT)
+ YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_VCORES,
+ YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_VCORES_DEFAULT)
)
);
- dsResp.setMaxAllocatableCapabilty(
+ dsResp.setMaxContainerResource(
Resource.newInstance(
getConfig().getInt(
- YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY,
- YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_DEFAULT),
+ YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_MB,
+ YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_MB_DEFAULT),
getConfig().getInt(
- YarnConfiguration.DIST_SCHEDULING_MAX_VCORES,
- YarnConfiguration.DIST_SCHEDULING_MAX_VCORES_DEFAULT)
+ YarnConfiguration.DIST_SCHEDULING_MAX_CONTAINER_VCORES,
+ YarnConfiguration.DIST_SCHEDULING_MAX_CONTAINER_VCORES_DEFAULT)
)
);
- dsResp.setIncrAllocatableCapabilty(
+ dsResp.setIncrContainerResource(
Resource.newInstance(
getConfig().getInt(
- YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY,
- YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY_DEFAULT),
+ YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB,
+ YarnConfiguration.
+ DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB_DEFAULT),
getConfig().getInt(
- YarnConfiguration.DIST_SCHEDULING_INCR_VCORES,
- YarnConfiguration.DIST_SCHEDULING_INCR_VCORES_DEFAULT)
+ YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_VCORES,
+ YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_VCORES_DEFAULT)
)
);
dsResp.setContainerTokenExpiryInterval(
@@ -238,8 +240,9 @@ public class DistributedSchedulingService extends ApplicationMasterService
}
@Override
- public DistSchedAllocateResponse allocateForDistributedScheduling(
- DistSchedAllocateRequest request) throws YarnException, IOException {
+ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
+ DistributedSchedulingAllocateRequest request)
+ throws YarnException, IOException {
List distAllocContainers = request.getAllocatedContainers();
for (Container container : distAllocContainers) {
// Create RMContainer
@@ -255,8 +258,8 @@ public class DistributedSchedulingService extends ApplicationMasterService
RMContainerEventType.LAUNCHED));
}
AllocateResponse response = allocate(request.getAllocateRequest());
- DistSchedAllocateResponse dsResp = recordFactory.newRecordInstance
- (DistSchedAllocateResponse.class);
+ DistributedSchedulingAllocateResponse dsResp = recordFactory
+ .newRecordInstance(DistributedSchedulingAllocateResponse.class);
dsResp.setAllocateResponse(response);
dsResp.setNodesForScheduling(
this.nodeMonitor.selectLeastLoadedNodes(this.k));
@@ -264,7 +267,7 @@ public class DistributedSchedulingService extends ApplicationMasterService
}
private void addToMapping(ConcurrentHashMap> mapping,
- String rackName, NodeId nodeId) {
+ String rackName, NodeId nodeId) {
if (rackName != null) {
mapping.putIfAbsent(rackName, new HashSet());
Set nodeIds = mapping.get(rackName);
@@ -275,7 +278,7 @@ public class DistributedSchedulingService extends ApplicationMasterService
}
private void removeFromMapping(ConcurrentHashMap> mapping,
- String rackName, NodeId nodeId) {
+ String rackName, NodeId nodeId) {
if (rackName != null) {
Set nodeIds = mapping.get(rackName);
synchronized (nodeIds) {
@@ -346,7 +349,7 @@ public class DistributedSchedulingService extends ApplicationMasterService
break;
// <-- IGNORED EVENTS : END -->
default:
- LOG.error("Unknown event arrived at DistributedSchedulingService: "
+ LOG.error("Unknown event arrived at DistributedSchedulingAMService: "
+ event.toString());
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index e36d96b0bb7..d55c1c0d655 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -1143,12 +1143,12 @@ public class ResourceManager extends CompositeService implements Recoverable {
if (this.rmContext.getYarnConfiguration().getBoolean(
YarnConfiguration.DIST_SCHEDULING_ENABLED,
YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) {
- DistributedSchedulingService distributedSchedulingService = new
- DistributedSchedulingService(this.rmContext, scheduler);
+ DistributedSchedulingAMService distributedSchedulingService = new
+ DistributedSchedulingAMService(this.rmContext, scheduler);
EventDispatcher distSchedulerEventDispatcher =
new EventDispatcher(distributedSchedulingService,
- DistributedSchedulingService.class.getName());
- // Add an event dispoatcher for the DistributedSchedulingService
+ DistributedSchedulingAMService.class.getName());
+ // Add an event dispatcher for the DistributedSchedulingAMService
// to handle node updates/additions and removals.
// Since the SchedulerEvent is currently a super set of theses,
// we register interest for it..
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 376466430af..c6773458ab6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
@@ -93,8 +94,8 @@ public class AppSchedulingInfo {
this.queue = queue;
this.user = user;
this.activeUsersManager = activeUsersManager;
- this.containerIdCounter =
- new AtomicLong(epoch << EPOCH_BIT_SHIFT);
+ this.containerIdCounter = new AtomicLong(
+ epoch << ResourceManager.EPOCH_BIT_SHIFT);
this.appResourceUsage = appResourceUsage;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index dcdc9348bda..7d1b3c33b9e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -822,7 +822,7 @@ public class MockRM extends ResourceManager {
if (this.rmContext.getYarnConfiguration().getBoolean(
YarnConfiguration.DIST_SCHEDULING_ENABLED,
YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) {
- return new DistributedSchedulingService(getRMContext(), scheduler) {
+ return new DistributedSchedulingAMService(getRMContext(), scheduler) {
@Override
protected void serviceStart() {
// override to not start rpc handler
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingAMService.java
similarity index 75%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingAMService.java
index 4716bab830c..0213a94bd63 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingAMService.java
@@ -25,16 +25,11 @@ import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
- .AllocateResponsePBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
- .FinishApplicationMasterRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
- .FinishApplicationMasterResponsePBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
- .RegisterApplicationMasterRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
- .RegisterApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@@ -44,17 +39,16 @@ import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -63,12 +57,10 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb
- .DistSchedAllocateResponsePBImpl;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb
- .DistSchedRegisterResponsePBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt
- .AMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.junit.Assert;
import org.junit.Test;
@@ -78,9 +70,12 @@ import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.List;
-public class TestDistributedSchedulingService {
+/**
+ * Test cases for {@link DistributedSchedulingAMService}.
+ */
+public class TestDistributedSchedulingAMService {
- // Test if the DistributedSchedulingService can handle both DSProtocol as
+ // Test if the DistributedSchedulingAMService can handle both DSProtocol as
// well as AMProtocol clients
@Test
public void testRPCWrapping() throws Exception {
@@ -116,7 +111,8 @@ public class TestDistributedSchedulingService {
Resource.newInstance(1, 2), 1, true, "exp",
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true))));
- DistributedSchedulingService service = createService(factory, rmContext, c);
+ DistributedSchedulingAMService service =
+ createService(factory, rmContext, c);
Server server = service.getServer(rpc, conf, addr, null);
server.start();
@@ -126,7 +122,7 @@ public class TestDistributedSchedulingService {
ProtobufRpcEngine.class);
ApplicationMasterProtocolPB ampProxy =
RPC.getProxy(ApplicationMasterProtocolPB
- .class, 1, NetUtils.getConnectAddress(server), conf);
+ .class, 1, NetUtils.getConnectAddress(server), conf);
RegisterApplicationMasterResponse regResp =
new RegisterApplicationMasterResponsePBImpl(
ampProxy.registerApplicationMaster(null,
@@ -156,34 +152,34 @@ public class TestDistributedSchedulingService {
// Verify that the DistrubutedSchedulingService can handle the
- // DistributedSchedulerProtocol clients as well
- RPC.setProtocolEngine(conf, DistributedSchedulerProtocolPB.class,
+ // DistributedSchedulingAMProtocol clients as well
+ RPC.setProtocolEngine(conf, DistributedSchedulingAMProtocolPB.class,
ProtobufRpcEngine.class);
- DistributedSchedulerProtocolPB dsProxy =
- RPC.getProxy(DistributedSchedulerProtocolPB
+ DistributedSchedulingAMProtocolPB dsProxy =
+ RPC.getProxy(DistributedSchedulingAMProtocolPB
.class, 1, NetUtils.getConnectAddress(server), conf);
- DistSchedRegisterResponse dsRegResp =
- new DistSchedRegisterResponsePBImpl(
+ RegisterDistributedSchedulingAMResponse dsRegResp =
+ new RegisterDistributedSchedulingAMResponsePBImpl(
dsProxy.registerApplicationMasterForDistributedScheduling(null,
((RegisterApplicationMasterRequestPBImpl)factory
.newRecordInstance(RegisterApplicationMasterRequest.class))
.getProto()));
Assert.assertEquals(54321l, dsRegResp.getContainerIdStart());
Assert.assertEquals(4,
- dsRegResp.getMaxAllocatableCapabilty().getVirtualCores());
+ dsRegResp.getMaxContainerResource().getVirtualCores());
Assert.assertEquals(1024,
- dsRegResp.getMinAllocatableCapabilty().getMemorySize());
+ dsRegResp.getMinContainerResource().getMemorySize());
Assert.assertEquals(2,
- dsRegResp.getIncrAllocatableCapabilty().getVirtualCores());
+ dsRegResp.getIncrContainerResource().getVirtualCores());
- DistSchedAllocateRequestPBImpl distAllReq =
- (DistSchedAllocateRequestPBImpl)factory.newRecordInstance(
- DistSchedAllocateRequest.class);
+ DistributedSchedulingAllocateRequestPBImpl distAllReq =
+ (DistributedSchedulingAllocateRequestPBImpl)factory.newRecordInstance(
+ DistributedSchedulingAllocateRequest.class);
distAllReq.setAllocateRequest(allReq);
distAllReq.setAllocatedContainers(Arrays.asList(c));
- DistSchedAllocateResponse dsAllocResp =
- new DistSchedAllocateResponsePBImpl(
+ DistributedSchedulingAllocateResponse dsAllocResp =
+ new DistributedSchedulingAllocateResponsePBImpl(
dsProxy.allocateForDistributedScheduling(null,
distAllReq.getProto()));
Assert.assertEquals(
@@ -199,9 +195,9 @@ public class TestDistributedSchedulingService {
false, dsfinishResp.getIsUnregistered());
}
- private DistributedSchedulingService createService(final RecordFactory
+ private DistributedSchedulingAMService createService(final RecordFactory
factory, final RMContext rmContext, final Container c) {
- return new DistributedSchedulingService(rmContext, null) {
+ return new DistributedSchedulingAMService(rmContext, null) {
@Override
public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request) throws
@@ -235,22 +231,24 @@ public class TestDistributedSchedulingService {
}
@Override
- public DistSchedRegisterResponse
+ public RegisterDistributedSchedulingAMResponse
registerApplicationMasterForDistributedScheduling(
- RegisterApplicationMasterRequest request) throws
- YarnException, IOException {
- DistSchedRegisterResponse resp = factory.newRecordInstance(
- DistSchedRegisterResponse.class);
+ RegisterApplicationMasterRequest request)
+ throws YarnException, IOException {
+ RegisterDistributedSchedulingAMResponse resp = factory
+ .newRecordInstance(RegisterDistributedSchedulingAMResponse.class);
resp.setContainerIdStart(54321L);
- resp.setMaxAllocatableCapabilty(Resource.newInstance(4096, 4));
- resp.setMinAllocatableCapabilty(Resource.newInstance(1024, 1));
- resp.setIncrAllocatableCapabilty(Resource.newInstance(2048, 2));
+ resp.setMaxContainerResource(Resource.newInstance(4096, 4));
+ resp.setMinContainerResource(Resource.newInstance(1024, 1));
+ resp.setIncrContainerResource(Resource.newInstance(2048, 2));
return resp;
}
@Override
- public DistSchedAllocateResponse allocateForDistributedScheduling(
- DistSchedAllocateRequest request) throws YarnException, IOException {
+ public DistributedSchedulingAllocateResponse
+ allocateForDistributedScheduling(
+ DistributedSchedulingAllocateRequest request)
+ throws YarnException, IOException {
List askList =
request.getAllocateRequest().getAskList();
List allocatedContainers = request.getAllocatedContainers();
@@ -260,8 +258,8 @@ public class TestDistributedSchedulingService {
Assert.assertEquals(1, askList.size());
Assert.assertTrue(askList.get(0)
.getExecutionTypeRequest().getEnforceExecutionType());
- DistSchedAllocateResponse resp =
- factory.newRecordInstance(DistSchedAllocateResponse.class);
+ DistributedSchedulingAllocateResponse resp = factory
+ .newRecordInstance(DistributedSchedulingAllocateResponse.class);
resp.setNodesForScheduling(
Arrays.asList(NodeId.newInstance("h1", 1234)));
return resp;