Revert "HBASE-14865 Support passing multiple QOPs to SaslClient/Server via hbase.rpc.protection (Apply)"

Needs newer Hadoop with n HADOOP-10786/HADOOP-11287 (courtesy of Matteo)

This reverts commit e76a2e4e6d.
This commit is contained in:
stack 2016-01-21 08:27:06 -08:00
parent 58521869b0
commit d965d14a63
11 changed files with 189 additions and 315 deletions

View File

@ -32,6 +32,7 @@ import io.netty.util.concurrent.Promise;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
@ -103,7 +104,8 @@ public class AsyncRpcChannel {
final String serviceName;
final InetSocketAddress address;
private int failureCounter = 0;
private int ioFailureCounter = 0;
private int connectFailureCounter = 0;
boolean useSasl;
AuthMethod authMethod;
@ -164,7 +166,11 @@ public class AsyncRpcChannel {
@Override
public void operationComplete(final ChannelFuture f) throws Exception {
if (!f.isSuccess()) {
retryOrClose(bootstrap, failureCounter++, client.failureSleep, f.cause());
if (f.cause() instanceof SocketException) {
retryOrClose(bootstrap, connectFailureCounter++, f.cause());
} else {
retryOrClose(bootstrap, ioFailureCounter++, f.cause());
}
return;
}
channel = f.channel();
@ -257,8 +263,13 @@ public class AsyncRpcChannel {
// Handle Sasl failure. Try to potentially get new credentials
handleSaslConnectionFailure(retryCount, cause, realTicket);
retryOrClose(bootstrap, failureCounter++, random.nextInt(reloginMaxBackoff) + 1,
cause);
// Try to reconnect
client.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
connect(bootstrap);
}
}, random.nextInt(reloginMaxBackoff) + 1, TimeUnit.MILLISECONDS);
} catch (IOException | InterruptedException e) {
close(e);
}
@ -275,18 +286,16 @@ public class AsyncRpcChannel {
* Retry to connect or close
*
* @param bootstrap to connect with
* @param failureCount failure count
* @param connectCounter amount of tries
* @param e exception of fail
*/
private void retryOrClose(final Bootstrap bootstrap, int failureCount,
long timeout, Throwable e) {
if (failureCount < client.maxRetries) {
private void retryOrClose(final Bootstrap bootstrap, int connectCounter, Throwable e) {
if (connectCounter < client.maxRetries) {
client.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
@Override public void run(Timeout timeout) throws Exception {
connect(bootstrap);
}
}, timeout, TimeUnit.MILLISECONDS);
}, client.failureSleep, TimeUnit.MILLISECONDS);
} else {
client.failedServers.addToFailedServers(address);
close(e);

View File

@ -46,7 +46,6 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import com.google.common.annotations.VisibleForTesting;
@ -60,7 +59,6 @@ public class HBaseSaslRpcClient {
private final SaslClient saslClient;
private final boolean fallbackAllowed;
protected final Map<String, String> saslProps;
/**
* Create a HBaseSaslRpcClient for an authentication method
*
@ -98,7 +96,7 @@ public class HBaseSaslRpcClient {
Token<? extends TokenIdentifier> token, String serverPrincipal, boolean fallbackAllowed,
String rpcProtection) throws IOException {
this.fallbackAllowed = fallbackAllowed;
saslProps = SaslUtil.initSaslProperties(rpcProtection);
SaslUtil.initSaslProperties(rpcProtection);
switch (method) {
case DIGEST:
if (LOG.isDebugEnabled())
@ -140,13 +138,13 @@ public class HBaseSaslRpcClient {
String saslDefaultRealm, CallbackHandler saslClientCallbackHandler)
throws IOException {
return Sasl.createSaslClient(mechanismNames, null, null, saslDefaultRealm,
saslProps, saslClientCallbackHandler);
SaslUtil.SASL_PROPS, saslClientCallbackHandler);
}
protected SaslClient createKerberosSaslClient(String[] mechanismNames,
String userFirstPart, String userSecondPart) throws IOException {
return Sasl.createSaslClient(mechanismNames, null, userFirstPart,
userSecondPart, saslProps, null);
userSecondPart, SaslUtil.SASL_PROPS, null);
}
private static void readStatus(DataInputStream inStream) throws IOException {

View File

@ -41,7 +41,6 @@ import javax.security.sasl.SaslException;
import java.io.IOException;
import java.nio.charset.Charset;
import java.security.PrivilegedExceptionAction;
import java.util.Map;
import java.util.Random;
/**
@ -59,7 +58,6 @@ public class SaslClientHandler extends ChannelDuplexHandler {
* Used for client or server's token to send or receive from each other.
*/
private final SaslClient saslClient;
private final Map<String, String> saslProps;
private final SaslExceptionHandler exceptionHandler;
private final SaslSuccessfulConnectHandler successfulConnectHandler;
private byte[] saslToken;
@ -69,6 +67,8 @@ public class SaslClientHandler extends ChannelDuplexHandler {
private Random random;
/**
* Constructor
*
* @param ticket the ugi
* @param method auth method
* @param token for Sasl
@ -76,6 +76,8 @@ public class SaslClientHandler extends ChannelDuplexHandler {
* @param fallbackAllowed True if server may also fall back to less secure connection
* @param rpcProtection Quality of protection. Can be 'authentication', 'integrity' or
* 'privacy'.
* @param exceptionHandler handler for exceptions
* @param successfulConnectHandler handler for succesful connects
* @throws java.io.IOException if handler could not be created
*/
public SaslClientHandler(UserGroupInformation ticket, AuthMethod method,
@ -88,7 +90,7 @@ public class SaslClientHandler extends ChannelDuplexHandler {
this.exceptionHandler = exceptionHandler;
this.successfulConnectHandler = successfulConnectHandler;
saslProps = SaslUtil.initSaslProperties(rpcProtection);
SaslUtil.initSaslProperties(rpcProtection);
switch (method) {
case DIGEST:
if (LOG.isDebugEnabled())
@ -123,23 +125,32 @@ public class SaslClientHandler extends ChannelDuplexHandler {
/**
* Create a Digest Sasl client
*
* @param mechanismNames names of mechanisms
* @param saslDefaultRealm default realm for sasl
* @param saslClientCallbackHandler handler for the client
* @return new SaslClient
* @throws java.io.IOException if creation went wrong
*/
protected SaslClient createDigestSaslClient(String[] mechanismNames, String saslDefaultRealm,
CallbackHandler saslClientCallbackHandler) throws IOException {
return Sasl.createSaslClient(mechanismNames, null, null, saslDefaultRealm, saslProps,
return Sasl.createSaslClient(mechanismNames, null, null, saslDefaultRealm, SaslUtil.SASL_PROPS,
saslClientCallbackHandler);
}
/**
* Create Kerberos client
*
* @param mechanismNames names of mechanisms
* @param userFirstPart first part of username
* @param userSecondPart second part of username
* @return new SaslClient
* @throws java.io.IOException if fails
*/
protected SaslClient createKerberosSaslClient(String[] mechanismNames, String userFirstPart,
String userSecondPart) throws IOException {
return Sasl
.createSaslClient(mechanismNames, null, userFirstPart, userSecondPart, saslProps,
.createSaslClient(mechanismNames, null, userFirstPart, userSecondPart, SaslUtil.SASL_PROPS,
null);
}
@ -258,6 +269,11 @@ public class SaslClientHandler extends ChannelDuplexHandler {
}
}
/**
* Write SASL token
* @param ctx to write to
* @param saslToken to write
*/
private void writeSaslToken(final ChannelHandlerContext ctx, byte[] saslToken) {
ByteBuf b = ctx.alloc().buffer(4 + saslToken.length);
b.writeInt(saslToken.length);
@ -274,6 +290,9 @@ public class SaslClientHandler extends ChannelDuplexHandler {
/**
* Get the read status
*
* @param inStream to read
* @throws org.apache.hadoop.ipc.RemoteException if status was not success
*/
private static void readStatus(ByteBuf inStream) throws RemoteException {
int status = inStream.readInt(); // read status
@ -341,6 +360,7 @@ public class SaslClientHandler extends ChannelDuplexHandler {
*
* @param retryCount current retry count
* @param random to create new backoff with
* @param cause of fail
*/
public void handle(int retryCount, Random random, Throwable cause);
}

View File

@ -35,31 +35,24 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
public class SaslUtil {
private static final Log log = LogFactory.getLog(SaslUtil.class);
public static final String SASL_DEFAULT_REALM = "default";
public static final Map<String, String> SASL_PROPS =
new TreeMap<String, String>();
public static final int SWITCH_TO_SIMPLE_AUTH = -88;
public enum QualityOfProtection {
public static enum QualityOfProtection {
AUTHENTICATION("auth"),
INTEGRITY("auth-int"),
PRIVACY("auth-conf");
private final String saslQop;
public final String saslQop;
QualityOfProtection(String saslQop) {
private QualityOfProtection(String saslQop) {
this.saslQop = saslQop;
}
public String getSaslQop() {
return saslQop;
}
public boolean matches(String stringQop) {
if (saslQop.equals(stringQop)) {
log.warn("Use authentication/integrity/privacy as value for rpc protection "
+ "configurations instead of auth/auth-int/auth-conf.");
return true;
}
return name().equalsIgnoreCase(stringQop);
}
}
/** Splitting fully qualified Kerberos name into parts */
@ -81,39 +74,40 @@ public class SaslUtil {
/**
* Returns {@link org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection}
* corresponding to the given {@code stringQop} value.
* @throws IllegalArgumentException If stringQop doesn't match any QOP.
* corresponding to the given {@code stringQop} value. Returns null if value is
* invalid.
*/
public static QualityOfProtection getQop(String stringQop) {
for (QualityOfProtection qop : QualityOfProtection.values()) {
if (qop.matches(stringQop)) {
return qop;
}
QualityOfProtection qop = null;
if (QualityOfProtection.AUTHENTICATION.name().toLowerCase().equals(stringQop)
|| QualityOfProtection.AUTHENTICATION.saslQop.equals(stringQop)) {
qop = QualityOfProtection.AUTHENTICATION;
} else if (QualityOfProtection.INTEGRITY.name().toLowerCase().equals(stringQop)
|| QualityOfProtection.INTEGRITY.saslQop.equals(stringQop)) {
qop = QualityOfProtection.INTEGRITY;
} else if (QualityOfProtection.PRIVACY.name().toLowerCase().equals(stringQop)
|| QualityOfProtection.PRIVACY.saslQop.equals(stringQop)) {
qop = QualityOfProtection.PRIVACY;
}
if (qop == null) {
throw new IllegalArgumentException("Invalid qop: " + stringQop
+ ". It must be one of 'authentication', 'integrity', 'privacy'.");
}
if (QualityOfProtection.AUTHENTICATION.saslQop.equals(stringQop)
|| QualityOfProtection.INTEGRITY.saslQop.equals(stringQop)
|| QualityOfProtection.PRIVACY.saslQop.equals(stringQop)) {
log.warn("Use authentication/integrity/privacy as value for rpc protection "
+ "configurations instead of auth/auth-int/auth-conf.");
}
return qop;
}
/**
* @param rpcProtection Value of 'hbase.rpc.protection' configuration.
* @return Map with values for SASL properties.
*/
static Map<String, String> initSaslProperties(String rpcProtection) {
String saslQop;
if (rpcProtection.isEmpty()) {
saslQop = QualityOfProtection.AUTHENTICATION.getSaslQop();
} else {
String[] qops = rpcProtection.split(",");
StringBuilder saslQopBuilder = new StringBuilder();
for (int i = 0; i < qops.length; ++i) {
QualityOfProtection qop = getQop(qops[i]);
saslQopBuilder.append(",").append(qop.getSaslQop());
static void initSaslProperties(String rpcProtection) {
QualityOfProtection saslQOP = getQop(rpcProtection);
if (saslQOP == null) {
saslQOP = QualityOfProtection.AUTHENTICATION;
}
saslQop = saslQopBuilder.substring(1); // remove first ','
}
Map<String, String> saslProps = new TreeMap<>();
saslProps.put(Sasl.QOP, saslQop);
saslProps.put(Sasl.SERVER_AUTH, "true");
return saslProps;
SaslUtil.SASL_PROPS.put(Sasl.QOP, saslQOP.getSaslQop());
SaslUtil.SASL_PROPS.put(Sasl.SERVER_AUTH, "true");
}
}

View File

@ -1,59 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.security;
import static org.junit.Assert.assertEquals;
import org.apache.hadoop.hbase.testclassification.SecurityTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import javax.security.sasl.Sasl;
import java.util.Map;
@Category({SecurityTests.class, SmallTests.class})
public class TestSaslUtil {
@Rule
public ExpectedException exception = ExpectedException.none();
@Test
public void testInitSaslProperties() {
Map<String, String> props;
props = SaslUtil.initSaslProperties("integrity");
assertEquals(props.get(Sasl.QOP), "auth-int");
props = SaslUtil.initSaslProperties("privacy,authentication");
assertEquals(props.get(Sasl.QOP), "auth-conf,auth");
props = SaslUtil.initSaslProperties("integrity,authentication,privacy");
assertEquals(props.get(Sasl.QOP), "auth-int,auth,auth-conf");
exception.expect(IllegalArgumentException.class);
props = SaslUtil.initSaslProperties("xyz");
assertEquals(props.get(Sasl.QOP), "auth");
exception.expect(IllegalArgumentException.class);
props = SaslUtil.initSaslProperties("");
assertEquals(props.get(Sasl.QOP), "auth");
}
}

View File

@ -1404,7 +1404,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
.getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
HBaseSaslRpcServer.getSaslProps(), new SaslDigestCallbackHandler(
SaslUtil.SASL_PROPS, new SaslDigestCallbackHandler(
secretManager, this));
break;
default:
@ -1424,7 +1424,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
public Object run() throws SaslException {
saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
.getMechanismName(), names[0], names[1],
HBaseSaslRpcServer.getSaslProps(), new SaslGssCallbackHandler());
SaslUtil.SASL_PROPS, new SaslGssCallbackHandler());
return null;
}
});

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.security;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.Map;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
@ -49,17 +48,11 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
public class HBaseSaslRpcServer {
private static final Log LOG = LogFactory.getLog(HBaseSaslRpcServer.class);
private static Map<String, String> saslProps = null;
public static void init(Configuration conf) {
saslProps = SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection",
SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection",
QualityOfProtection.AUTHENTICATION.name().toLowerCase()));
}
public static Map<String, String> getSaslProps() {
return saslProps;
}
public static <T extends TokenIdentifier> T getIdentifier(String id,
SecretManager<T> secretManager) throws InvalidToken {
byte[] tokenId = SaslUtil.decodeIdentifier(id);

View File

@ -1,33 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.security;
import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.testclassification.SecurityTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.experimental.categories.Category;
@Category({ SecurityTests.class, SmallTests.class })
public class TestAsyncSecureIPC extends AbstractTestSecureIPC {
Class<? extends RpcClient> getRpcClientClass() {
return AsyncRpcClient.class;
}
}

View File

@ -1,4 +1,5 @@
/**
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -42,7 +43,6 @@ import javax.security.sasl.RealmCallback;
import javax.security.sasl.RealmChoiceCallback;
import javax.security.sasl.SaslClient;
import org.apache.hadoop.hbase.testclassification.SecurityTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.security.HBaseSaslRpcClient.SaslClientCallbackHandler;
import org.apache.hadoop.io.DataInputBuffer;
@ -60,7 +60,7 @@ import org.mockito.Mockito;
import com.google.common.base.Strings;
@Category({SecurityTests.class, SmallTests.class})
@Category(SmallTests.class)
public class TestHBaseSaslRpcClient {
static {
@ -83,18 +83,33 @@ public class TestHBaseSaslRpcClient {
}
@Test
public void testSaslClientUsesGivenRpcProtection() throws Exception {
public void testSaslQOPNotEmpty() throws Exception {
Token<? extends TokenIdentifier> token = createTokenMockWithCredentials(DEFAULT_USER_NAME,
DEFAULT_USER_PASSWORD);
for (SaslUtil.QualityOfProtection qop : SaslUtil.QualityOfProtection.values()) {
String negotiatedQop = new HBaseSaslRpcClient(AuthMethod.DIGEST, token,
"principal/host@DOMAIN.COM", false, qop.name()) {
public String getQop() {
return saslProps.get(Sasl.QOP);
}
}.getQop();
assertEquals(negotiatedQop, qop.getSaslQop());
}
// default QOP is authentication
new HBaseSaslRpcClient(AuthMethod.DIGEST, token, "principal/host@DOMAIN.COM", false);
assertTrue(SaslUtil.SASL_PROPS.get(Sasl.QOP).equals(SaslUtil.QualityOfProtection.
AUTHENTICATION.getSaslQop()));
// check with specific QOPs
new HBaseSaslRpcClient(AuthMethod.DIGEST, token, "principal/host@DOMAIN.COM", false,
"authentication");
assertTrue(SaslUtil.SASL_PROPS.get(Sasl.QOP).equals(SaslUtil.QualityOfProtection.
AUTHENTICATION.getSaslQop()));
new HBaseSaslRpcClient(AuthMethod.DIGEST, token, "principal/host@DOMAIN.COM", false,
"privacy");
assertTrue(SaslUtil.SASL_PROPS.get(Sasl.QOP).equals(SaslUtil.QualityOfProtection.
PRIVACY.getSaslQop()));
new HBaseSaslRpcClient(AuthMethod.DIGEST, token, "principal/host@DOMAIN.COM", false,
"integrity");
assertTrue(SaslUtil.SASL_PROPS.get(Sasl.QOP).equals(SaslUtil.QualityOfProtection.
INTEGRITY.getSaslQop()));
exception.expect(IllegalArgumentException.class);
new HBaseSaslRpcClient(AuthMethod.DIGEST, token, "principal/host@DOMAIN.COM", false,
"wrongvalue");
}
@Test

View File

@ -1,33 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.security;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientImpl;
import org.apache.hadoop.hbase.testclassification.SecurityTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.experimental.categories.Category;
@Category({ SecurityTests.class, SmallTests.class })
public class TestSecureIPC extends AbstractTestSecureIPC {
Class<? extends RpcClient> getRpcClientClass() {
return RpcClientImpl.class;
}
}

View File

@ -29,7 +29,6 @@ import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
@ -38,32 +37,32 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcClientImpl;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.TestDelayedRpc.TestDelayedImplementation;
import org.apache.hadoop.hbase.ipc.TestDelayedRpc.TestThread;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import com.google.common.collect.Lists;
import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.BlockingService;
import javax.security.sasl.SaslException;
public abstract class AbstractTestSecureIPC {
@Category(SmallTests.class)
public class TestSecureRPC {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@ -71,20 +70,11 @@ public abstract class AbstractTestSecureIPC {
.getPath());
private static MiniKdc KDC;
private static String HOST = "localhost";
private static String PRINCIPAL;
String krbKeytab;
String krbPrincipal;
UserGroupInformation ugi;
Configuration clientConf;
Configuration serverConf;
abstract Class<? extends RpcClient> getRpcClientClass();
@Rule
public ExpectedException exception = ExpectedException.none();
@BeforeClass
public static void setUp() throws Exception {
Properties conf = MiniKdc.createConf();
@ -105,18 +95,32 @@ public abstract class AbstractTestSecureIPC {
TEST_UTIL.cleanupTestDir();
}
@Before
public void setUpTest() throws Exception {
krbKeytab = getKeytabFileForTesting();
krbPrincipal = getPrincipalForTesting();
ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
clientConf = getSecuredConfiguration();
clientConf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, getRpcClientClass().getName());
serverConf = getSecuredConfiguration();
@Test
public void testRpc() throws Exception {
testRpcCallWithEnabledKerberosSaslAuth(RpcClientImpl.class);
}
@Test
public void testRpcCallWithEnabledKerberosSaslAuth() throws Exception {
public void testRpcWithInsecureFallback() throws Exception {
testRpcFallbackToSimpleAuth(RpcClientImpl.class);
}
@Test
public void testAsyncRpc() throws Exception {
testRpcCallWithEnabledKerberosSaslAuth(AsyncRpcClient.class);
}
@Test
public void testAsyncRpcWithInsecureFallback() throws Exception {
testRpcFallbackToSimpleAuth(AsyncRpcClient.class);
}
private void testRpcCallWithEnabledKerberosSaslAuth(Class<? extends RpcClient> rpcImplClass)
throws Exception {
String krbKeytab = getKeytabFileForTesting();
String krbPrincipal = getPrincipalForTesting();
UserGroupInformation ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
// check that the login user is okay:
@ -124,52 +128,8 @@ public abstract class AbstractTestSecureIPC {
assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
assertEquals(krbPrincipal, ugi.getUserName());
callRpcService(User.create(ugi2));
}
@Test
public void testRpcFallbackToSimpleAuth() throws Exception {
String clientUsername = "testuser";
UserGroupInformation clientUgi = UserGroupInformation.createUserForTesting(clientUsername,
new String[]{clientUsername});
// check that the client user is insecure
assertNotSame(ugi, clientUgi);
assertEquals(AuthenticationMethod.SIMPLE, clientUgi.getAuthenticationMethod());
assertEquals(clientUsername, clientUgi.getUserName());
clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
serverConf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, true);
callRpcService(User.create(clientUgi));
}
void setRpcProtection(String clientProtection, String serverProtection) {
clientConf.set("hbase.rpc.protection", clientProtection);
serverConf.set("hbase.rpc.protection", serverProtection);
}
/**
* Test various combinations of Server and Client qops.
* @throws Exception
*/
@Test
public void testSaslWithCommonQop() throws Exception {
setRpcProtection("privacy,authentication", "authentication");
callRpcService(User.create(ugi));
setRpcProtection("authentication", "privacy,authentication");
callRpcService(User.create(ugi));
setRpcProtection("integrity,authentication", "privacy,authentication");
callRpcService(User.create(ugi));
}
@Test
public void testSaslNoCommonQop() throws Exception {
exception.expect(SaslException.class);
exception.expectMessage("No common protection layer between client and server");
setRpcProtection("integrity", "privacy");
callRpcService(User.create(ugi));
Configuration clientConf = getSecuredConfiguration();
callRpcService(rpcImplClass, User.create(ugi2), clientConf, false);
}
private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal)
@ -181,11 +141,15 @@ public abstract class AbstractTestSecureIPC {
return UserGroupInformation.getLoginUser();
}
/**
* Sets up a RPC Server and a Client. Does a RPC checks the result. If an exception is thrown
* from the stub, this function will throw root cause of that exception.
*/
private void callRpcService(User clientUser) throws Exception {
private void callRpcService(Class<? extends RpcClient> rpcImplClass, User clientUser,
Configuration clientConf, boolean allowInsecureFallback)
throws Exception {
Configuration clientConfCopy = new Configuration(clientConf);
clientConfCopy.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcImplClass.getName());
Configuration conf = getSecuredConfiguration();
conf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, allowInsecureFallback);
SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
Mockito.when(securityInfoMock.getServerPrincipal())
.thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
@ -200,10 +164,10 @@ public abstract class AbstractTestSecureIPC {
RpcServerInterface rpcServer =
new RpcServer(null, "testSecuredDelayedRpc",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), isa,
serverConf, new FifoRpcScheduler(serverConf, 1));
conf, new FifoRpcScheduler(conf, 1));
rpcServer.start();
RpcClient rpcClient =
RpcClientFactory.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString());
RpcClientFactory.createClient(clientConfCopy, HConstants.DEFAULT_CLUSTER_ID.toString());
try {
InetSocketAddress address = rpcServer.getListenerAddress();
if (address == null) {
@ -212,29 +176,13 @@ public abstract class AbstractTestSecureIPC {
BlockingRpcChannel channel =
rpcClient.createBlockingRpcChannel(
ServerName.valueOf(address.getHostName(), address.getPort(),
System.currentTimeMillis()), clientUser, 0);
System.currentTimeMillis()), clientUser, 5000);
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
List<Integer> results = new ArrayList<>();
List<Integer> results = new ArrayList<Integer>();
TestThread th1 = new TestThread(stub, true, results);
final Throwable exception[] = new Throwable[1];
Collections.synchronizedList(new ArrayList<Throwable>());
Thread.UncaughtExceptionHandler exceptionHandler =
new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread th, Throwable ex) {
exception[0] = ex;
}
};
th1.setUncaughtExceptionHandler(exceptionHandler);
th1.start();
th1.join();
if (exception[0] != null) {
// throw root cause.
while (exception[0].getCause() != null) {
exception[0] = exception[0].getCause();
}
throw (Exception) exception[0];
}
assertEquals(0xDEADBEEF, results.get(0).intValue());
} finally {
@ -242,4 +190,26 @@ public abstract class AbstractTestSecureIPC {
rpcServer.stop();
}
}
public void testRpcFallbackToSimpleAuth(Class<? extends RpcClient> rpcImplClass) throws Exception {
String krbKeytab = getKeytabFileForTesting();
String krbPrincipal = getPrincipalForTesting();
UserGroupInformation ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
assertEquals(krbPrincipal, ugi.getUserName());
String clientUsername = "testuser";
UserGroupInformation clientUgi = UserGroupInformation.createUserForTesting(clientUsername,
new String[]{clientUsername});
// check that the client user is insecure
assertNotSame(ugi, clientUgi);
assertEquals(AuthenticationMethod.SIMPLE, clientUgi.getAuthenticationMethod());
assertEquals(clientUsername, clientUgi.getUserName());
Configuration clientConf = new Configuration();
clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
callRpcService(rpcImplClass, User.create(clientUgi), clientConf, true);
}
}