HBASE-27279 Make SslHandler work with SaslWrapHandler/SaslUnwrapHandler (#4705)

Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
Reviewed-by: Andor Molnár <andor@cloudera.com>
This commit is contained in:
Duo Zhang 2022-08-16 21:05:42 +08:00 committed by GitHub
parent eaa47c5cd4
commit 2b9d36869f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 815 additions and 718 deletions

View File

@ -188,4 +188,13 @@ public final class HBaseKerberosUtils {
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTabFileLocation);
return ugi;
}
public static UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal)
throws Exception {
Configuration conf = new Configuration();
conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab(krbPrincipal, krbKeytab);
return UserGroupInformation.getLoginUser();
}
}

View File

@ -221,7 +221,8 @@ class NettyRpcConnection extends RpcConnection {
return;
}
ch.pipeline().addBefore(BufferCallBeforeInitHandler.NAME, null, new SaslChallengeDecoder())
.addBefore(BufferCallBeforeInitHandler.NAME, null, saslHandler);
.addBefore(BufferCallBeforeInitHandler.NAME, NettyHBaseSaslRpcClientHandler.HANDLER_NAME,
saslHandler);
NettyFutureUtils.addListener(saslPromise, new FutureListener<Boolean>() {
@Override

View File

@ -45,16 +45,16 @@ public class NettyHBaseSaslRpcClient extends AbstractHBaseSaslRpcClient {
super(conf, provider, token, serverAddr, securityInfo, fallbackAllowed, rpcProtection);
}
public void setupSaslHandler(ChannelPipeline p) {
public void setupSaslHandler(ChannelPipeline p, String addAfter) {
String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
LOG.trace("SASL client context established. Negotiated QoP {}", qop);
if (qop == null || "auth".equalsIgnoreCase(qop)) {
return;
}
// add wrap and unwrap handlers to pipeline.
p.addFirst(new SaslWrapHandler(saslClient::wrap),
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4),
new SaslUnwrapHandler(saslClient::unwrap));
p.addAfter(addAfter, null, new SaslUnwrapHandler(saslClient::unwrap))
.addAfter(addAfter, null, new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
.addAfter(addAfter, null, new SaslWrapHandler(saslClient::wrap));
}
public String getSaslQOP() {

View File

@ -47,6 +47,8 @@ public class NettyHBaseSaslRpcClientHandler extends SimpleChannelInboundHandler<
private static final Logger LOG = LoggerFactory.getLogger(NettyHBaseSaslRpcClientHandler.class);
public static final String HANDLER_NAME = "SaslRpcClientHandler";
private final Promise<Boolean> saslPromise;
private final UserGroupInformation ugi;
@ -93,7 +95,7 @@ public class NettyHBaseSaslRpcClientHandler extends SimpleChannelInboundHandler<
LOG.trace("SASL negotiation for {} is complete", provider.getSaslAuthMethod().getName());
}
ChannelPipeline p = ctx.pipeline();
saslRpcClient.setupSaslHandler(p);
saslRpcClient.setupSaslHandler(p, HANDLER_NAME);
p.remove(SaslChallengeDecoder.class);
p.remove(this);

View File

@ -89,11 +89,11 @@ class NettyHBaseSaslRpcServerHandler extends SimpleChannelInboundHandler<ByteBuf
boolean useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
ChannelPipeline p = ctx.pipeline();
if (useWrap) {
p.addFirst(new SaslWrapHandler(saslServer::wrap));
p.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4),
p.addBefore(DECODER_NAME, null, new SaslWrapHandler(saslServer::wrap)).addLast(
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4),
new SaslUnwrapHandler(saslServer::unwrap));
}
conn.setupDecoder();
conn.setupHandler();
p.remove(this);
p.remove(DECODER_NAME);
}

View File

@ -117,9 +117,8 @@ public class NettyRpcServer extends RpcServer {
if (conf.getBoolean(HBASE_SERVER_NETTY_TLS_ENABLED, false)) {
initSSL(pipeline, conf.getBoolean(HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, true));
}
pipeline.addLast(NettyRpcServerPreambleHandler.DECODER_NAME, preambleDecoder);
pipeline.addLast(createNettyRpcServerPreambleHandler(),
new NettyRpcServerResponseEncoder(metrics));
pipeline.addLast(NettyRpcServerPreambleHandler.DECODER_NAME, preambleDecoder)
.addLast(createNettyRpcServerPreambleHandler());
}
});
try {

View File

@ -61,7 +61,7 @@ class NettyRpcServerPreambleHandler extends SimpleChannelInboundHandler<ByteBuf>
p.addLast(NettyHBaseSaslRpcServerHandler.DECODER_NAME, decoder);
p.addLast(new NettyHBaseSaslRpcServerHandler(rpcServer, conn));
} else {
conn.setupDecoder();
conn.setupHandler();
}
// add first and then remove, so the single decode decoder will pass the remaining bytes to the
// handler above.

View File

@ -33,7 +33,6 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescrip
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
@ -70,10 +69,11 @@ class NettyServerRpcConnection extends ServerRpcConnection {
this.remotePort = inetSocketAddress.getPort();
}
void setupDecoder() {
ChannelPipeline p = channel.pipeline();
p.addLast("frameDecoder", new NettyRpcFrameDecoder(rpcServer.maxRequestSize, this));
p.addLast("decoder", new NettyRpcServerRequestDecoder(rpcServer.metrics, this));
void setupHandler() {
channel.pipeline()
.addLast("frameDecoder", new NettyRpcFrameDecoder(rpcServer.maxRequestSize, this))
.addLast("decoder", new NettyRpcServerRequestDecoder(rpcServer.metrics, this))
.addLast("encoder", new NettyRpcServerResponseEncoder(rpcServer.metrics));
}
void process(ByteBuf buf) throws IOException, InterruptedException {

View File

@ -0,0 +1,390 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.security;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting;
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.loginKerberosPrincipal;
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.setSecuredConfiguration;
import static org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders.SELECTOR_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.security.provider.AuthenticationProviderSelector;
import org.apache.hadoop.hbase.security.provider.BuiltInProviderSelector;
import org.apache.hadoop.hbase.security.provider.SaslAuthMethod;
import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.junit.Test;
import org.mockito.Mockito;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
public class AbstractTestSecureIPC {
protected static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
protected static final File KEYTAB_FILE =
new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
protected static MiniKdc KDC;
protected static String HOST = "localhost";
protected static String PRINCIPAL;
protected String krbKeytab;
protected String krbPrincipal;
protected UserGroupInformation ugi;
protected Configuration clientConf;
protected Configuration serverConf;
protected static void initKDCAndConf() throws Exception {
KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE);
PRINCIPAL = "hbase/" + HOST;
KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL);
HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
// set a smaller timeout and retry to speed up tests
TEST_UTIL.getConfiguration().setInt(RpcClient.SOCKET_TIMEOUT_READ, 2000);
TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxretries", 1);
}
protected static void stopKDC() throws InterruptedException {
if (KDC != null) {
KDC.stop();
}
}
protected final void setUpPrincipalAndConf() throws Exception {
krbKeytab = getKeytabFileForTesting();
krbPrincipal = getPrincipalForTesting();
ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
clientConf = new Configuration(TEST_UTIL.getConfiguration());
setSecuredConfiguration(clientConf);
serverConf = new Configuration(TEST_UTIL.getConfiguration());
setSecuredConfiguration(serverConf);
}
@Test
public void testRpcCallWithEnabledKerberosSaslAuth() throws Exception {
UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
// check that the login user is okay:
assertSame(ugi2, ugi);
assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
assertEquals(krbPrincipal, ugi.getUserName());
callRpcService(User.create(ugi2));
}
@Test
public void testRpcCallWithEnabledKerberosSaslAuthCanonicalHostname() throws Exception {
UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
// check that the login user is okay:
assertSame(ugi2, ugi);
assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
assertEquals(krbPrincipal, ugi.getUserName());
enableCanonicalHostnameTesting(clientConf, "localhost");
clientConf.setBoolean(
SecurityConstants.UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS, false);
clientConf.set(HBaseKerberosUtils.KRB_PRINCIPAL, "hbase/_HOST@" + KDC.getRealm());
callRpcService(User.create(ugi2));
}
@Test
public void testRpcCallWithEnabledKerberosSaslAuthNoCanonicalHostname() throws Exception {
UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
// check that the login user is okay:
assertSame(ugi2, ugi);
assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
assertEquals(krbPrincipal, ugi.getUserName());
enableCanonicalHostnameTesting(clientConf, "127.0.0.1");
clientConf
.setBoolean(SecurityConstants.UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS, true);
clientConf.set(HBaseKerberosUtils.KRB_PRINCIPAL, "hbase/_HOST@" + KDC.getRealm());
callRpcService(User.create(ugi2));
}
private static void enableCanonicalHostnameTesting(Configuration conf, String canonicalHostname) {
conf.setClass(SELECTOR_KEY, CanonicalHostnameTestingAuthenticationProviderSelector.class,
AuthenticationProviderSelector.class);
conf.set(CanonicalHostnameTestingAuthenticationProviderSelector.CANONICAL_HOST_NAME_KEY,
canonicalHostname);
}
public static class CanonicalHostnameTestingAuthenticationProviderSelector
extends BuiltInProviderSelector {
private static final String CANONICAL_HOST_NAME_KEY =
"CanonicalHostnameTestingAuthenticationProviderSelector.canonicalHostName";
@Override
public Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>>
selectProvider(String clusterId, User user) {
final Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> pair =
super.selectProvider(clusterId, user);
pair.setFirst(createCanonicalHostNameTestingProvider(pair.getFirst()));
return pair;
}
SaslClientAuthenticationProvider
createCanonicalHostNameTestingProvider(SaslClientAuthenticationProvider delegate) {
return new SaslClientAuthenticationProvider() {
@Override
public SaslClient createClient(Configuration conf, InetAddress serverAddr,
SecurityInfo securityInfo, Token<? extends TokenIdentifier> token,
boolean fallbackAllowed, Map<String, String> saslProps) throws IOException {
final String s = conf.get(CANONICAL_HOST_NAME_KEY);
if (s != null) {
try {
final Field canonicalHostName =
InetAddress.class.getDeclaredField("canonicalHostName");
canonicalHostName.setAccessible(true);
canonicalHostName.set(serverAddr, s);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
return delegate.createClient(conf, serverAddr, securityInfo, token, fallbackAllowed,
saslProps);
}
@Override
public UserInformation getUserInfo(User user) {
return delegate.getUserInfo(user);
}
@Override
public UserGroupInformation getRealUser(User ugi) {
return delegate.getRealUser(ugi);
}
@Override
public boolean canRetry() {
return delegate.canRetry();
}
@Override
public void relogin() throws IOException {
delegate.relogin();
}
@Override
public SaslAuthMethod getSaslAuthMethod() {
return delegate.getSaslAuthMethod();
}
@Override
public String getTokenKind() {
return delegate.getTokenKind();
}
};
}
}
@Test
public void testRpcFallbackToSimpleAuth() throws Exception {
String clientUsername = "testuser";
UserGroupInformation clientUgi =
UserGroupInformation.createUserForTesting(clientUsername, new String[] { clientUsername });
// check that the client user is insecure
assertNotSame(ugi, clientUgi);
assertEquals(AuthenticationMethod.SIMPLE, clientUgi.getAuthenticationMethod());
assertEquals(clientUsername, clientUgi.getUserName());
clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
serverConf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, true);
callRpcService(User.create(clientUgi));
}
private void setRpcProtection(String clientProtection, String serverProtection) {
clientConf.set("hbase.rpc.protection", clientProtection);
serverConf.set("hbase.rpc.protection", serverProtection);
}
/**
* Test various combinations of Server and Client qops.
*/
@Test
public void testSaslWithCommonQop() throws Exception {
setRpcProtection("privacy,authentication", "authentication");
callRpcService(User.create(ugi));
setRpcProtection("authentication", "privacy,authentication");
callRpcService(User.create(ugi));
setRpcProtection("integrity,authentication", "privacy,authentication");
callRpcService(User.create(ugi));
setRpcProtection("integrity,authentication", "integrity,authentication");
callRpcService(User.create(ugi));
setRpcProtection("privacy,authentication", "privacy,authentication");
callRpcService(User.create(ugi));
}
@Test
public void testSaslNoCommonQop() throws Exception {
setRpcProtection("integrity", "privacy");
SaslException se = assertThrows(SaslException.class, () -> callRpcService(User.create(ugi)));
assertEquals("No common protection layer between client and server", se.getMessage());
}
/**
* Test sasl encryption with Crypto AES.
*/
@Test
public void testSaslWithCryptoAES() throws Exception {
setRpcProtection("privacy", "privacy");
setCryptoAES("true", "true");
callRpcService(User.create(ugi));
}
/**
* Test various combinations of Server and Client configuration for Crypto AES. n
*/
@Test
public void testDifferentConfWithCryptoAES() throws Exception {
setRpcProtection("privacy", "privacy");
setCryptoAES("false", "true");
callRpcService(User.create(ugi));
setCryptoAES("true", "false");
try {
callRpcService(User.create(ugi));
fail("The exception should be thrown out for the rpc timeout.");
} catch (Exception e) {
// ignore the expected exception
}
}
private void setCryptoAES(String clientCryptoAES, String serverCryptoAES) {
clientConf.set("hbase.rpc.crypto.encryption.aes.enabled", clientCryptoAES);
serverConf.set("hbase.rpc.crypto.encryption.aes.enabled", serverCryptoAES);
}
/**
* Sets up a RPC Server and a Client. Does a RPC checks the result. If an exception is thrown from
* the stub, this function will throw root cause of that exception.
*/
private void callRpcService(User clientUser) throws Exception {
SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
Mockito.when(securityInfoMock.getServerPrincipal())
.thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
InetSocketAddress isa = new InetSocketAddress(HOST, 0);
RpcServerInterface rpcServer = RpcServerFactory.createRpcServer(null, "AbstractTestSecureIPC",
Lists
.newArrayList(new RpcServer.BlockingServiceAndInterface((BlockingService) SERVICE, null)),
isa, serverConf, new FifoRpcScheduler(serverConf, 1));
rpcServer.start();
try (RpcClient rpcClient =
RpcClientFactory.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString())) {
BlockingInterface stub =
newBlockingStub(rpcClient, rpcServer.getListenerAddress(), clientUser);
TestThread th1 = new TestThread(stub);
final Throwable exception[] = new Throwable[1];
Collections.synchronizedList(new ArrayList<Throwable>());
Thread.UncaughtExceptionHandler exceptionHandler = new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread th, Throwable ex) {
exception[0] = ex;
}
};
th1.setUncaughtExceptionHandler(exceptionHandler);
th1.start();
th1.join();
if (exception[0] != null) {
// throw root cause.
while (exception[0].getCause() != null) {
exception[0] = exception[0].getCause();
}
throw (Exception) exception[0];
}
} finally {
rpcServer.stop();
}
}
public static class TestThread extends Thread {
private final BlockingInterface stub;
public TestThread(BlockingInterface stub) {
this.stub = stub;
}
@Override
public void run() {
try {
int[] messageSize = new int[] { 100, 1000, 10000 };
for (int i = 0; i < messageSize.length; i++) {
String input = RandomStringUtils.random(messageSize[i]);
String result =
stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage(input).build())
.getMessage();
assertEquals(input, result);
}
} catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) {
throw new RuntimeException(e);
}
}
}
}

