HADOOP-12605. Fix intermittent failure of TestIPC.testIpcWithReaderQueuing (iwasakims)

(cherry picked from commit 9eec6cbedc)
This commit is contained in:
Masatake Iwasaki 2016-01-20 05:00:02 +09:00
parent 9512866740
commit 54df2a4352
2 changed files with 15 additions and 6 deletions

View File

@ -35,6 +35,9 @@ Release 2.9.0 - UNRELEASED
BUG FIXES BUG FIXES
HADOOP-12605. Fix intermittent failure of TestIPC.testIpcWithReaderQueuing
(iwasakims)
HADOOP-12655. TestHttpServer.testBindAddress bind port range is wider HADOOP-12655. TestHttpServer.testBindAddress bind port range is wider
than expected. (Wei-Chiu Chuang via stevel) than expected. (Wei-Chiu Chuang via stevel)

View File

@ -27,6 +27,8 @@ import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.DataInput; import java.io.DataInput;
@ -71,6 +73,7 @@ import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC.RpcKind; import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.Server.Call;
import org.apache.hadoop.ipc.Server.Connection; import org.apache.hadoop.ipc.Server.Connection;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.net.ConnectTimeoutException;
@ -84,6 +87,7 @@ import org.junit.Assume;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
@ -703,6 +707,7 @@ public class TestIPC {
// goal is to jam a handler with a connection, fill the callq with // goal is to jam a handler with a connection, fill the callq with
// connections, in turn jamming the readers - then flood the server and // connections, in turn jamming the readers - then flood the server and
// ensure that the listener blocks when the reader connection queues fill // ensure that the listener blocks when the reader connection queues fill
@SuppressWarnings("unchecked")
private void checkBlocking(int readers, int readerQ, int callQ) throws Exception { private void checkBlocking(int readers, int readerQ, int callQ) throws Exception {
int handlers = 1; // makes it easier int handlers = 1; // makes it easier
@ -722,6 +727,9 @@ public class TestIPC {
// start server // start server
final TestServerQueue server = final TestServerQueue server =
new TestServerQueue(clients, readers, callQ, handlers, conf); new TestServerQueue(clients, readers, callQ, handlers, conf);
CallQueueManager<Call> spy = spy(
(CallQueueManager<Call>)Whitebox.getInternalState(server, "callQueue"));
Whitebox.setInternalState(server, "callQueue", spy);
final InetSocketAddress addr = NetUtils.getConnectAddress(server); final InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start(); server.start();
@ -757,12 +765,10 @@ public class TestIPC {
if (i==0) { if (i==0) {
// let first reader block in a call // let first reader block in a call
server.firstCallLatch.await(); server.firstCallLatch.await();
} else if (i <= callQ) {
// let subsequent readers jam the callq, will happen immediately
while (server.getCallQueueLen() != i) {
Thread.sleep(1);
} }
} // additional threads block the readers trying to add to the callq // wait until reader put a call to callQueue, to make sure all readers
// are blocking on the queue after initialClients threads are started.
verify(spy, timeout(100).times(i + 1)).put(Mockito.<Call>anyObject());
} }
try { try {