HBASE-25670 Backport HBASE-25665 to branch-1 (#3059)
Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
060e9e7cb1
commit
d07ef8fbcb
|
@ -573,7 +573,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
|
|||
if (this.rpcClient.metrics != null) {
|
||||
this.rpcClient.metrics.incrNsLookups();
|
||||
}
|
||||
isa = Address.toSocketAddress(addr);
|
||||
isa = addr.toSocketAddress();
|
||||
if (isa.isUnresolved()) {
|
||||
if (this.rpcClient.metrics != null) {
|
||||
this.rpcClient.metrics.incrNsLookupsFailed();
|
||||
|
@ -609,7 +609,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
|
|||
if (this.rpcClient.metrics != null) {
|
||||
this.rpcClient.metrics.incrNsLookups();
|
||||
}
|
||||
isa = Address.toSocketAddress(addr);
|
||||
isa = addr.toSocketAddress();
|
||||
if (isa.isUnresolved()) {
|
||||
if (this.rpcClient.metrics != null) {
|
||||
this.rpcClient.metrics.incrNsLookupsFailed();
|
||||
|
|
|
@ -60,7 +60,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
|
||||
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
|
||||
import org.apache.hadoop.hbase.net.Address;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
|
||||
|
@ -255,7 +254,7 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
|
|||
if (this.rpcClient.metrics != null) {
|
||||
this.rpcClient.metrics.incrNsLookups();
|
||||
}
|
||||
InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress());
|
||||
InetSocketAddress remoteAddr = remoteId.getAddress().toSocketAddress();
|
||||
if (remoteAddr.isUnresolved()) {
|
||||
if (this.rpcClient.metrics != null) {
|
||||
this.rpcClient.metrics.incrNsLookupsFailed();
|
||||
|
|
|
@ -54,7 +54,6 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
|
||||
import org.apache.hadoop.hbase.net.Address;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
|
||||
import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
|
||||
import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
|
||||
|
@ -227,7 +226,7 @@ class NettyRpcConnection extends RpcConnection {
|
|||
if (this.rpcClient.metrics != null) {
|
||||
this.rpcClient.metrics.incrNsLookups();
|
||||
}
|
||||
InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress());
|
||||
InetSocketAddress remoteAddr = remoteId.getAddress().toSocketAddress();
|
||||
if (remoteAddr.isUnresolved()) {
|
||||
if (this.rpcClient.metrics != null) {
|
||||
this.rpcClient.metrics.incrNsLookupsFailed();
|
||||
|
|
|
@ -24,6 +24,7 @@ import io.netty.util.TimerTask;
|
|||
import java.io.IOException;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -35,7 +36,6 @@ import org.apache.hadoop.hbase.HConstants;
|
|||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.MetricsConnection;
|
||||
import org.apache.hadoop.hbase.codec.Codec;
|
||||
import org.apache.hadoop.hbase.net.Address;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
|
||||
|
@ -121,7 +121,7 @@ abstract class RpcConnection {
|
|||
if (metrics != null) {
|
||||
metrics.incrNsLookups();
|
||||
}
|
||||
InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress());
|
||||
InetSocketAddress remoteAddr = remoteId.getAddress().toSocketAddress();
|
||||
if (remoteAddr.isUnresolved()) {
|
||||
if (metrics != null) {
|
||||
metrics.incrNsLookupsFailed();
|
||||
|
@ -129,7 +129,7 @@ abstract class RpcConnection {
|
|||
throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
|
||||
}
|
||||
serverPrincipal = SecurityUtil.getServerPrincipal(conf.get(serverKey),
|
||||
remoteAddr.getAddress().getCanonicalHostName().toLowerCase());
|
||||
getHostnameForServerPrincipal(conf, remoteAddr.getAddress()));
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("RPC Server Kerberos principal name for service=" + remoteId.getServiceName()
|
||||
+ " is " + serverPrincipal);
|
||||
|
@ -159,6 +159,30 @@ abstract class RpcConnection {
|
|||
conf.getInt(HConstants.HBASE_MINTIME_BEFORE_FORCE_RELOGIN, 10 * 60 * 1000);
|
||||
}
|
||||
|
||||
private static boolean useCanonicalHostname(Configuration conf) {
|
||||
return !conf.getBoolean(
|
||||
HConstants.UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS,
|
||||
HConstants.DEFAULT_UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS);
|
||||
}
|
||||
|
||||
public static String getHostnameForServerPrincipal(Configuration conf, InetAddress addr) {
|
||||
final String hostname;
|
||||
|
||||
if (useCanonicalHostname(conf)) {
|
||||
hostname = addr.getCanonicalHostName();
|
||||
if (hostname.equals(addr.getHostAddress())) {
|
||||
LOG.warn("Canonical hostname for SASL principal is the same with IP address: "
|
||||
+ hostname + ", " + addr.getHostName() + ". Check DNS configuration or consider "
|
||||
+ HConstants.UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS
|
||||
+ "=true");
|
||||
}
|
||||
} else {
|
||||
hostname = addr.getHostName();
|
||||
}
|
||||
|
||||
return hostname.toLowerCase();
|
||||
}
|
||||
|
||||
private UserInformation getUserInfo(UserGroupInformation ugi) {
|
||||
if (ugi == null || authMethod == AuthMethod.DIGEST) {
|
||||
// Don't send user for token auth
|
||||
|
|
|
@ -1438,6 +1438,18 @@ public final class HConstants {
|
|||
"hbase.regionserver.slowlog.systable.enabled";
|
||||
public static final boolean DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY = false;
|
||||
|
||||
/**
|
||||
* This config is for experts: don't set its value unless you really know what you are doing.
|
||||
* When set to true, HBase client using SASL Kerberos will skip reverse DNS lookup and use
|
||||
* provided hostname of the destination for the principal instead.
|
||||
* See https://issues.apache.org/jira/browse/HBASE-25665 for more details.
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
|
||||
public static final String UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS =
|
||||
"hbase.unsafe.client.kerberos.hostname.disable.reversedns";
|
||||
public static final boolean DEFAULT_UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS =
|
||||
false;
|
||||
|
||||
private HConstants() {
|
||||
// Can't be instantiated with this ctor.
|
||||
}
|
||||
|
|
|
@ -53,8 +53,8 @@ public class Address implements Comparable<Address> {
|
|||
return Address.fromParts(addr.getHostString(), addr.getPort());
|
||||
}
|
||||
|
||||
public static InetSocketAddress toSocketAddress(Address addr) {
|
||||
return new InetSocketAddress(addr.getHostName(), addr.getPort());
|
||||
public InetSocketAddress toSocketAddress() {
|
||||
return new InetSocketAddress(getHostName(), getPort());
|
||||
}
|
||||
|
||||
public static InetSocketAddress[] toSocketAddress(Address[] addrs) {
|
||||
|
@ -63,7 +63,7 @@ public class Address implements Comparable<Address> {
|
|||
}
|
||||
InetSocketAddress[] result = new InetSocketAddress[addrs.length];
|
||||
for (int i = 0; i < addrs.length; i++) {
|
||||
result[i] = toSocketAddress(addrs[i]);
|
||||
result[i] = addrs[i].toSocketAddress();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
|
|
@ -1075,6 +1075,14 @@ possible configurations would overwhelm and obscure the important.
|
|||
be used as a temporary measure while converting clients over to secure authentication. It
|
||||
MUST BE DISABLED for secure operation.</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.unsafe.client.kerberos.hostname.disable.reversedns</name>
|
||||
<value>false</value>
|
||||
<description>This config is for experts: don't set its value unless you really know what you are doing.
|
||||
When set to true, HBase client using SASL Kerberos will skip reverse DNS lookup and use provided
|
||||
hostname of the destination for the principal instead. See https://issues.apache.org/jira/browse/HBASE-25665
|
||||
for more details.</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.coprocessor.enabled</name>
|
||||
<value>true</value>
|
||||
|
|
|
@ -26,6 +26,10 @@ import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConf
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotSame;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.protobuf.BlockingService;
|
||||
|
@ -33,11 +37,16 @@ import com.google.protobuf.ServiceException;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import javax.security.sasl.SaslException;
|
||||
|
||||
|
@ -46,7 +55,9 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
|
||||
import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
|
||||
import org.apache.hadoop.hbase.ipc.ConnectionId;
|
||||
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
|
||||
import org.apache.hadoop.hbase.ipc.NettyRpcClient;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClient;
|
||||
|
@ -55,6 +66,7 @@ import org.apache.hadoop.hbase.ipc.RpcServer;
|
|||
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
|
||||
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
|
||||
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
|
||||
import org.apache.hadoop.hbase.net.Address;
|
||||
import org.apache.hadoop.hbase.testclassification.SecurityTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.minikdc.MiniKdc;
|
||||
|
@ -73,6 +85,8 @@ import org.junit.runners.Parameterized;
|
|||
import org.junit.runners.Parameterized.Parameter;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
@Category({ SecurityTests.class, SmallTests.class })
|
||||
|
@ -105,6 +119,12 @@ public class TestSecureIPC {
|
|||
@Parameter
|
||||
public String rpcClientImpl;
|
||||
|
||||
private Callable<RpcClient> rpcClientFactory = new Callable<RpcClient>() {
|
||||
@Override public RpcClient call() {
|
||||
return RpcClientFactory.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString());
|
||||
}
|
||||
};
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE);
|
||||
|
@ -143,6 +163,92 @@ public class TestSecureIPC {
|
|||
callRpcService(User.create(ugi2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRpcCallWithEnabledKerberosSaslAuth_CanonicalHostname() throws Exception {
|
||||
UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
|
||||
|
||||
// check that the login user is okay:
|
||||
assertSame(ugi2, ugi);
|
||||
assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
|
||||
assertEquals(krbPrincipal, ugi.getUserName());
|
||||
|
||||
clientConf.setBoolean(
|
||||
HConstants.UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS, false);
|
||||
clientConf.set(HBaseKerberosUtils.KRB_PRINCIPAL, "hbase/_HOST@" + KDC.getRealm());
|
||||
|
||||
callRpcService(User.create(ugi2), new CanonicalHostnameTestingRpcClientFactory("localhost"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRpcCallWithEnabledKerberosSaslAuth_NoCanonicalHostname() throws Exception {
|
||||
UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
|
||||
|
||||
// check that the login user is okay:
|
||||
assertSame(ugi2, ugi);
|
||||
assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
|
||||
assertEquals(krbPrincipal, ugi.getUserName());
|
||||
|
||||
clientConf.setBoolean(
|
||||
HConstants.UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS, true);
|
||||
clientConf.set(HBaseKerberosUtils.KRB_PRINCIPAL, "hbase/_HOST@" + KDC.getRealm());
|
||||
|
||||
callRpcService(User.create(ugi2), new CanonicalHostnameTestingRpcClientFactory("127.0.0.1"));
|
||||
}
|
||||
|
||||
public class CanonicalHostnameTestingRpcClientFactory implements Callable<RpcClient> {
|
||||
private final String canonicalHostName;
|
||||
|
||||
public CanonicalHostnameTestingRpcClientFactory(String canonicalHostName) {
|
||||
this.canonicalHostName = canonicalHostName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RpcClient call() throws Exception {
|
||||
final RpcClient rpcClient = rpcClientFactory.call();
|
||||
|
||||
if (rpcClient instanceof AbstractRpcClient<?>) {
|
||||
final AbstractRpcClient<?> abstractRpcClient = (AbstractRpcClient<?>) rpcClient;
|
||||
final AbstractRpcClient<?> spiedRpcClient = spy(abstractRpcClient);
|
||||
|
||||
final Method createConnectionMethod =
|
||||
AbstractRpcClient.class.getDeclaredMethod("createConnection", ConnectionId.class);
|
||||
createConnectionMethod.setAccessible(true);
|
||||
|
||||
final Answer<Object> answer = new Answer<Object>() {
|
||||
@Override public Object answer(InvocationOnMock invocationOnMock)
|
||||
throws InvocationTargetException, IllegalAccessException {
|
||||
final ConnectionId remoteId = invocationOnMock.getArgumentAt(0, ConnectionId.class);
|
||||
|
||||
final InetSocketAddress serverAddr = remoteId.getAddress().toSocketAddress();
|
||||
try {
|
||||
final Field canonicalHostNameField =
|
||||
InetAddress.class.getDeclaredField("canonicalHostName");
|
||||
canonicalHostNameField.setAccessible(true);
|
||||
canonicalHostNameField.set(serverAddr.getAddress(), canonicalHostName);
|
||||
} catch (NoSuchFieldException | IllegalAccessException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
final Address address = spy(remoteId.getAddress());
|
||||
doReturn(serverAddr).when(address).toSocketAddress();
|
||||
|
||||
ConnectionId mockedRemoteId =
|
||||
new ConnectionId(remoteId.getTicket(), remoteId.getServiceName(), address);
|
||||
|
||||
return createConnectionMethod.invoke(abstractRpcClient, mockedRemoteId);
|
||||
}
|
||||
};
|
||||
|
||||
createConnectionMethod.invoke(
|
||||
doAnswer(answer).when(spiedRpcClient), any(ConnectionId.class));
|
||||
return spiedRpcClient;
|
||||
} else {
|
||||
throw new UnsupportedOperationException(
|
||||
rpcClient.getClass() + " isn't supported for testing");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRpcFallbackToSimpleAuth() throws Exception {
|
||||
String clientUsername = "testuser";
|
||||
|
@ -204,11 +310,16 @@ public class TestSecureIPC {
|
|||
return UserGroupInformation.getLoginUser();
|
||||
}
|
||||
|
||||
private void callRpcService(User clientUser) throws Exception {
|
||||
callRpcService(clientUser, rpcClientFactory);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets up a RPC Server and a Client. Does a RPC checks the result. If an exception is thrown from
|
||||
* the stub, this function will throw root cause of that exception.
|
||||
*/
|
||||
private void callRpcService(User clientUser) throws Exception {
|
||||
private void callRpcService(User clientUser, Callable<RpcClient> rpcClientFactory)
|
||||
throws Exception {
|
||||
SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
|
||||
Mockito.when(securityInfoMock.getServerPrincipal())
|
||||
.thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
|
||||
|
@ -221,8 +332,7 @@ public class TestSecureIPC {
|
|||
new RpcServer.BlockingServiceAndInterface((BlockingService) SERVICE, null)),
|
||||
isa, serverConf, new FifoRpcScheduler(serverConf, 1));
|
||||
rpcServer.start();
|
||||
try (RpcClient rpcClient =
|
||||
RpcClientFactory.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString())) {
|
||||
try (RpcClient rpcClient = rpcClientFactory.call()) {
|
||||
BlockingInterface stub =
|
||||
newBlockingStub(rpcClient, rpcServer.getListenerAddress(), clientUser);
|
||||
TestThread th1 = new TestThread(stub);
|
||||
|
|
Loading…
Reference in New Issue