HBASE-8579 TestDelayedRpc falis from time to time
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1484589 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
190aac3613
commit
21735dde20
|
@ -69,7 +69,6 @@ import org.apache.hadoop.hbase.security.AuthMethod;
|
||||||
import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
|
import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
|
||||||
import org.apache.hadoop.hbase.security.SecurityInfo;
|
import org.apache.hadoop.hbase.security.SecurityInfo;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
|
|
||||||
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
|
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
|
|
|
@ -60,23 +60,23 @@ import com.google.protobuf.ServiceException;
|
||||||
@Category(MediumTests.class) // Fails sometimes with small tests
|
@Category(MediumTests.class) // Fails sometimes with small tests
|
||||||
public class TestDelayedRpc {
|
public class TestDelayedRpc {
|
||||||
private static final Log LOG = LogFactory.getLog(TestDelayedRpc.class);
|
private static final Log LOG = LogFactory.getLog(TestDelayedRpc.class);
|
||||||
|
|
||||||
public static RpcServerInterface rpcServer;
|
public static RpcServerInterface rpcServer;
|
||||||
|
|
||||||
public static final int UNDELAYED = 0;
|
public static final int UNDELAYED = 0;
|
||||||
public static final int DELAYED = 1;
|
public static final int DELAYED = 1;
|
||||||
|
private static final int RPC_CLIENT_TIMEOUT = 30000;
|
||||||
|
|
||||||
@Test
|
@Test (timeout=60000)
|
||||||
public void testDelayedRpcImmediateReturnValue() throws Exception {
|
public void testDelayedRpcImmediateReturnValue() throws Exception {
|
||||||
testDelayedRpc(false);
|
testDelayedRpc(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test (timeout=60000)
|
||||||
public void testDelayedRpcDelayedReturnValue() throws Exception {
|
public void testDelayedRpcDelayedReturnValue() throws Exception {
|
||||||
testDelayedRpc(true);
|
testDelayedRpc(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testDelayedRpc(boolean delayReturnValue) throws Exception {
|
private void testDelayedRpc(boolean delayReturnValue) throws Exception {
|
||||||
|
LOG.info("Running testDelayedRpc delayReturnValue=" + delayReturnValue);
|
||||||
Configuration conf = HBaseConfiguration.create();
|
Configuration conf = HBaseConfiguration.create();
|
||||||
InetSocketAddress isa = new InetSocketAddress("localhost", 0);
|
InetSocketAddress isa = new InetSocketAddress("localhost", 0);
|
||||||
TestDelayedImplementation instance = new TestDelayedImplementation(delayReturnValue);
|
TestDelayedImplementation instance = new TestDelayedImplementation(delayReturnValue);
|
||||||
|
@ -91,11 +91,13 @@ public class TestDelayedRpc {
|
||||||
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
|
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
|
||||||
new ServerName(rpcServer.getListenerAddress().getHostName(),
|
new ServerName(rpcServer.getListenerAddress().getHostName(),
|
||||||
rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
|
rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
|
||||||
User.getCurrent(), 1000);
|
User.getCurrent(), RPC_CLIENT_TIMEOUT);
|
||||||
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
|
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
|
||||||
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
|
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
|
||||||
List<Integer> results = new ArrayList<Integer>();
|
List<Integer> results = new ArrayList<Integer>();
|
||||||
|
// Setting true sets 'delayed' on the client.
|
||||||
TestThread th1 = new TestThread(stub, true, results);
|
TestThread th1 = new TestThread(stub, true, results);
|
||||||
|
// Setting 'false' means we will return UNDELAYED as response immediately.
|
||||||
TestThread th2 = new TestThread(stub, false, results);
|
TestThread th2 = new TestThread(stub, false, results);
|
||||||
TestThread th3 = new TestThread(stub, false, results);
|
TestThread th3 = new TestThread(stub, false, results);
|
||||||
th1.start();
|
th1.start();
|
||||||
|
@ -108,6 +110,7 @@ public class TestDelayedRpc {
|
||||||
th2.join();
|
th2.join();
|
||||||
th3.join();
|
th3.join();
|
||||||
|
|
||||||
|
// We should get the two undelayed responses first.
|
||||||
assertEquals(UNDELAYED, results.get(0).intValue());
|
assertEquals(UNDELAYED, results.get(0).intValue());
|
||||||
assertEquals(UNDELAYED, results.get(1).intValue());
|
assertEquals(UNDELAYED, results.get(1).intValue());
|
||||||
assertEquals(results.get(2).intValue(), delayReturnValue ? DELAYED : 0xDEADBEEF);
|
assertEquals(results.get(2).intValue(), delayReturnValue ? DELAYED : 0xDEADBEEF);
|
||||||
|
@ -142,7 +145,7 @@ public class TestDelayedRpc {
|
||||||
* Tests that we see a WARN message in the logs.
|
* Tests that we see a WARN message in the logs.
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test (timeout=60000)
|
||||||
public void testTooManyDelayedRpcs() throws Exception {
|
public void testTooManyDelayedRpcs() throws Exception {
|
||||||
Configuration conf = HBaseConfiguration.create();
|
Configuration conf = HBaseConfiguration.create();
|
||||||
final int MAX_DELAYED_RPC = 10;
|
final int MAX_DELAYED_RPC = 10;
|
||||||
|
@ -167,7 +170,7 @@ public class TestDelayedRpc {
|
||||||
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
|
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
|
||||||
new ServerName(rpcServer.getListenerAddress().getHostName(),
|
new ServerName(rpcServer.getListenerAddress().getHostName(),
|
||||||
rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
|
rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
|
||||||
User.getCurrent(), 1000);
|
User.getCurrent(), RPC_CLIENT_TIMEOUT);
|
||||||
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
|
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
|
||||||
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
|
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
|
||||||
Thread threads[] = new Thread[MAX_DELAYED_RPC + 1];
|
Thread threads[] = new Thread[MAX_DELAYED_RPC + 1];
|
||||||
|
@ -258,8 +261,8 @@ public class TestDelayedRpc {
|
||||||
public void run() {
|
public void run() {
|
||||||
Integer result;
|
Integer result;
|
||||||
try {
|
try {
|
||||||
result = new Integer(stub.test(null, TestArg.newBuilder().setDelay(delay).
|
result = new Integer(stub.test(null, TestArg.newBuilder().setDelay(delay).build()).
|
||||||
build()).getResponse());
|
getResponse());
|
||||||
} catch (ServiceException e) {
|
} catch (ServiceException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue