diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index eaecbbb1894..f9388e4006c 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -174,6 +174,9 @@ Release 2.1.0-beta - 2013-07-02 HADOOP-9676. Make maximum RPC buffer size configurable (Colin Patrick McCabe) + HADOOP-9691. RPC clients can generate call ID using AtomicInteger instead of + synchronizing on the Client instance. (cnauroth) + HADOOP-9661. Allow metrics sources to be extended. (sandyr via tucu) HADOOP-9370. Write FSWrapper class to wrap FileSystem and FileContext for diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 5db2d2177a2..80929be981d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -45,6 +45,7 @@ import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import javax.net.SocketFactory; @@ -105,7 +106,7 @@ public class Client { new Hashtable(); private Class valueClass; // class of call values - private int counter; // counter for call ids + private final AtomicInteger counter = new AtomicInteger(); // call ID sequence private AtomicBoolean running = new AtomicBoolean(true); // if client runs final private Configuration conf; @@ -271,9 +272,7 @@ public class Client { protected Call(RPC.RpcKind rpcKind, Writable param) { this.rpcKind = rpcKind; this.rpcRequest = param; - synchronized (Client.this) { - this.id = counter++; - } + this.id = nextCallId(); } /** Indicate when the call is complete and the @@ -1623,4 +1622,18 @@ public class Client { return serverPrincipal + "@" + address; } } + + /** + * Returns the next valid sequential call ID by incrementing an atomic counter + * and masking off the sign bit. Valid call IDs are non-negative integers in + * the range [ 0, 2^31 - 1 ]. Negative numbers are reserved for special + * purposes. The values can overflow back to 0 and be reused. Note that prior + * versions of the client did not mask off the sign bit, so a server may still + * see a negative call ID if it receives connections from an old client. + * + * @return int next valid call ID + */ + private int nextCallId() { + return counter.getAndIncrement() & 0x7FFFFFFF; + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java index 3f03ade2a7a..6fd4ac75063 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java @@ -31,6 +31,7 @@ public class RpcConstants { public static final byte[] DUMMY_CLIENT_ID = new byte[0]; + public static final int INVALID_CALL_ID = -2; /** * The first four bytes of Hadoop RPC connections diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 9889ad037a6..8e46349ced6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -268,6 +268,18 @@ public abstract class Server { */ private static final ThreadLocal CurCall = new ThreadLocal(); + /** + * Returns the currently active RPC call's sequential ID number. A negative + * call ID indicates an invalid value, such as if there is no currently active + * RPC call. + * + * @return int sequential ID number of currently active RPC call + */ + public static int getCallId() { + Call call = CurCall.get(); + return call != null ? call.callId : RpcConstants.INVALID_CALL_ID; + } + /** Returns the remote side ip address when invoked inside an RPC * Returns null incase of an error. */ diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index d30e9145e5b..3e24c38da55 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -30,6 +30,9 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.net.NetUtils; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Random; import java.io.ByteArrayOutputStream; import java.io.DataInput; @@ -83,6 +86,10 @@ public class TestIPC { private static final File FD_DIR = new File("/proc/self/fd"); private static class TestServer extends Server { + // Tests can set callListener to run a piece of code each time the server + // receives a call. This code executes on the server thread, so it has + // visibility of that thread's thread-local storage. + private Runnable callListener; private boolean sleep; private Class responseClass; @@ -108,6 +115,9 @@ public class TestIPC { Thread.sleep(RANDOM.nextInt(PING_INTERVAL) + MIN_SLEEP_TIME); } catch (InterruptedException e) {} } + if (callListener != null) { + callListener.run(); + } if (responseClass != null) { try { return responseClass.newInstance(); @@ -645,6 +655,57 @@ public class TestIPC { assertRetriesOnSocketTimeouts(conf, 4); } + /** + * Tests that client generates a unique sequential call ID for each RPC call, + * even if multiple threads are using the same client. + */ + @Test + public void testUniqueSequentialCallIds() throws Exception { + int serverThreads = 10, callerCount = 100, perCallerCallCount = 100; + TestServer server = new TestServer(serverThreads, false); + + // Attach a listener that tracks every call ID received by the server. This + // list must be synchronized, because multiple server threads will add to it. + final List callIds = Collections.synchronizedList( + new ArrayList()); + server.callListener = new Runnable() { + @Override + public void run() { + callIds.add(Server.getCallId()); + } + }; + + Client client = new Client(LongWritable.class, conf); + + try { + InetSocketAddress addr = NetUtils.getConnectAddress(server); + server.start(); + SerialCaller[] callers = new SerialCaller[callerCount]; + for (int i = 0; i < callerCount; ++i) { + callers[i] = new SerialCaller(client, addr, perCallerCallCount); + callers[i].start(); + } + for (int i = 0; i < callerCount; ++i) { + callers[i].join(); + assertFalse(callers[i].failed); + } + } finally { + client.stop(); + server.stop(); + } + + int expectedCallCount = callerCount * perCallerCallCount; + assertEquals(expectedCallCount, callIds.size()); + + // It is not guaranteed that the server executes requests in sequential order + // of client call ID, so we must sort the call IDs before checking that it + // contains every expected value. + Collections.sort(callIds); + for (int i = 0; i < expectedCallCount; ++i) { + assertEquals(i, callIds.get(i).intValue()); + } + } + private void assertRetriesOnSocketTimeouts(Configuration conf, int maxTimeoutRetries) throws IOException, InterruptedException { SocketFactory mockFactory = Mockito.mock(SocketFactory.class);