HADOOP-12605. Fix intermittent failure of TestIPC.testIpcWithReaderQueuing (iwasakims)
(cherry picked from commit 9eec6cbedc
)
This commit is contained in:
parent
7aa95bcdd5
commit
a631247c2a
|
@ -27,6 +27,8 @@ import static org.mockito.Matchers.anyInt;
|
|||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.timeout;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataInput;
|
||||
|
@ -72,6 +74,7 @@ import org.apache.hadoop.io.retry.RetryPolicies;
|
|||
import org.apache.hadoop.io.retry.RetryProxy;
|
||||
import org.apache.hadoop.ipc.Client.ConnectionId;
|
||||
import org.apache.hadoop.ipc.RPC.RpcKind;
|
||||
import org.apache.hadoop.ipc.Server.Call;
|
||||
import org.apache.hadoop.ipc.Server.Connection;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
|
||||
import org.apache.hadoop.net.ConnectTimeoutException;
|
||||
|
@ -88,6 +91,7 @@ import org.junit.Assume;
|
|||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.internal.util.reflection.Whitebox;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
|
@ -742,6 +746,7 @@ public class TestIPC {
|
|||
// goal is to jam a handler with a connection, fill the callq with
|
||||
// connections, in turn jamming the readers - then flood the server and
|
||||
// ensure that the listener blocks when the reader connection queues fill
|
||||
@SuppressWarnings("unchecked")
|
||||
private void checkBlocking(int readers, int readerQ, int callQ) throws Exception {
|
||||
int handlers = 1; // makes it easier
|
||||
|
||||
|
@ -761,6 +766,9 @@ public class TestIPC {
|
|||
// start server
|
||||
final TestServerQueue server =
|
||||
new TestServerQueue(clients, readers, callQ, handlers, conf);
|
||||
CallQueueManager<Call> spy = spy(
|
||||
(CallQueueManager<Call>)Whitebox.getInternalState(server, "callQueue"));
|
||||
Whitebox.setInternalState(server, "callQueue", spy);
|
||||
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||
server.start();
|
||||
|
||||
|
@ -796,12 +804,10 @@ public class TestIPC {
|
|||
if (i==0) {
|
||||
// let first reader block in a call
|
||||
server.firstCallLatch.await();
|
||||
} else if (i <= callQ) {
|
||||
// let subsequent readers jam the callq, will happen immediately
|
||||
while (server.getCallQueueLen() != i) {
|
||||
Thread.sleep(1);
|
||||
}
|
||||
} // additional threads block the readers trying to add to the callq
|
||||
// wait until reader put a call to callQueue, to make sure all readers
|
||||
// are blocking on the queue after initialClients threads are started.
|
||||
verify(spy, timeout(100).times(i + 1)).put(Mockito.<Call>anyObject());
|
||||
}
|
||||
|
||||
try {
|
||||
|
|
Loading…
Reference in New Issue