HBASE-27283 Use readTO instead of hard coded RpcClient.DEFAULT_SOCKET_TIMEOUT_READ when creating ReadTimeoutHandler in NettyRpcConnection (#4685)
Signed-off-by: Xin Sun <ddupgs@gmail.com Signed-off-by: GeorryHuang <huangzhuoyue@apache.org>
This commit is contained in:
parent
48f1107a2d
commit
5919b30b6d
|
@ -240,7 +240,7 @@ class NettyRpcConnection extends RpcConnection {
|
||||||
// because of the different configuration in client side and server side
|
// because of the different configuration in client side and server side
|
||||||
final String readTimeoutHandlerName = "ReadTimeout";
|
final String readTimeoutHandlerName = "ReadTimeout";
|
||||||
p.addBefore(BufferCallBeforeInitHandler.NAME, readTimeoutHandlerName,
|
p.addBefore(BufferCallBeforeInitHandler.NAME, readTimeoutHandlerName,
|
||||||
new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS))
|
new ReadTimeoutHandler(rpcClient.readTO, TimeUnit.MILLISECONDS))
|
||||||
.addBefore(BufferCallBeforeInitHandler.NAME, null, chHandler);
|
.addBefore(BufferCallBeforeInitHandler.NAME, null, chHandler);
|
||||||
NettyFutureUtils.addListener(connectionHeaderPromise, new FutureListener<Boolean>() {
|
NettyFutureUtils.addListener(connectionHeaderPromise, new FutureListener<Boolean>() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -249,7 +249,7 @@ class NettyRpcConnection extends RpcConnection {
|
||||||
ChannelPipeline p = ch.pipeline();
|
ChannelPipeline p = ch.pipeline();
|
||||||
p.remove(readTimeoutHandlerName);
|
p.remove(readTimeoutHandlerName);
|
||||||
p.remove(NettyHBaseRpcConnectionHeaderHandler.class);
|
p.remove(NettyHBaseRpcConnectionHeaderHandler.class);
|
||||||
// don't send connection header, NettyHbaseRpcConnectionHeaderHandler
|
// don't send connection header, NettyHBaseRpcConnectionHeaderHandler
|
||||||
// sent it already
|
// sent it already
|
||||||
established(ch);
|
established(ch);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -21,11 +21,12 @@ import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
|
||||||
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
|
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
|
||||||
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting;
|
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting;
|
||||||
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
|
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
|
||||||
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConfiguration;
|
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.setSecuredConfiguration;
|
||||||
import static org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders.SELECTOR_KEY;
|
import static org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders.SELECTOR_KEY;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotSame;
|
import static org.junit.Assert.assertNotSame;
|
||||||
import static org.junit.Assert.assertSame;
|
import static org.junit.Assert.assertSame;
|
||||||
|
import static org.junit.Assert.assertThrows;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
@ -73,10 +74,8 @@ import org.junit.AfterClass;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Rule;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import org.junit.rules.ExpectedException;
|
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.Parameterized;
|
import org.junit.runners.Parameterized;
|
||||||
import org.junit.runners.Parameterized.Parameter;
|
import org.junit.runners.Parameterized.Parameter;
|
||||||
|
@ -107,14 +106,11 @@ public class TestSecureIPC {
|
||||||
private static String HOST = "localhost";
|
private static String HOST = "localhost";
|
||||||
private static String PRINCIPAL;
|
private static String PRINCIPAL;
|
||||||
|
|
||||||
String krbKeytab;
|
private String krbKeytab;
|
||||||
String krbPrincipal;
|
private String krbPrincipal;
|
||||||
UserGroupInformation ugi;
|
private UserGroupInformation ugi;
|
||||||
Configuration clientConf;
|
private Configuration clientConf;
|
||||||
Configuration serverConf;
|
private Configuration serverConf;
|
||||||
|
|
||||||
@Rule
|
|
||||||
public ExpectedException exception = ExpectedException.none();
|
|
||||||
|
|
||||||
@Parameters(name = "{index}: rpcClientImpl={0}, rpcServerImpl={1}")
|
@Parameters(name = "{index}: rpcClientImpl={0}, rpcServerImpl={1}")
|
||||||
public static Collection<Object[]> parameters() {
|
public static Collection<Object[]> parameters() {
|
||||||
|
@ -143,6 +139,9 @@ public class TestSecureIPC {
|
||||||
PRINCIPAL = "hbase/" + HOST;
|
PRINCIPAL = "hbase/" + HOST;
|
||||||
KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL);
|
KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL);
|
||||||
HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
|
HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
|
||||||
|
// set a smaller timeout and retry to speed up tests
|
||||||
|
TEST_UTIL.getConfiguration().setInt(RpcClient.SOCKET_TIMEOUT_READ, 2000);
|
||||||
|
TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxretries", 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
|
@ -158,9 +157,11 @@ public class TestSecureIPC {
|
||||||
krbKeytab = getKeytabFileForTesting();
|
krbKeytab = getKeytabFileForTesting();
|
||||||
krbPrincipal = getPrincipalForTesting();
|
krbPrincipal = getPrincipalForTesting();
|
||||||
ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
|
ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
|
||||||
clientConf = getSecuredConfiguration();
|
clientConf = new Configuration(TEST_UTIL.getConfiguration());
|
||||||
|
setSecuredConfiguration(clientConf);
|
||||||
clientConf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl);
|
clientConf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl);
|
||||||
serverConf = getSecuredConfiguration();
|
serverConf = new Configuration(TEST_UTIL.getConfiguration());
|
||||||
|
setSecuredConfiguration(serverConf);
|
||||||
serverConf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl);
|
serverConf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -303,7 +304,7 @@ public class TestSecureIPC {
|
||||||
callRpcService(User.create(clientUgi));
|
callRpcService(User.create(clientUgi));
|
||||||
}
|
}
|
||||||
|
|
||||||
void setRpcProtection(String clientProtection, String serverProtection) {
|
private void setRpcProtection(String clientProtection, String serverProtection) {
|
||||||
clientConf.set("hbase.rpc.protection", clientProtection);
|
clientConf.set("hbase.rpc.protection", clientProtection);
|
||||||
serverConf.set("hbase.rpc.protection", serverProtection);
|
serverConf.set("hbase.rpc.protection", serverProtection);
|
||||||
}
|
}
|
||||||
|
@ -331,10 +332,9 @@ public class TestSecureIPC {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSaslNoCommonQop() throws Exception {
|
public void testSaslNoCommonQop() throws Exception {
|
||||||
exception.expect(SaslException.class);
|
|
||||||
exception.expectMessage("No common protection layer between client and server");
|
|
||||||
setRpcProtection("integrity", "privacy");
|
setRpcProtection("integrity", "privacy");
|
||||||
callRpcService(User.create(ugi));
|
SaslException se = assertThrows(SaslException.class, () -> callRpcService(User.create(ugi)));
|
||||||
|
assertEquals("No common protection layer between client and server", se.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue