HBASE-27279 Make SslHandler work with SaslWrapHandler/SaslUnwrapHandler (#4705)
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org> Reviewed-by: Andor Molnár <andor@cloudera.com> (cherry picked from commit 2b9d36869ffe765256d4a7b6a74b95a8200678df) Conflicts: hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestNettyTlsIPCRejectPlainText.java hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestTlsWithKerberos.java
This commit is contained in:
parent
3acf920e1f
commit
c2dff398d3
@ -188,4 +188,13 @@ public final class HBaseKerberosUtils {
|
|||||||
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTabFileLocation);
|
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTabFileLocation);
|
||||||
return ugi;
|
return ugi;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal)
|
||||||
|
throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
|
||||||
|
UserGroupInformation.setConfiguration(conf);
|
||||||
|
UserGroupInformation.loginUserFromKeytab(krbPrincipal, krbKeytab);
|
||||||
|
return UserGroupInformation.getLoginUser();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -222,7 +222,8 @@ class NettyRpcConnection extends RpcConnection {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
ch.pipeline().addBefore(BufferCallBeforeInitHandler.NAME, null, new SaslChallengeDecoder())
|
ch.pipeline().addBefore(BufferCallBeforeInitHandler.NAME, null, new SaslChallengeDecoder())
|
||||||
.addBefore(BufferCallBeforeInitHandler.NAME, null, saslHandler);
|
.addBefore(BufferCallBeforeInitHandler.NAME, NettyHBaseSaslRpcClientHandler.HANDLER_NAME,
|
||||||
|
saslHandler);
|
||||||
NettyFutureUtils.addListener(saslPromise, new FutureListener<Boolean>() {
|
NettyFutureUtils.addListener(saslPromise, new FutureListener<Boolean>() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -45,16 +45,16 @@ public class NettyHBaseSaslRpcClient extends AbstractHBaseSaslRpcClient {
|
|||||||
super(conf, provider, token, serverAddr, securityInfo, fallbackAllowed, rpcProtection);
|
super(conf, provider, token, serverAddr, securityInfo, fallbackAllowed, rpcProtection);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setupSaslHandler(ChannelPipeline p) {
|
public void setupSaslHandler(ChannelPipeline p, String addAfter) {
|
||||||
String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
|
String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
|
||||||
LOG.trace("SASL client context established. Negotiated QoP {}", qop);
|
LOG.trace("SASL client context established. Negotiated QoP {}", qop);
|
||||||
if (qop == null || "auth".equalsIgnoreCase(qop)) {
|
if (qop == null || "auth".equalsIgnoreCase(qop)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// add wrap and unwrap handlers to pipeline.
|
// add wrap and unwrap handlers to pipeline.
|
||||||
p.addFirst(new SaslWrapHandler(saslClient::wrap),
|
p.addAfter(addAfter, null, new SaslUnwrapHandler(saslClient::unwrap))
|
||||||
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4),
|
.addAfter(addAfter, null, new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
|
||||||
new SaslUnwrapHandler(saslClient::unwrap));
|
.addAfter(addAfter, null, new SaslWrapHandler(saslClient::wrap));
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getSaslQOP() {
|
public String getSaslQOP() {
|
||||||
|
@ -47,6 +47,8 @@ public class NettyHBaseSaslRpcClientHandler extends SimpleChannelInboundHandler<
|
|||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(NettyHBaseSaslRpcClientHandler.class);
|
private static final Logger LOG = LoggerFactory.getLogger(NettyHBaseSaslRpcClientHandler.class);
|
||||||
|
|
||||||
|
public static final String HANDLER_NAME = "SaslRpcClientHandler";
|
||||||
|
|
||||||
private final Promise<Boolean> saslPromise;
|
private final Promise<Boolean> saslPromise;
|
||||||
|
|
||||||
private final UserGroupInformation ugi;
|
private final UserGroupInformation ugi;
|
||||||
@ -86,7 +88,7 @@ public class NettyHBaseSaslRpcClientHandler extends SimpleChannelInboundHandler<
|
|||||||
}
|
}
|
||||||
|
|
||||||
ChannelPipeline p = ctx.pipeline();
|
ChannelPipeline p = ctx.pipeline();
|
||||||
saslRpcClient.setupSaslHandler(p);
|
saslRpcClient.setupSaslHandler(p, HANDLER_NAME);
|
||||||
p.remove(SaslChallengeDecoder.class);
|
p.remove(SaslChallengeDecoder.class);
|
||||||
p.remove(this);
|
p.remove(this);
|
||||||
|
|
||||||
|
@ -89,11 +89,11 @@ class NettyHBaseSaslRpcServerHandler extends SimpleChannelInboundHandler<ByteBuf
|
|||||||
boolean useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
|
boolean useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
|
||||||
ChannelPipeline p = ctx.pipeline();
|
ChannelPipeline p = ctx.pipeline();
|
||||||
if (useWrap) {
|
if (useWrap) {
|
||||||
p.addFirst(new SaslWrapHandler(saslServer::wrap));
|
p.addBefore(DECODER_NAME, null, new SaslWrapHandler(saslServer::wrap)).addLast(
|
||||||
p.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4),
|
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4),
|
||||||
new SaslUnwrapHandler(saslServer::unwrap));
|
new SaslUnwrapHandler(saslServer::unwrap));
|
||||||
}
|
}
|
||||||
conn.setupDecoder();
|
conn.setupHandler();
|
||||||
p.remove(this);
|
p.remove(this);
|
||||||
p.remove(DECODER_NAME);
|
p.remove(DECODER_NAME);
|
||||||
}
|
}
|
||||||
|
@ -124,9 +124,8 @@ public class NettyRpcServer extends RpcServer {
|
|||||||
if (conf.getBoolean(HBASE_SERVER_NETTY_TLS_ENABLED, false)) {
|
if (conf.getBoolean(HBASE_SERVER_NETTY_TLS_ENABLED, false)) {
|
||||||
initSSL(pipeline, conf.getBoolean(HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, true));
|
initSSL(pipeline, conf.getBoolean(HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, true));
|
||||||
}
|
}
|
||||||
pipeline.addLast(NettyRpcServerPreambleHandler.DECODER_NAME, preambleDecoder);
|
pipeline.addLast(NettyRpcServerPreambleHandler.DECODER_NAME, preambleDecoder)
|
||||||
pipeline.addLast(createNettyRpcServerPreambleHandler(),
|
.addLast(createNettyRpcServerPreambleHandler());
|
||||||
new NettyRpcServerResponseEncoder(metrics));
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
try {
|
try {
|
||||||
|
@ -61,7 +61,7 @@ class NettyRpcServerPreambleHandler extends SimpleChannelInboundHandler<ByteBuf>
|
|||||||
p.addLast(NettyHBaseSaslRpcServerHandler.DECODER_NAME, decoder);
|
p.addLast(NettyHBaseSaslRpcServerHandler.DECODER_NAME, decoder);
|
||||||
p.addLast(new NettyHBaseSaslRpcServerHandler(rpcServer, conn));
|
p.addLast(new NettyHBaseSaslRpcServerHandler(rpcServer, conn));
|
||||||
} else {
|
} else {
|
||||||
conn.setupDecoder();
|
conn.setupHandler();
|
||||||
}
|
}
|
||||||
// add first and then remove, so the single decode decoder will pass the remaining bytes to the
|
// add first and then remove, so the single decode decoder will pass the remaining bytes to the
|
||||||
// handler above.
|
// handler above.
|
||||||
|
@ -33,7 +33,6 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescrip
|
|||||||
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
||||||
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
|
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
|
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
|
||||||
|
|
||||||
@ -70,10 +69,11 @@ class NettyServerRpcConnection extends ServerRpcConnection {
|
|||||||
this.remotePort = inetSocketAddress.getPort();
|
this.remotePort = inetSocketAddress.getPort();
|
||||||
}
|
}
|
||||||
|
|
||||||
void setupDecoder() {
|
void setupHandler() {
|
||||||
ChannelPipeline p = channel.pipeline();
|
channel.pipeline()
|
||||||
p.addLast("frameDecoder", new NettyRpcFrameDecoder(rpcServer.maxRequestSize, this));
|
.addLast("frameDecoder", new NettyRpcFrameDecoder(rpcServer.maxRequestSize, this))
|
||||||
p.addLast("decoder", new NettyRpcServerRequestDecoder(rpcServer.metrics, this));
|
.addLast("decoder", new NettyRpcServerRequestDecoder(rpcServer.metrics, this))
|
||||||
|
.addLast("encoder", new NettyRpcServerResponseEncoder(rpcServer.metrics));
|
||||||
}
|
}
|
||||||
|
|
||||||
void process(ByteBuf buf) throws IOException, InterruptedException {
|
void process(ByteBuf buf) throws IOException, InterruptedException {
|
||||||
|
@ -0,0 +1,390 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.security;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
|
||||||
|
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
|
||||||
|
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting;
|
||||||
|
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
|
||||||
|
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.loginKerberosPrincipal;
|
||||||
|
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.setSecuredConfiguration;
|
||||||
|
import static org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders.SELECTOR_KEY;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotSame;
|
||||||
|
import static org.junit.Assert.assertSame;
|
||||||
|
import static org.junit.Assert.assertThrows;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.Field;
|
||||||
|
import java.net.InetAddress;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Map;
|
||||||
|
import javax.security.sasl.SaslClient;
|
||||||
|
import javax.security.sasl.SaslException;
|
||||||
|
import org.apache.commons.lang3.RandomStringUtils;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcClient;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
|
||||||
|
import org.apache.hadoop.hbase.security.provider.AuthenticationProviderSelector;
|
||||||
|
import org.apache.hadoop.hbase.security.provider.BuiltInProviderSelector;
|
||||||
|
import org.apache.hadoop.hbase.security.provider.SaslAuthMethod;
|
||||||
|
import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
|
||||||
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
|
import org.apache.hadoop.minikdc.MiniKdc;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
|
||||||
|
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
|
||||||
|
|
||||||
|
public class AbstractTestSecureIPC {
|
||||||
|
|
||||||
|
protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
protected static final File KEYTAB_FILE =
|
||||||
|
new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
|
||||||
|
|
||||||
|
protected static MiniKdc KDC;
|
||||||
|
protected static String HOST = "localhost";
|
||||||
|
protected static String PRINCIPAL;
|
||||||
|
|
||||||
|
protected String krbKeytab;
|
||||||
|
protected String krbPrincipal;
|
||||||
|
protected UserGroupInformation ugi;
|
||||||
|
protected Configuration clientConf;
|
||||||
|
protected Configuration serverConf;
|
||||||
|
|
||||||
|
protected static void initKDCAndConf() throws Exception {
|
||||||
|
KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE);
|
||||||
|
PRINCIPAL = "hbase/" + HOST;
|
||||||
|
KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL);
|
||||||
|
HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
|
||||||
|
// set a smaller timeout and retry to speed up tests
|
||||||
|
TEST_UTIL.getConfiguration().setInt(RpcClient.SOCKET_TIMEOUT_READ, 2000);
|
||||||
|
TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxretries", 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static void stopKDC() throws InterruptedException {
|
||||||
|
if (KDC != null) {
|
||||||
|
KDC.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected final void setUpPrincipalAndConf() throws Exception {
|
||||||
|
krbKeytab = getKeytabFileForTesting();
|
||||||
|
krbPrincipal = getPrincipalForTesting();
|
||||||
|
ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
|
||||||
|
clientConf = new Configuration(TEST_UTIL.getConfiguration());
|
||||||
|
setSecuredConfiguration(clientConf);
|
||||||
|
serverConf = new Configuration(TEST_UTIL.getConfiguration());
|
||||||
|
setSecuredConfiguration(serverConf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRpcCallWithEnabledKerberosSaslAuth() throws Exception {
|
||||||
|
UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
|
||||||
|
|
||||||
|
// check that the login user is okay:
|
||||||
|
assertSame(ugi2, ugi);
|
||||||
|
assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
|
||||||
|
assertEquals(krbPrincipal, ugi.getUserName());
|
||||||
|
|
||||||
|
callRpcService(User.create(ugi2));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRpcCallWithEnabledKerberosSaslAuthCanonicalHostname() throws Exception {
|
||||||
|
UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
|
||||||
|
|
||||||
|
// check that the login user is okay:
|
||||||
|
assertSame(ugi2, ugi);
|
||||||
|
assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
|
||||||
|
assertEquals(krbPrincipal, ugi.getUserName());
|
||||||
|
|
||||||
|
enableCanonicalHostnameTesting(clientConf, "localhost");
|
||||||
|
clientConf.setBoolean(
|
||||||
|
SecurityConstants.UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS, false);
|
||||||
|
clientConf.set(HBaseKerberosUtils.KRB_PRINCIPAL, "hbase/_HOST@" + KDC.getRealm());
|
||||||
|
|
||||||
|
callRpcService(User.create(ugi2));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRpcCallWithEnabledKerberosSaslAuthNoCanonicalHostname() throws Exception {
|
||||||
|
UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
|
||||||
|
|
||||||
|
// check that the login user is okay:
|
||||||
|
assertSame(ugi2, ugi);
|
||||||
|
assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
|
||||||
|
assertEquals(krbPrincipal, ugi.getUserName());
|
||||||
|
|
||||||
|
enableCanonicalHostnameTesting(clientConf, "127.0.0.1");
|
||||||
|
clientConf
|
||||||
|
.setBoolean(SecurityConstants.UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS, true);
|
||||||
|
clientConf.set(HBaseKerberosUtils.KRB_PRINCIPAL, "hbase/_HOST@" + KDC.getRealm());
|
||||||
|
|
||||||
|
callRpcService(User.create(ugi2));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void enableCanonicalHostnameTesting(Configuration conf, String canonicalHostname) {
|
||||||
|
conf.setClass(SELECTOR_KEY, CanonicalHostnameTestingAuthenticationProviderSelector.class,
|
||||||
|
AuthenticationProviderSelector.class);
|
||||||
|
conf.set(CanonicalHostnameTestingAuthenticationProviderSelector.CANONICAL_HOST_NAME_KEY,
|
||||||
|
canonicalHostname);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class CanonicalHostnameTestingAuthenticationProviderSelector
|
||||||
|
extends BuiltInProviderSelector {
|
||||||
|
private static final String CANONICAL_HOST_NAME_KEY =
|
||||||
|
"CanonicalHostnameTestingAuthenticationProviderSelector.canonicalHostName";
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>>
|
||||||
|
selectProvider(String clusterId, User user) {
|
||||||
|
final Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> pair =
|
||||||
|
super.selectProvider(clusterId, user);
|
||||||
|
pair.setFirst(createCanonicalHostNameTestingProvider(pair.getFirst()));
|
||||||
|
return pair;
|
||||||
|
}
|
||||||
|
|
||||||
|
SaslClientAuthenticationProvider
|
||||||
|
createCanonicalHostNameTestingProvider(SaslClientAuthenticationProvider delegate) {
|
||||||
|
return new SaslClientAuthenticationProvider() {
|
||||||
|
@Override
|
||||||
|
public SaslClient createClient(Configuration conf, InetAddress serverAddr,
|
||||||
|
SecurityInfo securityInfo, Token<? extends TokenIdentifier> token,
|
||||||
|
boolean fallbackAllowed, Map<String, String> saslProps) throws IOException {
|
||||||
|
final String s = conf.get(CANONICAL_HOST_NAME_KEY);
|
||||||
|
if (s != null) {
|
||||||
|
try {
|
||||||
|
final Field canonicalHostName =
|
||||||
|
InetAddress.class.getDeclaredField("canonicalHostName");
|
||||||
|
canonicalHostName.setAccessible(true);
|
||||||
|
canonicalHostName.set(serverAddr, s);
|
||||||
|
} catch (NoSuchFieldException | IllegalAccessException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return delegate.createClient(conf, serverAddr, securityInfo, token, fallbackAllowed,
|
||||||
|
saslProps);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public UserInformation getUserInfo(User user) {
|
||||||
|
return delegate.getUserInfo(user);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public UserGroupInformation getRealUser(User ugi) {
|
||||||
|
return delegate.getRealUser(ugi);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean canRetry() {
|
||||||
|
return delegate.canRetry();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void relogin() throws IOException {
|
||||||
|
delegate.relogin();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SaslAuthMethod getSaslAuthMethod() {
|
||||||
|
return delegate.getSaslAuthMethod();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getTokenKind() {
|
||||||
|
return delegate.getTokenKind();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRpcFallbackToSimpleAuth() throws Exception {
|
||||||
|
String clientUsername = "testuser";
|
||||||
|
UserGroupInformation clientUgi =
|
||||||
|
UserGroupInformation.createUserForTesting(clientUsername, new String[] { clientUsername });
|
||||||
|
|
||||||
|
// check that the client user is insecure
|
||||||
|
assertNotSame(ugi, clientUgi);
|
||||||
|
assertEquals(AuthenticationMethod.SIMPLE, clientUgi.getAuthenticationMethod());
|
||||||
|
assertEquals(clientUsername, clientUgi.getUserName());
|
||||||
|
|
||||||
|
clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
|
||||||
|
serverConf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, true);
|
||||||
|
callRpcService(User.create(clientUgi));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setRpcProtection(String clientProtection, String serverProtection) {
|
||||||
|
clientConf.set("hbase.rpc.protection", clientProtection);
|
||||||
|
serverConf.set("hbase.rpc.protection", serverProtection);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test various combinations of Server and Client qops.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testSaslWithCommonQop() throws Exception {
|
||||||
|
setRpcProtection("privacy,authentication", "authentication");
|
||||||
|
callRpcService(User.create(ugi));
|
||||||
|
|
||||||
|
setRpcProtection("authentication", "privacy,authentication");
|
||||||
|
callRpcService(User.create(ugi));
|
||||||
|
|
||||||
|
setRpcProtection("integrity,authentication", "privacy,authentication");
|
||||||
|
callRpcService(User.create(ugi));
|
||||||
|
|
||||||
|
setRpcProtection("integrity,authentication", "integrity,authentication");
|
||||||
|
callRpcService(User.create(ugi));
|
||||||
|
|
||||||
|
setRpcProtection("privacy,authentication", "privacy,authentication");
|
||||||
|
callRpcService(User.create(ugi));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSaslNoCommonQop() throws Exception {
|
||||||
|
setRpcProtection("integrity", "privacy");
|
||||||
|
SaslException se = assertThrows(SaslException.class, () -> callRpcService(User.create(ugi)));
|
||||||
|
assertEquals("No common protection layer between client and server", se.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test sasl encryption with Crypto AES.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testSaslWithCryptoAES() throws Exception {
|
||||||
|
setRpcProtection("privacy", "privacy");
|
||||||
|
setCryptoAES("true", "true");
|
||||||
|
callRpcService(User.create(ugi));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test various combinations of Server and Client configuration for Crypto AES. n
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testDifferentConfWithCryptoAES() throws Exception {
|
||||||
|
setRpcProtection("privacy", "privacy");
|
||||||
|
|
||||||
|
setCryptoAES("false", "true");
|
||||||
|
callRpcService(User.create(ugi));
|
||||||
|
|
||||||
|
setCryptoAES("true", "false");
|
||||||
|
try {
|
||||||
|
callRpcService(User.create(ugi));
|
||||||
|
fail("The exception should be thrown out for the rpc timeout.");
|
||||||
|
} catch (Exception e) {
|
||||||
|
// ignore the expected exception
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setCryptoAES(String clientCryptoAES, String serverCryptoAES) {
|
||||||
|
clientConf.set("hbase.rpc.crypto.encryption.aes.enabled", clientCryptoAES);
|
||||||
|
serverConf.set("hbase.rpc.crypto.encryption.aes.enabled", serverCryptoAES);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets up a RPC Server and a Client. Does a RPC checks the result. If an exception is thrown from
|
||||||
|
* the stub, this function will throw root cause of that exception.
|
||||||
|
*/
|
||||||
|
private void callRpcService(User clientUser) throws Exception {
|
||||||
|
SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
|
||||||
|
Mockito.when(securityInfoMock.getServerPrincipal())
|
||||||
|
.thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
|
||||||
|
SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
|
||||||
|
|
||||||
|
InetSocketAddress isa = new InetSocketAddress(HOST, 0);
|
||||||
|
|
||||||
|
RpcServerInterface rpcServer = RpcServerFactory.createRpcServer(null, "AbstractTestSecureIPC",
|
||||||
|
Lists
|
||||||
|
.newArrayList(new RpcServer.BlockingServiceAndInterface((BlockingService) SERVICE, null)),
|
||||||
|
isa, serverConf, new FifoRpcScheduler(serverConf, 1));
|
||||||
|
rpcServer.start();
|
||||||
|
try (RpcClient rpcClient =
|
||||||
|
RpcClientFactory.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString())) {
|
||||||
|
BlockingInterface stub =
|
||||||
|
newBlockingStub(rpcClient, rpcServer.getListenerAddress(), clientUser);
|
||||||
|
TestThread th1 = new TestThread(stub);
|
||||||
|
final Throwable exception[] = new Throwable[1];
|
||||||
|
Collections.synchronizedList(new ArrayList<Throwable>());
|
||||||
|
Thread.UncaughtExceptionHandler exceptionHandler = new Thread.UncaughtExceptionHandler() {
|
||||||
|
@Override
|
||||||
|
public void uncaughtException(Thread th, Throwable ex) {
|
||||||
|
exception[0] = ex;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
th1.setUncaughtExceptionHandler(exceptionHandler);
|
||||||
|
th1.start();
|
||||||
|
th1.join();
|
||||||
|
if (exception[0] != null) {
|
||||||
|
// throw root cause.
|
||||||
|
while (exception[0].getCause() != null) {
|
||||||
|
exception[0] = exception[0].getCause();
|
||||||
|
}
|
||||||
|
throw (Exception) exception[0];
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
rpcServer.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class TestThread extends Thread {
|
||||||
|
private final BlockingInterface stub;
|
||||||
|
|
||||||
|
public TestThread(BlockingInterface stub) {
|
||||||
|
this.stub = stub;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
int[] messageSize = new int[] { 100, 1000, 10000 };
|
||||||
|
for (int i = 0; i < messageSize.length; i++) {
|
||||||
|
String input = RandomStringUtils.random(messageSize[i]);
|
||||||
|
String result =
|
||||||
|
stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage(input).build())
|
||||||
|
.getMessage();
|
||||||
|
assertEquals(input, result);
|
||||||
|
}
|
||||||
|
} catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,155 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.security;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
|
||||||
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
import static org.hamcrest.Matchers.instanceOf;
|
||||||
|
import static org.junit.Assert.assertThrows;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.security.Security;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
|
||||||
|
import org.apache.hadoop.hbase.io.crypto.tls.KeyStoreFileType;
|
||||||
|
import org.apache.hadoop.hbase.io.crypto.tls.X509KeyType;
|
||||||
|
import org.apache.hadoop.hbase.io.crypto.tls.X509TestContext;
|
||||||
|
import org.apache.hadoop.hbase.io.crypto.tls.X509TestContextProvider;
|
||||||
|
import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
|
||||||
|
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
|
||||||
|
import org.apache.hadoop.hbase.ipc.NettyRpcClient;
|
||||||
|
import org.apache.hadoop.hbase.ipc.NettyRpcServer;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcClient;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
|
||||||
|
import org.bouncycastle.jce.provider.BouncyCastleProvider;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
|
||||||
|
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
|
||||||
|
|
||||||
|
public abstract class AbstractTestTlsRejectPlainText {
|
||||||
|
|
||||||
|
protected static HBaseCommonTestingUtility UTIL;
|
||||||
|
|
||||||
|
protected static File DIR;
|
||||||
|
|
||||||
|
protected static X509TestContextProvider PROVIDER;
|
||||||
|
|
||||||
|
@Parameterized.Parameter(0)
|
||||||
|
public X509KeyType caKeyType;
|
||||||
|
|
||||||
|
@Parameterized.Parameter(1)
|
||||||
|
public X509KeyType certKeyType;
|
||||||
|
|
||||||
|
@Parameterized.Parameter(2)
|
||||||
|
public String keyPassword;
|
||||||
|
|
||||||
|
private X509TestContext x509TestContext;
|
||||||
|
|
||||||
|
protected RpcServer rpcServer;
|
||||||
|
|
||||||
|
protected RpcClient rpcClient;
|
||||||
|
|
||||||
|
@Parameterized.Parameters(name = "{index}: caKeyType={0}, certKeyType={1}, keyPassword={2}")
|
||||||
|
public static List<Object[]> data() {
|
||||||
|
List<Object[]> params = new ArrayList<>();
|
||||||
|
for (X509KeyType caKeyType : X509KeyType.values()) {
|
||||||
|
for (X509KeyType certKeyType : X509KeyType.values()) {
|
||||||
|
for (String keyPassword : new String[] { "", "pa$$w0rd" }) {
|
||||||
|
params.add(new Object[] { caKeyType, certKeyType, keyPassword });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return params;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static void initialize() throws IOException {
|
||||||
|
Security.addProvider(new BouncyCastleProvider());
|
||||||
|
DIR =
|
||||||
|
new File(UTIL.getDataTestDir(AbstractTestTlsRejectPlainText.class.getSimpleName()).toString())
|
||||||
|
.getCanonicalFile();
|
||||||
|
FileUtils.forceMkdir(DIR);
|
||||||
|
Configuration conf = UTIL.getConfiguration();
|
||||||
|
conf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, NettyRpcClient.class,
|
||||||
|
RpcClient.class);
|
||||||
|
conf.setClass(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, NettyRpcServer.class,
|
||||||
|
RpcServer.class);
|
||||||
|
conf.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_ENABLED, true);
|
||||||
|
conf.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, false);
|
||||||
|
conf.setBoolean(X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED, false);
|
||||||
|
PROVIDER = new X509TestContextProvider(conf, DIR);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static void cleanUp() {
|
||||||
|
Security.removeProvider(BouncyCastleProvider.PROVIDER_NAME);
|
||||||
|
UTIL.cleanupTestDir();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
x509TestContext = PROVIDER.get(caKeyType, certKeyType, keyPassword);
|
||||||
|
x509TestContext.setConfigurations(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
|
||||||
|
Configuration conf = UTIL.getConfiguration();
|
||||||
|
rpcServer = new NettyRpcServer(null, "testRpcServer",
|
||||||
|
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
|
||||||
|
new InetSocketAddress("localhost", 0), conf, new FifoRpcScheduler(conf, 1), true);
|
||||||
|
rpcServer.start();
|
||||||
|
rpcClient = new NettyRpcClient(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws IOException {
|
||||||
|
if (rpcServer != null) {
|
||||||
|
rpcServer.stop();
|
||||||
|
}
|
||||||
|
Closeables.close(rpcClient, true);
|
||||||
|
x509TestContext.clearConfigurations();
|
||||||
|
x509TestContext.getConf().unset(X509Util.TLS_CONFIG_OCSP);
|
||||||
|
x509TestContext.getConf().unset(X509Util.TLS_CONFIG_CLR);
|
||||||
|
x509TestContext.getConf().unset(X509Util.TLS_CONFIG_PROTOCOL);
|
||||||
|
System.clearProperty("com.sun.net.ssl.checkRevocation");
|
||||||
|
System.clearProperty("com.sun.security.enableCRLDP");
|
||||||
|
Security.setProperty("ocsp.enable", Boolean.FALSE.toString());
|
||||||
|
Security.setProperty("com.sun.security.enableCRLDP", Boolean.FALSE.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract BlockingInterface createStub() throws Exception;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReject() throws Exception {
|
||||||
|
BlockingInterface stub = createStub();
|
||||||
|
ServiceException se = assertThrows(ServiceException.class,
|
||||||
|
() -> stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello world").build()));
|
||||||
|
assertThat(se.getCause(), instanceOf(ConnectionClosedException.class));
|
||||||
|
}
|
||||||
|
}
|
@ -17,147 +17,42 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.security;
|
package org.apache.hadoop.hbase.security;
|
||||||
|
|
||||||
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
|
|
||||||
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
|
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
|
||||||
import static org.hamcrest.Matchers.instanceOf;
|
|
||||||
import static org.junit.Assert.assertThrows;
|
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.security.Security;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import org.apache.commons.io.FileUtils;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
|
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
|
||||||
import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
|
import org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl;
|
||||||
import org.apache.hadoop.hbase.io.crypto.tls.KeyStoreFileType;
|
|
||||||
import org.apache.hadoop.hbase.io.crypto.tls.X509KeyType;
|
|
||||||
import org.apache.hadoop.hbase.io.crypto.tls.X509TestContext;
|
|
||||||
import org.apache.hadoop.hbase.io.crypto.tls.X509TestContextProvider;
|
|
||||||
import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
|
|
||||||
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
|
|
||||||
import org.apache.hadoop.hbase.ipc.NettyRpcClient;
|
|
||||||
import org.apache.hadoop.hbase.ipc.NettyRpcServer;
|
|
||||||
import org.apache.hadoop.hbase.ipc.RpcClient;
|
|
||||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.RPCTests;
|
import org.apache.hadoop.hbase.testclassification.RPCTests;
|
||||||
import org.bouncycastle.jce.provider.BouncyCastleProvider;
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.Parameterized;
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
|
|
||||||
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
|
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
|
||||||
|
|
||||||
@RunWith(Parameterized.class)
|
@RunWith(Parameterized.class)
|
||||||
@Category({ RPCTests.class, MediumTests.class })
|
@Category({ RPCTests.class, MediumTests.class })
|
||||||
public class TestNettyTlsIPCRejectPlainText {
|
public class TestNettyTlsIPCRejectPlainText extends AbstractTestTlsRejectPlainText {
|
||||||
|
|
||||||
@ClassRule
|
@ClassRule
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
HBaseClassTestRule.forClass(TestNettyTlsIPCRejectPlainText.class);
|
HBaseClassTestRule.forClass(TestNettyTlsIPCRejectPlainText.class);
|
||||||
|
|
||||||
private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
|
|
||||||
|
|
||||||
private static File DIR;
|
|
||||||
|
|
||||||
private static X509TestContextProvider PROVIDER;
|
|
||||||
|
|
||||||
@Parameterized.Parameter(0)
|
|
||||||
public X509KeyType caKeyType;
|
|
||||||
|
|
||||||
@Parameterized.Parameter(1)
|
|
||||||
public X509KeyType certKeyType;
|
|
||||||
|
|
||||||
@Parameterized.Parameter(2)
|
|
||||||
public String keyPassword;
|
|
||||||
|
|
||||||
private X509TestContext x509TestContext;
|
|
||||||
|
|
||||||
private RpcServer rpcServer;
|
|
||||||
|
|
||||||
private RpcClient rpcClient;
|
|
||||||
|
|
||||||
@Parameterized.Parameters(name = "{index}: caKeyType={0}, certKeyType={1}, keyPassword={2}")
|
|
||||||
public static List<Object[]> data() {
|
|
||||||
List<Object[]> params = new ArrayList<>();
|
|
||||||
for (X509KeyType caKeyType : X509KeyType.values()) {
|
|
||||||
for (X509KeyType certKeyType : X509KeyType.values()) {
|
|
||||||
for (String keyPassword : new String[] { "", "pa$$w0rd" }) {
|
|
||||||
params.add(new Object[] { caKeyType, certKeyType, keyPassword });
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return params;
|
|
||||||
}
|
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUpBeforeClass() throws IOException {
|
public static void setUpBeforeClass() throws IOException {
|
||||||
Security.addProvider(new BouncyCastleProvider());
|
UTIL = new HBaseCommonTestingUtility();
|
||||||
DIR = new File(UTIL.getDataTestDir(TestNettyTlsIPC.class.getSimpleName()).toString())
|
initialize();
|
||||||
.getCanonicalFile();
|
|
||||||
FileUtils.forceMkdir(DIR);
|
|
||||||
Configuration conf = UTIL.getConfiguration();
|
|
||||||
conf.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_ENABLED, true);
|
|
||||||
conf.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, false);
|
|
||||||
conf.setBoolean(X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED, false);
|
|
||||||
PROVIDER = new X509TestContextProvider(conf, DIR);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void tearDownAfterClass() {
|
public static void tearDownAfterClass() {
|
||||||
Security.removeProvider(BouncyCastleProvider.PROVIDER_NAME);
|
cleanUp();
|
||||||
UTIL.cleanupTestDir();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@Override
|
||||||
public void setUp() throws IOException {
|
protected BlockingInterface createStub() throws Exception {
|
||||||
x509TestContext = PROVIDER.get(caKeyType, certKeyType, keyPassword);
|
return TestProtobufRpcServiceImpl.newBlockingStub(rpcClient, rpcServer.getListenerAddress());
|
||||||
x509TestContext.setConfigurations(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
|
|
||||||
Configuration conf = UTIL.getConfiguration();
|
|
||||||
rpcServer = new NettyRpcServer(null, "testRpcServer",
|
|
||||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
|
|
||||||
new InetSocketAddress("localhost", 0), conf, new FifoRpcScheduler(conf, 1), true);
|
|
||||||
rpcServer.start();
|
|
||||||
rpcClient = new NettyRpcClient(conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
@After
|
|
||||||
public void tearDown() throws IOException {
|
|
||||||
if (rpcServer != null) {
|
|
||||||
rpcServer.stop();
|
|
||||||
}
|
|
||||||
Closeables.close(rpcClient, true);
|
|
||||||
x509TestContext.clearConfigurations();
|
|
||||||
x509TestContext.getConf().unset(X509Util.TLS_CONFIG_OCSP);
|
|
||||||
x509TestContext.getConf().unset(X509Util.TLS_CONFIG_CLR);
|
|
||||||
x509TestContext.getConf().unset(X509Util.TLS_CONFIG_PROTOCOL);
|
|
||||||
System.clearProperty("com.sun.net.ssl.checkRevocation");
|
|
||||||
System.clearProperty("com.sun.security.enableCRLDP");
|
|
||||||
Security.setProperty("ocsp.enable", Boolean.FALSE.toString());
|
|
||||||
Security.setProperty("com.sun.security.enableCRLDP", Boolean.FALSE.toString());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testReject() throws IOException, ServiceException {
|
|
||||||
BlockingInterface stub = newBlockingStub(rpcClient, rpcServer.getListenerAddress());
|
|
||||||
ServiceException se = assertThrows(ServiceException.class,
|
|
||||||
() -> stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello world").build()));
|
|
||||||
assertThat(se.getCause(), instanceOf(ConnectionClosedException.class));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,143 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.security;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.security.Security;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.io.crypto.tls.KeyStoreFileType;
|
||||||
|
import org.apache.hadoop.hbase.io.crypto.tls.X509KeyType;
|
||||||
|
import org.apache.hadoop.hbase.io.crypto.tls.X509TestContext;
|
||||||
|
import org.apache.hadoop.hbase.io.crypto.tls.X509TestContextProvider;
|
||||||
|
import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
|
||||||
|
import org.apache.hadoop.hbase.ipc.NettyRpcClient;
|
||||||
|
import org.apache.hadoop.hbase.ipc.NettyRpcServer;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcClient;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SecurityTests;
|
||||||
|
import org.bouncycastle.jce.provider.BouncyCastleProvider;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
|
@Category({ SecurityTests.class, LargeTests.class })
|
||||||
|
public class TestSaslTlsIPC extends AbstractTestSecureIPC {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestSaslTlsIPC.class);
|
||||||
|
|
||||||
|
private static X509TestContextProvider PROVIDER;
|
||||||
|
|
||||||
|
@Parameterized.Parameter(0)
|
||||||
|
public X509KeyType caKeyType;
|
||||||
|
|
||||||
|
@Parameterized.Parameter(1)
|
||||||
|
public X509KeyType certKeyType;
|
||||||
|
|
||||||
|
@Parameterized.Parameter(2)
|
||||||
|
public String keyPassword;
|
||||||
|
|
||||||
|
@Parameterized.Parameter(3)
|
||||||
|
public boolean acceptPlainText;
|
||||||
|
|
||||||
|
@Parameterized.Parameter(4)
|
||||||
|
public boolean clientTlsEnabled;
|
||||||
|
|
||||||
|
private X509TestContext x509TestContext;
|
||||||
|
|
||||||
|
@Parameterized.Parameters(
|
||||||
|
name = "{index}: caKeyType={0}, certKeyType={1}, keyPassword={2}, acceptPlainText={3},"
|
||||||
|
+ " clientTlsEnabled={4}")
|
||||||
|
public static List<Object[]> data() {
|
||||||
|
List<Object[]> params = new ArrayList<>();
|
||||||
|
for (X509KeyType caKeyType : X509KeyType.values()) {
|
||||||
|
for (X509KeyType certKeyType : X509KeyType.values()) {
|
||||||
|
for (String keyPassword : new String[] { "", "pa$$w0rd" }) {
|
||||||
|
// do not accept plain text
|
||||||
|
params.add(new Object[] { caKeyType, certKeyType, keyPassword, false, true });
|
||||||
|
// support plain text and client enables tls
|
||||||
|
params.add(new Object[] { caKeyType, certKeyType, keyPassword, true, true });
|
||||||
|
// support plain text and client disables tls
|
||||||
|
params.add(new Object[] { caKeyType, certKeyType, keyPassword, true, false });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return params;
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
Security.addProvider(new BouncyCastleProvider());
|
||||||
|
File dir = new File(TEST_UTIL.getDataTestDir(TestNettyTlsIPC.class.getSimpleName()).toString())
|
||||||
|
.getCanonicalFile();
|
||||||
|
FileUtils.forceMkdir(dir);
|
||||||
|
initKDCAndConf();
|
||||||
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
// server must enable tls
|
||||||
|
conf.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_ENABLED, true);
|
||||||
|
// only netty support tls
|
||||||
|
conf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, NettyRpcClient.class,
|
||||||
|
RpcClient.class);
|
||||||
|
conf.setClass(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, NettyRpcServer.class,
|
||||||
|
RpcServer.class);
|
||||||
|
PROVIDER = new X509TestContextProvider(conf, dir);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() throws InterruptedException {
|
||||||
|
stopKDC();
|
||||||
|
Security.removeProvider(BouncyCastleProvider.PROVIDER_NAME);
|
||||||
|
TEST_UTIL.cleanupTestDir();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
x509TestContext = PROVIDER.get(caKeyType, certKeyType, keyPassword);
|
||||||
|
x509TestContext.setConfigurations(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
|
||||||
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
conf.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, acceptPlainText);
|
||||||
|
conf.setBoolean(X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED, clientTlsEnabled);
|
||||||
|
setUpPrincipalAndConf();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() {
|
||||||
|
x509TestContext.clearConfigurations();
|
||||||
|
x509TestContext.getConf().unset(X509Util.TLS_CONFIG_OCSP);
|
||||||
|
x509TestContext.getConf().unset(X509Util.TLS_CONFIG_CLR);
|
||||||
|
x509TestContext.getConf().unset(X509Util.TLS_CONFIG_PROTOCOL);
|
||||||
|
System.clearProperty("com.sun.net.ssl.checkRevocation");
|
||||||
|
System.clearProperty("com.sun.security.enableCRLDP");
|
||||||
|
Security.setProperty("ocsp.enable", Boolean.FALSE.toString());
|
||||||
|
Security.setProperty("com.sun.security.enableCRLDP", Boolean.FALSE.toString());
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,87 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.security;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.loginKerberosPrincipal;
|
||||||
|
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.setSecuredConfiguration;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SecurityTests;
|
||||||
|
import org.apache.hadoop.minikdc.MiniKdc;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
|
||||||
|
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
|
@Category({ SecurityTests.class, MediumTests.class })
|
||||||
|
public class TestSaslTlsIPCRejectPlainText extends AbstractTestTlsRejectPlainText {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestSaslTlsIPCRejectPlainText.class);
|
||||||
|
|
||||||
|
private static File KEYTAB_FILE;
|
||||||
|
|
||||||
|
private static MiniKdc KDC;
|
||||||
|
private static String HOST = "localhost";
|
||||||
|
private static String PRINCIPAL;
|
||||||
|
private static UserGroupInformation UGI;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
HBaseTestingUtility util = new HBaseTestingUtility();
|
||||||
|
UTIL = util;
|
||||||
|
initialize();
|
||||||
|
KEYTAB_FILE = new File(util.getDataTestDir("keytab").toUri().getPath());
|
||||||
|
KDC = util.setupMiniKdc(KEYTAB_FILE);
|
||||||
|
PRINCIPAL = "hbase/" + HOST;
|
||||||
|
KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL);
|
||||||
|
HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
|
||||||
|
UGI = loginKerberosPrincipal(KEYTAB_FILE.getCanonicalPath(), PRINCIPAL);
|
||||||
|
setSecuredConfiguration(util.getConfiguration());
|
||||||
|
SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
|
||||||
|
Mockito.when(securityInfoMock.getServerPrincipal())
|
||||||
|
.thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
|
||||||
|
SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() {
|
||||||
|
if (KDC != null) {
|
||||||
|
KDC.stop();
|
||||||
|
}
|
||||||
|
cleanUp();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected BlockingInterface createStub() throws Exception {
|
||||||
|
return TestProtobufRpcServiceImpl.newBlockingStub(rpcClient, rpcServer.getListenerAddress(),
|
||||||
|
User.create(UGI));
|
||||||
|
}
|
||||||
|
}
|
@ -17,101 +17,37 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.security;
|
package org.apache.hadoop.hbase.security;
|
||||||
|
|
||||||
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
|
|
||||||
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
|
|
||||||
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting;
|
|
||||||
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
|
|
||||||
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.setSecuredConfiguration;
|
|
||||||
import static org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders.SELECTOR_KEY;
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertNotSame;
|
|
||||||
import static org.junit.Assert.assertSame;
|
|
||||||
import static org.junit.Assert.assertThrows;
|
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.lang.reflect.Field;
|
|
||||||
import java.net.InetAddress;
|
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
|
||||||
import javax.security.sasl.SaslClient;
|
|
||||||
import javax.security.sasl.SaslException;
|
|
||||||
import org.apache.commons.lang3.RandomStringUtils;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
|
import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
|
||||||
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
|
|
||||||
import org.apache.hadoop.hbase.ipc.NettyRpcClient;
|
import org.apache.hadoop.hbase.ipc.NettyRpcClient;
|
||||||
import org.apache.hadoop.hbase.ipc.NettyRpcServer;
|
import org.apache.hadoop.hbase.ipc.NettyRpcServer;
|
||||||
import org.apache.hadoop.hbase.ipc.RpcClient;
|
|
||||||
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
|
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
|
||||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
|
||||||
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
|
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
|
||||||
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
|
|
||||||
import org.apache.hadoop.hbase.ipc.SimpleRpcServer;
|
import org.apache.hadoop.hbase.ipc.SimpleRpcServer;
|
||||||
import org.apache.hadoop.hbase.security.provider.AuthenticationProviderSelector;
|
|
||||||
import org.apache.hadoop.hbase.security.provider.BuiltInProviderSelector;
|
|
||||||
import org.apache.hadoop.hbase.security.provider.SaslAuthMethod;
|
|
||||||
import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.SecurityTests;
|
import org.apache.hadoop.hbase.testclassification.SecurityTests;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
|
||||||
import org.apache.hadoop.minikdc.MiniKdc;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
|
||||||
import org.apache.hadoop.security.token.Token;
|
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.Parameterized;
|
import org.junit.runners.Parameterized;
|
||||||
import org.junit.runners.Parameterized.Parameter;
|
import org.junit.runners.Parameterized.Parameter;
|
||||||
import org.junit.runners.Parameterized.Parameters;
|
import org.junit.runners.Parameterized.Parameters;
|
||||||
import org.mockito.Mockito;
|
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
|
|
||||||
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
|
|
||||||
|
|
||||||
@RunWith(Parameterized.class)
|
@RunWith(Parameterized.class)
|
||||||
@Category({ SecurityTests.class, LargeTests.class })
|
@Category({ SecurityTests.class, LargeTests.class })
|
||||||
public class TestSecureIPC {
|
public class TestSecureIPC extends AbstractTestSecureIPC {
|
||||||
|
|
||||||
@ClassRule
|
@ClassRule
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
HBaseClassTestRule.forClass(TestSecureIPC.class);
|
HBaseClassTestRule.forClass(TestSecureIPC.class);
|
||||||
|
|
||||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
|
||||||
|
|
||||||
private static final File KEYTAB_FILE =
|
|
||||||
new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
|
|
||||||
|
|
||||||
private static MiniKdc KDC;
|
|
||||||
private static String HOST = "localhost";
|
|
||||||
private static String PRINCIPAL;
|
|
||||||
|
|
||||||
private String krbKeytab;
|
|
||||||
private String krbPrincipal;
|
|
||||||
private UserGroupInformation ugi;
|
|
||||||
private Configuration clientConf;
|
|
||||||
private Configuration serverConf;
|
|
||||||
|
|
||||||
@Parameters(name = "{index}: rpcClientImpl={0}, rpcServerImpl={1}")
|
@Parameters(name = "{index}: rpcClientImpl={0}, rpcServerImpl={1}")
|
||||||
public static Collection<Object[]> parameters() {
|
public static Collection<Object[]> parameters() {
|
||||||
List<Object[]> params = new ArrayList<>();
|
List<Object[]> params = new ArrayList<>();
|
||||||
@ -135,317 +71,19 @@ public class TestSecureIPC {
|
|||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUp() throws Exception {
|
public static void setUp() throws Exception {
|
||||||
KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE);
|
initKDCAndConf();
|
||||||
PRINCIPAL = "hbase/" + HOST;
|
|
||||||
KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL);
|
|
||||||
HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
|
|
||||||
// set a smaller timeout and retry to speed up tests
|
|
||||||
TEST_UTIL.getConfiguration().setInt(RpcClient.SOCKET_TIMEOUT_READ, 2000);
|
|
||||||
TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxretries", 1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void tearDown() throws IOException {
|
public static void tearDown() throws Exception {
|
||||||
if (KDC != null) {
|
stopKDC();
|
||||||
KDC.stop();
|
|
||||||
}
|
|
||||||
TEST_UTIL.cleanupTestDir();
|
TEST_UTIL.cleanupTestDir();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUpTest() throws Exception {
|
public void setUpTest() throws Exception {
|
||||||
krbKeytab = getKeytabFileForTesting();
|
setUpPrincipalAndConf();
|
||||||
krbPrincipal = getPrincipalForTesting();
|
|
||||||
ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
|
|
||||||
clientConf = new Configuration(TEST_UTIL.getConfiguration());
|
|
||||||
setSecuredConfiguration(clientConf);
|
|
||||||
clientConf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl);
|
clientConf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl);
|
||||||
serverConf = new Configuration(TEST_UTIL.getConfiguration());
|
|
||||||
setSecuredConfiguration(serverConf);
|
|
||||||
serverConf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl);
|
serverConf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRpcCallWithEnabledKerberosSaslAuth() throws Exception {
|
|
||||||
UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
|
|
||||||
|
|
||||||
// check that the login user is okay:
|
|
||||||
assertSame(ugi2, ugi);
|
|
||||||
assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
|
|
||||||
assertEquals(krbPrincipal, ugi.getUserName());
|
|
||||||
|
|
||||||
callRpcService(User.create(ugi2));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRpcCallWithEnabledKerberosSaslAuth_CanonicalHostname() throws Exception {
|
|
||||||
UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
|
|
||||||
|
|
||||||
// check that the login user is okay:
|
|
||||||
assertSame(ugi2, ugi);
|
|
||||||
assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
|
|
||||||
assertEquals(krbPrincipal, ugi.getUserName());
|
|
||||||
|
|
||||||
enableCanonicalHostnameTesting(clientConf, "localhost");
|
|
||||||
clientConf.setBoolean(
|
|
||||||
SecurityConstants.UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS, false);
|
|
||||||
clientConf.set(HBaseKerberosUtils.KRB_PRINCIPAL, "hbase/_HOST@" + KDC.getRealm());
|
|
||||||
|
|
||||||
callRpcService(User.create(ugi2));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRpcCallWithEnabledKerberosSaslAuth_NoCanonicalHostname() throws Exception {
|
|
||||||
UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
|
|
||||||
|
|
||||||
// check that the login user is okay:
|
|
||||||
assertSame(ugi2, ugi);
|
|
||||||
assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
|
|
||||||
assertEquals(krbPrincipal, ugi.getUserName());
|
|
||||||
|
|
||||||
enableCanonicalHostnameTesting(clientConf, "127.0.0.1");
|
|
||||||
clientConf
|
|
||||||
.setBoolean(SecurityConstants.UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS, true);
|
|
||||||
clientConf.set(HBaseKerberosUtils.KRB_PRINCIPAL, "hbase/_HOST@" + KDC.getRealm());
|
|
||||||
|
|
||||||
callRpcService(User.create(ugi2));
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void enableCanonicalHostnameTesting(Configuration conf, String canonicalHostname) {
|
|
||||||
conf.setClass(SELECTOR_KEY, CanonicalHostnameTestingAuthenticationProviderSelector.class,
|
|
||||||
AuthenticationProviderSelector.class);
|
|
||||||
conf.set(CanonicalHostnameTestingAuthenticationProviderSelector.CANONICAL_HOST_NAME_KEY,
|
|
||||||
canonicalHostname);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class CanonicalHostnameTestingAuthenticationProviderSelector
|
|
||||||
extends BuiltInProviderSelector {
|
|
||||||
private static final String CANONICAL_HOST_NAME_KEY =
|
|
||||||
"CanonicalHostnameTestingAuthenticationProviderSelector.canonicalHostName";
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>>
|
|
||||||
selectProvider(String clusterId, User user) {
|
|
||||||
final Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> pair =
|
|
||||||
super.selectProvider(clusterId, user);
|
|
||||||
pair.setFirst(createCanonicalHostNameTestingProvider(pair.getFirst()));
|
|
||||||
return pair;
|
|
||||||
}
|
|
||||||
|
|
||||||
SaslClientAuthenticationProvider
|
|
||||||
createCanonicalHostNameTestingProvider(SaslClientAuthenticationProvider delegate) {
|
|
||||||
return new SaslClientAuthenticationProvider() {
|
|
||||||
@Override
|
|
||||||
public SaslClient createClient(Configuration conf, InetAddress serverAddr,
|
|
||||||
SecurityInfo securityInfo, Token<? extends TokenIdentifier> token,
|
|
||||||
boolean fallbackAllowed, Map<String, String> saslProps) throws IOException {
|
|
||||||
final String s = conf.get(CANONICAL_HOST_NAME_KEY);
|
|
||||||
if (s != null) {
|
|
||||||
try {
|
|
||||||
final Field canonicalHostName =
|
|
||||||
InetAddress.class.getDeclaredField("canonicalHostName");
|
|
||||||
canonicalHostName.setAccessible(true);
|
|
||||||
canonicalHostName.set(serverAddr, s);
|
|
||||||
} catch (NoSuchFieldException | IllegalAccessException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return delegate.createClient(conf, serverAddr, securityInfo, token, fallbackAllowed,
|
|
||||||
saslProps);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public UserInformation getUserInfo(User user) {
|
|
||||||
return delegate.getUserInfo(user);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public UserGroupInformation getRealUser(User ugi) {
|
|
||||||
return delegate.getRealUser(ugi);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean canRetry() {
|
|
||||||
return delegate.canRetry();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void relogin() throws IOException {
|
|
||||||
delegate.relogin();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public SaslAuthMethod getSaslAuthMethod() {
|
|
||||||
return delegate.getSaslAuthMethod();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getTokenKind() {
|
|
||||||
return delegate.getTokenKind();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRpcFallbackToSimpleAuth() throws Exception {
|
|
||||||
String clientUsername = "testuser";
|
|
||||||
UserGroupInformation clientUgi =
|
|
||||||
UserGroupInformation.createUserForTesting(clientUsername, new String[] { clientUsername });
|
|
||||||
|
|
||||||
// check that the client user is insecure
|
|
||||||
assertNotSame(ugi, clientUgi);
|
|
||||||
assertEquals(AuthenticationMethod.SIMPLE, clientUgi.getAuthenticationMethod());
|
|
||||||
assertEquals(clientUsername, clientUgi.getUserName());
|
|
||||||
|
|
||||||
clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
|
|
||||||
serverConf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, true);
|
|
||||||
callRpcService(User.create(clientUgi));
|
|
||||||
}
|
|
||||||
|
|
||||||
private void setRpcProtection(String clientProtection, String serverProtection) {
|
|
||||||
clientConf.set("hbase.rpc.protection", clientProtection);
|
|
||||||
serverConf.set("hbase.rpc.protection", serverProtection);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Test various combinations of Server and Client qops. n
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testSaslWithCommonQop() throws Exception {
|
|
||||||
setRpcProtection("privacy,authentication", "authentication");
|
|
||||||
callRpcService(User.create(ugi));
|
|
||||||
|
|
||||||
setRpcProtection("authentication", "privacy,authentication");
|
|
||||||
callRpcService(User.create(ugi));
|
|
||||||
|
|
||||||
setRpcProtection("integrity,authentication", "privacy,authentication");
|
|
||||||
callRpcService(User.create(ugi));
|
|
||||||
|
|
||||||
setRpcProtection("integrity,authentication", "integrity,authentication");
|
|
||||||
callRpcService(User.create(ugi));
|
|
||||||
|
|
||||||
setRpcProtection("privacy,authentication", "privacy,authentication");
|
|
||||||
callRpcService(User.create(ugi));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testSaslNoCommonQop() throws Exception {
|
|
||||||
setRpcProtection("integrity", "privacy");
|
|
||||||
SaslException se = assertThrows(SaslException.class, () -> callRpcService(User.create(ugi)));
|
|
||||||
assertEquals("No common protection layer between client and server", se.getMessage());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Test sasl encryption with Crypto AES. n
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testSaslWithCryptoAES() throws Exception {
|
|
||||||
setRpcProtection("privacy", "privacy");
|
|
||||||
setCryptoAES("true", "true");
|
|
||||||
callRpcService(User.create(ugi));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Test various combinations of Server and Client configuration for Crypto AES. n
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testDifferentConfWithCryptoAES() throws Exception {
|
|
||||||
setRpcProtection("privacy", "privacy");
|
|
||||||
|
|
||||||
setCryptoAES("false", "true");
|
|
||||||
callRpcService(User.create(ugi));
|
|
||||||
|
|
||||||
setCryptoAES("true", "false");
|
|
||||||
try {
|
|
||||||
callRpcService(User.create(ugi));
|
|
||||||
fail("The exception should be thrown out for the rpc timeout.");
|
|
||||||
} catch (Exception e) {
|
|
||||||
// ignore the expected exception
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void setCryptoAES(String clientCryptoAES, String serverCryptoAES) {
|
|
||||||
clientConf.set("hbase.rpc.crypto.encryption.aes.enabled", clientCryptoAES);
|
|
||||||
serverConf.set("hbase.rpc.crypto.encryption.aes.enabled", serverCryptoAES);
|
|
||||||
}
|
|
||||||
|
|
||||||
private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal)
|
|
||||||
throws Exception {
|
|
||||||
Configuration cnf = new Configuration();
|
|
||||||
cnf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
|
|
||||||
UserGroupInformation.setConfiguration(cnf);
|
|
||||||
UserGroupInformation.loginUserFromKeytab(krbPrincipal, krbKeytab);
|
|
||||||
return UserGroupInformation.getLoginUser();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Sets up a RPC Server and a Client. Does a RPC checks the result. If an exception is thrown from
|
|
||||||
* the stub, this function will throw root cause of that exception.
|
|
||||||
*/
|
|
||||||
private void callRpcService(User clientUser) throws Exception {
|
|
||||||
SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
|
|
||||||
Mockito.when(securityInfoMock.getServerPrincipal())
|
|
||||||
.thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
|
|
||||||
SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
|
|
||||||
|
|
||||||
InetSocketAddress isa = new InetSocketAddress(HOST, 0);
|
|
||||||
|
|
||||||
RpcServerInterface rpcServer = RpcServerFactory.createRpcServer(null, "AbstractTestSecureIPC",
|
|
||||||
Lists
|
|
||||||
.newArrayList(new RpcServer.BlockingServiceAndInterface((BlockingService) SERVICE, null)),
|
|
||||||
isa, serverConf, new FifoRpcScheduler(serverConf, 1));
|
|
||||||
rpcServer.start();
|
|
||||||
try (RpcClient rpcClient =
|
|
||||||
RpcClientFactory.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString())) {
|
|
||||||
BlockingInterface stub =
|
|
||||||
newBlockingStub(rpcClient, rpcServer.getListenerAddress(), clientUser);
|
|
||||||
TestThread th1 = new TestThread(stub);
|
|
||||||
final Throwable exception[] = new Throwable[1];
|
|
||||||
Collections.synchronizedList(new ArrayList<Throwable>());
|
|
||||||
Thread.UncaughtExceptionHandler exceptionHandler = new Thread.UncaughtExceptionHandler() {
|
|
||||||
@Override
|
|
||||||
public void uncaughtException(Thread th, Throwable ex) {
|
|
||||||
exception[0] = ex;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
th1.setUncaughtExceptionHandler(exceptionHandler);
|
|
||||||
th1.start();
|
|
||||||
th1.join();
|
|
||||||
if (exception[0] != null) {
|
|
||||||
// throw root cause.
|
|
||||||
while (exception[0].getCause() != null) {
|
|
||||||
exception[0] = exception[0].getCause();
|
|
||||||
}
|
|
||||||
throw (Exception) exception[0];
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
rpcServer.stop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class TestThread extends Thread {
|
|
||||||
private final BlockingInterface stub;
|
|
||||||
|
|
||||||
public TestThread(BlockingInterface stub) {
|
|
||||||
this.stub = stub;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
int[] messageSize = new int[] { 100, 1000, 10000 };
|
|
||||||
for (int i = 0; i < messageSize.length; i++) {
|
|
||||||
String input = RandomStringUtils.random(messageSize[i]);
|
|
||||||
String result =
|
|
||||||
stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage(input).build())
|
|
||||||
.getMessage();
|
|
||||||
assertEquals(input, result);
|
|
||||||
}
|
|
||||||
} catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -1,219 +0,0 @@
|
|||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.security;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED;
|
|
||||||
import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_SERVER_NETTY_TLS_ENABLED;
|
|
||||||
import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT;
|
|
||||||
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
|
|
||||||
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
|
|
||||||
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting;
|
|
||||||
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
|
|
||||||
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.setSecuredConfiguration;
|
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.security.Security;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
|
|
||||||
import org.apache.hadoop.hbase.io.crypto.tls.KeyStoreFileType;
|
|
||||||
import org.apache.hadoop.hbase.io.crypto.tls.X509KeyType;
|
|
||||||
import org.apache.hadoop.hbase.io.crypto.tls.X509TestContext;
|
|
||||||
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
|
|
||||||
import org.apache.hadoop.hbase.ipc.NettyRpcClient;
|
|
||||||
import org.apache.hadoop.hbase.ipc.NettyRpcServer;
|
|
||||||
import org.apache.hadoop.hbase.ipc.RpcClient;
|
|
||||||
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
|
|
||||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
|
||||||
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
|
|
||||||
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.SecurityTests;
|
|
||||||
import org.apache.hadoop.minikdc.MiniKdc;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
|
||||||
import org.bouncycastle.jce.provider.BouncyCastleProvider;
|
|
||||||
import org.junit.AfterClass;
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.ClassRule;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.experimental.categories.Category;
|
|
||||||
import org.mockito.Mockito;
|
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos;
|
|
||||||
|
|
||||||
@Category({ SecurityTests.class, LargeTests.class })
|
|
||||||
public class TestTlsWithKerberos {
|
|
||||||
|
|
||||||
@ClassRule
|
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
|
||||||
HBaseClassTestRule.forClass(TestTlsWithKerberos.class);
|
|
||||||
|
|
||||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
|
||||||
|
|
||||||
private static final File KEYTAB_FILE =
|
|
||||||
new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
|
|
||||||
|
|
||||||
private static MiniKdc KDC;
|
|
||||||
private static final String HOST = "localhost";
|
|
||||||
private static String PRINCIPAL;
|
|
||||||
private static final String RPC_CLIENT_IMPL = NettyRpcClient.class.getName();
|
|
||||||
private static final String RPC_SERVER_IMPL = NettyRpcServer.class.getName();
|
|
||||||
|
|
||||||
private String krbKeytab;
|
|
||||||
private String krbPrincipal;
|
|
||||||
private UserGroupInformation ugi;
|
|
||||||
private Configuration clientConf;
|
|
||||||
private Configuration serverConf;
|
|
||||||
private static X509TestContext x509TestContext;
|
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static void setUp() throws Exception {
|
|
||||||
Security.addProvider(new BouncyCastleProvider());
|
|
||||||
KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE);
|
|
||||||
PRINCIPAL = "hbase/" + HOST;
|
|
||||||
KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL);
|
|
||||||
HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
|
|
||||||
x509TestContext = X509TestContext.newBuilder()
|
|
||||||
.setTempDir(new File(TEST_UTIL.getDataTestDir().toUri().getPath()))
|
|
||||||
.setKeyStorePassword("Pa$$word").setKeyStoreKeyType(X509KeyType.RSA)
|
|
||||||
.setTrustStoreKeyType(X509KeyType.RSA).setTrustStorePassword("Pa$$word").build();
|
|
||||||
x509TestContext.setConfigurations(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterClass
|
|
||||||
public static void tearDown() throws IOException {
|
|
||||||
Security.removeProvider(BouncyCastleProvider.PROVIDER_NAME);
|
|
||||||
if (KDC != null) {
|
|
||||||
KDC.stop();
|
|
||||||
}
|
|
||||||
TEST_UTIL.cleanupTestDir();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setUpTest() throws Exception {
|
|
||||||
krbKeytab = getKeytabFileForTesting();
|
|
||||||
krbPrincipal = getPrincipalForTesting();
|
|
||||||
ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
|
|
||||||
clientConf = HBaseConfiguration.create(x509TestContext.getConf());
|
|
||||||
setSecuredConfiguration(clientConf);
|
|
||||||
clientConf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RPC_CLIENT_IMPL);
|
|
||||||
serverConf = HBaseConfiguration.create(x509TestContext.getConf());
|
|
||||||
setSecuredConfiguration(serverConf);
|
|
||||||
serverConf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, RPC_SERVER_IMPL);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testNoPlaintext() throws Exception {
|
|
||||||
setRpcProtection("authentication", "authentication");
|
|
||||||
setTLSEncryption(true, false, true);
|
|
||||||
callRpcService(User.create(ugi));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRejectPlaintext() {
|
|
||||||
setRpcProtection("authentication", "authentication");
|
|
||||||
setTLSEncryption(true, false, false);
|
|
||||||
Assert.assertThrows(ConnectionClosedException.class, () -> callRpcService(User.create(ugi)));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testAcceptPlaintext() throws Exception {
|
|
||||||
setRpcProtection("authentication", "authentication");
|
|
||||||
setTLSEncryption(true, true, false);
|
|
||||||
callRpcService(User.create(ugi));
|
|
||||||
}
|
|
||||||
|
|
||||||
void setTLSEncryption(Boolean server, Boolean acceptPlaintext, Boolean client) {
|
|
||||||
serverConf.set(HBASE_SERVER_NETTY_TLS_ENABLED, server.toString());
|
|
||||||
serverConf.set(HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, acceptPlaintext.toString());
|
|
||||||
clientConf.set(HBASE_CLIENT_NETTY_TLS_ENABLED, client.toString());
|
|
||||||
}
|
|
||||||
|
|
||||||
void setRpcProtection(String clientProtection, String serverProtection) {
|
|
||||||
clientConf.set("hbase.rpc.protection", clientProtection);
|
|
||||||
serverConf.set("hbase.rpc.protection", serverProtection);
|
|
||||||
}
|
|
||||||
|
|
||||||
private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal)
|
|
||||||
throws Exception {
|
|
||||||
Configuration cnf = new Configuration();
|
|
||||||
cnf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
|
|
||||||
UserGroupInformation.setConfiguration(cnf);
|
|
||||||
UserGroupInformation.loginUserFromKeytab(krbPrincipal, krbKeytab);
|
|
||||||
return UserGroupInformation.getLoginUser();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Sets up a RPC Server and a Client. Does a RPC checks the result. If an exception is thrown from
|
|
||||||
* the stub, this function will throw root cause of that exception.
|
|
||||||
*/
|
|
||||||
private void callRpcService(User clientUser) throws Exception {
|
|
||||||
SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
|
|
||||||
Mockito.when(securityInfoMock.getServerPrincipal())
|
|
||||||
.thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
|
|
||||||
SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
|
|
||||||
|
|
||||||
InetSocketAddress isa = new InetSocketAddress(HOST, 0);
|
|
||||||
|
|
||||||
RpcServerInterface rpcServer = RpcServerFactory.createRpcServer(null, "AbstractTestSecureIPC",
|
|
||||||
Lists
|
|
||||||
.newArrayList(new RpcServer.BlockingServiceAndInterface((BlockingService) SERVICE, null)),
|
|
||||||
isa, serverConf, new FifoRpcScheduler(serverConf, 1));
|
|
||||||
rpcServer.start();
|
|
||||||
try (RpcClient rpcClient =
|
|
||||||
RpcClientFactory.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString())) {
|
|
||||||
TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
|
|
||||||
newBlockingStub(rpcClient, rpcServer.getListenerAddress(), clientUser);
|
|
||||||
TestSecureIPC.TestThread th = new TestSecureIPC.TestThread(stub);
|
|
||||||
AtomicReference<Throwable> exception = new AtomicReference<>();
|
|
||||||
Collections.synchronizedList(new ArrayList<Throwable>());
|
|
||||||
Thread.UncaughtExceptionHandler exceptionHandler = new Thread.UncaughtExceptionHandler() {
|
|
||||||
@Override
|
|
||||||
public void uncaughtException(Thread th, Throwable ex) {
|
|
||||||
exception.set(ex);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
th.setUncaughtExceptionHandler(exceptionHandler);
|
|
||||||
th.start();
|
|
||||||
th.join();
|
|
||||||
if (exception.get() != null) {
|
|
||||||
// throw root cause.
|
|
||||||
while (exception.get().getCause() != null) {
|
|
||||||
exception.set(exception.get().getCause());
|
|
||||||
}
|
|
||||||
throw (Exception) exception.get();
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
rpcServer.stop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
x
Reference in New Issue
Block a user