View File

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.security;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertThrows;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.Security;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
import org.apache.hadoop.hbase.io.crypto.tls.KeyStoreFileType;
import org.apache.hadoop.hbase.io.crypto.tls.X509KeyType;
import org.apache.hadoop.hbase.io.crypto.tls.X509TestContext;
import org.apache.hadoop.hbase.io.crypto.tls.X509TestContextProvider;
import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
import org.apache.hadoop.hbase.ipc.NettyRpcClient;
import org.apache.hadoop.hbase.ipc.NettyRpcServer;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runners.Parameterized;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
public abstract class AbstractTestTlsRejectPlainText {
protected static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil();
protected static File DIR;
protected static X509TestContextProvider PROVIDER;
@Parameterized.Parameter(0)
public X509KeyType caKeyType;
@Parameterized.Parameter(1)
public X509KeyType certKeyType;
@Parameterized.Parameter(2)
public String keyPassword;
private X509TestContext x509TestContext;
protected RpcServer rpcServer;
protected RpcClient rpcClient;
@Parameterized.Parameters(name = "{index}: caKeyType={0}, certKeyType={1}, keyPassword={2}")
public static List<Object[]> data() {
List<Object[]> params = new ArrayList<>();
for (X509KeyType caKeyType : X509KeyType.values()) {
for (X509KeyType certKeyType : X509KeyType.values()) {
for (String keyPassword : new String[] { "", "pa$$w0rd" }) {
params.add(new Object[] { caKeyType, certKeyType, keyPassword });
}
}
}
return params;
}
protected static void initialize() throws IOException {
Security.addProvider(new BouncyCastleProvider());
DIR =
new File(UTIL.getDataTestDir(AbstractTestTlsRejectPlainText.class.getSimpleName()).toString())
.getCanonicalFile();
FileUtils.forceMkdir(DIR);
Configuration conf = UTIL.getConfiguration();
conf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, NettyRpcClient.class,
RpcClient.class);
conf.setClass(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, NettyRpcServer.class,
RpcServer.class);
conf.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_ENABLED, true);
conf.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, false);
conf.setBoolean(X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED, false);
PROVIDER = new X509TestContextProvider(conf, DIR);
}
protected static void cleanUp() {
Security.removeProvider(BouncyCastleProvider.PROVIDER_NAME);
UTIL.cleanupTestDir();
}
@Before
public void setUp() throws Exception {
x509TestContext = PROVIDER.get(caKeyType, certKeyType, keyPassword);
x509TestContext.setConfigurations(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
Configuration conf = UTIL.getConfiguration();
rpcServer = new NettyRpcServer(null, "testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
new InetSocketAddress("localhost", 0), conf, new FifoRpcScheduler(conf, 1), true);
rpcServer.start();
rpcClient = new NettyRpcClient(conf);
}
@After
public void tearDown() throws IOException {
if (rpcServer != null) {
rpcServer.stop();
}
Closeables.close(rpcClient, true);
x509TestContext.clearConfigurations();
x509TestContext.getConf().unset(X509Util.TLS_CONFIG_OCSP);
x509TestContext.getConf().unset(X509Util.TLS_CONFIG_CLR);
x509TestContext.getConf().unset(X509Util.TLS_CONFIG_PROTOCOL);
System.clearProperty("com.sun.net.ssl.checkRevocation");
System.clearProperty("com.sun.security.enableCRLDP");
Security.setProperty("ocsp.enable", Boolean.FALSE.toString());
Security.setProperty("com.sun.security.enableCRLDP", Boolean.FALSE.toString());
}
protected abstract BlockingInterface createStub() throws Exception;
@Test
public void testReject() throws Exception {
BlockingInterface stub = createStub();
ServiceException se = assertThrows(ServiceException.class,
() -> stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello world").build()));
assertThat(se.getCause(), instanceOf(ConnectionClosedException.class));
}
}

View File

@ -17,147 +17,40 @@
*/
package org.apache.hadoop.hbase.security;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertThrows;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.Security;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
import org.apache.hadoop.hbase.io.crypto.tls.KeyStoreFileType;
import org.apache.hadoop.hbase.io.crypto.tls.X509KeyType;
import org.apache.hadoop.hbase.io.crypto.tls.X509TestContext;
import org.apache.hadoop.hbase.io.crypto.tls.X509TestContextProvider;
import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
import org.apache.hadoop.hbase.ipc.NettyRpcClient;
import org.apache.hadoop.hbase.ipc.NettyRpcServer;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
@RunWith(Parameterized.class)
@Category({ RPCTests.class, MediumTests.class })
public class TestNettyTlsIPCRejectPlainText {
public class TestNettyTlsIPCRejectPlainText extends AbstractTestTlsRejectPlainText {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestNettyTlsIPCRejectPlainText.class);
private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil();
private static File DIR;
private static X509TestContextProvider PROVIDER;
@Parameterized.Parameter(0)
public X509KeyType caKeyType;
@Parameterized.Parameter(1)
public X509KeyType certKeyType;
@Parameterized.Parameter(2)
public String keyPassword;
private X509TestContext x509TestContext;
private RpcServer rpcServer;
private RpcClient rpcClient;
@Parameterized.Parameters(name = "{index}: caKeyType={0}, certKeyType={1}, keyPassword={2}")
public static List<Object[]> data() {
List<Object[]> params = new ArrayList<>();
for (X509KeyType caKeyType : X509KeyType.values()) {
for (X509KeyType certKeyType : X509KeyType.values()) {
for (String keyPassword : new String[] { "", "pa$$w0rd" }) {
params.add(new Object[] { caKeyType, certKeyType, keyPassword });
}
}
}
return params;
}
@BeforeClass
public static void setUpBeforeClass() throws IOException {
Security.addProvider(new BouncyCastleProvider());
DIR = new File(UTIL.getDataTestDir(TestNettyTlsIPC.class.getSimpleName()).toString())
.getCanonicalFile();
FileUtils.forceMkdir(DIR);
Configuration conf = UTIL.getConfiguration();
conf.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_ENABLED, true);
conf.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, false);
conf.setBoolean(X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED, false);
PROVIDER = new X509TestContextProvider(conf, DIR);
initialize();
}
@AfterClass
public static void tearDownAfterClass() {
Security.removeProvider(BouncyCastleProvider.PROVIDER_NAME);
UTIL.cleanupTestDir();
cleanUp();
}
@Before
public void setUp() throws IOException {
x509TestContext = PROVIDER.get(caKeyType, certKeyType, keyPassword);
x509TestContext.setConfigurations(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
Configuration conf = UTIL.getConfiguration();
rpcServer = new NettyRpcServer(null, "testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
new InetSocketAddress("localhost", 0), conf, new FifoRpcScheduler(conf, 1), true);
rpcServer.start();
rpcClient = new NettyRpcClient(conf);
}
@After
public void tearDown() throws IOException {
if (rpcServer != null) {
rpcServer.stop();
}
Closeables.close(rpcClient, true);
x509TestContext.clearConfigurations();
x509TestContext.getConf().unset(X509Util.TLS_CONFIG_OCSP);
x509TestContext.getConf().unset(X509Util.TLS_CONFIG_CLR);
x509TestContext.getConf().unset(X509Util.TLS_CONFIG_PROTOCOL);
System.clearProperty("com.sun.net.ssl.checkRevocation");
System.clearProperty("com.sun.security.enableCRLDP");
Security.setProperty("ocsp.enable", Boolean.FALSE.toString());
Security.setProperty("com.sun.security.enableCRLDP", Boolean.FALSE.toString());
}
@Test
public void testReject() throws IOException, ServiceException {
BlockingInterface stub = newBlockingStub(rpcClient, rpcServer.getListenerAddress());
ServiceException se = assertThrows(ServiceException.class,
() -> stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello world").build()));
assertThat(se.getCause(), instanceOf(ConnectionClosedException.class));
@Override
protected BlockingInterface createStub() throws Exception {
return TestProtobufRpcServiceImpl.newBlockingStub(rpcClient, rpcServer.getListenerAddress());
}
}

View File

@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.security;
import java.io.File;
import java.security.Security;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.io.crypto.tls.KeyStoreFileType;
import org.apache.hadoop.hbase.io.crypto.tls.X509KeyType;
import org.apache.hadoop.hbase.io.crypto.tls.X509TestContext;
import org.apache.hadoop.hbase.io.crypto.tls.X509TestContextProvider;
import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
import org.apache.hadoop.hbase.ipc.NettyRpcClient;
import org.apache.hadoop.hbase.ipc.NettyRpcServer;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.SecurityTests;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
@Category({ SecurityTests.class, LargeTests.class })
public class TestSaslTlsIPC extends AbstractTestSecureIPC {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSaslTlsIPC.class);
private static X509TestContextProvider PROVIDER;
@Parameterized.Parameter(0)
public X509KeyType caKeyType;
@Parameterized.Parameter(1)
public X509KeyType certKeyType;
@Parameterized.Parameter(2)
public String keyPassword;
@Parameterized.Parameter(3)
public boolean acceptPlainText;
@Parameterized.Parameter(4)
public boolean clientTlsEnabled;
private X509TestContext x509TestContext;
@Parameterized.Parameters(
name = "{index}: caKeyType={0}, certKeyType={1}, keyPassword={2}, acceptPlainText={3},"
+ " clientTlsEnabled={4}")
public static List<Object[]> data() {
List<Object[]> params = new ArrayList<>();
for (X509KeyType caKeyType : X509KeyType.values()) {
for (X509KeyType certKeyType : X509KeyType.values()) {
for (String keyPassword : new String[] { "", "pa$$w0rd" }) {
// do not accept plain text
params.add(new Object[] { caKeyType, certKeyType, keyPassword, false, true });
// support plain text and client enables tls
params.add(new Object[] { caKeyType, certKeyType, keyPassword, true, true });
// support plain text and client disables tls
params.add(new Object[] { caKeyType, certKeyType, keyPassword, true, false });
}
}
}
return params;
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Security.addProvider(new BouncyCastleProvider());
File dir = new File(TEST_UTIL.getDataTestDir(TestNettyTlsIPC.class.getSimpleName()).toString())
.getCanonicalFile();
FileUtils.forceMkdir(dir);
initKDCAndConf();
Configuration conf = TEST_UTIL.getConfiguration();
// server must enable tls
conf.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_ENABLED, true);
// only netty support tls
conf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, NettyRpcClient.class,
RpcClient.class);
conf.setClass(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, NettyRpcServer.class,
RpcServer.class);
PROVIDER = new X509TestContextProvider(conf, dir);
}
@AfterClass
public static void tearDownAfterClass() throws InterruptedException {
stopKDC();
Security.removeProvider(BouncyCastleProvider.PROVIDER_NAME);
TEST_UTIL.cleanupTestDir();
}
@Before
public void setUp() throws Exception {
x509TestContext = PROVIDER.get(caKeyType, certKeyType, keyPassword);
x509TestContext.setConfigurations(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
Configuration conf = TEST_UTIL.getConfiguration();
conf.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, acceptPlainText);
conf.setBoolean(X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED, clientTlsEnabled);
setUpPrincipalAndConf();
}
@After
public void tearDown() {
x509TestContext.clearConfigurations();
x509TestContext.getConf().unset(X509Util.TLS_CONFIG_OCSP);
x509TestContext.getConf().unset(X509Util.TLS_CONFIG_CLR);
x509TestContext.getConf().unset(X509Util.TLS_CONFIG_PROTOCOL);
System.clearProperty("com.sun.net.ssl.checkRevocation");
System.clearProperty("com.sun.security.enableCRLDP");
Security.setProperty("ocsp.enable", Boolean.FALSE.toString());
Security.setProperty("com.sun.security.enableCRLDP", Boolean.FALSE.toString());
}
}

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.security;
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.loginKerberosPrincipal;
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.setSecuredConfiguration;
import java.io.File;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.SecurityTests;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
@RunWith(Parameterized.class)
@Category({ SecurityTests.class, MediumTests.class })
public class TestSaslTlsIPCRejectPlainText extends AbstractTestTlsRejectPlainText {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSaslTlsIPCRejectPlainText.class);
private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
private static final File KEYTAB_FILE = new File(UTIL.getDataTestDir("keytab").toUri().getPath());
private static MiniKdc KDC;
private static String HOST = "localhost";
private static String PRINCIPAL;
private static UserGroupInformation UGI;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
initialize();
KDC = UTIL.setupMiniKdc(KEYTAB_FILE);
PRINCIPAL = "hbase/" + HOST;
KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL);
HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
UGI = loginKerberosPrincipal(KEYTAB_FILE.getCanonicalPath(), PRINCIPAL);
setSecuredConfiguration(UTIL.getConfiguration());
SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
Mockito.when(securityInfoMock.getServerPrincipal())
.thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
}
@AfterClass
public static void tearDownAfterClass() {
if (KDC != null) {
KDC.stop();
}
cleanUp();
}
@Override
protected BlockingInterface createStub() throws Exception {
return TestProtobufRpcServiceImpl.newBlockingStub(rpcClient, rpcServer.getListenerAddress(),
User.create(UGI));
}
}

View File

@ -17,101 +17,37 @@
*/
package org.apache.hadoop.hbase.security;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting;
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.setSecuredConfiguration;
import static org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders.SELECTOR_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
import org.apache.hadoop.hbase.ipc.NettyRpcClient;
import org.apache.hadoop.hbase.ipc.NettyRpcServer;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.SimpleRpcServer;
import org.apache.hadoop.hbase.security.provider.AuthenticationProviderSelector;
import org.apache.hadoop.hbase.security.provider.BuiltInProviderSelector;
import org.apache.hadoop.hbase.security.provider.SaslAuthMethod;
import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.SecurityTests;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.mockito.Mockito;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
@RunWith(Parameterized.class)
@Category({ SecurityTests.class, LargeTests.class })
public class TestSecureIPC {
public class TestSecureIPC extends AbstractTestSecureIPC {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSecureIPC.class);
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static final File KEYTAB_FILE =
new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
private static MiniKdc KDC;
private static String HOST = "localhost";
private static String PRINCIPAL;
private String krbKeytab;
private String krbPrincipal;
private UserGroupInformation ugi;
private Configuration clientConf;
private Configuration serverConf;
@Parameters(name = "{index}: rpcClientImpl={0}, rpcServerImpl={1}")
public static Collection<Object[]> parameters() {
List<Object[]> params = new ArrayList<>();
@ -135,317 +71,19 @@ public class TestSecureIPC {
@BeforeClass
public static void setUp() throws Exception {
KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE);
PRINCIPAL = "hbase/" + HOST;
KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL);
HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
// set a smaller timeout and retry to speed up tests
TEST_UTIL.getConfiguration().setInt(RpcClient.SOCKET_TIMEOUT_READ, 2000);
TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxretries", 1);
initKDCAndConf();
}
@AfterClass
public static void tearDown() throws IOException {
if (KDC != null) {
KDC.stop();
}
public static void tearDown() throws Exception {
stopKDC();
TEST_UTIL.cleanupTestDir();
}
@Before
public void setUpTest() throws Exception {
krbKeytab = getKeytabFileForTesting();
krbPrincipal = getPrincipalForTesting();
ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
clientConf = new Configuration(TEST_UTIL.getConfiguration());
setSecuredConfiguration(clientConf);
setUpPrincipalAndConf();
clientConf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl);
serverConf = new Configuration(TEST_UTIL.getConfiguration());
setSecuredConfiguration(serverConf);
serverConf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl);
}
@Test
public void testRpcCallWithEnabledKerberosSaslAuth() throws Exception {
UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
// check that the login user is okay:
assertSame(ugi2, ugi);
assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
assertEquals(krbPrincipal, ugi.getUserName());
callRpcService(User.create(ugi2));
}
@Test
public void testRpcCallWithEnabledKerberosSaslAuth_CanonicalHostname() throws Exception {
UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
// check that the login user is okay:
assertSame(ugi2, ugi);
assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
assertEquals(krbPrincipal, ugi.getUserName());
enableCanonicalHostnameTesting(clientConf, "localhost");
clientConf.setBoolean(
SecurityConstants.UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS, false);
clientConf.set(HBaseKerberosUtils.KRB_PRINCIPAL, "hbase/_HOST@" + KDC.getRealm());
callRpcService(User.create(ugi2));
}
@Test
public void testRpcCallWithEnabledKerberosSaslAuth_NoCanonicalHostname() throws Exception {
UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
// check that the login user is okay:
assertSame(ugi2, ugi);
assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
assertEquals(krbPrincipal, ugi.getUserName());
enableCanonicalHostnameTesting(clientConf, "127.0.0.1");
clientConf
.setBoolean(SecurityConstants.UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS, true);
clientConf.set(HBaseKerberosUtils.KRB_PRINCIPAL, "hbase/_HOST@" + KDC.getRealm());
callRpcService(User.create(ugi2));
}
private static void enableCanonicalHostnameTesting(Configuration conf, String canonicalHostname) {
conf.setClass(SELECTOR_KEY, CanonicalHostnameTestingAuthenticationProviderSelector.class,
AuthenticationProviderSelector.class);
conf.set(CanonicalHostnameTestingAuthenticationProviderSelector.CANONICAL_HOST_NAME_KEY,
canonicalHostname);
}
public static class CanonicalHostnameTestingAuthenticationProviderSelector
extends BuiltInProviderSelector {
private static final String CANONICAL_HOST_NAME_KEY =
"CanonicalHostnameTestingAuthenticationProviderSelector.canonicalHostName";
@Override
public Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>>
selectProvider(String clusterId, User user) {
final Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> pair =
super.selectProvider(clusterId, user);
pair.setFirst(createCanonicalHostNameTestingProvider(pair.getFirst()));
return pair;
}
SaslClientAuthenticationProvider
createCanonicalHostNameTestingProvider(SaslClientAuthenticationProvider delegate) {
return new SaslClientAuthenticationProvider() {
@Override
public SaslClient createClient(Configuration conf, InetAddress serverAddr,
SecurityInfo securityInfo, Token<? extends TokenIdentifier> token,
boolean fallbackAllowed, Map<String, String> saslProps) throws IOException {
final String s = conf.get(CANONICAL_HOST_NAME_KEY);
if (s != null) {
try {
final Field canonicalHostName =
InetAddress.class.getDeclaredField("canonicalHostName");
canonicalHostName.setAccessible(true);
canonicalHostName.set(serverAddr, s);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
return delegate.createClient(conf, serverAddr, securityInfo, token, fallbackAllowed,
saslProps);
}
@Override
public UserInformation getUserInfo(User user) {
return delegate.getUserInfo(user);
}
@Override
public UserGroupInformation getRealUser(User ugi) {
return delegate.getRealUser(ugi);
}
@Override
public boolean canRetry() {
return delegate.canRetry();
}
@Override
public void relogin() throws IOException {
delegate.relogin();
}
@Override
public SaslAuthMethod getSaslAuthMethod() {
return delegate.getSaslAuthMethod();
}
@Override
public String getTokenKind() {
return delegate.getTokenKind();
}
};
}
}
@Test
public void testRpcFallbackToSimpleAuth() throws Exception {
String clientUsername = "testuser";
UserGroupInformation clientUgi =
UserGroupInformation.createUserForTesting(clientUsername, new String[] { clientUsername });
// check that the client user is insecure
assertNotSame(ugi, clientUgi);
assertEquals(AuthenticationMethod.SIMPLE, clientUgi.getAuthenticationMethod());
assertEquals(clientUsername, clientUgi.getUserName());
clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
serverConf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, true);
callRpcService(User.create(clientUgi));
}
private void setRpcProtection(String clientProtection, String serverProtection) {
clientConf.set("hbase.rpc.protection", clientProtection);
serverConf.set("hbase.rpc.protection", serverProtection);
}
/**
* Test various combinations of Server and Client qops. n
*/
@Test
public void testSaslWithCommonQop() throws Exception {
setRpcProtection("privacy,authentication", "authentication");
callRpcService(User.create(ugi));
setRpcProtection("authentication", "privacy,authentication");
callRpcService(User.create(ugi));
setRpcProtection("integrity,authentication", "privacy,authentication");
callRpcService(User.create(ugi));
setRpcProtection("integrity,authentication", "integrity,authentication");
callRpcService(User.create(ugi));
setRpcProtection("privacy,authentication", "privacy,authentication");
callRpcService(User.create(ugi));
}
@Test
public void testSaslNoCommonQop() throws Exception {
setRpcProtection("integrity", "privacy");
SaslException se = assertThrows(SaslException.class, () -> callRpcService(User.create(ugi)));
assertEquals("No common protection layer between client and server", se.getMessage());
}
/**
* Test sasl encryption with Crypto AES. n
*/
@Test
public void testSaslWithCryptoAES() throws Exception {
setRpcProtection("privacy", "privacy");
setCryptoAES("true", "true");
callRpcService(User.create(ugi));
}
/**
* Test various combinations of Server and Client configuration for Crypto AES. n
*/
@Test
public void testDifferentConfWithCryptoAES() throws Exception {
setRpcProtection("privacy", "privacy");
setCryptoAES("false", "true");
callRpcService(User.create(ugi));
setCryptoAES("true", "false");
try {
callRpcService(User.create(ugi));
fail("The exception should be thrown out for the rpc timeout.");
} catch (Exception e) {
// ignore the expected exception
}
}
void setCryptoAES(String clientCryptoAES, String serverCryptoAES) {
clientConf.set("hbase.rpc.crypto.encryption.aes.enabled", clientCryptoAES);
serverConf.set("hbase.rpc.crypto.encryption.aes.enabled", serverCryptoAES);
}
private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal)
throws Exception {
Configuration cnf = new Configuration();
cnf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
UserGroupInformation.setConfiguration(cnf);
UserGroupInformation.loginUserFromKeytab(krbPrincipal, krbKeytab);
return UserGroupInformation.getLoginUser();
}
/**
* Sets up a RPC Server and a Client. Does a RPC checks the result. If an exception is thrown from
* the stub, this function will throw root cause of that exception.
*/
private void callRpcService(User clientUser) throws Exception {
SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
Mockito.when(securityInfoMock.getServerPrincipal())
.thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
InetSocketAddress isa = new InetSocketAddress(HOST, 0);
RpcServerInterface rpcServer = RpcServerFactory.createRpcServer(null, "AbstractTestSecureIPC",
Lists
.newArrayList(new RpcServer.BlockingServiceAndInterface((BlockingService) SERVICE, null)),
isa, serverConf, new FifoRpcScheduler(serverConf, 1));
rpcServer.start();
try (RpcClient rpcClient =
RpcClientFactory.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString())) {
BlockingInterface stub =
newBlockingStub(rpcClient, rpcServer.getListenerAddress(), clientUser);
TestThread th1 = new TestThread(stub);
final Throwable exception[] = new Throwable[1];
Collections.synchronizedList(new ArrayList<Throwable>());
Thread.UncaughtExceptionHandler exceptionHandler = new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread th, Throwable ex) {
exception[0] = ex;
}
};
th1.setUncaughtExceptionHandler(exceptionHandler);
th1.start();
th1.join();
if (exception[0] != null) {
// throw root cause.
while (exception[0].getCause() != null) {
exception[0] = exception[0].getCause();
}
throw (Exception) exception[0];
}
} finally {
rpcServer.stop();
}
}
public static class TestThread extends Thread {
private final BlockingInterface stub;
public TestThread(BlockingInterface stub) {
this.stub = stub;
}
@Override
public void run() {
try {
int[] messageSize = new int[] { 100, 1000, 10000 };
for (int i = 0; i < messageSize.length; i++) {
String input = RandomStringUtils.random(messageSize[i]);
String result =
stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage(input).build())
.getMessage();
assertEquals(input, result);
}
} catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) {
throw new RuntimeException(e);
}
}
}
}

