HADOOP-8227. Allow RPC to limit ephemeral port range. (bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1327127 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-04-17 15:04:20 +00:00
parent a98ba41092
commit e8eed2f62d
10 changed files with 320 additions and 27 deletions

View File

@ -449,6 +449,8 @@ Release 0.23.3 - UNRELEASED
HADOOP-8286. Simplify getting a socket address from conf (Daryn Sharp via HADOOP-8286. Simplify getting a socket address from conf (Daryn Sharp via
bobby) bobby)
HADOOP-8227. Allow RPC to limit ephemeral port range. (bobby)
Release 0.23.2 - UNRELEASED Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -964,12 +964,58 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
* bound may be omitted meaning all values up to or over. So the string * bound may be omitted meaning all values up to or over. So the string
* above means 2, 3, 5, and 7, 8, 9, ... * above means 2, 3, 5, and 7, 8, 9, ...
*/ */
public static class IntegerRanges { public static class IntegerRanges implements Iterable<Integer>{
private static class Range { private static class Range {
int start; int start;
int end; int end;
} }
private static class RangeNumberIterator implements Iterator<Integer> {
Iterator<Range> internal;
int at;
int end;
public RangeNumberIterator(List<Range> ranges) {
if (ranges != null) {
internal = ranges.iterator();
}
at = -1;
end = -2;
}
@Override
public boolean hasNext() {
if (at <= end) {
return true;
} else if (internal != null){
return internal.hasNext();
}
return false;
}
@Override
public Integer next() {
if (at <= end) {
at++;
return at - 1;
} else if (internal != null){
Range found = internal.next();
if (found != null) {
at = found.start;
end = found.end;
at++;
return at - 1;
}
}
return null;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
List<Range> ranges = new ArrayList<Range>(); List<Range> ranges = new ArrayList<Range>();
public IntegerRanges() { public IntegerRanges() {
@ -1027,6 +1073,13 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
return false; return false;
} }
/**
* @return true if there are no values in this range, else false.
*/
public boolean isEmpty() {
return ranges == null || ranges.isEmpty();
}
@Override @Override
public String toString() { public String toString() {
StringBuilder result = new StringBuilder(); StringBuilder result = new StringBuilder();
@ -1043,6 +1096,12 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
} }
return result.toString(); return result.toString();
} }
@Override
public Iterator<Integer> iterator() {
return new RangeNumberIterator(ranges);
}
} }
/** /**

View File

@ -319,10 +319,12 @@ public class ProtobufRpcEngine implements RpcEngine {
public RPC.Server getServer(Class<?> protocol, Object protocolImpl, public RPC.Server getServer(Class<?> protocol, Object protocolImpl,
String bindAddress, int port, int numHandlers, int numReaders, String bindAddress, int port, int numHandlers, int numReaders,
int queueSizePerHandler, boolean verbose, Configuration conf, int queueSizePerHandler, boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager) SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig)
throws IOException { throws IOException {
return new Server(protocol, protocolImpl, conf, bindAddress, port, return new Server(protocol, protocolImpl, conf, bindAddress, port,
numHandlers, numReaders, queueSizePerHandler, verbose, secretManager); numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
portRangeConfig);
} }
public static class Server extends RPC.Server { public static class Server extends RPC.Server {
@ -336,15 +338,18 @@ public class ProtobufRpcEngine implements RpcEngine {
* @param port the port to listen for connections on * @param port the port to listen for connections on
* @param numHandlers the number of method handler threads to run * @param numHandlers the number of method handler threads to run
* @param verbose whether each call should be logged * @param verbose whether each call should be logged
* @param portRangeConfig A config parameter that can be used to restrict
* the range of ports used when port is 0 (an ephemeral port)
*/ */
public Server(Class<?> protocolClass, Object protocolImpl, public Server(Class<?> protocolClass, Object protocolImpl,
Configuration conf, String bindAddress, int port, int numHandlers, Configuration conf, String bindAddress, int port, int numHandlers,
int numReaders, int queueSizePerHandler, boolean verbose, int numReaders, int queueSizePerHandler, boolean verbose,
SecretManager<? extends TokenIdentifier> secretManager) SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig)
throws IOException { throws IOException {
super(bindAddress, port, null, numHandlers, super(bindAddress, port, null, numHandlers,
numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl
.getClass().getName()), secretManager); .getClass().getName()), secretManager, portRangeConfig);
this.verbose = verbose; this.verbose = verbose;
registerProtocolAndImpl(RpcKind.RPC_PROTOCOL_BUFFER, protocolClass, registerProtocolAndImpl(RpcKind.RPC_PROTOCOL_BUFFER, protocolClass,
protocolImpl); protocolImpl);

View File

@ -654,7 +654,8 @@ public class RPC {
final boolean verbose, Configuration conf) final boolean verbose, Configuration conf)
throws IOException { throws IOException {
return getServer(instance.getClass(), // use impl class for protocol return getServer(instance.getClass(), // use impl class for protocol
instance, bindAddress, port, numHandlers, false, conf, null); instance, bindAddress, port, numHandlers, false, conf, null,
null);
} }
/** Construct a server for a protocol implementation instance. */ /** Construct a server for a protocol implementation instance. */
@ -662,7 +663,8 @@ public class RPC {
Object instance, String bindAddress, Object instance, String bindAddress,
int port, Configuration conf) int port, Configuration conf)
throws IOException { throws IOException {
return getServer(protocol, instance, bindAddress, port, 1, false, conf, null); return getServer(protocol, instance, bindAddress, port, 1, false, conf, null,
null);
} }
/** Construct a server for a protocol implementation instance. /** Construct a server for a protocol implementation instance.
@ -676,7 +678,7 @@ public class RPC {
throws IOException { throws IOException {
return getServer(protocol, instance, bindAddress, port, numHandlers, verbose, return getServer(protocol, instance, bindAddress, port, numHandlers, verbose,
conf, null); conf, null, null);
} }
/** Construct a server for a protocol implementation instance. */ /** Construct a server for a protocol implementation instance. */
@ -686,10 +688,20 @@ public class RPC {
boolean verbose, Configuration conf, boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager) SecretManager<? extends TokenIdentifier> secretManager)
throws IOException { throws IOException {
return getServer(protocol, instance, bindAddress, port, numHandlers, verbose,
conf, secretManager, null);
}
public static Server getServer(Class<?> protocol,
Object instance, String bindAddress, int port,
int numHandlers,
boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig)
throws IOException {
return getProtocolEngine(protocol, conf) return getProtocolEngine(protocol, conf)
.getServer(protocol, instance, bindAddress, port, numHandlers, -1, -1, .getServer(protocol, instance, bindAddress, port, numHandlers, -1, -1,
verbose, conf, secretManager); verbose, conf, secretManager, portRangeConfig);
} }
/** Construct a server for a protocol implementation instance. */ /** Construct a server for a protocol implementation instance. */
@ -704,7 +716,8 @@ public class RPC {
return getProtocolEngine(protocol, conf) return getProtocolEngine(protocol, conf)
.getServer(protocol, instance, bindAddress, port, numHandlers, .getServer(protocol, instance, bindAddress, port, numHandlers,
numReaders, queueSizePerHandler, verbose, conf, secretManager); numReaders, queueSizePerHandler, verbose, conf, secretManager,
null);
} }
/** An RPC Server. */ /** An RPC Server. */
@ -855,9 +868,10 @@ public class RPC {
Class<? extends Writable> paramClass, int handlerCount, Class<? extends Writable> paramClass, int handlerCount,
int numReaders, int queueSizePerHandler, int numReaders, int queueSizePerHandler,
Configuration conf, String serverName, Configuration conf, String serverName,
SecretManager<? extends TokenIdentifier> secretManager) throws IOException { SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig) throws IOException {
super(bindAddress, port, paramClass, handlerCount, numReaders, queueSizePerHandler, super(bindAddress, port, paramClass, handlerCount, numReaders, queueSizePerHandler,
conf, serverName, secretManager); conf, serverName, secretManager, portRangeConfig);
initProtocolMetaInfo(conf); initProtocolMetaInfo(conf);
} }

View File

@ -47,12 +47,30 @@ public interface RpcEngine {
UserGroupInformation ticket, Configuration conf) UserGroupInformation ticket, Configuration conf)
throws IOException, InterruptedException; throws IOException, InterruptedException;
/** Construct a server for a protocol implementation instance. */ /**
* Construct a server for a protocol implementation instance.
*
* @param protocol the class of protocol to use
* @param instance the instance of protocol whose methods will be called
* @param conf the configuration to use
* @param bindAddress the address to bind on to listen for connection
* @param port the port to listen for connections on
* @param numHandlers the number of method handler threads to run
* @param numReaders the number of reader threads to run
* @param queueSizePerHandler the size of the queue per hander thread
* @param verbose whether each call should be logged
* @param secretManager The secret manager to use to validate incoming requests.
* @param portRangeConfig A config parameter that can be used to restrict
* the range of ports used when port is 0 (an ephemeral port)
* @return The Server instance
* @throws IOException on any error
*/
RPC.Server getServer(Class<?> protocol, Object instance, String bindAddress, RPC.Server getServer(Class<?> protocol, Object instance, String bindAddress,
int port, int numHandlers, int numReaders, int port, int numHandlers, int numReaders,
int queueSizePerHandler, boolean verbose, int queueSizePerHandler, boolean verbose,
Configuration conf, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig
) throws IOException; ) throws IOException;
/** /**

View File

@ -63,6 +63,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configuration.IntegerRanges;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.BytesWritable;
@ -291,6 +292,7 @@ public abstract class Server {
protected RpcDetailedMetrics rpcDetailedMetrics; protected RpcDetailedMetrics rpcDetailedMetrics;
private Configuration conf; private Configuration conf;
private String portRangeConfig = null;
private SecretManager<TokenIdentifier> secretManager; private SecretManager<TokenIdentifier> secretManager;
private ServiceAuthorizationManager serviceAuthorizationManager = new ServiceAuthorizationManager(); private ServiceAuthorizationManager serviceAuthorizationManager = new ServiceAuthorizationManager();
@ -323,8 +325,33 @@ public abstract class Server {
*/ */
public static void bind(ServerSocket socket, InetSocketAddress address, public static void bind(ServerSocket socket, InetSocketAddress address,
int backlog) throws IOException { int backlog) throws IOException {
bind(socket, address, backlog, null, null);
}
public static void bind(ServerSocket socket, InetSocketAddress address,
int backlog, Configuration conf, String rangeConf) throws IOException {
try { try {
IntegerRanges range = null;
if (rangeConf != null) {
range = conf.getRange(rangeConf, "");
}
if (range == null || range.isEmpty() || (address.getPort() != 0)) {
socket.bind(address, backlog); socket.bind(address, backlog);
} else {
for (Integer port : range) {
if (socket.isBound()) break;
try {
InetSocketAddress temp = new InetSocketAddress(address.getAddress(),
port);
socket.bind(temp, backlog);
} catch(BindException e) {
//Ignored
}
}
if (!socket.isBound()) {
throw new BindException("Could not find a free port in "+range);
}
}
} catch (SocketException e) { } catch (SocketException e) {
throw NetUtils.wrapException(null, throw NetUtils.wrapException(null,
0, 0,
@ -424,7 +451,7 @@ public abstract class Server {
acceptChannel.configureBlocking(false); acceptChannel.configureBlocking(false);
// Bind the server socket to the local host and port // Bind the server socket to the local host and port
bind(acceptChannel.socket(), address, backlogLength); bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
// create a selector; // create a selector;
selector= Selector.open(); selector= Selector.open();
@ -1725,7 +1752,16 @@ public abstract class Server {
throws IOException throws IOException
{ {
this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer
.toString(port), null); .toString(port), null, null);
}
protected Server(String bindAddress, int port,
Class<? extends Writable> rpcRequestClass, int handlerCount,
int numReaders, int queueSizePerHandler, Configuration conf,
String serverName, SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
this(bindAddress, port, rpcRequestClass, handlerCount, numReaders,
queueSizePerHandler, conf, serverName, secretManager, null);
} }
/** /**
@ -1745,10 +1781,12 @@ public abstract class Server {
protected Server(String bindAddress, int port, protected Server(String bindAddress, int port,
Class<? extends Writable> rpcRequestClass, int handlerCount, Class<? extends Writable> rpcRequestClass, int handlerCount,
int numReaders, int queueSizePerHandler, Configuration conf, int numReaders, int queueSizePerHandler, Configuration conf,
String serverName, SecretManager<? extends TokenIdentifier> secretManager) String serverName, SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig)
throws IOException { throws IOException {
this.bindAddress = bindAddress; this.bindAddress = bindAddress;
this.conf = conf; this.conf = conf;
this.portRangeConfig = portRangeConfig;
this.port = port; this.port = port;
this.rpcRequestClass = rpcRequestClass; this.rpcRequestClass = rpcRequestClass;
this.handlerCount = handlerCount; this.handlerCount = handlerCount;

View File

@ -299,16 +299,19 @@ public class WritableRpcEngine implements RpcEngine {
} }
} }
/** Construct a server for a protocol implementation instance listening on a /* Construct a server for a protocol implementation instance listening on a
* port and address. */ * port and address. */
@Override
public RPC.Server getServer(Class<?> protocolClass, public RPC.Server getServer(Class<?> protocolClass,
Object protocolImpl, String bindAddress, int port, Object protocolImpl, String bindAddress, int port,
int numHandlers, int numReaders, int queueSizePerHandler, int numHandlers, int numReaders, int queueSizePerHandler,
boolean verbose, Configuration conf, boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager) SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig)
throws IOException { throws IOException {
return new Server(protocolClass, protocolImpl, conf, bindAddress, port, return new Server(protocolClass, protocolImpl, conf, bindAddress, port,
numHandlers, numReaders, queueSizePerHandler, verbose, secretManager); numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
portRangeConfig);
} }
@ -341,7 +344,7 @@ public class WritableRpcEngine implements RpcEngine {
Configuration conf, String bindAddress, int port) Configuration conf, String bindAddress, int port)
throws IOException { throws IOException {
this(protocolClass, protocolImpl, conf, bindAddress, port, 1, -1, -1, this(protocolClass, protocolImpl, conf, bindAddress, port, 1, -1, -1,
false, null); false, null, null);
} }
/** /**
@ -363,7 +366,7 @@ public class WritableRpcEngine implements RpcEngine {
throws IOException { throws IOException {
this(null, protocolImpl, conf, bindAddress, port, this(null, protocolImpl, conf, bindAddress, port,
numHandlers, numReaders, queueSizePerHandler, verbose, numHandlers, numReaders, queueSizePerHandler, verbose,
secretManager); secretManager, null);
} }
@ -381,11 +384,13 @@ public class WritableRpcEngine implements RpcEngine {
public Server(Class<?> protocolClass, Object protocolImpl, public Server(Class<?> protocolClass, Object protocolImpl,
Configuration conf, String bindAddress, int port, Configuration conf, String bindAddress, int port,
int numHandlers, int numReaders, int queueSizePerHandler, int numHandlers, int numReaders, int queueSizePerHandler,
boolean verbose, SecretManager<? extends TokenIdentifier> secretManager) boolean verbose, SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig)
throws IOException { throws IOException {
super(bindAddress, port, null, numHandlers, numReaders, super(bindAddress, port, null, numHandlers, numReaders,
queueSizePerHandler, conf, queueSizePerHandler, conf,
classNameBase(protocolImpl.getClass().getName()), secretManager); classNameBase(protocolImpl.getClass().getName()), secretManager,
portRangeConfig);
this.verbose = verbose; this.verbose = verbose;

View File

@ -25,16 +25,20 @@ import java.io.IOException;
import java.io.StringWriter; import java.io.StringWriter;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import junit.framework.TestCase; import junit.framework.TestCase;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration.IntegerRanges;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.ObjectMapper;
@ -363,6 +367,35 @@ public class TestConfiguration extends TestCase {
assertEquals(true, range.isIncluded(100000000)); assertEquals(true, range.isIncluded(100000000));
} }
public void testGetRangeIterator() throws Exception {
Configuration config = new Configuration(false);
IntegerRanges ranges = config.getRange("Test", "");
assertFalse("Empty range has values", ranges.iterator().hasNext());
ranges = config.getRange("Test", "5");
Set<Integer> expected = new HashSet<Integer>(Arrays.asList(5));
Set<Integer> found = new HashSet<Integer>();
for(Integer i: ranges) {
found.add(i);
}
assertEquals(expected, found);
ranges = config.getRange("Test", "5-10,13-14");
expected = new HashSet<Integer>(Arrays.asList(5,6,7,8,9,10,13,14));
found = new HashSet<Integer>();
for(Integer i: ranges) {
found.add(i);
}
assertEquals(expected, found);
ranges = config.getRange("Test", "8-12, 5- 7");
expected = new HashSet<Integer>(Arrays.asList(5,6,7,8,9,10,11,12));
found = new HashSet<Integer>();
for(Integer i: ranges) {
found.add(i);
}
assertEquals(expected, found);
}
public void testHexValues() throws IOException{ public void testHexValues() throws IOException{
out=new BufferedWriter(new FileWriter(CONFIG)); out=new BufferedWriter(new FileWriter(CONFIG));
startConfig(); startConfig();

View File

@ -260,7 +260,8 @@ public class TestRPC {
public org.apache.hadoop.ipc.RPC.Server getServer(Class<?> protocol, public org.apache.hadoop.ipc.RPC.Server getServer(Class<?> protocol,
Object instance, String bindAddress, int port, int numHandlers, Object instance, String bindAddress, int port, int numHandlers,
int numReaders, int queueSizePerHandler, boolean verbose, Configuration conf, int numReaders, int queueSizePerHandler, boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager) throws IOException { SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig) throws IOException {
return null; return null;
} }

View File

@ -0,0 +1,118 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import static org.junit.Assert.*;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import org.apache.hadoop.conf.Configuration;
import org.junit.Test;
/**
* This is intended to be a set of unit tests for the
* org.apache.hadoop.ipc.Server class.
*/
public class TestServer {
@Test
public void testBind() throws Exception {
Configuration conf = new Configuration();
ServerSocket socket = new ServerSocket();
InetSocketAddress address = new InetSocketAddress("0.0.0.0",0);
socket.bind(address);
try {
int min = socket.getLocalPort();
int max = min + 100;
conf.set("TestRange", min+"-"+max);
ServerSocket socket2 = new ServerSocket();
InetSocketAddress address2 = new InetSocketAddress("0.0.0.0", 0);
Server.bind(socket2, address2, 10, conf, "TestRange");
try {
assertTrue(socket2.isBound());
assertTrue(socket2.getLocalPort() > min);
assertTrue(socket2.getLocalPort() <= max);
} finally {
socket2.close();
}
} finally {
socket.close();
}
}
@Test
public void testBindSimple() throws Exception {
ServerSocket socket = new ServerSocket();
InetSocketAddress address = new InetSocketAddress("0.0.0.0",0);
Server.bind(socket, address, 10);
try {
assertTrue(socket.isBound());
} finally {
socket.close();
}
}
@Test
public void testEmptyConfig() throws Exception {
Configuration conf = new Configuration();
conf.set("TestRange", "");
ServerSocket socket = new ServerSocket();
InetSocketAddress address = new InetSocketAddress("0.0.0.0", 0);
try {
Server.bind(socket, address, 10, conf, "TestRange");
assertTrue(socket.isBound());
} finally {
socket.close();
}
}
@Test
public void testBindError() throws Exception {
Configuration conf = new Configuration();
ServerSocket socket = new ServerSocket();
InetSocketAddress address = new InetSocketAddress("0.0.0.0",0);
socket.bind(address);
try {
int min = socket.getLocalPort();
conf.set("TestRange", min+"-"+min);
ServerSocket socket2 = new ServerSocket();
InetSocketAddress address2 = new InetSocketAddress("0.0.0.0", 0);
boolean caught = false;
try {
Server.bind(socket2, address2, 10, conf, "TestRange");
} catch (BindException e) {
caught = true;
} finally {
socket2.close();
}
assertTrue("Failed to catch the expected bind exception",caught);
} finally {
socket.close();
}
}
}