merge -r 1329693:1329694 from trunk. FIXES: MAPREDUCE-4079

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1329697 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas Graves 2012-04-24 13:07:22 +00:00
parent 11e0a12672
commit f8f390b83d
8 changed files with 45 additions and 9 deletions

View File

@ -183,6 +183,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4161. create sockets consistently (Daryn Sharp via bobby) MAPREDUCE-4161. create sockets consistently (Daryn Sharp via bobby)
MAPREDUCE-4079. Allow MR AppMaster to limit ephemeral port range.
(bobby via tgraves)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -139,7 +139,8 @@ public class MRClientService extends AbstractService
rpc.getServer(MRClientProtocol.class, protocolHandler, address, rpc.getServer(MRClientProtocol.class, protocolHandler, address,
conf, secretManager, conf, secretManager,
conf.getInt(MRJobConfig.MR_AM_JOB_CLIENT_THREAD_COUNT, conf.getInt(MRJobConfig.MR_AM_JOB_CLIENT_THREAD_COUNT,
MRJobConfig.DEFAULT_MR_AM_JOB_CLIENT_THREAD_COUNT)); MRJobConfig.DEFAULT_MR_AM_JOB_CLIENT_THREAD_COUNT),
MRJobConfig.MR_AM_JOB_CLIENT_PORT_RANGE);
// Enable service authorization? // Enable service authorization?
if (conf.getBoolean( if (conf.getBoolean(

View File

@ -373,6 +373,13 @@ public interface MRJobConfig {
MR_AM_PREFIX + "job.client.thread-count"; MR_AM_PREFIX + "job.client.thread-count";
public static final int DEFAULT_MR_AM_JOB_CLIENT_THREAD_COUNT = 1; public static final int DEFAULT_MR_AM_JOB_CLIENT_THREAD_COUNT = 1;
/**
* Range of ports that the MapReduce AM can use when binding. Leave blank
* if you want all possible ports.
*/
public static final String MR_AM_JOB_CLIENT_PORT_RANGE =
MR_AM_PREFIX + "job.client.port-range";
/** Enable blacklisting of nodes in the job.*/ /** Enable blacklisting of nodes in the job.*/
public static final String MR_AM_JOB_NODE_BLACKLISTING_ENABLE = public static final String MR_AM_JOB_NODE_BLACKLISTING_ENABLE =
MR_AM_PREFIX + "job.node-blacklisting.enable"; MR_AM_PREFIX + "job.node-blacklisting.enable";

View File

@ -1236,6 +1236,14 @@
MR AppMaster from remote tasks</description> MR AppMaster from remote tasks</description>
</property> </property>
<property>
<name>yarn.app.mapreduce.am.job.client.port-range</name>
<value></value>
<description>Range of ports that the MapReduce AM can use when binding.
Leave blank if you want all possible ports.
For example 50000-50050,50100-50200</description>
</property>
<property> <property>
<name>yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms</name> <name>yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms</name>
<value>1000</value> <value>1000</value>

View File

@ -31,6 +31,6 @@ public interface RpcServerFactory {
public Server getServer(Class<?> protocol, Object instance, public Server getServer(Class<?> protocol, Object instance,
InetSocketAddress addr, Configuration conf, InetSocketAddress addr, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager, SecretManager<? extends TokenIdentifier> secretManager,
int numHandlers) int numHandlers, String portRangeConfig)
throws YarnException; throws YarnException;
} }

View File

@ -62,11 +62,20 @@ public class RpcServerFactoryPBImpl implements RpcServerFactory {
private RpcServerFactoryPBImpl() { private RpcServerFactoryPBImpl() {
} }
@Override
public Server getServer(Class<?> protocol, Object instance, public Server getServer(Class<?> protocol, Object instance,
InetSocketAddress addr, Configuration conf, InetSocketAddress addr, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager, int numHandlers) SecretManager<? extends TokenIdentifier> secretManager, int numHandlers)
throws YarnException { throws YarnException {
return getServer(protocol, instance, addr, conf, secretManager, numHandlers,
null);
}
@Override
public Server getServer(Class<?> protocol, Object instance,
InetSocketAddress addr, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager, int numHandlers,
String portRangeConfig)
throws YarnException {
Constructor<?> constructor = serviceCache.get(protocol); Constructor<?> constructor = serviceCache.get(protocol);
if (constructor == null) { if (constructor == null) {
@ -122,7 +131,7 @@ public class RpcServerFactoryPBImpl implements RpcServerFactory {
try { try {
return createServer(pbProtocol, addr, conf, secretManager, numHandlers, return createServer(pbProtocol, addr, conf, secretManager, numHandlers,
(BlockingService)method.invoke(null, service)); (BlockingService)method.invoke(null, service), portRangeConfig);
} catch (InvocationTargetException e) { } catch (InvocationTargetException e) {
throw new YarnException(e); throw new YarnException(e);
} catch (IllegalAccessException e) { } catch (IllegalAccessException e) {
@ -156,11 +165,11 @@ public class RpcServerFactoryPBImpl implements RpcServerFactory {
private Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf, private Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager, int numHandlers, SecretManager<? extends TokenIdentifier> secretManager, int numHandlers,
BlockingService blockingService) throws IOException { BlockingService blockingService, String portRangeConfig) throws IOException {
RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class); RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class);
RPC.Server server = RPC.getServer(pbProtocol, blockingService, RPC.Server server = RPC.getServer(pbProtocol, blockingService,
addr.getHostName(), addr.getPort(), numHandlers, false, conf, addr.getHostName(), addr.getPort(), numHandlers, false, conf,
secretManager); secretManager, portRangeConfig);
LOG.info("Adding protocol "+pbProtocol.getCanonicalName()+" to the server"); LOG.info("Adding protocol "+pbProtocol.getCanonicalName()+" to the server");
server.addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, pbProtocol, blockingService); server.addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, pbProtocol, blockingService);
return server; return server;

View File

@ -56,12 +56,12 @@ public class HadoopYarnProtoRPC extends YarnRPC {
public Server getServer(Class protocol, Object instance, public Server getServer(Class protocol, Object instance,
InetSocketAddress addr, Configuration conf, InetSocketAddress addr, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager, SecretManager<? extends TokenIdentifier> secretManager,
int numHandlers) { int numHandlers, String portRangeConfig) {
LOG.debug("Creating a HadoopYarnProtoRpc server for protocol " + protocol + LOG.debug("Creating a HadoopYarnProtoRpc server for protocol " + protocol +
" with " + numHandlers + " handlers"); " with " + numHandlers + " handlers");
return RpcFactoryProvider.getServerFactory(conf).getServer(protocol, return RpcFactoryProvider.getServerFactory(conf).getServer(protocol,
instance, addr, conf, secretManager, numHandlers); instance, addr, conf, secretManager, numHandlers, portRangeConfig);
} }

View File

@ -43,8 +43,16 @@ public abstract class YarnRPC {
public abstract Server getServer(Class protocol, Object instance, public abstract Server getServer(Class protocol, Object instance,
InetSocketAddress addr, Configuration conf, InetSocketAddress addr, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager, SecretManager<? extends TokenIdentifier> secretManager,
int numHandlers); int numHandlers, String portRangeConfig);
public Server getServer(Class protocol, Object instance,
InetSocketAddress addr, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager,
int numHandlers) {
return getServer(protocol, instance, addr, conf, secretManager, numHandlers,
null);
}
public static YarnRPC create(Configuration conf) { public static YarnRPC create(Configuration conf) {
LOG.debug("Creating YarnRPC for " + LOG.debug("Creating YarnRPC for " +
conf.get(YarnConfiguration.IPC_RPC_IMPL)); conf.get(YarnConfiguration.IPC_RPC_IMPL));