From e8eed2f62d30e0bf2f915ee3ad6b9c9f6d2d97cb Mon Sep 17 00:00:00 2001 From: Robert Joseph Evans Date: Tue, 17 Apr 2012 15:04:20 +0000 Subject: [PATCH] HADOOP-8227. Allow RPC to limit ephemeral port range. (bobby) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1327127 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 2 + .../org/apache/hadoop/conf/Configuration.java | 61 ++++++++- .../apache/hadoop/ipc/ProtobufRpcEngine.java | 13 +- .../main/java/org/apache/hadoop/ipc/RPC.java | 30 +++-- .../java/org/apache/hadoop/ipc/RpcEngine.java | 22 +++- .../java/org/apache/hadoop/ipc/Server.java | 46 ++++++- .../apache/hadoop/ipc/WritableRpcEngine.java | 19 +-- .../apache/hadoop/conf/TestConfiguration.java | 33 +++++ .../java/org/apache/hadoop/ipc/TestRPC.java | 3 +- .../org/apache/hadoop/ipc/TestServer.java | 118 ++++++++++++++++++ 10 files changed, 320 insertions(+), 27 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestServer.java diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 9be76d829d0..35acb90ca3b 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -449,6 +449,8 @@ Release 0.23.3 - UNRELEASED HADOOP-8286. Simplify getting a socket address from conf (Daryn Sharp via bobby) + HADOOP-8227. Allow RPC to limit ephemeral port range. (bobby) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java index bf0960dc212..aa738f5ddec 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java @@ -964,11 +964,57 @@ public class Configuration implements Iterable>, * bound may be omitted meaning all values up to or over. So the string * above means 2, 3, 5, and 7, 8, 9, ... */ - public static class IntegerRanges { + public static class IntegerRanges implements Iterable{ private static class Range { int start; int end; } + + private static class RangeNumberIterator implements Iterator { + Iterator internal; + int at; + int end; + + public RangeNumberIterator(List ranges) { + if (ranges != null) { + internal = ranges.iterator(); + } + at = -1; + end = -2; + } + + @Override + public boolean hasNext() { + if (at <= end) { + return true; + } else if (internal != null){ + return internal.hasNext(); + } + return false; + } + + @Override + public Integer next() { + if (at <= end) { + at++; + return at - 1; + } else if (internal != null){ + Range found = internal.next(); + if (found != null) { + at = found.start; + end = found.end; + at++; + return at - 1; + } + } + return null; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; List ranges = new ArrayList(); @@ -1027,6 +1073,13 @@ public class Configuration implements Iterable>, return false; } + /** + * @return true if there are no values in this range, else false. + */ + public boolean isEmpty() { + return ranges == null || ranges.isEmpty(); + } + @Override public String toString() { StringBuilder result = new StringBuilder(); @@ -1043,6 +1096,12 @@ public class Configuration implements Iterable>, } return result.toString(); } + + @Override + public Iterator iterator() { + return new RangeNumberIterator(ranges); + } + } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index 556f7101a4e..befc8f70e03 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -319,10 +319,12 @@ public class ProtobufRpcEngine implements RpcEngine { public RPC.Server getServer(Class protocol, Object protocolImpl, String bindAddress, int port, int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, Configuration conf, - SecretManager secretManager) + SecretManager secretManager, + String portRangeConfig) throws IOException { return new Server(protocol, protocolImpl, conf, bindAddress, port, - numHandlers, numReaders, queueSizePerHandler, verbose, secretManager); + numHandlers, numReaders, queueSizePerHandler, verbose, secretManager, + portRangeConfig); } public static class Server extends RPC.Server { @@ -336,15 +338,18 @@ public class ProtobufRpcEngine implements RpcEngine { * @param port the port to listen for connections on * @param numHandlers the number of method handler threads to run * @param verbose whether each call should be logged + * @param portRangeConfig A config parameter that can be used to restrict + * the range of ports used when port is 0 (an ephemeral port) */ public Server(Class protocolClass, Object protocolImpl, Configuration conf, String bindAddress, int port, int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, - SecretManager secretManager) + SecretManager secretManager, + String portRangeConfig) throws IOException { super(bindAddress, port, null, numHandlers, numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl - .getClass().getName()), secretManager); + .getClass().getName()), secretManager, portRangeConfig); this.verbose = verbose; registerProtocolAndImpl(RpcKind.RPC_PROTOCOL_BUFFER, protocolClass, protocolImpl); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java index 0c848bb40d9..d0f268ec5d2 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java @@ -654,7 +654,8 @@ public class RPC { final boolean verbose, Configuration conf) throws IOException { return getServer(instance.getClass(), // use impl class for protocol - instance, bindAddress, port, numHandlers, false, conf, null); + instance, bindAddress, port, numHandlers, false, conf, null, + null); } /** Construct a server for a protocol implementation instance. */ @@ -662,7 +663,8 @@ public class RPC { Object instance, String bindAddress, int port, Configuration conf) throws IOException { - return getServer(protocol, instance, bindAddress, port, 1, false, conf, null); + return getServer(protocol, instance, bindAddress, port, 1, false, conf, null, + null); } /** Construct a server for a protocol implementation instance. @@ -676,7 +678,7 @@ public class RPC { throws IOException { return getServer(protocol, instance, bindAddress, port, numHandlers, verbose, - conf, null); + conf, null, null); } /** Construct a server for a protocol implementation instance. */ @@ -686,10 +688,20 @@ public class RPC { boolean verbose, Configuration conf, SecretManager secretManager) throws IOException { - + return getServer(protocol, instance, bindAddress, port, numHandlers, verbose, + conf, secretManager, null); + } + + public static Server getServer(Class protocol, + Object instance, String bindAddress, int port, + int numHandlers, + boolean verbose, Configuration conf, + SecretManager secretManager, + String portRangeConfig) + throws IOException { return getProtocolEngine(protocol, conf) .getServer(protocol, instance, bindAddress, port, numHandlers, -1, -1, - verbose, conf, secretManager); + verbose, conf, secretManager, portRangeConfig); } /** Construct a server for a protocol implementation instance. */ @@ -704,7 +716,8 @@ public class RPC { return getProtocolEngine(protocol, conf) .getServer(protocol, instance, bindAddress, port, numHandlers, - numReaders, queueSizePerHandler, verbose, conf, secretManager); + numReaders, queueSizePerHandler, verbose, conf, secretManager, + null); } /** An RPC Server. */ @@ -855,9 +868,10 @@ public class RPC { Class paramClass, int handlerCount, int numReaders, int queueSizePerHandler, Configuration conf, String serverName, - SecretManager secretManager) throws IOException { + SecretManager secretManager, + String portRangeConfig) throws IOException { super(bindAddress, port, paramClass, handlerCount, numReaders, queueSizePerHandler, - conf, serverName, secretManager); + conf, serverName, secretManager, portRangeConfig); initProtocolMetaInfo(conf); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java index 0fc7d60bd32..09980da452c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java @@ -47,12 +47,30 @@ public interface RpcEngine { UserGroupInformation ticket, Configuration conf) throws IOException, InterruptedException; - /** Construct a server for a protocol implementation instance. */ + /** + * Construct a server for a protocol implementation instance. + * + * @param protocol the class of protocol to use + * @param instance the instance of protocol whose methods will be called + * @param conf the configuration to use + * @param bindAddress the address to bind on to listen for connection + * @param port the port to listen for connections on + * @param numHandlers the number of method handler threads to run + * @param numReaders the number of reader threads to run + * @param queueSizePerHandler the size of the queue per hander thread + * @param verbose whether each call should be logged + * @param secretManager The secret manager to use to validate incoming requests. + * @param portRangeConfig A config parameter that can be used to restrict + * the range of ports used when port is 0 (an ephemeral port) + * @return The Server instance + * @throws IOException on any error + */ RPC.Server getServer(Class protocol, Object instance, String bindAddress, int port, int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, Configuration conf, - SecretManager secretManager + SecretManager secretManager, + String portRangeConfig ) throws IOException; /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index f11224c1d2d..d9ac47eb663 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -63,6 +63,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configuration.IntegerRanges; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.BytesWritable; @@ -291,6 +292,7 @@ public abstract class Server { protected RpcDetailedMetrics rpcDetailedMetrics; private Configuration conf; + private String portRangeConfig = null; private SecretManager secretManager; private ServiceAuthorizationManager serviceAuthorizationManager = new ServiceAuthorizationManager(); @@ -323,8 +325,33 @@ public abstract class Server { */ public static void bind(ServerSocket socket, InetSocketAddress address, int backlog) throws IOException { + bind(socket, address, backlog, null, null); + } + + public static void bind(ServerSocket socket, InetSocketAddress address, + int backlog, Configuration conf, String rangeConf) throws IOException { try { - socket.bind(address, backlog); + IntegerRanges range = null; + if (rangeConf != null) { + range = conf.getRange(rangeConf, ""); + } + if (range == null || range.isEmpty() || (address.getPort() != 0)) { + socket.bind(address, backlog); + } else { + for (Integer port : range) { + if (socket.isBound()) break; + try { + InetSocketAddress temp = new InetSocketAddress(address.getAddress(), + port); + socket.bind(temp, backlog); + } catch(BindException e) { + //Ignored + } + } + if (!socket.isBound()) { + throw new BindException("Could not find a free port in "+range); + } + } } catch (SocketException e) { throw NetUtils.wrapException(null, 0, @@ -424,7 +451,7 @@ public abstract class Server { acceptChannel.configureBlocking(false); // Bind the server socket to the local host and port - bind(acceptChannel.socket(), address, backlogLength); + bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig); port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port // create a selector; selector= Selector.open(); @@ -1725,7 +1752,16 @@ public abstract class Server { throws IOException { this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer - .toString(port), null); + .toString(port), null, null); + } + + protected Server(String bindAddress, int port, + Class rpcRequestClass, int handlerCount, + int numReaders, int queueSizePerHandler, Configuration conf, + String serverName, SecretManager secretManager) + throws IOException { + this(bindAddress, port, rpcRequestClass, handlerCount, numReaders, + queueSizePerHandler, conf, serverName, secretManager, null); } /** @@ -1745,10 +1781,12 @@ public abstract class Server { protected Server(String bindAddress, int port, Class rpcRequestClass, int handlerCount, int numReaders, int queueSizePerHandler, Configuration conf, - String serverName, SecretManager secretManager) + String serverName, SecretManager secretManager, + String portRangeConfig) throws IOException { this.bindAddress = bindAddress; this.conf = conf; + this.portRangeConfig = portRangeConfig; this.port = port; this.rpcRequestClass = rpcRequestClass; this.handlerCount = handlerCount; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java index fc0da0cf90d..e4cd9b9e08f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java @@ -299,16 +299,19 @@ public class WritableRpcEngine implements RpcEngine { } } - /** Construct a server for a protocol implementation instance listening on a + /* Construct a server for a protocol implementation instance listening on a * port and address. */ + @Override public RPC.Server getServer(Class protocolClass, Object protocolImpl, String bindAddress, int port, int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, Configuration conf, - SecretManager secretManager) + SecretManager secretManager, + String portRangeConfig) throws IOException { return new Server(protocolClass, protocolImpl, conf, bindAddress, port, - numHandlers, numReaders, queueSizePerHandler, verbose, secretManager); + numHandlers, numReaders, queueSizePerHandler, verbose, secretManager, + portRangeConfig); } @@ -341,7 +344,7 @@ public class WritableRpcEngine implements RpcEngine { Configuration conf, String bindAddress, int port) throws IOException { this(protocolClass, protocolImpl, conf, bindAddress, port, 1, -1, -1, - false, null); + false, null, null); } /** @@ -363,7 +366,7 @@ public class WritableRpcEngine implements RpcEngine { throws IOException { this(null, protocolImpl, conf, bindAddress, port, numHandlers, numReaders, queueSizePerHandler, verbose, - secretManager); + secretManager, null); } @@ -381,11 +384,13 @@ public class WritableRpcEngine implements RpcEngine { public Server(Class protocolClass, Object protocolImpl, Configuration conf, String bindAddress, int port, int numHandlers, int numReaders, int queueSizePerHandler, - boolean verbose, SecretManager secretManager) + boolean verbose, SecretManager secretManager, + String portRangeConfig) throws IOException { super(bindAddress, port, null, numHandlers, numReaders, queueSizePerHandler, conf, - classNameBase(protocolImpl.getClass().getName()), secretManager); + classNameBase(protocolImpl.getClass().getName()), secretManager, + portRangeConfig); this.verbose = verbose; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestConfiguration.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestConfiguration.java index 4f1ec878bbe..c48a25de183 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestConfiguration.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestConfiguration.java @@ -25,16 +25,20 @@ import java.io.IOException; import java.io.StringWriter; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.regex.Pattern; import junit.framework.TestCase; import static org.junit.Assert.assertArrayEquals; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration.IntegerRanges; import org.apache.hadoop.fs.Path; import org.apache.hadoop.net.NetUtils; import org.codehaus.jackson.map.ObjectMapper; @@ -362,6 +366,35 @@ public class TestConfiguration extends TestCase { assertEquals(true, range.isIncluded(34)); assertEquals(true, range.isIncluded(100000000)); } + + public void testGetRangeIterator() throws Exception { + Configuration config = new Configuration(false); + IntegerRanges ranges = config.getRange("Test", ""); + assertFalse("Empty range has values", ranges.iterator().hasNext()); + ranges = config.getRange("Test", "5"); + Set expected = new HashSet(Arrays.asList(5)); + Set found = new HashSet(); + for(Integer i: ranges) { + found.add(i); + } + assertEquals(expected, found); + + ranges = config.getRange("Test", "5-10,13-14"); + expected = new HashSet(Arrays.asList(5,6,7,8,9,10,13,14)); + found = new HashSet(); + for(Integer i: ranges) { + found.add(i); + } + assertEquals(expected, found); + + ranges = config.getRange("Test", "8-12, 5- 7"); + expected = new HashSet(Arrays.asList(5,6,7,8,9,10,11,12)); + found = new HashSet(); + for(Integer i: ranges) { + found.add(i); + } + assertEquals(expected, found); + } public void testHexValues() throws IOException{ out=new BufferedWriter(new FileWriter(CONFIG)); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index f22cd614100..56b2b2487ba 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -260,7 +260,8 @@ public class TestRPC { public org.apache.hadoop.ipc.RPC.Server getServer(Class protocol, Object instance, String bindAddress, int port, int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, Configuration conf, - SecretManager secretManager) throws IOException { + SecretManager secretManager, + String portRangeConfig) throws IOException { return null; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestServer.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestServer.java new file mode 100644 index 00000000000..db0d2ccc15a --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestServer.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ipc; + +import static org.junit.Assert.*; + +import java.net.BindException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; + +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; + +/** + * This is intended to be a set of unit tests for the + * org.apache.hadoop.ipc.Server class. + */ +public class TestServer { + + @Test + public void testBind() throws Exception { + Configuration conf = new Configuration(); + ServerSocket socket = new ServerSocket(); + InetSocketAddress address = new InetSocketAddress("0.0.0.0",0); + socket.bind(address); + try { + int min = socket.getLocalPort(); + int max = min + 100; + conf.set("TestRange", min+"-"+max); + + + ServerSocket socket2 = new ServerSocket(); + InetSocketAddress address2 = new InetSocketAddress("0.0.0.0", 0); + Server.bind(socket2, address2, 10, conf, "TestRange"); + try { + assertTrue(socket2.isBound()); + assertTrue(socket2.getLocalPort() > min); + assertTrue(socket2.getLocalPort() <= max); + } finally { + socket2.close(); + } + } finally { + socket.close(); + } + } + + @Test + public void testBindSimple() throws Exception { + ServerSocket socket = new ServerSocket(); + InetSocketAddress address = new InetSocketAddress("0.0.0.0",0); + Server.bind(socket, address, 10); + try { + assertTrue(socket.isBound()); + } finally { + socket.close(); + } + } + + @Test + public void testEmptyConfig() throws Exception { + Configuration conf = new Configuration(); + conf.set("TestRange", ""); + + + ServerSocket socket = new ServerSocket(); + InetSocketAddress address = new InetSocketAddress("0.0.0.0", 0); + try { + Server.bind(socket, address, 10, conf, "TestRange"); + assertTrue(socket.isBound()); + } finally { + socket.close(); + } + } + + + @Test + public void testBindError() throws Exception { + Configuration conf = new Configuration(); + ServerSocket socket = new ServerSocket(); + InetSocketAddress address = new InetSocketAddress("0.0.0.0",0); + socket.bind(address); + try { + int min = socket.getLocalPort(); + conf.set("TestRange", min+"-"+min); + + + ServerSocket socket2 = new ServerSocket(); + InetSocketAddress address2 = new InetSocketAddress("0.0.0.0", 0); + boolean caught = false; + try { + Server.bind(socket2, address2, 10, conf, "TestRange"); + } catch (BindException e) { + caught = true; + } finally { + socket2.close(); + } + assertTrue("Failed to catch the expected bind exception",caught); + } finally { + socket.close(); + } + } +}