diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 3d2e840031a..b3bd02e97d4 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -309,6 +309,8 @@ Release 2.8.0 - UNRELEASED HADOOP-12460. Add overwrite option for 'get' shell command (Jagadesh Kiran N via vinayakumarb) + HADOOP-10406. TestIPC.testIpcWithReaderQueuing may fail. (Xiao Chen via wang) + OPTIMIZATIONS HADOOP-11785. Reduce the number of listStatus operation in distcp diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index 4e2e2f13f13..d6c065890ab 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -47,11 +47,13 @@ import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import javax.net.SocketFactory; +import com.google.common.base.Supplier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; @@ -73,6 +75,7 @@ import org.apache.hadoop.ipc.Server.Connection; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.log4j.Level; @@ -707,9 +710,9 @@ public class TestIPC { conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY, readerQ); // send in enough clients to block up the handlers, callq, and readers - int initialClients = readers + callQ + handlers; + final int initialClients = readers + callQ + handlers; // max connections we should ever end up accepting at once - int maxAccept = initialClients + readers*readerQ + 1; // 1 = listener + final int maxAccept = initialClients + readers*readerQ + 1; // 1 = listener // stress it with 2X the max int clients = maxAccept*2; @@ -762,12 +765,18 @@ public class TestIPC { } // additional threads block the readers trying to add to the callq } - // wait till everything is slotted, should happen immediately - Thread.sleep(10); - if (server.getNumOpenConnections() < initialClients) { - LOG.info("(initial clients) need:"+initialClients+" connections have:"+server.getNumOpenConnections()); - Thread.sleep(100); + try { + // wait till everything is slotted, should happen immediately + GenericTestUtils.waitFor(new Supplier() { + @Override public Boolean get() { + return server.getNumOpenConnections() >= initialClients; + } + }, 100, 3000); + } catch (TimeoutException e) { + fail("timed out while waiting for connections to open."); } + LOG.info("(initial clients) need:"+initialClients + +" connections have:"+server.getNumOpenConnections()); LOG.info("ipc layer should be blocked"); assertEquals(callQ, server.getCallQueueLen()); assertEquals(initialClients, server.getNumOpenConnections()); @@ -778,10 +787,18 @@ public class TestIPC { threads[i].start(); } Thread.sleep(10); - if (server.getNumOpenConnections() < maxAccept) { - LOG.info("(max clients) need:"+maxAccept+" connections have:"+server.getNumOpenConnections()); - Thread.sleep(100); + + try { + GenericTestUtils.waitFor(new Supplier() { + @Override public Boolean get() { + return server.getNumOpenConnections() >= maxAccept; + } + }, 100, 3000); + } catch (TimeoutException e) { + fail("timed out while waiting for connections to open until maxAccept."); } + LOG.info("(max clients) need:"+maxAccept + +" connections have:"+server.getNumOpenConnections()); // check a few times to make sure we didn't go over for (int i=0; i<4; i++) { assertEquals(maxAccept, server.getNumOpenConnections());