HADOOP-9916. Fix race in ipc.Client retry. (Binglin Chang via llu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1519974 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Luke Lu 2013-09-04 10:34:56 +00:00
parent 5f4abaa3dc
commit 6ebdbd4a0a
3 changed files with 36 additions and 32 deletions

View File

@ -127,6 +127,8 @@ Release 2.1.1-beta - UNRELEASED
BUG FIXES BUG FIXES
HADOOP-9916. Fix race in ipc.Client retry. (Binglin Chang via llu)
HADOOP-9768. chown and chgrp reject users and groups with spaces on platforms HADOOP-9768. chown and chgrp reject users and groups with spaces on platforms
where spaces are otherwise acceptable. (cnauroth) where spaces are otherwise acceptable. (cnauroth)

View File

@ -1063,8 +1063,8 @@ public class Client {
if (status == RpcStatusProto.SUCCESS) { if (status == RpcStatusProto.SUCCESS) {
Writable value = ReflectionUtils.newInstance(valueClass, conf); Writable value = ReflectionUtils.newInstance(valueClass, conf);
value.readFields(in); // read value value.readFields(in); // read value
call.setRpcResponse(value);
calls.remove(callId); calls.remove(callId);
call.setRpcResponse(value);
// verify that length was correct // verify that length was correct
// only for ProtobufEngine where len can be verified easily // only for ProtobufEngine where len can be verified easily
@ -1098,8 +1098,8 @@ public class Client {
new RemoteException(exceptionClassName, errorMsg) : new RemoteException(exceptionClassName, errorMsg) :
new RemoteException(exceptionClassName, errorMsg, erCode)); new RemoteException(exceptionClassName, errorMsg, erCode));
if (status == RpcStatusProto.ERROR) { if (status == RpcStatusProto.ERROR) {
call.setException(re);
calls.remove(callId); calls.remove(callId);
call.setException(re);
} else if (status == RpcStatusProto.FATAL) { } else if (status == RpcStatusProto.FATAL) {
// Close the connection // Close the connection
markClosed(re); markClosed(re);
@ -1166,8 +1166,8 @@ public class Client {
Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator() ; Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator() ;
while (itor.hasNext()) { while (itor.hasNext()) {
Call c = itor.next().getValue(); Call c = itor.next().getValue();
itor.remove();
c.setException(closeException); // local exception c.setException(closeException); // local exception
itor.remove();
} }
} }
} }

View File

@ -216,13 +216,13 @@ public class TestIPC {
} }
} }
@Test @Test(timeout=60000)
public void testSerial() throws IOException, InterruptedException { public void testSerial() throws IOException, InterruptedException {
testSerial(3, false, 2, 5, 100); internalTestSerial(3, false, 2, 5, 100);
testSerial(3, true, 2, 5, 10); internalTestSerial(3, true, 2, 5, 10);
} }
public void testSerial(int handlerCount, boolean handlerSleep, public void internalTestSerial(int handlerCount, boolean handlerSleep,
int clientCount, int callerCount, int callCount) int clientCount, int callerCount, int callCount)
throws IOException, InterruptedException { throws IOException, InterruptedException {
Server server = new TestServer(handlerCount, handlerSleep); Server server = new TestServer(handlerCount, handlerSleep);
@ -249,7 +249,7 @@ public class TestIPC {
server.stop(); server.stop();
} }
@Test @Test(timeout=60000)
public void testStandAloneClient() throws IOException { public void testStandAloneClient() throws IOException {
Client client = new Client(LongWritable.class, conf); Client client = new Client(LongWritable.class, conf);
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10); InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10);
@ -383,7 +383,7 @@ public class TestIPC {
} }
} }
@Test @Test(timeout=60000)
public void testIOEOnClientWriteParam() throws Exception { public void testIOEOnClientWriteParam() throws Exception {
doErrorTest(IOEOnWriteWritable.class, doErrorTest(IOEOnWriteWritable.class,
LongWritable.class, LongWritable.class,
@ -391,7 +391,7 @@ public class TestIPC {
LongWritable.class); LongWritable.class);
} }
@Test @Test(timeout=60000)
public void testRTEOnClientWriteParam() throws Exception { public void testRTEOnClientWriteParam() throws Exception {
doErrorTest(RTEOnWriteWritable.class, doErrorTest(RTEOnWriteWritable.class,
LongWritable.class, LongWritable.class,
@ -399,7 +399,7 @@ public class TestIPC {
LongWritable.class); LongWritable.class);
} }
@Test @Test(timeout=60000)
public void testIOEOnServerReadParam() throws Exception { public void testIOEOnServerReadParam() throws Exception {
doErrorTest(LongWritable.class, doErrorTest(LongWritable.class,
IOEOnReadWritable.class, IOEOnReadWritable.class,
@ -407,7 +407,7 @@ public class TestIPC {
LongWritable.class); LongWritable.class);
} }
@Test @Test(timeout=60000)
public void testRTEOnServerReadParam() throws Exception { public void testRTEOnServerReadParam() throws Exception {
doErrorTest(LongWritable.class, doErrorTest(LongWritable.class,
RTEOnReadWritable.class, RTEOnReadWritable.class,
@ -416,7 +416,7 @@ public class TestIPC {
} }
@Test @Test(timeout=60000)
public void testIOEOnServerWriteResponse() throws Exception { public void testIOEOnServerWriteResponse() throws Exception {
doErrorTest(LongWritable.class, doErrorTest(LongWritable.class,
LongWritable.class, LongWritable.class,
@ -424,7 +424,7 @@ public class TestIPC {
LongWritable.class); LongWritable.class);
} }
@Test @Test(timeout=60000)
public void testRTEOnServerWriteResponse() throws Exception { public void testRTEOnServerWriteResponse() throws Exception {
doErrorTest(LongWritable.class, doErrorTest(LongWritable.class,
LongWritable.class, LongWritable.class,
@ -432,7 +432,7 @@ public class TestIPC {
LongWritable.class); LongWritable.class);
} }
@Test @Test(timeout=60000)
public void testIOEOnClientReadResponse() throws Exception { public void testIOEOnClientReadResponse() throws Exception {
doErrorTest(LongWritable.class, doErrorTest(LongWritable.class,
LongWritable.class, LongWritable.class,
@ -440,7 +440,7 @@ public class TestIPC {
IOEOnReadWritable.class); IOEOnReadWritable.class);
} }
@Test @Test(timeout=60000)
public void testRTEOnClientReadResponse() throws Exception { public void testRTEOnClientReadResponse() throws Exception {
doErrorTest(LongWritable.class, doErrorTest(LongWritable.class,
LongWritable.class, LongWritable.class,
@ -453,7 +453,7 @@ public class TestIPC {
* that a ping should have been sent. This is a reproducer for a * that a ping should have been sent. This is a reproducer for a
* deadlock seen in one iteration of HADOOP-6762. * deadlock seen in one iteration of HADOOP-6762.
*/ */
@Test @Test(timeout=60000)
public void testIOEOnWriteAfterPingClient() throws Exception { public void testIOEOnWriteAfterPingClient() throws Exception {
// start server // start server
Client.setPingInterval(conf, 100); Client.setPingInterval(conf, 100);
@ -481,7 +481,7 @@ public class TestIPC {
* Test that, if the socket factory throws an IOE, it properly propagates * Test that, if the socket factory throws an IOE, it properly propagates
* to the client. * to the client.
*/ */
@Test @Test(timeout=60000)
public void testSocketFactoryException() throws IOException { public void testSocketFactoryException() throws IOException {
SocketFactory mockFactory = mock(SocketFactory.class); SocketFactory mockFactory = mock(SocketFactory.class);
doThrow(new IOException("Injected fault")).when(mockFactory).createSocket(); doThrow(new IOException("Injected fault")).when(mockFactory).createSocket();
@ -503,7 +503,7 @@ public class TestIPC {
* failure is handled properly. This is a regression test for * failure is handled properly. This is a regression test for
* HADOOP-7428. * HADOOP-7428.
*/ */
@Test @Test(timeout=60000)
public void testRTEDuringConnectionSetup() throws IOException { public void testRTEDuringConnectionSetup() throws IOException {
// Set up a socket factory which returns sockets which // Set up a socket factory which returns sockets which
// throw an RTE when setSoTimeout is called. // throw an RTE when setSoTimeout is called.
@ -544,7 +544,7 @@ public class TestIPC {
} }
} }
@Test @Test(timeout=60000)
public void testIpcTimeout() throws IOException { public void testIpcTimeout() throws IOException {
// start server // start server
Server server = new TestServer(1, true); Server server = new TestServer(1, true);
@ -566,7 +566,7 @@ public class TestIPC {
addr, null, null, 3*PING_INTERVAL+MIN_SLEEP_TIME, conf); addr, null, null, 3*PING_INTERVAL+MIN_SLEEP_TIME, conf);
} }
@Test @Test(timeout=60000)
public void testIpcConnectTimeout() throws IOException { public void testIpcConnectTimeout() throws IOException {
// start server // start server
Server server = new TestServer(1, true); Server server = new TestServer(1, true);
@ -670,31 +670,31 @@ public class TestIPC {
return FD_DIR.list().length; return FD_DIR.list().length;
} }
@Test @Test(timeout=60000)
public void testIpcFromHadoop_0_18_13() throws IOException { public void testIpcFromHadoop_0_18_13() throws IOException {
doIpcVersionTest(NetworkTraces.HADOOP_0_18_3_RPC_DUMP, doIpcVersionTest(NetworkTraces.HADOOP_0_18_3_RPC_DUMP,
NetworkTraces.RESPONSE_TO_HADOOP_0_18_3_RPC); NetworkTraces.RESPONSE_TO_HADOOP_0_18_3_RPC);
} }
@Test @Test(timeout=60000)
public void testIpcFromHadoop0_20_3() throws IOException { public void testIpcFromHadoop0_20_3() throws IOException {
doIpcVersionTest(NetworkTraces.HADOOP_0_20_3_RPC_DUMP, doIpcVersionTest(NetworkTraces.HADOOP_0_20_3_RPC_DUMP,
NetworkTraces.RESPONSE_TO_HADOOP_0_20_3_RPC); NetworkTraces.RESPONSE_TO_HADOOP_0_20_3_RPC);
} }
@Test @Test(timeout=60000)
public void testIpcFromHadoop0_21_0() throws IOException { public void testIpcFromHadoop0_21_0() throws IOException {
doIpcVersionTest(NetworkTraces.HADOOP_0_21_0_RPC_DUMP, doIpcVersionTest(NetworkTraces.HADOOP_0_21_0_RPC_DUMP,
NetworkTraces.RESPONSE_TO_HADOOP_0_21_0_RPC); NetworkTraces.RESPONSE_TO_HADOOP_0_21_0_RPC);
} }
@Test @Test(timeout=60000)
public void testHttpGetResponse() throws IOException { public void testHttpGetResponse() throws IOException {
doIpcVersionTest("GET / HTTP/1.0\r\n\r\n".getBytes(), doIpcVersionTest("GET / HTTP/1.0\r\n\r\n".getBytes(),
Server.RECEIVED_HTTP_REQ_RESPONSE.getBytes()); Server.RECEIVED_HTTP_REQ_RESPONSE.getBytes());
} }
@Test @Test(timeout=60000)
public void testConnectionRetriesOnSocketTimeoutExceptions() throws IOException { public void testConnectionRetriesOnSocketTimeoutExceptions() throws IOException {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
// set max retries to 0 // set max retries to 0
@ -720,7 +720,7 @@ public class TestIPC {
* (1) the rpc server uses the call id/retry provided by the rpc client, and * (1) the rpc server uses the call id/retry provided by the rpc client, and
* (2) the rpc client receives the same call id/retry from the rpc server. * (2) the rpc client receives the same call id/retry from the rpc server.
*/ */
@Test @Test(timeout=60000)
public void testCallIdAndRetry() throws IOException { public void testCallIdAndRetry() throws IOException {
final CallInfo info = new CallInfo(); final CallInfo info = new CallInfo();
@ -772,7 +772,7 @@ public class TestIPC {
/** /**
* Test the retry count while used in a retry proxy. * Test the retry count while used in a retry proxy.
*/ */
@Test @Test(timeout=60000)
public void testRetryProxy() throws IOException { public void testRetryProxy() throws IOException {
final Client client = new Client(LongWritable.class, conf); final Client client = new Client(LongWritable.class, conf);
@ -785,7 +785,9 @@ public class TestIPC {
} }
}; };
final int totalRetry = 256; // try more times, so it is easier to find race condition bug
// 10000 times runs about 6s on a core i7 machine
final int totalRetry = 10000;
DummyProtocol proxy = (DummyProtocol) Proxy.newProxyInstance( DummyProtocol proxy = (DummyProtocol) Proxy.newProxyInstance(
DummyProtocol.class.getClassLoader(), DummyProtocol.class.getClassLoader(),
new Class[] { DummyProtocol.class }, new TestInvocationHandler(client, new Class[] { DummyProtocol.class }, new TestInvocationHandler(client,
@ -807,7 +809,7 @@ public class TestIPC {
/** /**
* Test if the rpc server gets the default retry count (0) from client. * Test if the rpc server gets the default retry count (0) from client.
*/ */
@Test @Test(timeout=60000)
public void testInitialCallRetryCount() throws IOException { public void testInitialCallRetryCount() throws IOException {
// Override client to store the call id // Override client to store the call id
final Client client = new Client(LongWritable.class, conf); final Client client = new Client(LongWritable.class, conf);
@ -838,7 +840,7 @@ public class TestIPC {
/** /**
* Test if the rpc server gets the retry count from client. * Test if the rpc server gets the retry count from client.
*/ */
@Test @Test(timeout=60000)
public void testCallRetryCount() throws IOException { public void testCallRetryCount() throws IOException {
final int retryCount = 255; final int retryCount = 255;
// Override client to store the call id // Override client to store the call id
@ -873,7 +875,7 @@ public class TestIPC {
* even if multiple threads are using the same client. * even if multiple threads are using the same client.
* @throws InterruptedException * @throws InterruptedException
*/ */
@Test @Test(timeout=60000)
public void testUniqueSequentialCallIds() public void testUniqueSequentialCallIds()
throws IOException, InterruptedException { throws IOException, InterruptedException {
int serverThreads = 10, callerCount = 100, perCallerCallCount = 100; int serverThreads = 10, callerCount = 100, perCallerCallCount = 100;