View File

@ -1,219 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.security;
import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED;
import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_SERVER_NETTY_TLS_ENABLED;
import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting;
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.setSecuredConfiguration;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.Security;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
import org.apache.hadoop.hbase.io.crypto.tls.KeyStoreFileType;
import org.apache.hadoop.hbase.io.crypto.tls.X509KeyType;
import org.apache.hadoop.hbase.io.crypto.tls.X509TestContext;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
import org.apache.hadoop.hbase.ipc.NettyRpcClient;
import org.apache.hadoop.hbase.ipc.NettyRpcServer;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.SecurityTests;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.UserGroupInformation;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos;
@Category({ SecurityTests.class, LargeTests.class })
public class TestTlsWithKerberos {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestTlsWithKerberos.class);
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static final File KEYTAB_FILE =
new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
private static MiniKdc KDC;
private static final String HOST = "localhost";
private static String PRINCIPAL;
private static final String RPC_CLIENT_IMPL = NettyRpcClient.class.getName();
private static final String RPC_SERVER_IMPL = NettyRpcServer.class.getName();
private String krbKeytab;
private String krbPrincipal;
private UserGroupInformation ugi;
private Configuration clientConf;
private Configuration serverConf;
private static X509TestContext x509TestContext;
@BeforeClass
public static void setUp() throws Exception {
Security.addProvider(new BouncyCastleProvider());
KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE);
PRINCIPAL = "hbase/" + HOST;
KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL);
HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
x509TestContext = X509TestContext.newBuilder()
.setTempDir(new File(TEST_UTIL.getDataTestDir().toUri().getPath()))
.setKeyStorePassword("Pa$$word").setKeyStoreKeyType(X509KeyType.RSA)
.setTrustStoreKeyType(X509KeyType.RSA).setTrustStorePassword("Pa$$word").build();
x509TestContext.setConfigurations(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
}
@AfterClass
public static void tearDown() throws IOException {
Security.removeProvider(BouncyCastleProvider.PROVIDER_NAME);
if (KDC != null) {
KDC.stop();
}
TEST_UTIL.cleanupTestDir();
}
@Before
public void setUpTest() throws Exception {
krbKeytab = getKeytabFileForTesting();
krbPrincipal = getPrincipalForTesting();
ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
clientConf = HBaseConfiguration.create(x509TestContext.getConf());
setSecuredConfiguration(clientConf);
clientConf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RPC_CLIENT_IMPL);
serverConf = HBaseConfiguration.create(x509TestContext.getConf());
setSecuredConfiguration(serverConf);
serverConf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, RPC_SERVER_IMPL);
}
@Test
public void testNoPlaintext() throws Exception {
setRpcProtection("authentication", "authentication");
setTLSEncryption(true, false, true);
callRpcService(User.create(ugi));
}
@Test
public void testRejectPlaintext() {
setRpcProtection("authentication", "authentication");
setTLSEncryption(true, false, false);
Assert.assertThrows(ConnectionClosedException.class, () -> callRpcService(User.create(ugi)));
}
@Test
public void testAcceptPlaintext() throws Exception {
setRpcProtection("authentication", "authentication");
setTLSEncryption(true, true, false);
callRpcService(User.create(ugi));
}
void setTLSEncryption(Boolean server, Boolean acceptPlaintext, Boolean client) {
serverConf.set(HBASE_SERVER_NETTY_TLS_ENABLED, server.toString());
serverConf.set(HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, acceptPlaintext.toString());
clientConf.set(HBASE_CLIENT_NETTY_TLS_ENABLED, client.toString());
}
void setRpcProtection(String clientProtection, String serverProtection) {
clientConf.set("hbase.rpc.protection", clientProtection);
serverConf.set("hbase.rpc.protection", serverProtection);
}
private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal)
throws Exception {
Configuration cnf = new Configuration();
cnf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
UserGroupInformation.setConfiguration(cnf);
UserGroupInformation.loginUserFromKeytab(krbPrincipal, krbKeytab);
return UserGroupInformation.getLoginUser();
}
/**
* Sets up a RPC Server and a Client. Does a RPC checks the result. If an exception is thrown from
* the stub, this function will throw root cause of that exception.
*/
private void callRpcService(User clientUser) throws Exception {
SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
Mockito.when(securityInfoMock.getServerPrincipal())
.thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
InetSocketAddress isa = new InetSocketAddress(HOST, 0);
RpcServerInterface rpcServer = RpcServerFactory.createRpcServer(null, "AbstractTestSecureIPC",
Lists
.newArrayList(new RpcServer.BlockingServiceAndInterface((BlockingService) SERVICE, null)),
isa, serverConf, new FifoRpcScheduler(serverConf, 1));
rpcServer.start();
try (RpcClient rpcClient =
RpcClientFactory.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString())) {
TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
newBlockingStub(rpcClient, rpcServer.getListenerAddress(), clientUser);
TestSecureIPC.TestThread th = new TestSecureIPC.TestThread(stub);
AtomicReference<Throwable> exception = new AtomicReference<>();
Collections.synchronizedList(new ArrayList<Throwable>());
Thread.UncaughtExceptionHandler exceptionHandler = new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread th, Throwable ex) {
exception.set(ex);
}
};
th.setUncaughtExceptionHandler(exceptionHandler);
th.start();
th.join();
if (exception.get() != null) {
// throw root cause.
while (exception.get().getCause() != null) {
exception.set(exception.get().getCause());
}
throw (Exception) exception.get();
}
} finally {
rpcServer.stop();
}
}
}