HADOOP-10406. TestIPC.testIpcWithReaderQueuing may fail. Contributed by Xiao Chen.

This commit is contained in:
Andrew Wang 2015-10-21 15:21:01 -07:00
parent a24c6e8420
commit d1cdce77fa
2 changed files with 29 additions and 10 deletions

View File

@ -897,6 +897,8 @@ Release 2.8.0 - UNRELEASED
HADOOP-12460. Add overwrite option for 'get' shell command HADOOP-12460. Add overwrite option for 'get' shell command
(Jagadesh Kiran N via vinayakumarb) (Jagadesh Kiran N via vinayakumarb)
HADOOP-10406. TestIPC.testIpcWithReaderQueuing may fail. (Xiao Chen via wang)
OPTIMIZATIONS OPTIMIZATIONS
HADOOP-11785. Reduce the number of listStatus operation in distcp HADOOP-11785. Reduce the number of listStatus operation in distcp

View File

@ -47,11 +47,13 @@ import java.util.Random;
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import javax.net.SocketFactory; import javax.net.SocketFactory;
import com.google.common.base.Supplier;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger; import org.apache.commons.logging.impl.Log4JLogger;
@ -73,6 +75,7 @@ import org.apache.hadoop.ipc.Server.Connection;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.log4j.Level; import org.apache.log4j.Level;
@ -707,9 +710,9 @@ public class TestIPC {
conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY, readerQ); conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY, readerQ);
// send in enough clients to block up the handlers, callq, and readers // send in enough clients to block up the handlers, callq, and readers
int initialClients = readers + callQ + handlers; final int initialClients = readers + callQ + handlers;
// max connections we should ever end up accepting at once // max connections we should ever end up accepting at once
int maxAccept = initialClients + readers*readerQ + 1; // 1 = listener final int maxAccept = initialClients + readers*readerQ + 1; // 1 = listener
// stress it with 2X the max // stress it with 2X the max
int clients = maxAccept*2; int clients = maxAccept*2;
@ -762,12 +765,18 @@ public class TestIPC {
} // additional threads block the readers trying to add to the callq } // additional threads block the readers trying to add to the callq
} }
try {
// wait till everything is slotted, should happen immediately // wait till everything is slotted, should happen immediately
Thread.sleep(10); GenericTestUtils.waitFor(new Supplier<Boolean>() {
if (server.getNumOpenConnections() < initialClients) { @Override public Boolean get() {
LOG.info("(initial clients) need:"+initialClients+" connections have:"+server.getNumOpenConnections()); return server.getNumOpenConnections() >= initialClients;
Thread.sleep(100);
} }
}, 100, 3000);
} catch (TimeoutException e) {
fail("timed out while waiting for connections to open.");
}
LOG.info("(initial clients) need:"+initialClients
+" connections have:"+server.getNumOpenConnections());
LOG.info("ipc layer should be blocked"); LOG.info("ipc layer should be blocked");
assertEquals(callQ, server.getCallQueueLen()); assertEquals(callQ, server.getCallQueueLen());
assertEquals(initialClients, server.getNumOpenConnections()); assertEquals(initialClients, server.getNumOpenConnections());
@ -778,10 +787,18 @@ public class TestIPC {
threads[i].start(); threads[i].start();
} }
Thread.sleep(10); Thread.sleep(10);
if (server.getNumOpenConnections() < maxAccept) {
LOG.info("(max clients) need:"+maxAccept+" connections have:"+server.getNumOpenConnections()); try {
Thread.sleep(100); GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override public Boolean get() {
return server.getNumOpenConnections() >= maxAccept;
} }
}, 100, 3000);
} catch (TimeoutException e) {
fail("timed out while waiting for connections to open until maxAccept.");
}
LOG.info("(max clients) need:"+maxAccept
+" connections have:"+server.getNumOpenConnections());
// check a few times to make sure we didn't go over // check a few times to make sure we didn't go over
for (int i=0; i<4; i++) { for (int i=0; i<4; i++) {
assertEquals(maxAccept, server.getNumOpenConnections()); assertEquals(maxAccept, server.getNumOpenConnections());