YARN-5113. Refactoring and other clean-up for distributed scheduling. (Konstantinos Karanasos via asuresh)

(cherry picked from commit e5766b1dbe)
This commit is contained in:
Arun Suresh 2016-07-31 10:18:01 -07:00
parent 96d5607e7a
commit f197378f81
31 changed files with 834 additions and 658 deletions

View File

@ -296,53 +296,60 @@ public class YarnConfiguration extends Configuration {
/** ACL used in case none is found. Allows nothing. */
public static final String DEFAULT_YARN_APP_ACL = " ";
/** Is Distributed Scheduling Enabled. */
/** Setting that controls whether distributed scheduling is enabled or not. */
public static final String DIST_SCHEDULING_ENABLED =
YARN_PREFIX + "distributed-scheduling.enabled";
public static final boolean DIST_SCHEDULING_ENABLED_DEFAULT = false;
/** Mininum allocatable container memory for Distributed Scheduling. */
public static final String DIST_SCHEDULING_MIN_MEMORY =
YARN_PREFIX + "distributed-scheduling.min-memory";
public static final int DIST_SCHEDULING_MIN_MEMORY_DEFAULT = 512;
/** Minimum memory (in MB) used for allocating a container through distributed
* scheduling. */
public static final String DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB =
YARN_PREFIX + "distributed-scheduling.min-container-memory-mb";
public static final int DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB_DEFAULT = 512;
/** Mininum allocatable container vcores for Distributed Scheduling. */
public static final String DIST_SCHEDULING_MIN_VCORES =
YARN_PREFIX + "distributed-scheduling.min-vcores";
public static final int DIST_SCHEDULING_MIN_VCORES_DEFAULT = 1;
/** Minimum virtual CPU cores used for allocating a container through
* distributed scheduling. */
public static final String DIST_SCHEDULING_MIN_CONTAINER_VCORES =
YARN_PREFIX + "distributed-scheduling.min-container-vcores";
public static final int DIST_SCHEDULING_MIN_CONTAINER_VCORES_DEFAULT = 1;
/** Maximum allocatable container memory for Distributed Scheduling. */
public static final String DIST_SCHEDULING_MAX_MEMORY =
YARN_PREFIX + "distributed-scheduling.max-memory";
public static final int DIST_SCHEDULING_MAX_MEMORY_DEFAULT = 2048;
/** Maximum memory (in MB) used for allocating a container through distributed
* scheduling. */
public static final String DIST_SCHEDULING_MAX_MEMORY_MB =
YARN_PREFIX + "distributed-scheduling.max-container-memory-mb";
public static final int DIST_SCHEDULING_MAX_MEMORY_MB_DEFAULT = 2048;
/** Maximum allocatable container vcores for Distributed Scheduling. */
public static final String DIST_SCHEDULING_MAX_VCORES =
YARN_PREFIX + "distributed-scheduling.max-vcores";
public static final int DIST_SCHEDULING_MAX_VCORES_DEFAULT = 4;
/** Maximum virtual CPU cores used for allocating a container through
* distributed scheduling. */
public static final String DIST_SCHEDULING_MAX_CONTAINER_VCORES =
YARN_PREFIX + "distributed-scheduling.max-container-vcores";
public static final int DIST_SCHEDULING_MAX_CONTAINER_VCORES_DEFAULT = 4;
/** Incremental allocatable container memory for Distributed Scheduling. */
public static final String DIST_SCHEDULING_INCR_MEMORY =
YARN_PREFIX + "distributed-scheduling.incr-memory";
public static final int DIST_SCHEDULING_INCR_MEMORY_DEFAULT = 512;
/** Incremental memory (in MB) used for allocating a container through
* distributed scheduling. */
public static final String DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB =
YARN_PREFIX + "distributed-scheduling.incr-container-memory-mb";
public static final int DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB_DEFAULT =
512;
/** Incremental allocatable container vcores for Distributed Scheduling. */
public static final String DIST_SCHEDULING_INCR_VCORES =
/** Incremental virtual CPU cores used for allocating a container through
* distributed scheduling. */
public static final String DIST_SCHEDULING_INCR_CONTAINER_VCORES =
YARN_PREFIX + "distributed-scheduling.incr-vcores";
public static final int DIST_SCHEDULING_INCR_VCORES_DEFAULT = 1;
public static final int DIST_SCHEDULING_INCR_CONTAINER_VCORES_DEFAULT = 1;
/** Container token expiry for container allocated via Distributed
* Scheduling. */
/** Container token expiry for container allocated via distributed
* scheduling. */
public static final String DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS =
YARN_PREFIX + "distributed-scheduling.container-token-expiry";
YARN_PREFIX + "distributed-scheduling.container-token-expiry-ms";
public static final int DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT =
600000;
/** K least loaded nodes to be provided to the LocalScheduler of a
* NodeManager for Distributed Scheduling. */
public static final String DIST_SCHEDULING_TOP_K =
YARN_PREFIX + "distributed-scheduling.top-k";
public static final int DIST_SCHEDULING_TOP_K_DEFAULT = 10;
/** Number of nodes to be used by the LocalScheduler of a NodeManager for
* dispatching containers during distributed scheduling. */
public static final String DIST_SCHEDULING_NODES_NUMBER_USED =
YARN_PREFIX + "distributed-scheduling.nodes-used";
public static final int DIST_SCHEDULING_NODES_NUMBER_USED_DEFAULT = 10;
/** Frequency for computing least loaded NMs. */
public static final String NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS =
@ -350,7 +357,7 @@ public class YarnConfiguration extends Configuration {
public static final long
NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS_DEFAULT = 1000;
/** Comparator for determining Node Load for Distributed Scheduling. */
/** Comparator for determining node load for Distributed Scheduling. */
public static final String NM_CONTAINER_QUEUING_LOAD_COMPARATOR =
YARN_PREFIX + "nm-container-queuing.load-comparator";
public static final String NM_CONTAINER_QUEUING_LOAD_COMPARATOR_DEFAULT =
@ -373,13 +380,13 @@ public class YarnConfiguration extends Configuration {
YARN_PREFIX + "nm-container-queuing.max-queue-length";
public static final int NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT = 10;
/** Min wait time of container queue at NodeManager. */
/** Min queue wait time for a container at a NodeManager. */
public static final String NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS =
YARN_PREFIX + "nm-container-queuing.min-queue-wait-time-ms";
public static final int NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS_DEFAULT =
1;
/** Max wait time of container queue at NodeManager. */
/** Max queue wait time for a container queue at a NodeManager. */
public static final String NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS =
YARN_PREFIX + "nm-container-queuing.max-queue-wait-time-ms";
public static final int NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS_DEFAULT =
@ -1653,17 +1660,21 @@ public class YarnConfiguration extends Configuration {
public static final String YARN_APPLICATION_CLASSPATH = YARN_PREFIX
+ "application.classpath";
/** The setting that controls whether AMRMProxy is enabled or not. */
public static final String AMRM_PROXY_ENABLED = NM_PREFIX
+ "amrmproxy.enable";
+ "amrmproxy.enabled";
public static final boolean DEFAULT_AMRM_PROXY_ENABLED = false;
public static final String AMRM_PROXY_ADDRESS = NM_PREFIX
+ "amrmproxy.address";
public static final int DEFAULT_AMRM_PROXY_PORT = 8048;
public static final String DEFAULT_AMRM_PROXY_ADDRESS = "0.0.0.0:"
+ DEFAULT_AMRM_PROXY_PORT;
public static final String AMRM_PROXY_CLIENT_THREAD_COUNT = NM_PREFIX
+ "amrmproxy.client.thread-count";
public static final int DEFAULT_AMRM_PROXY_CLIENT_THREAD_COUNT = 25;
public static final String AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE =
NM_PREFIX + "amrmproxy.interceptor-class.pipeline";
public static final String DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE =

View File

@ -121,41 +121,6 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
configurationPrefixToSkipCompare
.add(YarnConfiguration.NM_CPU_RESOURCE_ENABLED);
// Ignore Distributed Scheduling Related Configurations.
// Since it is still a "work in progress" feature
configurationPrefixToSkipCompare
.add(YarnConfiguration.DIST_SCHEDULING_ENABLED);
configurationPrefixToSkipCompare
.add(YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY);
configurationPrefixToSkipCompare
.add(YarnConfiguration.DIST_SCHEDULING_INCR_VCORES);
configurationPrefixToSkipCompare
.add(YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY);
configurationPrefixToSkipCompare
.add(YarnConfiguration.DIST_SCHEDULING_MAX_VCORES);
configurationPrefixToSkipCompare
.add(YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS);
configurationPrefixToSkipCompare
.add(YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY);
configurationPrefixToSkipCompare
.add(YarnConfiguration.DIST_SCHEDULING_MIN_VCORES);
configurationPrefixToSkipCompare
.add(YarnConfiguration.DIST_SCHEDULING_TOP_K);
configurationPrefixToSkipCompare
.add(YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS);
configurationPrefixToSkipCompare
.add(YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR);
configurationPrefixToSkipCompare
.add(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH);
configurationPrefixToSkipCompare
.add(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH);
configurationPrefixToSkipCompare
.add(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS);
configurationPrefixToSkipCompare
.add(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS);
configurationPrefixToSkipCompare
.add(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV);
// Set by container-executor.cfg
configurationPrefixToSkipCompare.add(YarnConfiguration.NM_USER_HOME_DIR);

View File

@ -82,9 +82,9 @@ import static org.mockito.Mockito.when;
/**
* Validates End2End Distributed Scheduling flow which includes the AM
* specifying OPPORTUNISTIC containers in its resource requests,
* the AMRMProxyService on the NM, the LocalScheduler RequestInterceptor on
* the NM and the DistributedSchedulingProtocol used by the framework to talk
* to the DistributedSchedulingService running on the RM.
* the AMRMProxyService on the NM, the DistributedScheduler RequestInterceptor
* on the NM and the DistributedSchedulingProtocol used by the framework to talk
* to the DistributedSchedulingAMService running on the RM.
*/
public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {

View File

@ -2635,10 +2635,10 @@
<property>
<description>
Enable/Disable AMRMProxyService in the node manager. This service is used to intercept
calls from the application masters to the resource manager.
Enable/Disable AMRMProxyService in the node manager. This service is used to
intercept calls from the application masters to the resource manager.
</description>
<name>yarn.nodemanager.amrmproxy.enable</name>
<name>yarn.nodemanager.amrmproxy.enabled</name>
<value>false</value>
</property>
@ -2660,13 +2660,149 @@
<property>
<description>
The comma separated list of class names that implement the RequestInterceptor interface. This is used by the
AMRMProxyService to create the request processing pipeline for applications.
The comma separated list of class names that implement the
RequestInterceptor interface. This is used by the AMRMProxyService to create
the request processing pipeline for applications.
</description>
<name>yarn.nodemanager.amrmproxy.interceptor-class.pipeline</name>
<value>org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor</value>
</property>
<property>
<description>
Setting that controls whether distributed scheduling is enabled.
</description>
<name>yarn.distributed-scheduling.enabled</name>
<value>false</value>
</property>
<property>
<description>
Minimum memory (in MB) used for allocating a container through distributed
scheduling.
</description>
<name>yarn.distributed-scheduling.min-container-memory-mb</name>
<value>512</value>
</property>
<property>
<description>
Minimum virtual CPU cores used for allocating a container through
distributed scheduling.
</description>
<name>yarn.distributed-scheduling.min-container-vcores</name>
<value>1</value>
</property>
<property>
<description>
Maximum memory (in MB) used for allocating a container through distributed
scheduling.
</description>
<name>yarn.distributed-scheduling.max-container-memory-mb</name>
<value>2048</value>
</property>
<property>
<description>
Maximum virtual CPU cores used for allocating a container through
distributed scheduling.
</description>
<name>yarn.distributed-scheduling.max-container-vcores</name>
<value>4</value>
</property>
<property>
<description>
Incremental memory (in MB) used for allocating a container through
distributed scheduling.
</description>
<name>yarn.distributed-scheduling.incr-container-memory-mb</name>
<value>512</value>
</property>
<property>
<description>
Incremental virtual CPU cores used for allocating a container through
distributed scheduling.
</description>
<name>yarn.distributed-scheduling.incr-vcores</name>
<value>1</value>
</property>
<property>
<description>
Container token expiry for container allocated via distributed scheduling.
</description>
<name>yarn.distributed-scheduling.container-token-expiry-ms</name>
<value>600000</value>
</property>
<property>
<description>
Number of nodes to be used by the LocalScheduler of a NodeManager for
dispatching containers during distributed scheduling.
</description>
<name>yarn.distributed-scheduling.nodes-used</name>
<value>10</value>
</property>
<property>
<description>
Frequency for computing least loaded NMs.
</description>
<name>yarn.nm-container-queuing.sorting-nodes-interval-ms</name>
<value>1000</value>
</property>
<property>
<description>
Comparator for determining node load for Distributed Scheduling.
</description>
<name>yarn.nm-container-queuing.load-comparator</name>
<value>QUEUE_LENGTH</value>
</property>
<property>
<description>
Value of standard deviation used for calculation of queue limit thresholds.
</description>
<name>yarn.nm-container-queuing.queue-limit-stdev</name>
<value>1.0f</value>
</property>
<property>
<description>
Min length of container queue at NodeManager.
</description>
<name>yarn.nm-container-queuing.min-queue-length</name>
<value>1</value>
</property>
<property>
<description>
Max length of container queue at NodeManager.
</description>
<name>yarn.nm-container-queuing.max-queue-length</name>
<value>10</value>
</property>
<property>
<description>
Min queue wait time for a container at a NodeManager.
</description>
<name>yarn.nm-container-queuing.min-queue-wait-time-ms</name>
<value>1</value>
</property>
<property>
<description>
Max queue wait time for a container queue at a NodeManager.
</description>
<name>yarn.nm-container-queuing.max-queue-wait-time-ms</name>
<value>10</value>
</property>
<property>
<description>
Error filename pattern, to identify the file in the container's

View File

@ -144,7 +144,7 @@
<source>
<directory>${basedir}/src/main/proto</directory>
<includes>
<include>distributed_scheduler_protocol.proto</include>
<include>distributed_scheduling_am_protocol.proto</include>
<include>yarn_server_common_protos.proto</include>
<include>yarn_server_common_service_protos.proto</include>
<include>yarn_server_common_service_protos.proto</include>

View File

@ -1,78 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import java.io.IOException;
/**
* <p>This protocol extends the <code>ApplicationMasterProtocol</code>. It is
* used by the <code>LocalScheduler</code> running on the NodeManager to wrap
* the request / response objects of the <code>registerApplicationMaster</code>
* and <code>allocate</code> methods of the protocol with addition information
* required to perform Distributed Scheduling.
* </p>
*/
public interface DistributedSchedulerProtocol
extends ApplicationMasterProtocol {
/**
* <p> Extends the <code>registerApplicationMaster</code> to wrap the response
* with additional metadata.</p>
*
* @param request ApplicationMaster registration request
* @return A <code>DistSchedRegisterResponse</code> that contains a standard
* AM registration response along with additional information required
* for Distributed Scheduling
* @throws YarnException
* @throws IOException
*/
@Public
@Unstable
@Idempotent
DistSchedRegisterResponse registerApplicationMasterForDistributedScheduling(
RegisterApplicationMasterRequest request)
throws YarnException, IOException;
/**
* <p> Extends the <code>allocate</code> to wrap the response with additional
* metadata.</p>
*
* @param request ApplicationMaster allocate request
* @return A <code>DistSchedAllocateResponse</code> that contains a standard
* AM allocate response along with additional information required
* for Distributed Scheduling
* @throws YarnException
* @throws IOException
*/
@Public
@Unstable
@Idempotent
DistSchedAllocateResponse allocateForDistributedScheduling(
DistSchedAllocateRequest request) throws YarnException, IOException;
}

View File

@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import java.io.IOException;
/**
* <p>
* This protocol extends the <code>ApplicationMasterProtocol</code>. It is used
* by the <code>DistributedScheduler</code> running on the NodeManager to wrap
* the request / response objects of the <code>registerApplicationMaster</code>
* and <code>allocate</code> methods of the protocol with additional information
* required to perform distributed scheduling.
* </p>
*/
public interface DistributedSchedulingAMProtocol
extends ApplicationMasterProtocol {
/**
* <p>
* Extends the <code>registerApplicationMaster</code> to wrap the response
* with additional metadata.
* </p>
*
* @param request
* ApplicationMaster registration request
* @return A <code>RegisterDistributedSchedulingAMResponse</code> that
* contains a standard AM registration response along with additional
* information required for distributed scheduling
* @throws YarnException YarnException
* @throws IOException IOException
*/
@Public
@Unstable
@Idempotent
RegisterDistributedSchedulingAMResponse
registerApplicationMasterForDistributedScheduling(
RegisterApplicationMasterRequest request)
throws YarnException, IOException;
/**
* <p>
* Extends the <code>allocate</code> to wrap the response with additional
* metadata.
* </p>
*
* @param request
* ApplicationMaster allocate request
* @return A <code>DistributedSchedulingAllocateResponse</code> that contains
* a standard AM allocate response along with additional information
* required for distributed scheduling
* @throws YarnException YarnException
* @throws IOException IOException
*/
@Public
@Unstable
@Idempotent
DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
DistributedSchedulingAllocateRequest request)
throws YarnException, IOException;
}

View File

@ -23,14 +23,15 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
import org.apache.hadoop.yarn.proto.DistributedSchedulerProtocol.DistributedSchedulerProtocolService;
import org.apache.hadoop.yarn.proto.DistributedSchedulingAMProtocol.DistributedSchedulingAMProtocolService;
@Private
@Unstable
@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB",
@ProtocolInfo(protocolName =
"org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB",
protocolVersion = 1)
public interface DistributedSchedulerProtocolPB extends
DistributedSchedulerProtocolService.BlockingInterface,
public interface DistributedSchedulingAMProtocolPB extends
DistributedSchedulingAMProtocolService.BlockingInterface,
ApplicationMasterProtocolService.BlockingInterface,
ApplicationMasterProtocolPB {
ApplicationMasterProtocolPB {
}

View File

@ -78,7 +78,7 @@ public class ServerRMProxy<T> extends RMProxy<T> {
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
} else if (protocol == DistributedSchedulerProtocol.class) {
} else if (protocol == DistributedSchedulingAMProtocol.class) {
return conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);

View File

@ -23,51 +23,48 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedRegisterResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
.FinishApplicationMasterRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
.FinishApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
.RegisterApplicationMasterRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
.RegisterApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
public class DistributedSchedulerProtocolPBClientImpl implements
DistributedSchedulerProtocol, Closeable {
/**
* Implementation of {@link DistributedSchedulingAMProtocol}, used when
* distributed scheduling is enabled.
*/
public class DistributedSchedulingAMProtocolPBClientImpl implements
DistributedSchedulingAMProtocol, Closeable {
private DistributedSchedulerProtocolPB proxy;
private DistributedSchedulingAMProtocolPB proxy;
public DistributedSchedulerProtocolPBClientImpl(long clientVersion,
InetSocketAddress addr,
Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, DistributedSchedulerProtocolPB.class,
public DistributedSchedulingAMProtocolPBClientImpl(long clientVersion,
InetSocketAddress addr, Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, DistributedSchedulingAMProtocolPB.class,
ProtobufRpcEngine.class);
proxy = RPC.getProxy(DistributedSchedulerProtocolPB.class, clientVersion,
proxy = RPC.getProxy(DistributedSchedulingAMProtocolPB.class, clientVersion,
addr, conf);
}
@ -79,14 +76,14 @@ public class DistributedSchedulerProtocolPBClientImpl implements
}
@Override
public DistSchedRegisterResponse
public RegisterDistributedSchedulingAMResponse
registerApplicationMasterForDistributedScheduling(
RegisterApplicationMasterRequest request)
throws YarnException, IOException {
RegisterApplicationMasterRequest request)
throws YarnException, IOException {
YarnServiceProtos.RegisterApplicationMasterRequestProto requestProto =
((RegisterApplicationMasterRequestPBImpl) request).getProto();
try {
return new DistSchedRegisterResponsePBImpl(
return new RegisterDistributedSchedulingAMResponsePBImpl(
proxy.registerApplicationMasterForDistributedScheduling(
null, requestProto));
} catch (ServiceException e) {
@ -96,12 +93,14 @@ public class DistributedSchedulerProtocolPBClientImpl implements
}
@Override
public DistSchedAllocateResponse allocateForDistributedScheduling(
DistSchedAllocateRequest request) throws YarnException, IOException {
YarnServerCommonServiceProtos.DistSchedAllocateRequestProto requestProto =
((DistSchedAllocateRequestPBImpl) request).getProto();
public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
DistributedSchedulingAllocateRequest request)
throws YarnException, IOException {
YarnServerCommonServiceProtos.DistributedSchedulingAllocateRequestProto
requestProto =
((DistributedSchedulingAllocateRequestPBImpl) request).getProto();
try {
return new DistSchedAllocateResponsePBImpl(
return new DistributedSchedulingAllocateResponsePBImpl(
proxy.allocateForDistributedScheduling(null, requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
@ -110,9 +109,9 @@ public class DistributedSchedulerProtocolPBClientImpl implements
}
@Override
public RegisterApplicationMasterResponse registerApplicationMaster
(RegisterApplicationMasterRequest request) throws YarnException,
IOException {
public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request)
throws YarnException, IOException {
YarnServiceProtos.RegisterApplicationMasterRequestProto requestProto =
((RegisterApplicationMasterRequestPBImpl) request).getProto();
try {
@ -125,9 +124,9 @@ public class DistributedSchedulerProtocolPBClientImpl implements
}
@Override
public FinishApplicationMasterResponse finishApplicationMaster
(FinishApplicationMasterRequest request) throws YarnException,
IOException {
public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request)
throws YarnException, IOException {
YarnServiceProtos.FinishApplicationMasterRequestProto requestProto =
((FinishApplicationMasterRequestPBImpl) request).getProto();
try {
@ -140,8 +139,8 @@ public class DistributedSchedulerProtocolPBClientImpl implements
}
@Override
public AllocateResponse allocate(AllocateRequest request) throws
YarnException, IOException {
public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException {
YarnServiceProtos.AllocateRequestProto requestProto =
((AllocateRequestPBImpl) request).getProto();
try {

View File

@ -20,24 +20,21 @@ package org.apache.hadoop.yarn.server.api.impl.pb.service;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords
.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedRegisterResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
.FinishApplicationMasterRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
.FinishApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -47,27 +44,32 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterR
import java.io.IOException;
public class DistributedSchedulerProtocolPBServiceImpl implements
DistributedSchedulerProtocolPB {
/**
* Implementation of {@link DistributedSchedulingAMProtocolPB}.
*/
public class DistributedSchedulingAMProtocolPBServiceImpl implements
DistributedSchedulingAMProtocolPB {
private DistributedSchedulerProtocol real;
private DistributedSchedulingAMProtocol real;
public DistributedSchedulerProtocolPBServiceImpl(
DistributedSchedulerProtocol impl) {
public DistributedSchedulingAMProtocolPBServiceImpl(
DistributedSchedulingAMProtocol impl) {
this.real = impl;
}
@Override
public YarnServerCommonServiceProtos.DistSchedRegisterResponseProto
registerApplicationMasterForDistributedScheduling(RpcController controller,
RegisterApplicationMasterRequestProto proto) throws
ServiceException {
public YarnServerCommonServiceProtos.
RegisterDistributedSchedulingAMResponseProto
registerApplicationMasterForDistributedScheduling(
RpcController controller, RegisterApplicationMasterRequestProto proto)
throws ServiceException {
RegisterApplicationMasterRequestPBImpl request = new
RegisterApplicationMasterRequestPBImpl(proto);
try {
DistSchedRegisterResponse response =
RegisterDistributedSchedulingAMResponse response =
real.registerApplicationMasterForDistributedScheduling(request);
return ((DistSchedRegisterResponsePBImpl) response).getProto();
return ((RegisterDistributedSchedulingAMResponsePBImpl) response)
.getProto();
} catch (YarnException e) {
throw new ServiceException(e);
} catch (IOException e) {
@ -76,16 +78,19 @@ public class DistributedSchedulerProtocolPBServiceImpl implements
}
@Override
public YarnServerCommonServiceProtos.DistSchedAllocateResponseProto
allocateForDistributedScheduling(RpcController controller,
YarnServerCommonServiceProtos.DistSchedAllocateRequestProto proto)
public YarnServerCommonServiceProtos.
DistributedSchedulingAllocateResponseProto
allocateForDistributedScheduling(RpcController controller,
YarnServerCommonServiceProtos.
DistributedSchedulingAllocateRequestProto proto)
throws ServiceException {
DistSchedAllocateRequestPBImpl request =
new DistSchedAllocateRequestPBImpl(proto);
DistributedSchedulingAllocateRequestPBImpl request =
new DistributedSchedulingAllocateRequestPBImpl(proto);
try {
DistSchedAllocateResponse response = real
DistributedSchedulingAllocateResponse response = real
.allocateForDistributedScheduling(request);
return ((DistSchedAllocateResponsePBImpl) response).getProto();
return ((DistributedSchedulingAllocateResponsePBImpl) response)
.getProto();
} catch (YarnException e) {
throw new ServiceException(e);
} catch (IOException e) {

View File

@ -26,12 +26,14 @@ import org.apache.hadoop.yarn.api.records.Container;
import java.util.List;
/**
* Request for a distributed scheduler to notify allocation of containers to
* the Resource Manager.
* Object used by the Application Master when distributed scheduling is enabled,
* in order to forward the {@link AllocateRequest} for GUARANTEED containers to
* the Resource Manager, and to notify the Resource Manager about the allocation
* of OPPORTUNISTIC containers through the Distributed Scheduler.
*/
@Public
@Evolving
public abstract class DistSchedAllocateRequest {
public abstract class DistributedSchedulingAllocateRequest {
/**
* Get the underlying <code>AllocateRequest</code> object.

View File

@ -26,16 +26,24 @@ import org.apache.hadoop.yarn.util.Records;
import java.util.List;
/**
* This is the response of the Resource Manager to the
* {@link DistributedSchedulingAllocateRequest}, when distributed scheduling is
* enabled. It includes the {@link AllocateResponse} for the GUARANTEED
* containers allocated by the Resource Manager. Moreover, it includes a list
* with the nodes that can be used by the Distributed Scheduler when allocating
* containers.
*/
@Public
@Unstable
public abstract class DistSchedAllocateResponse {
public abstract class DistributedSchedulingAllocateResponse {
@Public
@Unstable
public static DistSchedAllocateResponse newInstance(AllocateResponse
allResp) {
DistSchedAllocateResponse response =
Records.newRecord(DistSchedAllocateResponse.class);
public static DistributedSchedulingAllocateResponse newInstance(
AllocateResponse allResp) {
DistributedSchedulingAllocateResponse response =
Records.newRecord(DistributedSchedulingAllocateResponse.class);
response.setAllocateResponse(allResp);
return response;
}

View File

@ -20,24 +20,30 @@ package org.apache.hadoop.yarn.server.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords
.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.Records;
import java.util.List;
/**
* This is the response to registering an Application Master when distributed
* scheduling is enabled. Apart from the
* {@link RegisterApplicationMasterResponse}, it includes various parameters
* to be used during distributed scheduling, such as the min and max resources
* that can be requested by containers.
*/
@Public
@Unstable
public abstract class DistSchedRegisterResponse {
public abstract class RegisterDistributedSchedulingAMResponse {
@Public
@Unstable
public static DistSchedRegisterResponse newInstance
public static RegisterDistributedSchedulingAMResponse newInstance
(RegisterApplicationMasterResponse regAMResp) {
DistSchedRegisterResponse response =
Records.newRecord(DistSchedRegisterResponse.class);
RegisterDistributedSchedulingAMResponse response =
Records.newRecord(RegisterDistributedSchedulingAMResponse.class);
response.setRegisterResponse(regAMResp);
return response;
}
@ -53,27 +59,27 @@ public abstract class DistSchedRegisterResponse {
@Public
@Unstable
public abstract void setMinAllocatableCapabilty(Resource minResource);
public abstract void setMinContainerResource(Resource minResource);
@Public
@Unstable
public abstract Resource getMinAllocatableCapabilty();
public abstract Resource getMinContainerResource();
@Public
@Unstable
public abstract void setMaxAllocatableCapabilty(Resource maxResource);
public abstract void setMaxContainerResource(Resource maxResource);
@Public
@Unstable
public abstract Resource getMaxAllocatableCapabilty();
public abstract Resource getMaxContainerResource();
@Public
@Unstable
public abstract void setIncrAllocatableCapabilty(Resource maxResource);
public abstract void setIncrContainerResource(Resource maxResource);
@Public
@Unstable
public abstract Resource getIncrAllocatableCapabilty();
public abstract Resource getIncrContainerResource();
@Public
@Unstable

View File

@ -26,39 +26,40 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.DistSchedAllocateRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.DistSchedAllocateRequestProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.DistributedSchedulingAllocateRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.DistributedSchedulingAllocateRequestProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
import java.util.Iterator;
import java.util.List;
/**
* Implementation of {@link DistSchedAllocateRequest} for a distributed
* scheduler to notify about the allocation of containers to the Resource
* Manager.
* Implementation of {@link DistributedSchedulingAllocateRequest}.
*/
public class DistSchedAllocateRequestPBImpl extends DistSchedAllocateRequest {
private DistSchedAllocateRequestProto.Builder builder = null;
public class DistributedSchedulingAllocateRequestPBImpl
extends DistributedSchedulingAllocateRequest {
private DistributedSchedulingAllocateRequestProto.Builder builder = null;
private boolean viaProto = false;
private DistSchedAllocateRequestProto proto;
private DistributedSchedulingAllocateRequestProto proto;
private AllocateRequest allocateRequest;
private List<Container> containers;
public DistSchedAllocateRequestPBImpl() {
builder = DistSchedAllocateRequestProto.newBuilder();
public DistributedSchedulingAllocateRequestPBImpl() {
builder = DistributedSchedulingAllocateRequestProto.newBuilder();
}
public DistSchedAllocateRequestPBImpl(DistSchedAllocateRequestProto proto) {
public DistributedSchedulingAllocateRequestPBImpl(
DistributedSchedulingAllocateRequestProto proto) {
this.proto = proto;
this.viaProto = true;
}
@Override
public AllocateRequest getAllocateRequest() {
DistSchedAllocateRequestProtoOrBuilder p = viaProto ? proto : builder;
DistributedSchedulingAllocateRequestProtoOrBuilder p =
viaProto ? proto : builder;
if (this.allocateRequest != null) {
return this.allocateRequest;
}
@ -88,7 +89,8 @@ public class DistSchedAllocateRequestPBImpl extends DistSchedAllocateRequest {
}
private void initAllocatedContainers() {
DistSchedAllocateRequestProtoOrBuilder p = viaProto ? proto : builder;
DistributedSchedulingAllocateRequestProtoOrBuilder p =
viaProto ? proto : builder;
List<ContainerProto> list = p.getAllocatedContainersList();
this.containers = new ArrayList<Container>();
for (ContainerProto c : list) {
@ -110,7 +112,7 @@ public class DistSchedAllocateRequestPBImpl extends DistSchedAllocateRequest {
this.containers.addAll(pContainers);
}
public DistSchedAllocateRequestProto getProto() {
public DistributedSchedulingAllocateRequestProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
@ -119,7 +121,7 @@ public class DistSchedAllocateRequestPBImpl extends DistSchedAllocateRequest {
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = DistSchedAllocateRequestProto.newBuilder(proto);
builder = DistributedSchedulingAllocateRequestProto.newBuilder(proto);
}
viaProto = false;
}

View File

@ -20,41 +20,47 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
.AllocateResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
import org.apache.hadoop.yarn.server.api.protocolrecords
.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class DistSchedAllocateResponsePBImpl extends DistSchedAllocateResponse {
/**
* Implementation of {@link DistributedSchedulingAllocateResponse}.
*/
public class DistributedSchedulingAllocateResponsePBImpl extends
DistributedSchedulingAllocateResponse {
YarnServerCommonServiceProtos.DistSchedAllocateResponseProto proto =
YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.getDefaultInstance();
YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.Builder builder = null;
YarnServerCommonServiceProtos.DistributedSchedulingAllocateResponseProto
proto = YarnServerCommonServiceProtos.
DistributedSchedulingAllocateResponseProto.getDefaultInstance();
YarnServerCommonServiceProtos.DistributedSchedulingAllocateResponseProto.
Builder builder = null;
boolean viaProto = false;
private AllocateResponse allocateResponse;
private List<NodeId> nodesForScheduling;
public DistSchedAllocateResponsePBImpl() {
builder = YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.newBuilder();
public DistributedSchedulingAllocateResponsePBImpl() {
builder = YarnServerCommonServiceProtos.
DistributedSchedulingAllocateResponseProto.newBuilder();
}
public DistSchedAllocateResponsePBImpl(YarnServerCommonServiceProtos.DistSchedAllocateResponseProto proto) {
public DistributedSchedulingAllocateResponsePBImpl(
YarnServerCommonServiceProtos.
DistributedSchedulingAllocateResponseProto proto) {
this.proto = proto;
viaProto = true;
}
public YarnServerCommonServiceProtos.DistSchedAllocateResponseProto getProto() {
public YarnServerCommonServiceProtos.
DistributedSchedulingAllocateResponseProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
@ -63,7 +69,8 @@ public class DistSchedAllocateResponsePBImpl extends DistSchedAllocateResponse {
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.newBuilder(proto);
builder = YarnServerCommonServiceProtos.
DistributedSchedulingAllocateResponseProto.newBuilder(proto);
}
viaProto = false;
}
@ -79,19 +86,20 @@ public class DistSchedAllocateResponsePBImpl extends DistSchedAllocateResponse {
private synchronized void mergeLocalToBuilder() {
if (this.nodesForScheduling != null) {
builder.clearNodesForScheduling();
Iterable<YarnProtos.NodeIdProto> iterable =
getNodeIdProtoIterable(this.nodesForScheduling);
Iterable<YarnProtos.NodeIdProto> iterable = getNodeIdProtoIterable(
this.nodesForScheduling);
builder.addAllNodesForScheduling(iterable);
}
if (this.allocateResponse != null) {
builder.setAllocateResponse(
((AllocateResponsePBImpl)this.allocateResponse).getProto());
((AllocateResponsePBImpl) this.allocateResponse).getProto());
}
}
@Override
public void setAllocateResponse(AllocateResponse response) {
maybeInitBuilder();
if(allocateResponse == null) {
if (allocateResponse == null) {
builder.clearAllocateResponse();
}
this.allocateResponse = response;
@ -103,14 +111,14 @@ public class DistSchedAllocateResponsePBImpl extends DistSchedAllocateResponse {
return this.allocateResponse;
}
YarnServerCommonServiceProtos.DistSchedAllocateResponseProtoOrBuilder p =
viaProto ? proto : builder;
YarnServerCommonServiceProtos.
DistributedSchedulingAllocateResponseProtoOrBuilder p =
viaProto ? proto : builder;
if (!p.hasAllocateResponse()) {
return null;
}
this.allocateResponse =
new AllocateResponsePBImpl(p.getAllocateResponse());
this.allocateResponse = new AllocateResponsePBImpl(p.getAllocateResponse());
return this.allocateResponse;
}
@ -138,8 +146,9 @@ public class DistSchedAllocateResponsePBImpl extends DistSchedAllocateResponse {
}
private synchronized void initLocalNodesForSchedulingList() {
YarnServerCommonServiceProtos.DistSchedAllocateResponseProtoOrBuilder p =
viaProto ? proto : builder;
YarnServerCommonServiceProtos.
DistributedSchedulingAllocateResponseProtoOrBuilder p =
viaProto ? proto : builder;
List<YarnProtos.NodeIdProto> list = p.getNodesForSchedulingList();
nodesForScheduling = new ArrayList<>();
if (list != null) {
@ -148,6 +157,7 @@ public class DistSchedAllocateResponsePBImpl extends DistSchedAllocateResponse {
}
}
}
private synchronized Iterable<YarnProtos.NodeIdProto> getNodeIdProtoIterable(
final List<NodeId> nodeList) {
maybeInitBuilder();

View File

@ -22,45 +22,54 @@ import com.google.protobuf.TextFormat;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
.RegisterApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
import org.apache.hadoop.yarn.server.api.protocolrecords
.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class DistSchedRegisterResponsePBImpl extends DistSchedRegisterResponse {
/**
* Implementation of {@link RegisterDistributedSchedulingAMResponse}.
*/
public class RegisterDistributedSchedulingAMResponsePBImpl extends
RegisterDistributedSchedulingAMResponse {
YarnServerCommonServiceProtos.DistSchedRegisterResponseProto proto =
YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.getDefaultInstance();
YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.Builder builder = null;
YarnServerCommonServiceProtos.RegisterDistributedSchedulingAMResponseProto
proto =
YarnServerCommonServiceProtos.
RegisterDistributedSchedulingAMResponseProto
.getDefaultInstance();
YarnServerCommonServiceProtos.RegisterDistributedSchedulingAMResponseProto.
Builder builder = null;
boolean viaProto = false;
private Resource maxAllocatableCapability;
private Resource minAllocatableCapability;
private Resource incrAllocatableCapability;
private Resource maxContainerResource;
private Resource minContainerResource;
private Resource incrContainerResource;
private List<NodeId> nodesForScheduling;
private RegisterApplicationMasterResponse registerApplicationMasterResponse;
public DistSchedRegisterResponsePBImpl() {
builder = YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.newBuilder();
public RegisterDistributedSchedulingAMResponsePBImpl() {
builder = YarnServerCommonServiceProtos.
RegisterDistributedSchedulingAMResponseProto.newBuilder();
}
public DistSchedRegisterResponsePBImpl(YarnServerCommonServiceProtos.DistSchedRegisterResponseProto proto) {
public RegisterDistributedSchedulingAMResponsePBImpl(
YarnServerCommonServiceProtos.
RegisterDistributedSchedulingAMResponseProto proto) {
this.proto = proto;
viaProto = true;
}
public YarnServerCommonServiceProtos.DistSchedRegisterResponseProto getProto() {
public YarnServerCommonServiceProtos.
RegisterDistributedSchedulingAMResponseProto
getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
@ -69,7 +78,8 @@ public class DistSchedRegisterResponsePBImpl extends DistSchedRegisterResponse {
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.newBuilder(proto);
builder = YarnServerCommonServiceProtos.
RegisterDistributedSchedulingAMResponseProto.newBuilder(proto);
}
viaProto = false;
}
@ -85,21 +95,21 @@ public class DistSchedRegisterResponsePBImpl extends DistSchedRegisterResponse {
private synchronized void mergeLocalToBuilder() {
if (this.nodesForScheduling != null) {
builder.clearNodesForScheduling();
Iterable<YarnProtos.NodeIdProto> iterable =
getNodeIdProtoIterable(this.nodesForScheduling);
Iterable<YarnProtos.NodeIdProto> iterable = getNodeIdProtoIterable(
this.nodesForScheduling);
builder.addAllNodesForScheduling(iterable);
}
if (this.maxAllocatableCapability != null) {
builder.setMaxAllocCapability(
ProtoUtils.convertToProtoFormat(this.maxAllocatableCapability));
if (this.maxContainerResource != null) {
builder.setMaxContainerResource(ProtoUtils.convertToProtoFormat(
this.maxContainerResource));
}
if (this.minAllocatableCapability != null) {
builder.setMinAllocCapability(
ProtoUtils.convertToProtoFormat(this.minAllocatableCapability));
if (this.minContainerResource != null) {
builder.setMinContainerResource(ProtoUtils.convertToProtoFormat(
this.minContainerResource));
}
if (this.incrAllocatableCapability != null) {
builder.setIncrAllocCapability(
ProtoUtils.convertToProtoFormat(this.incrAllocatableCapability));
if (this.incrContainerResource != null) {
builder.setIncrContainerResource(
ProtoUtils.convertToProtoFormat(this.incrContainerResource));
}
if (this.registerApplicationMasterResponse != null) {
builder.setRegisterResponse(
@ -111,7 +121,7 @@ public class DistSchedRegisterResponsePBImpl extends DistSchedRegisterResponse {
@Override
public void setRegisterResponse(RegisterApplicationMasterResponse resp) {
maybeInitBuilder();
if(registerApplicationMasterResponse == null) {
if (registerApplicationMasterResponse == null) {
builder.clearRegisterResponse();
}
this.registerApplicationMasterResponse = resp;
@ -123,7 +133,9 @@ public class DistSchedRegisterResponsePBImpl extends DistSchedRegisterResponse {
return this.registerApplicationMasterResponse;
}
YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
YarnServerCommonServiceProtos.
RegisterDistributedSchedulingAMResponseProtoOrBuilder p =
viaProto ? proto : builder;
if (!p.hasRegisterResponse()) {
return null;
}
@ -134,78 +146,84 @@ public class DistSchedRegisterResponsePBImpl extends DistSchedRegisterResponse {
}
@Override
public void setMaxAllocatableCapabilty(Resource maxResource) {
public void setMaxContainerResource(Resource maxResource) {
maybeInitBuilder();
if(maxAllocatableCapability == null) {
builder.clearMaxAllocCapability();
if (maxContainerResource == null) {
builder.clearMaxContainerResource();
}
this.maxAllocatableCapability = maxResource;
this.maxContainerResource = maxResource;
}
@Override
public Resource getMaxAllocatableCapabilty() {
if (this.maxAllocatableCapability != null) {
return this.maxAllocatableCapability;
public Resource getMaxContainerResource() {
if (this.maxContainerResource != null) {
return this.maxContainerResource;
}
YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasMaxAllocCapability()) {
YarnServerCommonServiceProtos.
RegisterDistributedSchedulingAMResponseProtoOrBuilder p =
viaProto ? proto : builder;
if (!p.hasMaxContainerResource()) {
return null;
}
this.maxAllocatableCapability =
ProtoUtils.convertFromProtoFormat(p.getMaxAllocCapability());
return this.maxAllocatableCapability;
this.maxContainerResource = ProtoUtils.convertFromProtoFormat(p
.getMaxContainerResource());
return this.maxContainerResource;
}
@Override
public void setMinAllocatableCapabilty(Resource minResource) {
public void setMinContainerResource(Resource minResource) {
maybeInitBuilder();
if(minAllocatableCapability == null) {
builder.clearMinAllocCapability();
if (minContainerResource == null) {
builder.clearMinContainerResource();
}
this.minAllocatableCapability = minResource;
this.minContainerResource = minResource;
}
@Override
public Resource getMinAllocatableCapabilty() {
if (this.minAllocatableCapability != null) {
return this.minAllocatableCapability;
public Resource getMinContainerResource() {
if (this.minContainerResource != null) {
return this.minContainerResource;
}
YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasMinAllocCapability()) {
YarnServerCommonServiceProtos.
RegisterDistributedSchedulingAMResponseProtoOrBuilder p =
viaProto ? proto : builder;
if (!p.hasMinContainerResource()) {
return null;
}
this.minAllocatableCapability =
ProtoUtils.convertFromProtoFormat(p.getMinAllocCapability());
return this.minAllocatableCapability;
this.minContainerResource = ProtoUtils.convertFromProtoFormat(p
.getMinContainerResource());
return this.minContainerResource;
}
@Override
public void setIncrAllocatableCapabilty(Resource incrResource) {
public void setIncrContainerResource(Resource incrResource) {
maybeInitBuilder();
if(incrAllocatableCapability == null) {
builder.clearIncrAllocCapability();
if (incrContainerResource == null) {
builder.clearIncrContainerResource();
}
this.incrAllocatableCapability = incrResource;
this.incrContainerResource = incrResource;
}
@Override
public Resource getIncrAllocatableCapabilty() {
if (this.incrAllocatableCapability != null) {
return this.incrAllocatableCapability;
public Resource getIncrContainerResource() {
if (this.incrContainerResource != null) {
return this.incrContainerResource;
}
YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasIncrAllocCapability()) {
YarnServerCommonServiceProtos.
RegisterDistributedSchedulingAMResponseProtoOrBuilder p =
viaProto ? proto : builder;
if (!p.hasIncrContainerResource()) {
return null;
}
this.incrAllocatableCapability =
ProtoUtils.convertFromProtoFormat(p.getIncrAllocCapability());
return this.incrAllocatableCapability;
this.incrContainerResource = ProtoUtils.convertFromProtoFormat(p
.getIncrContainerResource());
return this.incrContainerResource;
}
@Override
@ -216,7 +234,9 @@ public class DistSchedRegisterResponsePBImpl extends DistSchedRegisterResponse {
@Override
public int getContainerTokenExpiryInterval() {
YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
YarnServerCommonServiceProtos.
RegisterDistributedSchedulingAMResponseProtoOrBuilder p =
viaProto ? proto : builder;
if (!p.hasContainerTokenExpiryInterval()) {
return 0;
}
@ -231,14 +251,15 @@ public class DistSchedRegisterResponsePBImpl extends DistSchedRegisterResponse {
@Override
public long getContainerIdStart() {
YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
YarnServerCommonServiceProtos.
RegisterDistributedSchedulingAMResponseProtoOrBuilder p =
viaProto ? proto : builder;
if (!p.hasContainerIdStart()) {
return 0;
}
return p.getContainerIdStart();
}
@Override
public void setNodesForScheduling(List<NodeId> nodesForScheduling) {
maybeInitBuilder();
@ -263,7 +284,9 @@ public class DistSchedRegisterResponsePBImpl extends DistSchedRegisterResponse {
}
private synchronized void initLocalNodesForSchedulingList() {
YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
YarnServerCommonServiceProtos.
RegisterDistributedSchedulingAMResponseProtoOrBuilder p =
viaProto ? proto : builder;
List<YarnProtos.NodeIdProto> list = p.getNodesForSchedulingList();
nodesForScheduling = new ArrayList<>();
if (list != null) {
@ -272,6 +295,7 @@ public class DistSchedRegisterResponsePBImpl extends DistSchedRegisterResponse {
}
}
}
private synchronized Iterable<YarnProtos.NodeIdProto> getNodeIdProtoIterable(
final List<NodeId> nodeList) {
maybeInitBuilder();

View File

@ -16,6 +16,7 @@
* limitations under the License.
*/
/**
* These .proto interfaces are public and stable.
* Please see http://wiki.apache.org/hadoop/Compatibility
@ -23,7 +24,7 @@
*/
option java_package = "org.apache.hadoop.yarn.proto";
option java_outer_classname = "DistributedSchedulerProtocol";
option java_outer_classname = "DistributedSchedulingAMProtocol";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.yarn;
@ -31,9 +32,8 @@ package hadoop.yarn;
import "yarn_service_protos.proto";
import "yarn_server_common_service_protos.proto";
service DistributedSchedulerProtocolService {
rpc registerApplicationMasterForDistributedScheduling (RegisterApplicationMasterRequestProto) returns (DistSchedRegisterResponseProto);
service DistributedSchedulingAMProtocolService {
rpc registerApplicationMasterForDistributedScheduling (RegisterApplicationMasterRequestProto) returns (RegisterDistributedSchedulingAMResponseProto);
rpc allocateForDistributedScheduling (DistributedSchedulingAllocateRequestProto) returns (DistributedSchedulingAllocateResponseProto);
rpc finishApplicationMaster (FinishApplicationMasterRequestProto) returns (FinishApplicationMasterResponseProto);
rpc allocateForDistributedScheduling (DistSchedAllocateRequestProto) returns (DistSchedAllocateResponseProto);
}

View File

@ -26,22 +26,22 @@ import "yarn_protos.proto";
import "yarn_server_common_protos.proto";
import "yarn_service_protos.proto";
message DistSchedRegisterResponseProto {
message RegisterDistributedSchedulingAMResponseProto {
optional RegisterApplicationMasterResponseProto register_response = 1;
optional ResourceProto max_alloc_capability = 2;
optional ResourceProto min_alloc_capability = 3;
optional ResourceProto incr_alloc_capability = 4;
optional ResourceProto max_container_resource = 2;
optional ResourceProto min_container_resource = 3;
optional ResourceProto incr_container_resource = 4;
optional int32 container_token_expiry_interval = 5;
optional int64 container_id_start = 6;
repeated NodeIdProto nodes_for_scheduling = 7;
}
message DistSchedAllocateResponseProto {
message DistributedSchedulingAllocateResponseProto {
optional AllocateResponseProto allocate_response = 1;
repeated NodeIdProto nodes_for_scheduling = 2;
}
message DistSchedAllocateRequestProto {
message DistributedSchedulingAllocateRequestProto {
optional AllocateRequestProto allocate_request = 1;
repeated ContainerProto allocated_containers = 2;
}

View File

@ -65,7 +65,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.scheduler.LocalScheduler;
import org.apache.hadoop.yarn.server.nodemanager.scheduler.DistributedScheduler;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
@ -467,10 +467,9 @@ public class AMRMProxyService extends AbstractService implements
interceptorClassNames.add(item.trim());
}
// Make sure LocalScheduler is present at the beginning
// of the chain..
// Make sure DistributedScheduler is present at the beginning of the chain.
if (this.nmContext.isDistributedSchedulingEnabled()) {
interceptorClassNames.add(0, LocalScheduler.class.getName());
interceptorClassNames.add(0, DistributedScheduler.class.getName());
}
return interceptorClassNames;

View File

@ -21,12 +21,11 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
import org.apache.hadoop.conf.Configuration;
import com.google.common.base.Preconditions;
import org.apache.hadoop.yarn.api.protocolrecords
.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import java.io.IOException;
@ -118,8 +117,9 @@ public abstract class AbstractRequestInterceptor implements
* @throws IOException
*/
@Override
public DistSchedAllocateResponse allocateForDistributedScheduling(
DistSchedAllocateRequest request) throws YarnException, IOException {
public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
DistributedSchedulingAllocateRequest request)
throws YarnException, IOException {
return (this.nextInterceptor != null) ?
this.nextInterceptor.allocateForDistributedScheduling(request) : null;
}
@ -134,10 +134,10 @@ public abstract class AbstractRequestInterceptor implements
* @throws IOException
*/
@Override
public DistSchedRegisterResponse
public RegisterDistributedSchedulingAMResponse
registerApplicationMasterForDistributedScheduling(
RegisterApplicationMasterRequest request)
throws YarnException, IOException {
RegisterApplicationMasterRequest request)
throws YarnException, IOException {
return (this.nextInterceptor != null) ? this.nextInterceptor
.registerApplicationMasterForDistributedScheduling(request) : null;
}

View File

@ -43,12 +43,11 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
import org.apache.hadoop.yarn.server.api.ServerRMProxy;
import org.apache.hadoop.yarn.server.api.protocolrecords
.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -63,7 +62,7 @@ public final class DefaultRequestInterceptor extends
AbstractRequestInterceptor {
private static final Logger LOG = LoggerFactory
.getLogger(DefaultRequestInterceptor.class);
private DistributedSchedulerProtocol rmClient;
private DistributedSchedulingAMProtocol rmClient;
private UserGroupInformation user = null;
@Override
@ -77,13 +76,13 @@ public final class DefaultRequestInterceptor extends
user.addToken(appContext.getAMRMToken());
final Configuration conf = this.getConf();
rmClient =
user.doAs(new PrivilegedExceptionAction<DistributedSchedulerProtocol>() {
rmClient = user.doAs(
new PrivilegedExceptionAction<DistributedSchedulingAMProtocol>() {
@Override
public DistributedSchedulerProtocol run() throws Exception {
public DistributedSchedulingAMProtocol run() throws Exception {
setAMRMTokenService(conf);
return ServerRMProxy.createRMProxy(conf,
DistributedSchedulerProtocol.class);
DistributedSchedulingAMProtocol.class);
}
});
} catch (IOException e) {
@ -124,7 +123,7 @@ public final class DefaultRequestInterceptor extends
}
@Override
public DistSchedRegisterResponse
public RegisterDistributedSchedulingAMResponse
registerApplicationMasterForDistributedScheduling
(RegisterApplicationMasterRequest request) throws YarnException,
IOException {
@ -134,13 +133,14 @@ public final class DefaultRequestInterceptor extends
}
@Override
public DistSchedAllocateResponse allocateForDistributedScheduling(
DistSchedAllocateRequest request) throws YarnException, IOException {
public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
DistributedSchedulingAllocateRequest request)
throws YarnException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Forwarding allocateForDistributedScheduling request" +
"to the real YARN RM");
}
DistSchedAllocateResponse allocateResponse =
DistributedSchedulingAllocateResponse allocateResponse =
rmClient.allocateForDistributedScheduling(request);
if (allocateResponse.getAllocateResponse().getAMRMToken() != null) {
updateAMRMToken(allocateResponse.getAllocateResponse().getAMRMToken());
@ -180,10 +180,10 @@ public final class DefaultRequestInterceptor extends
@VisibleForTesting
public void setRMClient(final ApplicationMasterProtocol rmClient) {
if (rmClient instanceof DistributedSchedulerProtocol) {
this.rmClient = (DistributedSchedulerProtocol)rmClient;
if (rmClient instanceof DistributedSchedulingAMProtocol) {
this.rmClient = (DistributedSchedulingAMProtocol)rmClient;
} else {
this.rmClient = new DistributedSchedulerProtocol() {
this.rmClient = new DistributedSchedulingAMProtocol() {
@Override
public RegisterApplicationMasterResponse registerApplicationMaster
(RegisterApplicationMasterRequest request) throws YarnException,
@ -205,7 +205,7 @@ public final class DefaultRequestInterceptor extends
}
@Override
public DistSchedRegisterResponse
public RegisterDistributedSchedulingAMResponse
registerApplicationMasterForDistributedScheduling
(RegisterApplicationMasterRequest request) throws YarnException,
IOException {
@ -213,8 +213,9 @@ public final class DefaultRequestInterceptor extends
}
@Override
public DistSchedAllocateResponse allocateForDistributedScheduling(
DistSchedAllocateRequest request)
public DistributedSchedulingAllocateResponse
allocateForDistributedScheduling(
DistributedSchedulingAllocateRequest request)
throws YarnException, IOException {
throw new IOException("Not Supported !!");
}

View File

@ -19,14 +19,14 @@
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
/**
* Defines the contract to be implemented by the request intercepter classes,
* that can be used to intercept and inspect messages sent from the application
* master to the resource manager.
*/
public interface RequestInterceptor extends DistributedSchedulerProtocol,
public interface RequestInterceptor extends DistributedSchedulingAMProtocol,
Configurable {
/**
* This method is called for initializing the intercepter. This is guaranteed

View File

@ -23,17 +23,13 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords
.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords
.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords
.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords
.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -46,14 +42,12 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy
.AMRMProxyApplicationContext;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyApplicationContext;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AbstractRequestInterceptor;
import org.apache.hadoop.yarn.server.nodemanager.security
.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -68,19 +62,19 @@ import java.util.Set;
import java.util.TreeMap;
/**
* <p>The LocalScheduler runs on the NodeManager and is modelled as an
* <p>The DistributedScheduler runs on the NodeManager and is modeled as an
* <code>AMRMProxy</code> request interceptor. It is responsible for the
* following :</p>
* following:</p>
* <ul>
* <li>Intercept <code>ApplicationMasterProtocol</code> calls and unwrap the
* response objects to extract instructions from the
* <code>ClusterManager</code> running on the ResourceManager to aid in making
* Scheduling scheduling decisions</li>
* <code>ClusterMonitor</code> running on the ResourceManager to aid in making
* distributed scheduling decisions.</li>
* <li>Call the <code>OpportunisticContainerAllocator</code> to allocate
* containers for the opportunistic resource outstandingOpReqs</li>
* containers for the outstanding OPPORTUNISTIC container requests.</li>
* </ul>
*/
public final class LocalScheduler extends AbstractRequestInterceptor {
public final class DistributedScheduler extends AbstractRequestInterceptor {
static class PartitionedResourceRequests {
private List<ResourceRequest> guaranteed = new ArrayList<>();
@ -93,7 +87,7 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
}
}
static class DistSchedulerParams {
static class DistributedSchedulerParams {
Resource maxResource;
Resource minResource;
Resource incrementResource;
@ -101,18 +95,20 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
}
private static final Logger LOG = LoggerFactory
.getLogger(LocalScheduler.class);
.getLogger(DistributedScheduler.class);
private final static RecordFactory RECORD_FACTORY =
RecordFactoryProvider.getRecordFactory(null);
// Currently just used to keep track of allocated Containers
// Can be used for reporting stats later
// Currently just used to keep track of allocated containers.
// Can be used for reporting stats later.
private Set<ContainerId> containersAllocated = new HashSet<>();
private DistSchedulerParams appParams = new DistSchedulerParams();
private final OpportunisticContainerAllocator.ContainerIdCounter containerIdCounter =
new OpportunisticContainerAllocator.ContainerIdCounter();
private DistributedSchedulerParams appParams =
new DistributedSchedulerParams();
private final OpportunisticContainerAllocator.ContainerIdCounter
containerIdCounter =
new OpportunisticContainerAllocator.ContainerIdCounter();
private Map<String, NodeId> nodeList = new LinkedHashMap<>();
// Mapping of NodeId to NodeTokens. Populated either from RM response or
@ -123,7 +119,7 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
// This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by Priority,
// Resource Name (Host/rack/any) and capability. This mapping is required
// to match a received Container to an outstanding OPPORTUNISTIC
// ResourceRequests (ask)
// ResourceRequest (ask).
final TreeMap<Priority, Map<Resource, ResourceRequest>>
outstandingOpReqs = new TreeMap<>();
@ -158,8 +154,8 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
* @param request
* registration request
* @return Allocate Response
* @throws YarnException
* @throws IOException
* @throws YarnException YarnException
* @throws IOException IOException
*/
@Override
public RegisterApplicationMasterResponse registerApplicationMaster
@ -177,14 +173,14 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
* @param request
* allocation request
* @return Allocate Response
* @throws YarnException
* @throws IOException
* @throws YarnException YarnException
* @throws IOException IOException
*/
@Override
public AllocateResponse allocate(AllocateRequest request) throws
YarnException, IOException {
DistSchedAllocateRequest distRequest =
RECORD_FACTORY.newRecordInstance(DistSchedAllocateRequest.class);
DistributedSchedulingAllocateRequest distRequest = RECORD_FACTORY
.newRecordInstance(DistributedSchedulingAllocateRequest.class);
distRequest.setAllocateRequest(request);
return allocateForDistributedScheduling(distRequest).getAllocateResponse();
}
@ -199,9 +195,6 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
/**
* Check if we already have a NMToken. if Not, generate the Token and
* add it to the response
* @param response
* @param nmTokens
* @param allocatedContainers
*/
private void updateResponseWithNMTokens(AllocateResponse response,
List<NMToken> nmTokens, List<Container> allocatedContainers) {
@ -235,11 +228,11 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
}
private void updateParameters(
DistSchedRegisterResponse registerResponse) {
appParams.minResource = registerResponse.getMinAllocatableCapabilty();
appParams.maxResource = registerResponse.getMaxAllocatableCapabilty();
RegisterDistributedSchedulingAMResponse registerResponse) {
appParams.minResource = registerResponse.getMinContainerResource();
appParams.maxResource = registerResponse.getMaxContainerResource();
appParams.incrementResource =
registerResponse.getIncrAllocatableCapabilty();
registerResponse.getIncrContainerResource();
if (appParams.incrementResource == null) {
appParams.incrementResource = appParams.minResource;
}
@ -253,11 +246,12 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
/**
* Takes a list of ResourceRequests (asks), extracts the key information viz.
* (Priority, ResourceName, Capability) and adds it the outstanding
* (Priority, ResourceName, Capability) and adds to the outstanding
* OPPORTUNISTIC outstandingOpReqs map. The nested map is required to enforce
* the current YARN constraint that only a single ResourceRequest can exist at
* a give Priority and Capability
* @param resourceAsks
* a give Priority and Capability.
*
* @param resourceAsks the list with the {@link ResourceRequest}s
*/
public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) {
for (ResourceRequest request : resourceAsks) {
@ -297,11 +291,9 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
/**
* This method matches a returned list of Container Allocations to any
* outstanding OPPORTUNISTIC ResourceRequest
* @param capability
* @param allocatedContainers
* outstanding OPPORTUNISTIC ResourceRequest.
*/
public void matchAllocationToOutstandingRequest(Resource capability,
private void matchAllocationToOutstandingRequest(Resource capability,
List<Container> allocatedContainers) {
for (Container c : allocatedContainers) {
containersAllocated.add(c.getId());
@ -333,28 +325,29 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
}
@Override
public DistSchedRegisterResponse
public RegisterDistributedSchedulingAMResponse
registerApplicationMasterForDistributedScheduling(
RegisterApplicationMasterRequest request)
throws YarnException, IOException {
throws YarnException, IOException {
LOG.info("Forwarding registration request to the" +
"Distributed Scheduler Service on YARN RM");
DistSchedRegisterResponse dsResp = getNextInterceptor()
RegisterDistributedSchedulingAMResponse dsResp = getNextInterceptor()
.registerApplicationMasterForDistributedScheduling(request);
updateParameters(dsResp);
return dsResp;
}
@Override
public DistSchedAllocateResponse allocateForDistributedScheduling(
DistSchedAllocateRequest request) throws YarnException, IOException {
public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
DistributedSchedulingAllocateRequest request)
throws YarnException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Forwarding allocate request to the" +
"Distributed Scheduler Service on YARN RM");
}
// Partition requests into GUARANTEED and OPPORTUNISTIC reqs
PartitionedResourceRequests partitionedAsks = partitionAskList(
request.getAllocateRequest().getAskList());
PartitionedResourceRequests partitionedAsks =
partitionAskList(request.getAllocateRequest().getAskList());
List<ContainerId> releasedContainers =
request.getAllocateRequest().getReleaseList();
@ -393,11 +386,12 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
allocatedContainers.addAll(e.getValue());
}
}
request.setAllocatedContainers(allocatedContainers);
// Send all the GUARANTEED Reqs to RM
request.getAllocateRequest().setAskList(partitionedAsks.getGuaranteed());
DistSchedAllocateResponse dsResp =
DistributedSchedulingAllocateResponse dsResp =
getNextInterceptor().allocateForDistributedScheduling(request);
// Update host to nodeId mapping

View File

@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.scheduler.LocalScheduler.DistSchedulerParams;
import org.apache.hadoop.yarn.server.nodemanager.scheduler.DistributedScheduler.DistributedSchedulerParams;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@ -37,15 +37,17 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicLong;
/**
* <p>The OpportunisticContainerAllocator allocates containers on a given list
* of Nodes after it modifies the container sizes to within allowable limits
* specified by the <code>ClusterManager</code> running on the RM. It tries to
* distribute the containers as evenly as possible. It also uses the
* <code>NMTokenSecretManagerInNM</code> to generate the required NM tokens for
* the allocated containers</p>
* <p>
* The OpportunisticContainerAllocator allocates containers on a given list of
* nodes, after modifying the container sizes to respect the limits set by the
* ResourceManager. It tries to distribute the containers as evenly as possible.
* It also uses the <code>NMTokenSecretManagerInNM</code> to generate the
* required NM tokens for the allocated containers.
* </p>
*/
public class OpportunisticContainerAllocator {
@ -78,15 +80,15 @@ public class OpportunisticContainerAllocator {
this.webpagePort = webpagePort;
}
public Map<Resource, List<Container>> allocate(DistSchedulerParams appParams,
ContainerIdCounter idCounter, Collection<ResourceRequest> resourceAsks,
Set<String> blacklist, ApplicationAttemptId appAttId,
Map<String, NodeId> allNodes, String userName) throws YarnException {
public Map<Resource, List<Container>> allocate(
DistributedSchedulerParams appParams, ContainerIdCounter idCounter,
Collection<ResourceRequest> resourceAsks, Set<String> blacklist,
ApplicationAttemptId appAttId, Map<String, NodeId> allNodes,
String userName) throws YarnException {
Map<Resource, List<Container>> containers = new HashMap<>();
Set<String> nodesAllocated = new HashSet<>();
for (ResourceRequest anyAsk : resourceAsks) {
allocateOpportunisticContainers(appParams, idCounter, blacklist, appAttId,
allNodes, userName, containers, nodesAllocated, anyAsk);
allNodes, userName, containers, anyAsk);
LOG.info("Opportunistic allocation requested for ["
+ "priority=" + anyAsk.getPriority()
+ ", num_containers=" + anyAsk.getNumContainers()
@ -96,30 +98,30 @@ public class OpportunisticContainerAllocator {
return containers;
}
private void allocateOpportunisticContainers(DistSchedulerParams appParams,
ContainerIdCounter idCounter, Set<String> blacklist,
ApplicationAttemptId id, Map<String, NodeId> allNodes, String userName,
Map<Resource, List<Container>> containers, Set<String> nodesAllocated,
ResourceRequest anyAsk) throws YarnException {
private void allocateOpportunisticContainers(
DistributedSchedulerParams appParams, ContainerIdCounter idCounter,
Set<String> blacklist, ApplicationAttemptId id,
Map<String, NodeId> allNodes, String userName,
Map<Resource, List<Container>> containers, ResourceRequest anyAsk)
throws YarnException {
int toAllocate = anyAsk.getNumContainers()
- (containers.isEmpty() ?
0 : containers.get(anyAsk.getCapability()).size());
- (containers.isEmpty() ? 0 :
containers.get(anyAsk.getCapability()).size());
List<String> topKNodesLeft = new ArrayList<>();
for (String s : allNodes.keySet()) {
// Bias away from whatever we have already allocated and respect blacklist
if (nodesAllocated.contains(s) || blacklist.contains(s)) {
List<NodeId> nodesForScheduling = new ArrayList<>();
for (Entry<String, NodeId> nodeEntry : allNodes.entrySet()) {
// Do not use blacklisted nodes for scheduling.
if (blacklist.contains(nodeEntry.getKey())) {
continue;
}
topKNodesLeft.add(s);
nodesForScheduling.add(nodeEntry.getValue());
}
int numAllocated = 0;
int nextNodeToAllocate = 0;
int nextNodeToSchedule = 0;
for (int numCont = 0; numCont < toAllocate; numCont++) {
String topNode = topKNodesLeft.get(nextNodeToAllocate);
nextNodeToAllocate++;
nextNodeToAllocate %= topKNodesLeft.size();
NodeId nodeId = allNodes.get(topNode);
nextNodeToSchedule++;
nextNodeToSchedule %= nodesForScheduling.size();
NodeId nodeId = nodesForScheduling.get(nextNodeToSchedule);
Container container = buildContainer(appParams, idCounter, anyAsk, id,
userName, nodeId);
List<Container> cList = containers.get(anyAsk.getCapability());
@ -134,7 +136,7 @@ public class OpportunisticContainerAllocator {
LOG.info("Allocated " + numAllocated + " opportunistic containers.");
}
private Container buildContainer(DistSchedulerParams appParams,
private Container buildContainer(DistributedSchedulerParams appParams,
ContainerIdCounter idCounter, ResourceRequest rr, ApplicationAttemptId id,
String userName, NodeId nodeId) throws YarnException {
ContainerId cId =
@ -165,7 +167,7 @@ public class OpportunisticContainerAllocator {
return container;
}
private Resource normalizeCapability(DistSchedulerParams appParams,
private Resource normalizeCapability(DistributedSchedulerParams appParams,
ResourceRequest ask) {
return Resources.normalize(RESOURCE_CALCULATOR,
ask.getCapability(), appParams.minResource, appParams.maxResource,

View File

@ -19,34 +19,30 @@
package org.apache.hadoop.yarn.server.nodemanager.scheduler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords
.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords
.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor;
import org.apache.hadoop.yarn.server.nodemanager.security
.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security
.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
@ -63,27 +59,30 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
public class TestLocalScheduler {
/**
* Test cases for {@link DistributedScheduler}.
*/
public class TestDistributedScheduler {
@Test
public void testLocalScheduler() throws Exception {
public void testDistributedScheduler() throws Exception {
Configuration conf = new Configuration();
LocalScheduler localScheduler = new LocalScheduler();
DistributedScheduler distributedScheduler = new DistributedScheduler();
RequestInterceptor finalReqIntcptr = setup(conf, localScheduler);
RequestInterceptor finalReqIntcptr = setup(conf, distributedScheduler);
registerAM(localScheduler, finalReqIntcptr, Arrays.asList(
registerAM(distributedScheduler, finalReqIntcptr, Arrays.asList(
NodeId.newInstance("a", 1), NodeId.newInstance("b", 2)));
final AtomicBoolean flipFlag = new AtomicBoolean(false);
final AtomicBoolean flipFlag = new AtomicBoolean(true);
Mockito.when(
finalReqIntcptr.allocateForDistributedScheduling(
Mockito.any(DistSchedAllocateRequest.class)))
.thenAnswer(new Answer<DistSchedAllocateResponse>() {
Mockito.any(DistributedSchedulingAllocateRequest.class)))
.thenAnswer(new Answer<DistributedSchedulingAllocateResponse>() {
@Override
public DistSchedAllocateResponse answer(InvocationOnMock
invocationOnMock) throws Throwable {
public DistributedSchedulingAllocateResponse answer(
InvocationOnMock invocationOnMock) throws Throwable {
flipFlag.set(!flipFlag.get());
if (flipFlag.get()) {
return createAllocateResponse(Arrays.asList(
@ -101,15 +100,15 @@ public class TestLocalScheduler {
ResourceRequest opportunisticReq =
createResourceRequest(ExecutionType.OPPORTUNISTIC, 4, "*");
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
// Verify 4 containers were allocated
AllocateResponse allocateResponse =
localScheduler.allocate(allocateRequest);
distributedScheduler.allocate(allocateRequest);
Assert.assertEquals(4, allocateResponse.getAllocatedContainers().size());
// Verify equal distribution on hosts a and b
// And None on c and d
// Verify equal distribution on hosts a and b, and none on c or d
Map<NodeId, List<ContainerId>> allocs = mapAllocs(allocateResponse, 4);
Assert.assertEquals(2, allocs.get(NodeId.newInstance("a", 1)).size());
Assert.assertEquals(2, allocs.get(NodeId.newInstance("b", 2)).size());
@ -123,18 +122,18 @@ public class TestLocalScheduler {
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
// Verify 6 containers were allocated
allocateResponse = localScheduler.allocate(allocateRequest);
allocateResponse = distributedScheduler.allocate(allocateRequest);
Assert.assertEquals(6, allocateResponse.getAllocatedContainers().size());
// Verify New containers are equally distribution on hosts c and d
// And None on a and b
// Verify new containers are equally distribution on hosts c and d,
// and none on a or b
allocs = mapAllocs(allocateResponse, 6);
Assert.assertEquals(3, allocs.get(NodeId.newInstance("c", 3)).size());
Assert.assertEquals(3, allocs.get(NodeId.newInstance("d", 4)).size());
Assert.assertNull(allocs.get(NodeId.newInstance("a", 1)));
Assert.assertNull(allocs.get(NodeId.newInstance("b", 2)));
// Ensure the LocalScheduler respects the list order..
// Ensure the DistributedScheduler respects the list order..
// The first request should be allocated to "d" since it is ranked higher
// The second request should be allocated to "c" since the ranking is
// flipped on every allocate response.
@ -142,7 +141,7 @@ public class TestLocalScheduler {
opportunisticReq =
createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*");
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
allocateResponse = localScheduler.allocate(allocateRequest);
allocateResponse = distributedScheduler.allocate(allocateRequest);
allocs = mapAllocs(allocateResponse, 1);
Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size());
@ -150,7 +149,7 @@ public class TestLocalScheduler {
opportunisticReq =
createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*");
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
allocateResponse = localScheduler.allocate(allocateRequest);
allocateResponse = distributedScheduler.allocate(allocateRequest);
allocs = mapAllocs(allocateResponse, 1);
Assert.assertEquals(1, allocs.get(NodeId.newInstance("c", 3)).size());
@ -158,22 +157,23 @@ public class TestLocalScheduler {
opportunisticReq =
createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*");
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
allocateResponse = localScheduler.allocate(allocateRequest);
allocateResponse = distributedScheduler.allocate(allocateRequest);
allocs = mapAllocs(allocateResponse, 1);
Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size());
}
private void registerAM(LocalScheduler localScheduler, RequestInterceptor
finalReqIntcptr, List<NodeId> nodeList) throws Exception {
DistSchedRegisterResponse distSchedRegisterResponse =
Records.newRecord(DistSchedRegisterResponse.class);
private void registerAM(DistributedScheduler distributedScheduler,
RequestInterceptor finalReqIntcptr, List<NodeId> nodeList)
throws Exception {
RegisterDistributedSchedulingAMResponse distSchedRegisterResponse =
Records.newRecord(RegisterDistributedSchedulingAMResponse.class);
distSchedRegisterResponse.setRegisterResponse(
Records.newRecord(RegisterApplicationMasterResponse.class));
distSchedRegisterResponse.setContainerTokenExpiryInterval(12345);
distSchedRegisterResponse.setContainerIdStart(0);
distSchedRegisterResponse.setMaxAllocatableCapabilty(
distSchedRegisterResponse.setMaxContainerResource(
Resource.newInstance(1024, 4));
distSchedRegisterResponse.setMinAllocatableCapabilty(
distSchedRegisterResponse.setMinContainerResource(
Resource.newInstance(512, 2));
distSchedRegisterResponse.setNodesForScheduling(nodeList);
Mockito.when(
@ -181,12 +181,12 @@ public class TestLocalScheduler {
Mockito.any(RegisterApplicationMasterRequest.class)))
.thenReturn(distSchedRegisterResponse);
localScheduler.registerApplicationMaster(
distributedScheduler.registerApplicationMaster(
Records.newRecord(RegisterApplicationMasterRequest.class));
}
private RequestInterceptor setup(Configuration conf, LocalScheduler
localScheduler) {
private RequestInterceptor setup(Configuration conf,
DistributedScheduler distributedScheduler) {
NodeStatusUpdater nodeStatusUpdater = Mockito.mock(NodeStatusUpdater.class);
Mockito.when(nodeStatusUpdater.getRMIdentifier()).thenReturn(12345l);
Context context = Mockito.mock(Context.class);
@ -215,12 +215,12 @@ public class TestLocalScheduler {
NMTokenSecretManagerInNM nmTokenSecretManagerInNM =
new NMTokenSecretManagerInNM();
nmTokenSecretManagerInNM.setMasterKey(mKey);
localScheduler.initLocal(
distributedScheduler.initLocal(
ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1),
containerAllocator, nmTokenSecretManagerInNM, "test");
RequestInterceptor finalReqIntcptr = Mockito.mock(RequestInterceptor.class);
localScheduler.setNextInterceptor(finalReqIntcptr);
distributedScheduler.setNextInterceptor(finalReqIntcptr);
return finalReqIntcptr;
}
@ -237,17 +237,18 @@ public class TestLocalScheduler {
return opportunisticReq;
}
private DistSchedAllocateResponse createAllocateResponse(List<NodeId> nodes) {
DistSchedAllocateResponse distSchedAllocateResponse = Records.newRecord
(DistSchedAllocateResponse.class);
distSchedAllocateResponse.setAllocateResponse(
Records.newRecord(AllocateResponse.class));
private DistributedSchedulingAllocateResponse createAllocateResponse(
List<NodeId> nodes) {
DistributedSchedulingAllocateResponse distSchedAllocateResponse =
Records.newRecord(DistributedSchedulingAllocateResponse.class);
distSchedAllocateResponse
.setAllocateResponse(Records.newRecord(AllocateResponse.class));
distSchedAllocateResponse.setNodesForScheduling(nodes);
return distSchedAllocateResponse;
}
private Map<NodeId, List<ContainerId>> mapAllocs(AllocateResponse
allocateResponse, int expectedSize) throws Exception {
private Map<NodeId, List<ContainerId>> mapAllocs(
AllocateResponse allocateResponse, int expectedSize) throws Exception {
Assert.assertEquals(expectedSize,
allocateResponse.getAllocatedContainers().size());
Map<NodeId, List<ContainerId>> allocs = new HashMap<>();
@ -266,5 +267,4 @@ public class TestLocalScheduler {
}
return allocs;
}
}

View File

@ -27,15 +27,15 @@ import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
@ -73,18 +73,18 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* The DistributedSchedulingService is started instead of the
* ApplicationMasterService if DistributedScheduling is enabled for the YARN
* The DistributedSchedulingAMService is started instead of the
* ApplicationMasterService if distributed scheduling is enabled for the YARN
* cluster.
* It extends the functionality of the ApplicationMasterService by servicing
* clients (AMs and AMRMProxy request interceptors) that understand the
* DistributedSchedulingProtocol.
*/
public class DistributedSchedulingService extends ApplicationMasterService
implements DistributedSchedulerProtocol, EventHandler<SchedulerEvent> {
public class DistributedSchedulingAMService extends ApplicationMasterService
implements DistributedSchedulingAMProtocol, EventHandler<SchedulerEvent> {
private static final Log LOG =
LogFactory.getLog(DistributedSchedulingService.class);
LogFactory.getLog(DistributedSchedulingAMService.class);
private final NodeQueueLoadMonitor nodeMonitor;
@ -94,12 +94,12 @@ public class DistributedSchedulingService extends ApplicationMasterService
new ConcurrentHashMap<>();
private final int k;
public DistributedSchedulingService(RMContext rmContext,
YarnScheduler scheduler) {
super(DistributedSchedulingService.class.getName(), rmContext, scheduler);
public DistributedSchedulingAMService(RMContext rmContext,
YarnScheduler scheduler) {
super(DistributedSchedulingAMService.class.getName(), rmContext, scheduler);
this.k = rmContext.getYarnConfiguration().getInt(
YarnConfiguration.DIST_SCHEDULING_TOP_K,
YarnConfiguration.DIST_SCHEDULING_TOP_K_DEFAULT);
YarnConfiguration.DIST_SCHEDULING_NODES_NUMBER_USED,
YarnConfiguration.DIST_SCHEDULING_NODES_NUMBER_USED_DEFAULT);
long nodeSortInterval = rmContext.getYarnConfiguration().getLong(
YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS,
YarnConfiguration.
@ -149,7 +149,7 @@ public class DistributedSchedulingService extends ApplicationMasterService
@Override
public Server getServer(YarnRPC rpc, Configuration serverConf,
InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
Server server = rpc.getServer(DistributedSchedulerProtocol.class, this,
Server server = rpc.getServer(DistributedSchedulingAMProtocol.class, this,
addr, serverConf, secretManager,
serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
@ -184,43 +184,45 @@ public class DistributedSchedulingService extends ApplicationMasterService
}
@Override
public DistSchedRegisterResponse
public RegisterDistributedSchedulingAMResponse
registerApplicationMasterForDistributedScheduling(
RegisterApplicationMasterRequest request) throws YarnException,
IOException {
RegisterApplicationMasterResponse response =
registerApplicationMaster(request);
DistSchedRegisterResponse dsResp = recordFactory
.newRecordInstance(DistSchedRegisterResponse.class);
RegisterDistributedSchedulingAMResponse dsResp = recordFactory
.newRecordInstance(RegisterDistributedSchedulingAMResponse.class);
dsResp.setRegisterResponse(response);
dsResp.setMinAllocatableCapabilty(
dsResp.setMinContainerResource(
Resource.newInstance(
getConfig().getInt(
YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY,
YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY_DEFAULT),
YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB,
YarnConfiguration.
DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB_DEFAULT),
getConfig().getInt(
YarnConfiguration.DIST_SCHEDULING_MIN_VCORES,
YarnConfiguration.DIST_SCHEDULING_MIN_VCORES_DEFAULT)
YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_VCORES,
YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_VCORES_DEFAULT)
)
);
dsResp.setMaxAllocatableCapabilty(
dsResp.setMaxContainerResource(
Resource.newInstance(
getConfig().getInt(
YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY,
YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_DEFAULT),
YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_MB,
YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_MB_DEFAULT),
getConfig().getInt(
YarnConfiguration.DIST_SCHEDULING_MAX_VCORES,
YarnConfiguration.DIST_SCHEDULING_MAX_VCORES_DEFAULT)
YarnConfiguration.DIST_SCHEDULING_MAX_CONTAINER_VCORES,
YarnConfiguration.DIST_SCHEDULING_MAX_CONTAINER_VCORES_DEFAULT)
)
);
dsResp.setIncrAllocatableCapabilty(
dsResp.setIncrContainerResource(
Resource.newInstance(
getConfig().getInt(
YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY,
YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY_DEFAULT),
YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB,
YarnConfiguration.
DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB_DEFAULT),
getConfig().getInt(
YarnConfiguration.DIST_SCHEDULING_INCR_VCORES,
YarnConfiguration.DIST_SCHEDULING_INCR_VCORES_DEFAULT)
YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_VCORES,
YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_VCORES_DEFAULT)
)
);
dsResp.setContainerTokenExpiryInterval(
@ -238,8 +240,9 @@ public class DistributedSchedulingService extends ApplicationMasterService
}
@Override
public DistSchedAllocateResponse allocateForDistributedScheduling(
DistSchedAllocateRequest request) throws YarnException, IOException {
public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
DistributedSchedulingAllocateRequest request)
throws YarnException, IOException {
List<Container> distAllocContainers = request.getAllocatedContainers();
for (Container container : distAllocContainers) {
// Create RMContainer
@ -255,8 +258,8 @@ public class DistributedSchedulingService extends ApplicationMasterService
RMContainerEventType.LAUNCHED));
}
AllocateResponse response = allocate(request.getAllocateRequest());
DistSchedAllocateResponse dsResp = recordFactory.newRecordInstance
(DistSchedAllocateResponse.class);
DistributedSchedulingAllocateResponse dsResp = recordFactory
.newRecordInstance(DistributedSchedulingAllocateResponse.class);
dsResp.setAllocateResponse(response);
dsResp.setNodesForScheduling(
this.nodeMonitor.selectLeastLoadedNodes(this.k));
@ -264,7 +267,7 @@ public class DistributedSchedulingService extends ApplicationMasterService
}
private void addToMapping(ConcurrentHashMap<String, Set<NodeId>> mapping,
String rackName, NodeId nodeId) {
String rackName, NodeId nodeId) {
if (rackName != null) {
mapping.putIfAbsent(rackName, new HashSet<NodeId>());
Set<NodeId> nodeIds = mapping.get(rackName);
@ -275,7 +278,7 @@ public class DistributedSchedulingService extends ApplicationMasterService
}
private void removeFromMapping(ConcurrentHashMap<String, Set<NodeId>> mapping,
String rackName, NodeId nodeId) {
String rackName, NodeId nodeId) {
if (rackName != null) {
Set<NodeId> nodeIds = mapping.get(rackName);
synchronized (nodeIds) {
@ -346,7 +349,7 @@ public class DistributedSchedulingService extends ApplicationMasterService
break;
// <-- IGNORED EVENTS : END -->
default:
LOG.error("Unknown event arrived at DistributedSchedulingService: "
LOG.error("Unknown event arrived at DistributedSchedulingAMService: "
+ event.toString());
}

View File

@ -1143,12 +1143,12 @@ public class ResourceManager extends CompositeService implements Recoverable {
if (this.rmContext.getYarnConfiguration().getBoolean(
YarnConfiguration.DIST_SCHEDULING_ENABLED,
YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) {
DistributedSchedulingService distributedSchedulingService = new
DistributedSchedulingService(this.rmContext, scheduler);
DistributedSchedulingAMService distributedSchedulingService = new
DistributedSchedulingAMService(this.rmContext, scheduler);
EventDispatcher distSchedulerEventDispatcher =
new EventDispatcher(distributedSchedulingService,
DistributedSchedulingService.class.getName());
// Add an event dispoatcher for the DistributedSchedulingService
DistributedSchedulingAMService.class.getName());
// Add an event dispatcher for the DistributedSchedulingAMService
// to handle node updates/additions and removals.
// Since the SchedulerEvent is currently a super set of theses,
// we register interest for it..

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
@ -93,8 +94,8 @@ public class AppSchedulingInfo {
this.queue = queue;
this.user = user;
this.activeUsersManager = activeUsersManager;
this.containerIdCounter =
new AtomicLong(epoch << EPOCH_BIT_SHIFT);
this.containerIdCounter = new AtomicLong(
epoch << ResourceManager.EPOCH_BIT_SHIFT);
this.appResourceUsage = appResourceUsage;
}

View File

@ -822,7 +822,7 @@ public class MockRM extends ResourceManager {
if (this.rmContext.getYarnConfiguration().getBoolean(
YarnConfiguration.DIST_SCHEDULING_ENABLED,
YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) {
return new DistributedSchedulingService(getRMContext(), scheduler) {
return new DistributedSchedulingAMService(getRMContext(), scheduler) {
@Override
protected void serviceStart() {
// override to not start rpc handler

View File

@ -25,16 +25,11 @@ import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
.AllocateResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
.FinishApplicationMasterRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
.FinishApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
.RegisterApplicationMasterRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
.RegisterApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@ -44,17 +39,16 @@ import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -63,12 +57,10 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb
.DistSchedAllocateResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb
.DistSchedRegisterResponsePBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt
.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.junit.Assert;
import org.junit.Test;
@ -78,9 +70,12 @@ import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.List;
public class TestDistributedSchedulingService {
/**
* Test cases for {@link DistributedSchedulingAMService}.
*/
public class TestDistributedSchedulingAMService {
// Test if the DistributedSchedulingService can handle both DSProtocol as
// Test if the DistributedSchedulingAMService can handle both DSProtocol as
// well as AMProtocol clients
@Test
public void testRPCWrapping() throws Exception {
@ -116,7 +111,8 @@ public class TestDistributedSchedulingService {
Resource.newInstance(1, 2), 1, true, "exp",
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true))));
DistributedSchedulingService service = createService(factory, rmContext, c);
DistributedSchedulingAMService service =
createService(factory, rmContext, c);
Server server = service.getServer(rpc, conf, addr, null);
server.start();
@ -126,7 +122,7 @@ public class TestDistributedSchedulingService {
ProtobufRpcEngine.class);
ApplicationMasterProtocolPB ampProxy =
RPC.getProxy(ApplicationMasterProtocolPB
.class, 1, NetUtils.getConnectAddress(server), conf);
.class, 1, NetUtils.getConnectAddress(server), conf);
RegisterApplicationMasterResponse regResp =
new RegisterApplicationMasterResponsePBImpl(
ampProxy.registerApplicationMaster(null,
@ -156,34 +152,34 @@ public class TestDistributedSchedulingService {
// Verify that the DistrubutedSchedulingService can handle the
// DistributedSchedulerProtocol clients as well
RPC.setProtocolEngine(conf, DistributedSchedulerProtocolPB.class,
// DistributedSchedulingAMProtocol clients as well
RPC.setProtocolEngine(conf, DistributedSchedulingAMProtocolPB.class,
ProtobufRpcEngine.class);
DistributedSchedulerProtocolPB dsProxy =
RPC.getProxy(DistributedSchedulerProtocolPB
DistributedSchedulingAMProtocolPB dsProxy =
RPC.getProxy(DistributedSchedulingAMProtocolPB
.class, 1, NetUtils.getConnectAddress(server), conf);
DistSchedRegisterResponse dsRegResp =
new DistSchedRegisterResponsePBImpl(
RegisterDistributedSchedulingAMResponse dsRegResp =
new RegisterDistributedSchedulingAMResponsePBImpl(
dsProxy.registerApplicationMasterForDistributedScheduling(null,
((RegisterApplicationMasterRequestPBImpl)factory
.newRecordInstance(RegisterApplicationMasterRequest.class))
.getProto()));
Assert.assertEquals(54321l, dsRegResp.getContainerIdStart());
Assert.assertEquals(4,
dsRegResp.getMaxAllocatableCapabilty().getVirtualCores());
dsRegResp.getMaxContainerResource().getVirtualCores());
Assert.assertEquals(1024,
dsRegResp.getMinAllocatableCapabilty().getMemorySize());
dsRegResp.getMinContainerResource().getMemorySize());
Assert.assertEquals(2,
dsRegResp.getIncrAllocatableCapabilty().getVirtualCores());
dsRegResp.getIncrContainerResource().getVirtualCores());
DistSchedAllocateRequestPBImpl distAllReq =
(DistSchedAllocateRequestPBImpl)factory.newRecordInstance(
DistSchedAllocateRequest.class);
DistributedSchedulingAllocateRequestPBImpl distAllReq =
(DistributedSchedulingAllocateRequestPBImpl)factory.newRecordInstance(
DistributedSchedulingAllocateRequest.class);
distAllReq.setAllocateRequest(allReq);
distAllReq.setAllocatedContainers(Arrays.asList(c));
DistSchedAllocateResponse dsAllocResp =
new DistSchedAllocateResponsePBImpl(
DistributedSchedulingAllocateResponse dsAllocResp =
new DistributedSchedulingAllocateResponsePBImpl(
dsProxy.allocateForDistributedScheduling(null,
distAllReq.getProto()));
Assert.assertEquals(
@ -199,9 +195,9 @@ public class TestDistributedSchedulingService {
false, dsfinishResp.getIsUnregistered());
}
private DistributedSchedulingService createService(final RecordFactory
private DistributedSchedulingAMService createService(final RecordFactory
factory, final RMContext rmContext, final Container c) {
return new DistributedSchedulingService(rmContext, null) {
return new DistributedSchedulingAMService(rmContext, null) {
@Override
public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request) throws
@ -235,22 +231,24 @@ public class TestDistributedSchedulingService {
}
@Override
public DistSchedRegisterResponse
public RegisterDistributedSchedulingAMResponse
registerApplicationMasterForDistributedScheduling(
RegisterApplicationMasterRequest request) throws
YarnException, IOException {
DistSchedRegisterResponse resp = factory.newRecordInstance(
DistSchedRegisterResponse.class);
RegisterApplicationMasterRequest request)
throws YarnException, IOException {
RegisterDistributedSchedulingAMResponse resp = factory
.newRecordInstance(RegisterDistributedSchedulingAMResponse.class);
resp.setContainerIdStart(54321L);
resp.setMaxAllocatableCapabilty(Resource.newInstance(4096, 4));
resp.setMinAllocatableCapabilty(Resource.newInstance(1024, 1));
resp.setIncrAllocatableCapabilty(Resource.newInstance(2048, 2));
resp.setMaxContainerResource(Resource.newInstance(4096, 4));
resp.setMinContainerResource(Resource.newInstance(1024, 1));
resp.setIncrContainerResource(Resource.newInstance(2048, 2));
return resp;
}
@Override
public DistSchedAllocateResponse allocateForDistributedScheduling(
DistSchedAllocateRequest request) throws YarnException, IOException {
public DistributedSchedulingAllocateResponse
allocateForDistributedScheduling(
DistributedSchedulingAllocateRequest request)
throws YarnException, IOException {
List<ResourceRequest> askList =
request.getAllocateRequest().getAskList();
List<Container> allocatedContainers = request.getAllocatedContainers();
@ -260,8 +258,8 @@ public class TestDistributedSchedulingService {
Assert.assertEquals(1, askList.size());
Assert.assertTrue(askList.get(0)
.getExecutionTypeRequest().getEnforceExecutionType());
DistSchedAllocateResponse resp =
factory.newRecordInstance(DistSchedAllocateResponse.class);
DistributedSchedulingAllocateResponse resp = factory
.newRecordInstance(DistributedSchedulingAllocateResponse.class);
resp.setNodesForScheduling(
Arrays.asList(NodeId.newInstance("h1", 1234)));
return resp;