HADOOP-9691. Merge r1501615 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1505042 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2013-07-19 21:59:14 +00:00
parent 6e967c10f3
commit 7ac2c423cc
5 changed files with 94 additions and 4 deletions

View File

@ -174,6 +174,9 @@ Release 2.1.0-beta - 2013-07-02
HADOOP-9676. Make maximum RPC buffer size configurable (Colin Patrick HADOOP-9676. Make maximum RPC buffer size configurable (Colin Patrick
McCabe) McCabe)
HADOOP-9691. RPC clients can generate call ID using AtomicInteger instead of
synchronizing on the Client instance. (cnauroth)
HADOOP-9661. Allow metrics sources to be extended. (sandyr via tucu) HADOOP-9661. Allow metrics sources to be extended. (sandyr via tucu)
HADOOP-9370. Write FSWrapper class to wrap FileSystem and FileContext for HADOOP-9370. Write FSWrapper class to wrap FileSystem and FileContext for

View File

@ -45,6 +45,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import javax.net.SocketFactory; import javax.net.SocketFactory;
@ -105,7 +106,7 @@ public class Client {
new Hashtable<ConnectionId, Connection>(); new Hashtable<ConnectionId, Connection>();
private Class<? extends Writable> valueClass; // class of call values private Class<? extends Writable> valueClass; // class of call values
private int counter; // counter for call ids private final AtomicInteger counter = new AtomicInteger(); // call ID sequence
private AtomicBoolean running = new AtomicBoolean(true); // if client runs private AtomicBoolean running = new AtomicBoolean(true); // if client runs
final private Configuration conf; final private Configuration conf;
@ -271,9 +272,7 @@ public class Client {
protected Call(RPC.RpcKind rpcKind, Writable param) { protected Call(RPC.RpcKind rpcKind, Writable param) {
this.rpcKind = rpcKind; this.rpcKind = rpcKind;
this.rpcRequest = param; this.rpcRequest = param;
synchronized (Client.this) { this.id = nextCallId();
this.id = counter++;
}
} }
/** Indicate when the call is complete and the /** Indicate when the call is complete and the
@ -1623,4 +1622,18 @@ public class Client {
return serverPrincipal + "@" + address; return serverPrincipal + "@" + address;
} }
} }
/**
* Returns the next valid sequential call ID by incrementing an atomic counter
* and masking off the sign bit. Valid call IDs are non-negative integers in
* the range [ 0, 2^31 - 1 ]. Negative numbers are reserved for special
* purposes. The values can overflow back to 0 and be reused. Note that prior
* versions of the client did not mask off the sign bit, so a server may still
* see a negative call ID if it receives connections from an old client.
*
* @return int next valid call ID
*/
private int nextCallId() {
return counter.getAndIncrement() & 0x7FFFFFFF;
}
} }

View File

@ -31,6 +31,7 @@ public class RpcConstants {
public static final byte[] DUMMY_CLIENT_ID = new byte[0]; public static final byte[] DUMMY_CLIENT_ID = new byte[0];
public static final int INVALID_CALL_ID = -2;
/** /**
* The first four bytes of Hadoop RPC connections * The first four bytes of Hadoop RPC connections

View File

@ -268,6 +268,18 @@ public abstract class Server {
*/ */
private static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>(); private static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
/**
* Returns the currently active RPC call's sequential ID number. A negative
* call ID indicates an invalid value, such as if there is no currently active
* RPC call.
*
* @return int sequential ID number of currently active RPC call
*/
public static int getCallId() {
Call call = CurCall.get();
return call != null ? call.callId : RpcConstants.INVALID_CALL_ID;
}
/** Returns the remote side ip address when invoked inside an RPC /** Returns the remote side ip address when invoked inside an RPC
* Returns null incase of an error. * Returns null incase of an error.
*/ */

View File

@ -30,6 +30,9 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random; import java.util.Random;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.DataInput; import java.io.DataInput;
@ -83,6 +86,10 @@ public class TestIPC {
private static final File FD_DIR = new File("/proc/self/fd"); private static final File FD_DIR = new File("/proc/self/fd");
private static class TestServer extends Server { private static class TestServer extends Server {
// Tests can set callListener to run a piece of code each time the server
// receives a call. This code executes on the server thread, so it has
// visibility of that thread's thread-local storage.
private Runnable callListener;
private boolean sleep; private boolean sleep;
private Class<? extends Writable> responseClass; private Class<? extends Writable> responseClass;
@ -108,6 +115,9 @@ public class TestIPC {
Thread.sleep(RANDOM.nextInt(PING_INTERVAL) + MIN_SLEEP_TIME); Thread.sleep(RANDOM.nextInt(PING_INTERVAL) + MIN_SLEEP_TIME);
} catch (InterruptedException e) {} } catch (InterruptedException e) {}
} }
if (callListener != null) {
callListener.run();
}
if (responseClass != null) { if (responseClass != null) {
try { try {
return responseClass.newInstance(); return responseClass.newInstance();
@ -645,6 +655,57 @@ public class TestIPC {
assertRetriesOnSocketTimeouts(conf, 4); assertRetriesOnSocketTimeouts(conf, 4);
} }
/**
* Tests that client generates a unique sequential call ID for each RPC call,
* even if multiple threads are using the same client.
*/
@Test
public void testUniqueSequentialCallIds() throws Exception {
int serverThreads = 10, callerCount = 100, perCallerCallCount = 100;
TestServer server = new TestServer(serverThreads, false);
// Attach a listener that tracks every call ID received by the server. This
// list must be synchronized, because multiple server threads will add to it.
final List<Integer> callIds = Collections.synchronizedList(
new ArrayList<Integer>());
server.callListener = new Runnable() {
@Override
public void run() {
callIds.add(Server.getCallId());
}
};
Client client = new Client(LongWritable.class, conf);
try {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
SerialCaller[] callers = new SerialCaller[callerCount];
for (int i = 0; i < callerCount; ++i) {
callers[i] = new SerialCaller(client, addr, perCallerCallCount);
callers[i].start();
}
for (int i = 0; i < callerCount; ++i) {
callers[i].join();
assertFalse(callers[i].failed);
}
} finally {
client.stop();
server.stop();
}
int expectedCallCount = callerCount * perCallerCallCount;
assertEquals(expectedCallCount, callIds.size());
// It is not guaranteed that the server executes requests in sequential order
// of client call ID, so we must sort the call IDs before checking that it
// contains every expected value.
Collections.sort(callIds);
for (int i = 0; i < expectedCallCount; ++i) {
assertEquals(i, callIds.get(i).intValue());
}
}
private void assertRetriesOnSocketTimeouts(Configuration conf, private void assertRetriesOnSocketTimeouts(Configuration conf,
int maxTimeoutRetries) throws IOException, InterruptedException { int maxTimeoutRetries) throws IOException, InterruptedException {
SocketFactory mockFactory = Mockito.mock(SocketFactory.class); SocketFactory mockFactory = Mockito.mock(SocketFactory.class);