HBASE-14865 Support passing multiple QOPs to SaslClient/Server via hbase.rpc.protection (Appy)
This commit is contained in:
parent
ae7cc0c848
commit
4ac8d4ce61
|
@ -32,7 +32,6 @@ import io.netty.util.concurrent.Promise;
|
|||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
|
@ -104,8 +103,7 @@ public class AsyncRpcChannel {
|
|||
final String serviceName;
|
||||
final InetSocketAddress address;
|
||||
|
||||
private int ioFailureCounter = 0;
|
||||
private int connectFailureCounter = 0;
|
||||
private int failureCounter = 0;
|
||||
|
||||
boolean useSasl;
|
||||
AuthMethod authMethod;
|
||||
|
@ -134,7 +132,7 @@ public class AsyncRpcChannel {
|
|||
* @param bootstrap to construct channel on
|
||||
* @param client to connect with
|
||||
* @param ticket of user which uses connection
|
||||
* @param serviceName name of service to connect to
|
||||
* @param serviceName name of service to connect to
|
||||
* @param address to connect to
|
||||
*/
|
||||
public AsyncRpcChannel(Bootstrap bootstrap, final AsyncRpcClient client, User ticket, String
|
||||
|
@ -166,11 +164,7 @@ public class AsyncRpcChannel {
|
|||
@Override
|
||||
public void operationComplete(final ChannelFuture f) throws Exception {
|
||||
if (!f.isSuccess()) {
|
||||
if (f.cause() instanceof SocketException) {
|
||||
retryOrClose(bootstrap, connectFailureCounter++, f.cause());
|
||||
} else {
|
||||
retryOrClose(bootstrap, ioFailureCounter++, f.cause());
|
||||
}
|
||||
retryOrClose(bootstrap, failureCounter++, client.failureSleep, f.cause());
|
||||
return;
|
||||
}
|
||||
channel = f.channel();
|
||||
|
@ -263,13 +257,8 @@ public class AsyncRpcChannel {
|
|||
// Handle Sasl failure. Try to potentially get new credentials
|
||||
handleSaslConnectionFailure(retryCount, cause, realTicket);
|
||||
|
||||
// Try to reconnect
|
||||
client.newTimeout(new TimerTask() {
|
||||
@Override
|
||||
public void run(Timeout timeout) throws Exception {
|
||||
connect(bootstrap);
|
||||
}
|
||||
}, random.nextInt(reloginMaxBackoff) + 1, TimeUnit.MILLISECONDS);
|
||||
retryOrClose(bootstrap, failureCounter++, random.nextInt(reloginMaxBackoff) + 1,
|
||||
cause);
|
||||
} catch (IOException | InterruptedException e) {
|
||||
close(e);
|
||||
}
|
||||
|
@ -286,16 +275,18 @@ public class AsyncRpcChannel {
|
|||
* Retry to connect or close
|
||||
*
|
||||
* @param bootstrap to connect with
|
||||
* @param connectCounter amount of tries
|
||||
* @param failureCount failure count
|
||||
* @param e exception of fail
|
||||
*/
|
||||
private void retryOrClose(final Bootstrap bootstrap, int connectCounter, Throwable e) {
|
||||
if (connectCounter < client.maxRetries) {
|
||||
private void retryOrClose(final Bootstrap bootstrap, int failureCount,
|
||||
long timeout, Throwable e) {
|
||||
if (failureCount < client.maxRetries) {
|
||||
client.newTimeout(new TimerTask() {
|
||||
@Override public void run(Timeout timeout) throws Exception {
|
||||
@Override
|
||||
public void run(Timeout timeout) throws Exception {
|
||||
connect(bootstrap);
|
||||
}
|
||||
}, client.failureSleep, TimeUnit.MILLISECONDS);
|
||||
}, timeout, TimeUnit.MILLISECONDS);
|
||||
} else {
|
||||
client.failedServers.addToFailedServers(address);
|
||||
close(e);
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.io.DataOutputStream;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.security.auth.callback.Callback;
|
||||
import javax.security.auth.callback.CallbackHandler;
|
||||
|
@ -59,6 +60,7 @@ public class HBaseSaslRpcClient {
|
|||
|
||||
private final SaslClient saslClient;
|
||||
private final boolean fallbackAllowed;
|
||||
protected final Map<String, String> saslProps;
|
||||
/**
|
||||
* Create a HBaseSaslRpcClient for an authentication method
|
||||
*
|
||||
|
@ -96,7 +98,7 @@ public class HBaseSaslRpcClient {
|
|||
Token<? extends TokenIdentifier> token, String serverPrincipal, boolean fallbackAllowed,
|
||||
String rpcProtection) throws IOException {
|
||||
this.fallbackAllowed = fallbackAllowed;
|
||||
SaslUtil.initSaslProperties(rpcProtection);
|
||||
saslProps = SaslUtil.initSaslProperties(rpcProtection);
|
||||
switch (method) {
|
||||
case DIGEST:
|
||||
if (LOG.isDebugEnabled())
|
||||
|
@ -138,13 +140,13 @@ public class HBaseSaslRpcClient {
|
|||
String saslDefaultRealm, CallbackHandler saslClientCallbackHandler)
|
||||
throws IOException {
|
||||
return Sasl.createSaslClient(mechanismNames, null, null, saslDefaultRealm,
|
||||
SaslUtil.SASL_PROPS, saslClientCallbackHandler);
|
||||
saslProps, saslClientCallbackHandler);
|
||||
}
|
||||
|
||||
protected SaslClient createKerberosSaslClient(String[] mechanismNames,
|
||||
String userFirstPart, String userSecondPart) throws IOException {
|
||||
return Sasl.createSaslClient(mechanismNames, null, userFirstPart,
|
||||
userSecondPart, SaslUtil.SASL_PROPS, null);
|
||||
userSecondPart, saslProps, null);
|
||||
}
|
||||
|
||||
private static void readStatus(DataInputStream inStream) throws IOException {
|
||||
|
|
|
@ -41,6 +41,7 @@ import javax.security.sasl.SaslException;
|
|||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
|
||||
/**
|
||||
|
@ -58,6 +59,7 @@ public class SaslClientHandler extends ChannelDuplexHandler {
|
|||
* Used for client or server's token to send or receive from each other.
|
||||
*/
|
||||
private final SaslClient saslClient;
|
||||
private final Map<String, String> saslProps;
|
||||
private final SaslExceptionHandler exceptionHandler;
|
||||
private final SaslSuccessfulConnectHandler successfulConnectHandler;
|
||||
private byte[] saslToken;
|
||||
|
@ -67,8 +69,6 @@ public class SaslClientHandler extends ChannelDuplexHandler {
|
|||
private Random random;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
*
|
||||
* @param ticket the ugi
|
||||
* @param method auth method
|
||||
* @param token for Sasl
|
||||
|
@ -76,8 +76,6 @@ public class SaslClientHandler extends ChannelDuplexHandler {
|
|||
* @param fallbackAllowed True if server may also fall back to less secure connection
|
||||
* @param rpcProtection Quality of protection. Can be 'authentication', 'integrity' or
|
||||
* 'privacy'.
|
||||
* @param exceptionHandler handler for exceptions
|
||||
* @param successfulConnectHandler handler for succesful connects
|
||||
* @throws java.io.IOException if handler could not be created
|
||||
*/
|
||||
public SaslClientHandler(UserGroupInformation ticket, AuthMethod method,
|
||||
|
@ -90,7 +88,7 @@ public class SaslClientHandler extends ChannelDuplexHandler {
|
|||
this.exceptionHandler = exceptionHandler;
|
||||
this.successfulConnectHandler = successfulConnectHandler;
|
||||
|
||||
SaslUtil.initSaslProperties(rpcProtection);
|
||||
saslProps = SaslUtil.initSaslProperties(rpcProtection);
|
||||
switch (method) {
|
||||
case DIGEST:
|
||||
if (LOG.isDebugEnabled())
|
||||
|
@ -125,32 +123,23 @@ public class SaslClientHandler extends ChannelDuplexHandler {
|
|||
|
||||
/**
|
||||
* Create a Digest Sasl client
|
||||
*
|
||||
* @param mechanismNames names of mechanisms
|
||||
* @param saslDefaultRealm default realm for sasl
|
||||
* @param saslClientCallbackHandler handler for the client
|
||||
* @return new SaslClient
|
||||
* @throws java.io.IOException if creation went wrong
|
||||
*/
|
||||
protected SaslClient createDigestSaslClient(String[] mechanismNames, String saslDefaultRealm,
|
||||
CallbackHandler saslClientCallbackHandler) throws IOException {
|
||||
return Sasl.createSaslClient(mechanismNames, null, null, saslDefaultRealm, SaslUtil.SASL_PROPS,
|
||||
return Sasl.createSaslClient(mechanismNames, null, null, saslDefaultRealm, saslProps,
|
||||
saslClientCallbackHandler);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create Kerberos client
|
||||
*
|
||||
* @param mechanismNames names of mechanisms
|
||||
* @param userFirstPart first part of username
|
||||
* @param userSecondPart second part of username
|
||||
* @return new SaslClient
|
||||
* @throws java.io.IOException if fails
|
||||
*/
|
||||
protected SaslClient createKerberosSaslClient(String[] mechanismNames, String userFirstPart,
|
||||
String userSecondPart) throws IOException {
|
||||
return Sasl
|
||||
.createSaslClient(mechanismNames, null, userFirstPart, userSecondPart, SaslUtil.SASL_PROPS,
|
||||
.createSaslClient(mechanismNames, null, userFirstPart, userSecondPart, saslProps,
|
||||
null);
|
||||
}
|
||||
|
||||
|
@ -269,11 +258,6 @@ public class SaslClientHandler extends ChannelDuplexHandler {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write SASL token
|
||||
* @param ctx to write to
|
||||
* @param saslToken to write
|
||||
*/
|
||||
private void writeSaslToken(final ChannelHandlerContext ctx, byte[] saslToken) {
|
||||
ByteBuf b = ctx.alloc().buffer(4 + saslToken.length);
|
||||
b.writeInt(saslToken.length);
|
||||
|
@ -290,9 +274,6 @@ public class SaslClientHandler extends ChannelDuplexHandler {
|
|||
|
||||
/**
|
||||
* Get the read status
|
||||
*
|
||||
* @param inStream to read
|
||||
* @throws org.apache.hadoop.ipc.RemoteException if status was not success
|
||||
*/
|
||||
private static void readStatus(ByteBuf inStream) throws RemoteException {
|
||||
int status = inStream.readInt(); // read status
|
||||
|
@ -360,7 +341,6 @@ public class SaslClientHandler extends ChannelDuplexHandler {
|
|||
*
|
||||
* @param retryCount current retry count
|
||||
* @param random to create new backoff with
|
||||
* @param cause of fail
|
||||
*/
|
||||
public void handle(int retryCount, Random random, Throwable cause);
|
||||
}
|
||||
|
|
|
@ -32,24 +32,31 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|||
public class SaslUtil {
|
||||
private static final Log log = LogFactory.getLog(SaslUtil.class);
|
||||
public static final String SASL_DEFAULT_REALM = "default";
|
||||
public static final Map<String, String> SASL_PROPS =
|
||||
new TreeMap<String, String>();
|
||||
public static final int SWITCH_TO_SIMPLE_AUTH = -88;
|
||||
|
||||
public static enum QualityOfProtection {
|
||||
public enum QualityOfProtection {
|
||||
AUTHENTICATION("auth"),
|
||||
INTEGRITY("auth-int"),
|
||||
PRIVACY("auth-conf");
|
||||
|
||||
public final String saslQop;
|
||||
private final String saslQop;
|
||||
|
||||
private QualityOfProtection(String saslQop) {
|
||||
QualityOfProtection(String saslQop) {
|
||||
this.saslQop = saslQop;
|
||||
}
|
||||
|
||||
public String getSaslQop() {
|
||||
return saslQop;
|
||||
}
|
||||
|
||||
public boolean matches(String stringQop) {
|
||||
if (saslQop.equals(stringQop)) {
|
||||
log.warn("Use authentication/integrity/privacy as value for rpc protection "
|
||||
+ "configurations instead of auth/auth-int/auth-conf.");
|
||||
return true;
|
||||
}
|
||||
return name().equalsIgnoreCase(stringQop);
|
||||
}
|
||||
}
|
||||
|
||||
/** Splitting fully qualified Kerberos name into parts */
|
||||
|
@ -71,40 +78,39 @@ public class SaslUtil {
|
|||
|
||||
/**
|
||||
* Returns {@link org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection}
|
||||
* corresponding to the given {@code stringQop} value. Returns null if value is
|
||||
* invalid.
|
||||
* corresponding to the given {@code stringQop} value.
|
||||
* @throws IllegalArgumentException If stringQop doesn't match any QOP.
|
||||
*/
|
||||
public static QualityOfProtection getQop(String stringQop) {
|
||||
QualityOfProtection qop = null;
|
||||
if (QualityOfProtection.AUTHENTICATION.name().toLowerCase().equals(stringQop)
|
||||
|| QualityOfProtection.AUTHENTICATION.saslQop.equals(stringQop)) {
|
||||
qop = QualityOfProtection.AUTHENTICATION;
|
||||
} else if (QualityOfProtection.INTEGRITY.name().toLowerCase().equals(stringQop)
|
||||
|| QualityOfProtection.INTEGRITY.saslQop.equals(stringQop)) {
|
||||
qop = QualityOfProtection.INTEGRITY;
|
||||
} else if (QualityOfProtection.PRIVACY.name().toLowerCase().equals(stringQop)
|
||||
|| QualityOfProtection.PRIVACY.saslQop.equals(stringQop)) {
|
||||
qop = QualityOfProtection.PRIVACY;
|
||||
for (QualityOfProtection qop : QualityOfProtection.values()) {
|
||||
if (qop.matches(stringQop)) {
|
||||
return qop;
|
||||
}
|
||||
}
|
||||
if (qop == null) {
|
||||
throw new IllegalArgumentException("Invalid qop: " + stringQop
|
||||
+ ". It must be one of 'authentication', 'integrity', 'privacy'.");
|
||||
}
|
||||
if (QualityOfProtection.AUTHENTICATION.saslQop.equals(stringQop)
|
||||
|| QualityOfProtection.INTEGRITY.saslQop.equals(stringQop)
|
||||
|| QualityOfProtection.PRIVACY.saslQop.equals(stringQop)) {
|
||||
log.warn("Use authentication/integrity/privacy as value for rpc protection "
|
||||
+ "configurations instead of auth/auth-int/auth-conf.");
|
||||
}
|
||||
return qop;
|
||||
throw new IllegalArgumentException("Invalid qop: " + stringQop
|
||||
+ ". It must be one of 'authentication', 'integrity', 'privacy'.");
|
||||
}
|
||||
|
||||
static void initSaslProperties(String rpcProtection) {
|
||||
QualityOfProtection saslQOP = getQop(rpcProtection);
|
||||
if (saslQOP == null) {
|
||||
saslQOP = QualityOfProtection.AUTHENTICATION;
|
||||
/**
|
||||
* @param rpcProtection Value of 'hbase.rpc.protection' configuration.
|
||||
* @return Map with values for SASL properties.
|
||||
*/
|
||||
static Map<String, String> initSaslProperties(String rpcProtection) {
|
||||
String saslQop;
|
||||
if (rpcProtection.isEmpty()) {
|
||||
saslQop = QualityOfProtection.AUTHENTICATION.getSaslQop();
|
||||
} else {
|
||||
String[] qops = rpcProtection.split(",");
|
||||
StringBuilder saslQopBuilder = new StringBuilder();
|
||||
for (int i = 0; i < qops.length; ++i) {
|
||||
QualityOfProtection qop = getQop(qops[i]);
|
||||
saslQopBuilder.append(",").append(qop.getSaslQop());
|
||||
}
|
||||
saslQop = saslQopBuilder.substring(1); // remove first ','
|
||||
}
|
||||
SaslUtil.SASL_PROPS.put(Sasl.QOP, saslQOP.getSaslQop());
|
||||
SaslUtil.SASL_PROPS.put(Sasl.SERVER_AUTH, "true");
|
||||
Map<String, String> saslProps = new TreeMap<>();
|
||||
saslProps.put(Sasl.QOP, saslQop);
|
||||
saslProps.put(Sasl.SERVER_AUTH, "true");
|
||||
return saslProps;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
/*
|
||||
*
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -63,12 +62,12 @@ import com.google.common.base.Strings;
|
|||
|
||||
@Category({SecurityTests.class, SmallTests.class})
|
||||
public class TestHBaseSaslRpcClient {
|
||||
|
||||
|
||||
static {
|
||||
System.setProperty("java.security.krb5.realm", "DOMAIN.COM");
|
||||
System.setProperty("java.security.krb5.kdc", "DOMAIN.COM");
|
||||
}
|
||||
|
||||
|
||||
static final String DEFAULT_USER_NAME = "principal";
|
||||
static final String DEFAULT_USER_PASSWORD = "password";
|
||||
|
||||
|
@ -84,33 +83,18 @@ public class TestHBaseSaslRpcClient {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSaslQOPNotEmpty() throws Exception {
|
||||
public void testSaslClientUsesGivenRpcProtection() throws Exception {
|
||||
Token<? extends TokenIdentifier> token = createTokenMockWithCredentials(DEFAULT_USER_NAME,
|
||||
DEFAULT_USER_PASSWORD);
|
||||
// default QOP is authentication
|
||||
new HBaseSaslRpcClient(AuthMethod.DIGEST, token, "principal/host@DOMAIN.COM", false);
|
||||
assertTrue(SaslUtil.SASL_PROPS.get(Sasl.QOP).equals(SaslUtil.QualityOfProtection.
|
||||
AUTHENTICATION.getSaslQop()));
|
||||
|
||||
// check with specific QOPs
|
||||
new HBaseSaslRpcClient(AuthMethod.DIGEST, token, "principal/host@DOMAIN.COM", false,
|
||||
"authentication");
|
||||
assertTrue(SaslUtil.SASL_PROPS.get(Sasl.QOP).equals(SaslUtil.QualityOfProtection.
|
||||
AUTHENTICATION.getSaslQop()));
|
||||
|
||||
new HBaseSaslRpcClient(AuthMethod.DIGEST, token, "principal/host@DOMAIN.COM", false,
|
||||
"privacy");
|
||||
assertTrue(SaslUtil.SASL_PROPS.get(Sasl.QOP).equals(SaslUtil.QualityOfProtection.
|
||||
PRIVACY.getSaslQop()));
|
||||
|
||||
new HBaseSaslRpcClient(AuthMethod.DIGEST, token, "principal/host@DOMAIN.COM", false,
|
||||
"integrity");
|
||||
assertTrue(SaslUtil.SASL_PROPS.get(Sasl.QOP).equals(SaslUtil.QualityOfProtection.
|
||||
INTEGRITY.getSaslQop()));
|
||||
|
||||
exception.expect(IllegalArgumentException.class);
|
||||
new HBaseSaslRpcClient(AuthMethod.DIGEST, token, "principal/host@DOMAIN.COM", false,
|
||||
"wrongvalue");
|
||||
for (SaslUtil.QualityOfProtection qop : SaslUtil.QualityOfProtection.values()) {
|
||||
String negotiatedQop = new HBaseSaslRpcClient(AuthMethod.DIGEST, token,
|
||||
"principal/host@DOMAIN.COM", false, qop.name()) {
|
||||
public String getQop() {
|
||||
return saslProps.get(Sasl.QOP);
|
||||
}
|
||||
}.getQop();
|
||||
assertEquals(negotiatedQop, qop.getSaslQop());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -210,7 +194,7 @@ public class TestHBaseSaslRpcClient {
|
|||
boolean inState = false;
|
||||
boolean outState = false;
|
||||
|
||||
HBaseSaslRpcClient rpcClient = new HBaseSaslRpcClient(AuthMethod.DIGEST,
|
||||
HBaseSaslRpcClient rpcClient = new HBaseSaslRpcClient(AuthMethod.DIGEST,
|
||||
createTokenMockWithCredentials(principal, password), principal, false) {
|
||||
@Override
|
||||
public SaslClient createDigestSaslClient(String[] mechanismNames,
|
||||
|
@ -225,7 +209,7 @@ public class TestHBaseSaslRpcClient {
|
|||
return Mockito.mock(SaslClient.class);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
try {
|
||||
rpcClient.getInputStream(Mockito.mock(InputStream.class));
|
||||
} catch(IOException ex) {
|
||||
|
@ -245,7 +229,7 @@ public class TestHBaseSaslRpcClient {
|
|||
|
||||
private boolean assertIOExceptionThenSaslClientIsNull(String principal, String password) {
|
||||
try {
|
||||
new HBaseSaslRpcClient(AuthMethod.DIGEST,
|
||||
new HBaseSaslRpcClient(AuthMethod.DIGEST,
|
||||
createTokenMockWithCredentials(principal, password), principal, false) {
|
||||
@Override
|
||||
public SaslClient createDigestSaslClient(String[] mechanismNames,
|
||||
|
@ -253,7 +237,7 @@ public class TestHBaseSaslRpcClient {
|
|||
throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public SaslClient createKerberosSaslClient(String[] mechanismNames,
|
||||
String userFirstPart, String userSecondPart) throws IOException {
|
||||
|
@ -279,7 +263,7 @@ public class TestHBaseSaslRpcClient {
|
|||
private boolean assertSuccessCreationDigestPrincipal(String principal, String password) {
|
||||
HBaseSaslRpcClient rpcClient = null;
|
||||
try {
|
||||
rpcClient = new HBaseSaslRpcClient(AuthMethod.DIGEST,
|
||||
rpcClient = new HBaseSaslRpcClient(AuthMethod.DIGEST,
|
||||
createTokenMockWithCredentials(principal, password), principal, false);
|
||||
} catch(Exception ex) {
|
||||
LOG.error(ex.getMessage(), ex);
|
|
@ -0,0 +1,59 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.security;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import org.apache.hadoop.hbase.testclassification.SecurityTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import javax.security.sasl.Sasl;
|
||||
import java.util.Map;
|
||||
|
||||
@Category({SecurityTests.class, SmallTests.class})
|
||||
public class TestSaslUtil {
|
||||
|
||||
@Rule
|
||||
public ExpectedException exception = ExpectedException.none();
|
||||
|
||||
@Test
|
||||
public void testInitSaslProperties() {
|
||||
Map<String, String> props;
|
||||
|
||||
props = SaslUtil.initSaslProperties("integrity");
|
||||
assertEquals(props.get(Sasl.QOP), "auth-int");
|
||||
|
||||
props = SaslUtil.initSaslProperties("privacy,authentication");
|
||||
assertEquals(props.get(Sasl.QOP), "auth-conf,auth");
|
||||
|
||||
props = SaslUtil.initSaslProperties("integrity,authentication,privacy");
|
||||
assertEquals(props.get(Sasl.QOP), "auth-int,auth,auth-conf");
|
||||
|
||||
exception.expect(IllegalArgumentException.class);
|
||||
props = SaslUtil.initSaslProperties("xyz");
|
||||
assertEquals(props.get(Sasl.QOP), "auth");
|
||||
|
||||
exception.expect(IllegalArgumentException.class);
|
||||
props = SaslUtil.initSaslProperties("");
|
||||
assertEquals(props.get(Sasl.QOP), "auth");
|
||||
}
|
||||
}
|
|
@ -1413,7 +1413,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
}
|
||||
saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
|
||||
.getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
|
||||
SaslUtil.SASL_PROPS, new SaslDigestCallbackHandler(
|
||||
HBaseSaslRpcServer.getSaslProps(), new SaslDigestCallbackHandler(
|
||||
secretManager, this));
|
||||
break;
|
||||
default:
|
||||
|
@ -1433,7 +1433,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
public Object run() throws SaslException {
|
||||
saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
|
||||
.getMechanismName(), names[0], names[1],
|
||||
SaslUtil.SASL_PROPS, new SaslGssCallbackHandler());
|
||||
HBaseSaslRpcServer.getSaslProps(), new SaslGssCallbackHandler());
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.security;
|
|||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.security.auth.callback.Callback;
|
||||
import javax.security.auth.callback.CallbackHandler;
|
||||
|
@ -48,11 +49,17 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
|||
public class HBaseSaslRpcServer {
|
||||
private static final Log LOG = LogFactory.getLog(HBaseSaslRpcServer.class);
|
||||
|
||||
private static Map<String, String> saslProps = null;
|
||||
|
||||
public static void init(Configuration conf) {
|
||||
SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection",
|
||||
saslProps = SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection",
|
||||
QualityOfProtection.AUTHENTICATION.name().toLowerCase()));
|
||||
}
|
||||
|
||||
public static Map<String, String> getSaslProps() {
|
||||
return saslProps;
|
||||
}
|
||||
|
||||
public static <T extends TokenIdentifier> T getIdentifier(String id,
|
||||
SecretManager<T> secretManager) throws InvalidToken {
|
||||
byte[] tokenId = SaslUtil.decodeIdentifier(id);
|
||||
|
|
|
@ -29,6 +29,7 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
|
@ -37,33 +38,32 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
|
||||
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClient;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClientImpl;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
|
||||
import org.apache.hadoop.hbase.ipc.TestDelayedRpc.TestDelayedImplementation;
|
||||
import org.apache.hadoop.hbase.ipc.TestDelayedRpc.TestThread;
|
||||
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos;
|
||||
import org.apache.hadoop.hbase.testclassification.SecurityTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.minikdc.MiniKdc;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.ExpectedException;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.protobuf.BlockingRpcChannel;
|
||||
import com.google.protobuf.BlockingService;
|
||||
|
||||
@Category({ SecurityTests.class, SmallTests.class })
|
||||
public class TestSecureRPC {
|
||||
import javax.security.sasl.SaslException;
|
||||
|
||||
public abstract class AbstractTestSecureIPC {
|
||||
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
|
@ -71,11 +71,20 @@ public class TestSecureRPC {
|
|||
.getPath());
|
||||
|
||||
private static MiniKdc KDC;
|
||||
|
||||
private static String HOST = "localhost";
|
||||
|
||||
private static String PRINCIPAL;
|
||||
|
||||
String krbKeytab;
|
||||
String krbPrincipal;
|
||||
UserGroupInformation ugi;
|
||||
Configuration clientConf;
|
||||
Configuration serverConf;
|
||||
|
||||
abstract Class<? extends RpcClient> getRpcClientClass();
|
||||
|
||||
@Rule
|
||||
public ExpectedException exception = ExpectedException.none();
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
Properties conf = MiniKdc.createConf();
|
||||
|
@ -96,32 +105,18 @@ public class TestSecureRPC {
|
|||
TEST_UTIL.cleanupTestDir();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRpc() throws Exception {
|
||||
testRpcCallWithEnabledKerberosSaslAuth(RpcClientImpl.class);
|
||||
@Before
|
||||
public void setUpTest() throws Exception {
|
||||
krbKeytab = getKeytabFileForTesting();
|
||||
krbPrincipal = getPrincipalForTesting();
|
||||
ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
|
||||
clientConf = getSecuredConfiguration();
|
||||
clientConf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, getRpcClientClass().getName());
|
||||
serverConf = getSecuredConfiguration();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRpcWithInsecureFallback() throws Exception {
|
||||
testRpcFallbackToSimpleAuth(RpcClientImpl.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAsyncRpc() throws Exception {
|
||||
testRpcCallWithEnabledKerberosSaslAuth(AsyncRpcClient.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAsyncRpcWithInsecureFallback() throws Exception {
|
||||
testRpcFallbackToSimpleAuth(AsyncRpcClient.class);
|
||||
}
|
||||
|
||||
private void testRpcCallWithEnabledKerberosSaslAuth(Class<? extends RpcClient> rpcImplClass)
|
||||
throws Exception {
|
||||
String krbKeytab = getKeytabFileForTesting();
|
||||
String krbPrincipal = getPrincipalForTesting();
|
||||
|
||||
UserGroupInformation ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
|
||||
public void testRpcCallWithEnabledKerberosSaslAuth() throws Exception {
|
||||
UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
|
||||
|
||||
// check that the login user is okay:
|
||||
|
@ -129,8 +124,52 @@ public class TestSecureRPC {
|
|||
assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
|
||||
assertEquals(krbPrincipal, ugi.getUserName());
|
||||
|
||||
Configuration clientConf = getSecuredConfiguration();
|
||||
callRpcService(rpcImplClass, User.create(ugi2), clientConf, false);
|
||||
callRpcService(User.create(ugi2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRpcFallbackToSimpleAuth() throws Exception {
|
||||
String clientUsername = "testuser";
|
||||
UserGroupInformation clientUgi = UserGroupInformation.createUserForTesting(clientUsername,
|
||||
new String[]{clientUsername});
|
||||
|
||||
// check that the client user is insecure
|
||||
assertNotSame(ugi, clientUgi);
|
||||
assertEquals(AuthenticationMethod.SIMPLE, clientUgi.getAuthenticationMethod());
|
||||
assertEquals(clientUsername, clientUgi.getUserName());
|
||||
|
||||
clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
|
||||
serverConf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, true);
|
||||
callRpcService(User.create(clientUgi));
|
||||
}
|
||||
|
||||
void setRpcProtection(String clientProtection, String serverProtection) {
|
||||
clientConf.set("hbase.rpc.protection", clientProtection);
|
||||
serverConf.set("hbase.rpc.protection", serverProtection);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test various combinations of Server and Client qops.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testSaslWithCommonQop() throws Exception {
|
||||
setRpcProtection("privacy,authentication", "authentication");
|
||||
callRpcService(User.create(ugi));
|
||||
|
||||
setRpcProtection("authentication", "privacy,authentication");
|
||||
callRpcService(User.create(ugi));
|
||||
|
||||
setRpcProtection("integrity,authentication", "privacy,authentication");
|
||||
callRpcService(User.create(ugi));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSaslNoCommonQop() throws Exception {
|
||||
exception.expect(SaslException.class);
|
||||
exception.expectMessage("No common protection layer between client and server");
|
||||
setRpcProtection("integrity", "privacy");
|
||||
callRpcService(User.create(ugi));
|
||||
}
|
||||
|
||||
private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal)
|
||||
|
@ -142,15 +181,11 @@ public class TestSecureRPC {
|
|||
return UserGroupInformation.getLoginUser();
|
||||
}
|
||||
|
||||
private void callRpcService(Class<? extends RpcClient> rpcImplClass, User clientUser,
|
||||
Configuration clientConf, boolean allowInsecureFallback)
|
||||
throws Exception {
|
||||
Configuration clientConfCopy = new Configuration(clientConf);
|
||||
clientConfCopy.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcImplClass.getName());
|
||||
|
||||
Configuration conf = getSecuredConfiguration();
|
||||
conf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, allowInsecureFallback);
|
||||
|
||||
/**
|
||||
* Sets up a RPC Server and a Client. Does a RPC checks the result. If an exception is thrown
|
||||
* from the stub, this function will throw root cause of that exception.
|
||||
*/
|
||||
private void callRpcService(User clientUser) throws Exception {
|
||||
SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
|
||||
Mockito.when(securityInfoMock.getServerPrincipal())
|
||||
.thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
|
||||
|
@ -165,10 +200,10 @@ public class TestSecureRPC {
|
|||
RpcServerInterface rpcServer =
|
||||
new RpcServer(null, "testSecuredDelayedRpc",
|
||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), isa,
|
||||
conf, new FifoRpcScheduler(conf, 1));
|
||||
serverConf, new FifoRpcScheduler(serverConf, 1));
|
||||
rpcServer.start();
|
||||
RpcClient rpcClient =
|
||||
RpcClientFactory.createClient(clientConfCopy, HConstants.DEFAULT_CLUSTER_ID.toString());
|
||||
RpcClientFactory.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString());
|
||||
try {
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
if (address == null) {
|
||||
|
@ -177,13 +212,29 @@ public class TestSecureRPC {
|
|||
BlockingRpcChannel channel =
|
||||
rpcClient.createBlockingRpcChannel(
|
||||
ServerName.valueOf(address.getHostName(), address.getPort(),
|
||||
System.currentTimeMillis()), clientUser, 5000);
|
||||
System.currentTimeMillis()), clientUser, 0);
|
||||
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
|
||||
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
|
||||
List<Integer> results = new ArrayList<Integer>();
|
||||
List<Integer> results = new ArrayList<>();
|
||||
TestThread th1 = new TestThread(stub, true, results);
|
||||
final Throwable exception[] = new Throwable[1];
|
||||
Collections.synchronizedList(new ArrayList<Throwable>());
|
||||
Thread.UncaughtExceptionHandler exceptionHandler =
|
||||
new Thread.UncaughtExceptionHandler() {
|
||||
public void uncaughtException(Thread th, Throwable ex) {
|
||||
exception[0] = ex;
|
||||
}
|
||||
};
|
||||
th1.setUncaughtExceptionHandler(exceptionHandler);
|
||||
th1.start();
|
||||
th1.join();
|
||||
if (exception[0] != null) {
|
||||
// throw root cause.
|
||||
while (exception[0].getCause() != null) {
|
||||
exception[0] = exception[0].getCause();
|
||||
}
|
||||
throw (Exception) exception[0];
|
||||
}
|
||||
|
||||
assertEquals(0xDEADBEEF, results.get(0).intValue());
|
||||
} finally {
|
||||
|
@ -191,26 +242,4 @@ public class TestSecureRPC {
|
|||
rpcServer.stop();
|
||||
}
|
||||
}
|
||||
|
||||
public void testRpcFallbackToSimpleAuth(Class<? extends RpcClient> rpcImplClass) throws Exception {
|
||||
String krbKeytab = getKeytabFileForTesting();
|
||||
String krbPrincipal = getPrincipalForTesting();
|
||||
|
||||
UserGroupInformation ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
|
||||
assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
|
||||
assertEquals(krbPrincipal, ugi.getUserName());
|
||||
|
||||
String clientUsername = "testuser";
|
||||
UserGroupInformation clientUgi = UserGroupInformation.createUserForTesting(clientUsername,
|
||||
new String[]{clientUsername});
|
||||
|
||||
// check that the client user is insecure
|
||||
assertNotSame(ugi, clientUgi);
|
||||
assertEquals(AuthenticationMethod.SIMPLE, clientUgi.getAuthenticationMethod());
|
||||
assertEquals(clientUsername, clientUgi.getUserName());
|
||||
|
||||
Configuration clientConf = new Configuration();
|
||||
clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
|
||||
callRpcService(rpcImplClass, User.create(clientUgi), clientConf, true);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.security;
|
||||
|
||||
import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClient;
|
||||
import org.apache.hadoop.hbase.testclassification.SecurityTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ SecurityTests.class, SmallTests.class })
|
||||
public class TestAsyncSecureIPC extends AbstractTestSecureIPC {
|
||||
|
||||
Class<? extends RpcClient> getRpcClientClass() {
|
||||
return AsyncRpcClient.class;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.security;
|
||||
|
||||
import org.apache.hadoop.hbase.ipc.RpcClient;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClientImpl;
|
||||
import org.apache.hadoop.hbase.testclassification.SecurityTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ SecurityTests.class, SmallTests.class })
|
||||
public class TestSecureIPC extends AbstractTestSecureIPC {
|
||||
|
||||
Class<? extends RpcClient> getRpcClientClass() {
|
||||
return RpcClientImpl.class;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue