HBASE-18115 Move SaslServer creation to HBaseSaslRpcServer
This commit is contained in:
parent
97484f2aaf
commit
efc7edc81a
|
@ -24,6 +24,7 @@ import java.util.TreeMap;
|
||||||
import javax.security.sasl.Sasl;
|
import javax.security.sasl.Sasl;
|
||||||
import javax.security.sasl.SaslClient;
|
import javax.security.sasl.SaslClient;
|
||||||
import javax.security.sasl.SaslException;
|
import javax.security.sasl.SaslException;
|
||||||
|
import javax.security.sasl.SaslServer;
|
||||||
|
|
||||||
import org.apache.commons.codec.binary.Base64;
|
import org.apache.commons.codec.binary.Base64;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -97,7 +98,7 @@ public class SaslUtil {
|
||||||
* @param rpcProtection Value of 'hbase.rpc.protection' configuration.
|
* @param rpcProtection Value of 'hbase.rpc.protection' configuration.
|
||||||
* @return Map with values for SASL properties.
|
* @return Map with values for SASL properties.
|
||||||
*/
|
*/
|
||||||
static Map<String, String> initSaslProperties(String rpcProtection) {
|
public static Map<String, String> initSaslProperties(String rpcProtection) {
|
||||||
String saslQop;
|
String saslQop;
|
||||||
if (rpcProtection.isEmpty()) {
|
if (rpcProtection.isEmpty()) {
|
||||||
saslQop = QualityOfProtection.AUTHENTICATION.getSaslQop();
|
saslQop = QualityOfProtection.AUTHENTICATION.getSaslQop();
|
||||||
|
@ -123,4 +124,12 @@ public class SaslUtil {
|
||||||
LOG.error("Error disposing of SASL client", e);
|
LOG.error("Error disposing of SASL client", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void safeDispose(SaslServer saslServer) {
|
||||||
|
try {
|
||||||
|
saslServer.dispose();
|
||||||
|
} catch (SaslException e) {
|
||||||
|
LOG.error("Error disposing of SASL server", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,8 +30,10 @@ import java.nio.channels.GatheringByteChannel;
|
||||||
import java.nio.channels.ReadableByteChannel;
|
import java.nio.channels.ReadableByteChannel;
|
||||||
import java.nio.channels.WritableByteChannel;
|
import java.nio.channels.WritableByteChannel;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.atomic.LongAdder;
|
import java.util.concurrent.atomic.LongAdder;
|
||||||
|
|
||||||
|
@ -55,7 +57,8 @@ import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||||
import org.apache.hadoop.hbase.nio.MultiByteBuff;
|
import org.apache.hadoop.hbase.nio.MultiByteBuff;
|
||||||
import org.apache.hadoop.hbase.nio.SingleByteBuff;
|
import org.apache.hadoop.hbase.nio.SingleByteBuff;
|
||||||
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
||||||
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
|
import org.apache.hadoop.hbase.security.SaslUtil;
|
||||||
|
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.security.UserProvider;
|
import org.apache.hadoop.hbase.security.UserProvider;
|
||||||
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
|
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
|
||||||
|
@ -112,6 +115,7 @@ public abstract class RpcServer implements RpcServerInterface,
|
||||||
protected static final Log AUDITLOG = LogFactory.getLog("SecurityLogger."
|
protected static final Log AUDITLOG = LogFactory.getLog("SecurityLogger."
|
||||||
+ Server.class.getName());
|
+ Server.class.getName());
|
||||||
protected SecretManager<TokenIdentifier> secretManager;
|
protected SecretManager<TokenIdentifier> secretManager;
|
||||||
|
protected final Map<String, String> saslProps;
|
||||||
protected ServiceAuthorizationManager authManager;
|
protected ServiceAuthorizationManager authManager;
|
||||||
|
|
||||||
/** This is set to Call object before Handler invokes an RPC and ybdie
|
/** This is set to Call object before Handler invokes an RPC and ybdie
|
||||||
|
@ -307,7 +311,10 @@ public abstract class RpcServer implements RpcServerInterface,
|
||||||
this.userProvider = UserProvider.instantiate(conf);
|
this.userProvider = UserProvider.instantiate(conf);
|
||||||
this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
|
this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
|
||||||
if (isSecurityEnabled) {
|
if (isSecurityEnabled) {
|
||||||
HBaseSaslRpcServer.init(conf);
|
saslProps = SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection",
|
||||||
|
QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)));
|
||||||
|
} else {
|
||||||
|
saslProps = Collections.emptyMap();
|
||||||
}
|
}
|
||||||
|
|
||||||
this.scheduler = scheduler;
|
this.scheduler = scheduler;
|
||||||
|
|
|
@ -29,18 +29,14 @@ import java.nio.ByteBuffer;
|
||||||
import java.nio.channels.Channels;
|
import java.nio.channels.Channels;
|
||||||
import java.nio.channels.ReadableByteChannel;
|
import java.nio.channels.ReadableByteChannel;
|
||||||
import java.security.GeneralSecurityException;
|
import java.security.GeneralSecurityException;
|
||||||
import java.security.PrivilegedExceptionAction;
|
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
|
||||||
import javax.security.sasl.Sasl;
|
|
||||||
import javax.security.sasl.SaslException;
|
|
||||||
import javax.security.sasl.SaslServer;
|
|
||||||
|
|
||||||
import org.apache.commons.crypto.cipher.CryptoCipherFactory;
|
import org.apache.commons.crypto.cipher.CryptoCipherFactory;
|
||||||
import org.apache.commons.crypto.random.CryptoRandom;
|
import org.apache.commons.crypto.random.CryptoRandom;
|
||||||
import org.apache.commons.crypto.random.CryptoRandomFactory;
|
import org.apache.commons.crypto.random.CryptoRandomFactory;
|
||||||
import org.apache.hadoop.hbase.CellScanner;
|
import org.apache.hadoop.hbase.CellScanner;
|
||||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.client.VersionInfoUtil;
|
import org.apache.hadoop.hbase.client.VersionInfoUtil;
|
||||||
import org.apache.hadoop.hbase.codec.Codec;
|
import org.apache.hadoop.hbase.codec.Codec;
|
||||||
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
|
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
|
||||||
|
@ -51,8 +47,6 @@ import org.apache.hadoop.hbase.nio.SingleByteBuff;
|
||||||
import org.apache.hadoop.hbase.security.AccessDeniedException;
|
import org.apache.hadoop.hbase.security.AccessDeniedException;
|
||||||
import org.apache.hadoop.hbase.security.AuthMethod;
|
import org.apache.hadoop.hbase.security.AuthMethod;
|
||||||
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
|
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
|
||||||
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
|
|
||||||
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
|
|
||||||
import org.apache.hadoop.hbase.security.SaslStatus;
|
import org.apache.hadoop.hbase.security.SaslStatus;
|
||||||
import org.apache.hadoop.hbase.security.SaslUtil;
|
import org.apache.hadoop.hbase.security.SaslUtil;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
|
@ -89,6 +83,7 @@ import org.apache.htrace.TraceInfo;
|
||||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
|
||||||
value="VO_VOLATILE_INCREMENT",
|
value="VO_VOLATILE_INCREMENT",
|
||||||
justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
|
justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
|
||||||
|
@InterfaceAudience.Private
|
||||||
abstract class ServerRpcConnection implements Closeable {
|
abstract class ServerRpcConnection implements Closeable {
|
||||||
/** */
|
/** */
|
||||||
protected final RpcServer rpcServer;
|
protected final RpcServer rpcServer;
|
||||||
|
@ -121,7 +116,7 @@ abstract class ServerRpcConnection implements Closeable {
|
||||||
// When is this set? FindBugs wants to know! Says NP
|
// When is this set? FindBugs wants to know! Says NP
|
||||||
private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
|
private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
|
||||||
protected boolean useSasl;
|
protected boolean useSasl;
|
||||||
protected SaslServer saslServer;
|
protected HBaseSaslRpcServer saslServer;
|
||||||
protected CryptoAES cryptoAES;
|
protected CryptoAES cryptoAES;
|
||||||
protected boolean useWrap = false;
|
protected boolean useWrap = false;
|
||||||
protected boolean useCryptoAesWrap = false;
|
protected boolean useCryptoAesWrap = false;
|
||||||
|
@ -131,7 +126,6 @@ abstract class ServerRpcConnection implements Closeable {
|
||||||
|
|
||||||
protected boolean retryImmediatelySupported = false;
|
protected boolean retryImmediatelySupported = false;
|
||||||
|
|
||||||
private UserGroupInformation attemptingUser = null; // user name before auth
|
|
||||||
protected User user = null;
|
protected User user = null;
|
||||||
protected UserGroupInformation ugi = null;
|
protected UserGroupInformation ugi = null;
|
||||||
|
|
||||||
|
@ -164,13 +158,13 @@ abstract class ServerRpcConnection implements Closeable {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected String getFatalConnectionString(final int version, final byte authByte) {
|
private String getFatalConnectionString(final int version, final byte authByte) {
|
||||||
return "serverVersion=" + RpcServer.CURRENT_VERSION +
|
return "serverVersion=" + RpcServer.CURRENT_VERSION +
|
||||||
", clientVersion=" + version + ", authMethod=" + authByte +
|
", clientVersion=" + version + ", authMethod=" + authByte +
|
||||||
", authSupported=" + (authMethod != null) + " from " + toString();
|
", authSupported=" + (authMethod != null) + " from " + toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected UserGroupInformation getAuthorizedUgi(String authorizedId)
|
private UserGroupInformation getAuthorizedUgi(String authorizedId)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
UserGroupInformation authorizedUgi;
|
UserGroupInformation authorizedUgi;
|
||||||
if (authMethod == AuthMethod.DIGEST) {
|
if (authMethod == AuthMethod.DIGEST) {
|
||||||
|
@ -193,7 +187,7 @@ abstract class ServerRpcConnection implements Closeable {
|
||||||
* Set up cell block codecs
|
* Set up cell block codecs
|
||||||
* @throws FatalConnectionException
|
* @throws FatalConnectionException
|
||||||
*/
|
*/
|
||||||
protected void setupCellBlockCodecs(final ConnectionHeader header)
|
private void setupCellBlockCodecs(final ConnectionHeader header)
|
||||||
throws FatalConnectionException {
|
throws FatalConnectionException {
|
||||||
// TODO: Plug in other supported decoders.
|
// TODO: Plug in other supported decoders.
|
||||||
if (!header.hasCellBlockCodecClass()) return;
|
if (!header.hasCellBlockCodecClass()) return;
|
||||||
|
@ -218,13 +212,13 @@ abstract class ServerRpcConnection implements Closeable {
|
||||||
*
|
*
|
||||||
* @throws FatalConnectionException
|
* @throws FatalConnectionException
|
||||||
*/
|
*/
|
||||||
protected void setupCryptoCipher(final ConnectionHeader header,
|
private void setupCryptoCipher(final ConnectionHeader header,
|
||||||
RPCProtos.ConnectionHeaderResponse.Builder chrBuilder)
|
RPCProtos.ConnectionHeaderResponse.Builder chrBuilder)
|
||||||
throws FatalConnectionException {
|
throws FatalConnectionException {
|
||||||
// If simple auth, return
|
// If simple auth, return
|
||||||
if (saslServer == null) return;
|
if (saslServer == null) return;
|
||||||
// check if rpc encryption with Crypto AES
|
// check if rpc encryption with Crypto AES
|
||||||
String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
|
String qop = saslServer.getNegotiatedQop();
|
||||||
boolean isEncryption = SaslUtil.QualityOfProtection.PRIVACY
|
boolean isEncryption = SaslUtil.QualityOfProtection.PRIVACY
|
||||||
.getSaslQop().equalsIgnoreCase(qop);
|
.getSaslQop().equalsIgnoreCase(qop);
|
||||||
boolean isCryptoAesEncryption = isEncryption && this.rpcServer.conf.getBoolean(
|
boolean isCryptoAesEncryption = isEncryption && this.rpcServer.conf.getBoolean(
|
||||||
|
@ -289,7 +283,7 @@ abstract class ServerRpcConnection implements Closeable {
|
||||||
return (bytes.length == 0) ? ByteString.EMPTY : ByteString.copyFrom(bytes);
|
return (bytes.length == 0) ? ByteString.EMPTY : ByteString.copyFrom(bytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected UserGroupInformation createUser(ConnectionHeader head) {
|
private UserGroupInformation createUser(ConnectionHeader head) {
|
||||||
UserGroupInformation ugi = null;
|
UserGroupInformation ugi = null;
|
||||||
|
|
||||||
if (!head.hasUserInfo()) {
|
if (!head.hasUserInfo()) {
|
||||||
|
@ -316,14 +310,10 @@ abstract class ServerRpcConnection implements Closeable {
|
||||||
return ugi;
|
return ugi;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void disposeSasl() {
|
protected final void disposeSasl() {
|
||||||
if (saslServer != null) {
|
if (saslServer != null) {
|
||||||
try {
|
saslServer.dispose();
|
||||||
saslServer.dispose();
|
saslServer = null;
|
||||||
saslServer = null;
|
|
||||||
} catch (SaslException ignored) {
|
|
||||||
// Ignored. This is being disposed of anyway.
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -373,45 +363,11 @@ abstract class ServerRpcConnection implements Closeable {
|
||||||
byte[] replyToken;
|
byte[] replyToken;
|
||||||
try {
|
try {
|
||||||
if (saslServer == null) {
|
if (saslServer == null) {
|
||||||
switch (authMethod) {
|
saslServer =
|
||||||
case DIGEST:
|
new HBaseSaslRpcServer(authMethod, rpcServer.saslProps, rpcServer.secretManager);
|
||||||
if (this.rpcServer.secretManager == null) {
|
|
||||||
throw new AccessDeniedException(
|
|
||||||
"Server is not configured to do DIGEST authentication.");
|
|
||||||
}
|
|
||||||
saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
|
|
||||||
.getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
|
|
||||||
HBaseSaslRpcServer.getSaslProps(), new SaslDigestCallbackHandler(
|
|
||||||
this.rpcServer.secretManager, ugi -> attemptingUser = ugi));
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
UserGroupInformation current = UserGroupInformation.getCurrentUser();
|
|
||||||
String fullName = current.getUserName();
|
|
||||||
if (RpcServer.LOG.isDebugEnabled()) {
|
|
||||||
RpcServer.LOG.debug("Kerberos principal name is " + fullName);
|
|
||||||
}
|
|
||||||
final String names[] = SaslUtil.splitKerberosName(fullName);
|
|
||||||
if (names.length != 3) {
|
|
||||||
throw new AccessDeniedException(
|
|
||||||
"Kerberos principal name does NOT have the expected "
|
|
||||||
+ "hostname part: " + fullName);
|
|
||||||
}
|
|
||||||
current.doAs(new PrivilegedExceptionAction<Object>() {
|
|
||||||
@Override
|
|
||||||
public Object run() throws SaslException {
|
|
||||||
saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
|
|
||||||
.getMechanismName(), names[0], names[1],
|
|
||||||
HBaseSaslRpcServer.getSaslProps(), new SaslGssCallbackHandler());
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
if (saslServer == null)
|
|
||||||
throw new AccessDeniedException(
|
|
||||||
"Unable to find SASL server implementation for "
|
|
||||||
+ authMethod.getMechanismName());
|
|
||||||
if (RpcServer.LOG.isDebugEnabled()) {
|
if (RpcServer.LOG.isDebugEnabled()) {
|
||||||
RpcServer.LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName());
|
RpcServer.LOG
|
||||||
|
.debug("Created SASL server with mechanism = " + authMethod.getMechanismName());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (RpcServer.LOG.isDebugEnabled()) {
|
if (RpcServer.LOG.isDebugEnabled()) {
|
||||||
|
@ -435,7 +391,8 @@ abstract class ServerRpcConnection implements Closeable {
|
||||||
this.rpcServer.metrics.authenticationFailure();
|
this.rpcServer.metrics.authenticationFailure();
|
||||||
String clientIP = this.toString();
|
String clientIP = this.toString();
|
||||||
// attempting user could be null
|
// attempting user could be null
|
||||||
RpcServer.AUDITLOG.warn(RpcServer.AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
|
RpcServer.AUDITLOG
|
||||||
|
.warn(RpcServer.AUTH_FAILED_FOR + clientIP + ":" + saslServer.getAttemptingUser());
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
if (replyToken != null) {
|
if (replyToken != null) {
|
||||||
|
@ -447,13 +404,12 @@ abstract class ServerRpcConnection implements Closeable {
|
||||||
null);
|
null);
|
||||||
}
|
}
|
||||||
if (saslServer.isComplete()) {
|
if (saslServer.isComplete()) {
|
||||||
String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
|
String qop = saslServer.getNegotiatedQop();
|
||||||
useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
|
useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
|
||||||
ugi = getAuthorizedUgi(saslServer.getAuthorizationID());
|
ugi = getAuthorizedUgi(saslServer.getAuthorizationID());
|
||||||
if (RpcServer.LOG.isDebugEnabled()) {
|
if (RpcServer.LOG.isDebugEnabled()) {
|
||||||
RpcServer.LOG.debug("SASL server context established. Authenticated client: "
|
RpcServer.LOG.debug("SASL server context established. Authenticated client: " + ugi +
|
||||||
+ ugi + ". Negotiated QoP is "
|
". Negotiated QoP is " + qop);
|
||||||
+ saslServer.getNegotiatedProperty(Sasl.QOP));
|
|
||||||
}
|
}
|
||||||
this.rpcServer.metrics.authenticationSuccess();
|
this.rpcServer.metrics.authenticationSuccess();
|
||||||
RpcServer.AUDITLOG.info(RpcServer.AUTH_SUCCESSFUL_FOR + ugi);
|
RpcServer.AUDITLOG.info(RpcServer.AUTH_SUCCESSFUL_FOR + ugi);
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/*
|
/**
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -15,15 +15,13 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.security;
|
package org.apache.hadoop.hbase.security;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Locale;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.function.Consumer;
|
|
||||||
|
|
||||||
import javax.security.auth.callback.Callback;
|
import javax.security.auth.callback.Callback;
|
||||||
import javax.security.auth.callback.CallbackHandler;
|
import javax.security.auth.callback.CallbackHandler;
|
||||||
|
@ -32,33 +30,101 @@ import javax.security.auth.callback.PasswordCallback;
|
||||||
import javax.security.auth.callback.UnsupportedCallbackException;
|
import javax.security.auth.callback.UnsupportedCallbackException;
|
||||||
import javax.security.sasl.AuthorizeCallback;
|
import javax.security.sasl.AuthorizeCallback;
|
||||||
import javax.security.sasl.RealmCallback;
|
import javax.security.sasl.RealmCallback;
|
||||||
|
import javax.security.sasl.Sasl;
|
||||||
|
import javax.security.sasl.SaslException;
|
||||||
|
import javax.security.sasl.SaslServer;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.SecretManager;
|
import org.apache.hadoop.security.token.SecretManager;
|
||||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A utility class for dealing with SASL on RPC server
|
* A utility class that encapsulates SASL logic for RPC server. Copied from
|
||||||
|
* <code>org.apache.hadoop.security</code>
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class HBaseSaslRpcServer {
|
public class HBaseSaslRpcServer {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(HBaseSaslRpcServer.class);
|
private static final Log LOG = LogFactory.getLog(HBaseSaslRpcServer.class);
|
||||||
|
|
||||||
private static Map<String, String> saslProps = null;
|
private final SaslServer saslServer;
|
||||||
|
|
||||||
public static void init(Configuration conf) {
|
private UserGroupInformation attemptingUser; // user name before auth
|
||||||
saslProps = SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection",
|
|
||||||
QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)));
|
public HBaseSaslRpcServer(AuthMethod method, Map<String, String> saslProps,
|
||||||
|
SecretManager<TokenIdentifier> secretManager) throws IOException {
|
||||||
|
switch (method) {
|
||||||
|
case DIGEST:
|
||||||
|
if (secretManager == null) {
|
||||||
|
throw new AccessDeniedException("Server is not configured to do DIGEST authentication.");
|
||||||
|
}
|
||||||
|
saslServer = Sasl.createSaslServer(AuthMethod.DIGEST.getMechanismName(), null,
|
||||||
|
SaslUtil.SASL_DEFAULT_REALM, saslProps, new SaslDigestCallbackHandler(secretManager));
|
||||||
|
break;
|
||||||
|
case KERBEROS:
|
||||||
|
UserGroupInformation current = UserGroupInformation.getCurrentUser();
|
||||||
|
String fullName = current.getUserName();
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Kerberos principal name is " + fullName);
|
||||||
|
}
|
||||||
|
String[] names = SaslUtil.splitKerberosName(fullName);
|
||||||
|
if (names.length != 3) {
|
||||||
|
throw new AccessDeniedException(
|
||||||
|
"Kerberos principal name does NOT have the expected " + "hostname part: " + fullName);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
saslServer = current.doAs(new PrivilegedExceptionAction<SaslServer>() {
|
||||||
|
@Override
|
||||||
|
public SaslServer run() throws SaslException {
|
||||||
|
return Sasl.createSaslServer(AuthMethod.KERBEROS.getMechanismName(), names[0],
|
||||||
|
names[1], saslProps, new SaslGssCallbackHandler());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// should not happen
|
||||||
|
throw new AssertionError(e);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new IOException("Unknown authentication method " + method);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Map<String, String> getSaslProps() {
|
public boolean isComplete() {
|
||||||
return saslProps;
|
return saslServer.isComplete();
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte[] evaluateResponse(byte[] response) throws SaslException {
|
||||||
|
return saslServer.evaluateResponse(response);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Release resources used by wrapped saslServer */
|
||||||
|
public void dispose() {
|
||||||
|
SaslUtil.safeDispose(saslServer);
|
||||||
|
}
|
||||||
|
|
||||||
|
public UserGroupInformation getAttemptingUser() {
|
||||||
|
return attemptingUser;
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte[] wrap(byte[] buf, int off, int len) throws SaslException {
|
||||||
|
return saslServer.wrap(buf, off, len);
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte[] unwrap(byte[] buf, int off, int len) throws SaslException {
|
||||||
|
return saslServer.unwrap(buf, off, len);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getNegotiatedQop() {
|
||||||
|
return (String) saslServer.getNegotiatedProperty(Sasl.QOP);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getAuthorizationID() {
|
||||||
|
return saslServer.getAuthorizationID();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <T extends TokenIdentifier> T getIdentifier(String id,
|
public static <T extends TokenIdentifier> T getIdentifier(String id,
|
||||||
|
@ -66,25 +132,19 @@ public class HBaseSaslRpcServer {
|
||||||
byte[] tokenId = SaslUtil.decodeIdentifier(id);
|
byte[] tokenId = SaslUtil.decodeIdentifier(id);
|
||||||
T tokenIdentifier = secretManager.createIdentifier();
|
T tokenIdentifier = secretManager.createIdentifier();
|
||||||
try {
|
try {
|
||||||
tokenIdentifier.readFields(new DataInputStream(new ByteArrayInputStream(
|
tokenIdentifier.readFields(new DataInputStream(new ByteArrayInputStream(tokenId)));
|
||||||
tokenId)));
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw (InvalidToken) new InvalidToken(
|
throw (InvalidToken) new InvalidToken("Can't de-serialize tokenIdentifier").initCause(e);
|
||||||
"Can't de-serialize tokenIdentifier").initCause(e);
|
|
||||||
}
|
}
|
||||||
return tokenIdentifier;
|
return tokenIdentifier;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/** CallbackHandler for SASL DIGEST-MD5 mechanism */
|
/** CallbackHandler for SASL DIGEST-MD5 mechanism */
|
||||||
public static class SaslDigestCallbackHandler implements CallbackHandler {
|
private class SaslDigestCallbackHandler implements CallbackHandler {
|
||||||
private SecretManager<TokenIdentifier> secretManager;
|
private SecretManager<TokenIdentifier> secretManager;
|
||||||
private Consumer<UserGroupInformation> attemptingUserConsumer;
|
|
||||||
|
|
||||||
public SaslDigestCallbackHandler(SecretManager<TokenIdentifier> secretManager,
|
public SaslDigestCallbackHandler(SecretManager<TokenIdentifier> secretManager) {
|
||||||
Consumer<UserGroupInformation> attemptingUserConsumer) {
|
|
||||||
this.secretManager = secretManager;
|
this.secretManager = secretManager;
|
||||||
this.attemptingUserConsumer = attemptingUserConsumer;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private char[] getPassword(TokenIdentifier tokenid) throws InvalidToken {
|
private char[] getPassword(TokenIdentifier tokenid) throws InvalidToken {
|
||||||
|
@ -93,8 +153,7 @@ public class HBaseSaslRpcServer {
|
||||||
|
|
||||||
/** {@inheritDoc} */
|
/** {@inheritDoc} */
|
||||||
@Override
|
@Override
|
||||||
public void handle(Callback[] callbacks) throws InvalidToken,
|
public void handle(Callback[] callbacks) throws InvalidToken, UnsupportedCallbackException {
|
||||||
UnsupportedCallbackException {
|
|
||||||
NameCallback nc = null;
|
NameCallback nc = null;
|
||||||
PasswordCallback pc = null;
|
PasswordCallback pc = null;
|
||||||
AuthorizeCallback ac = null;
|
AuthorizeCallback ac = null;
|
||||||
|
@ -108,15 +167,14 @@ public class HBaseSaslRpcServer {
|
||||||
} else if (callback instanceof RealmCallback) {
|
} else if (callback instanceof RealmCallback) {
|
||||||
continue; // realm is ignored
|
continue; // realm is ignored
|
||||||
} else {
|
} else {
|
||||||
throw new UnsupportedCallbackException(callback,
|
throw new UnsupportedCallbackException(callback, "Unrecognized SASL DIGEST-MD5 Callback");
|
||||||
"Unrecognized SASL DIGEST-MD5 Callback");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (pc != null) {
|
if (pc != null) {
|
||||||
TokenIdentifier tokenIdentifier = getIdentifier(nc.getDefaultName(), secretManager);
|
TokenIdentifier tokenIdentifier = getIdentifier(nc.getDefaultName(), secretManager);
|
||||||
char[] password = getPassword(tokenIdentifier);
|
char[] password = getPassword(tokenIdentifier);
|
||||||
UserGroupInformation user = tokenIdentifier.getUser(); // may throw exception
|
UserGroupInformation user = tokenIdentifier.getUser(); // may throw exception
|
||||||
attemptingUserConsumer.accept(user);
|
attemptingUser = user;
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("SASL server DIGEST-MD5 callback: setting password " + "for client: " +
|
LOG.trace("SASL server DIGEST-MD5 callback: setting password " + "for client: " +
|
||||||
tokenIdentifier.getUser());
|
tokenIdentifier.getUser());
|
||||||
|
@ -133,10 +191,9 @@ public class HBaseSaslRpcServer {
|
||||||
}
|
}
|
||||||
if (ac.isAuthorized()) {
|
if (ac.isAuthorized()) {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
String username =
|
String username = getIdentifier(authzid, secretManager).getUser().getUserName();
|
||||||
getIdentifier(authzid, secretManager).getUser().getUserName();
|
LOG.trace(
|
||||||
LOG.trace("SASL server DIGEST-MD5 callback: setting "
|
"SASL server DIGEST-MD5 callback: setting " + "canonicalized client ID: " + username);
|
||||||
+ "canonicalized client ID: " + username);
|
|
||||||
}
|
}
|
||||||
ac.setAuthorizedID(authzid);
|
ac.setAuthorizedID(authzid);
|
||||||
}
|
}
|
||||||
|
@ -145,19 +202,17 @@ public class HBaseSaslRpcServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** CallbackHandler for SASL GSSAPI Kerberos mechanism */
|
/** CallbackHandler for SASL GSSAPI Kerberos mechanism */
|
||||||
public static class SaslGssCallbackHandler implements CallbackHandler {
|
private static class SaslGssCallbackHandler implements CallbackHandler {
|
||||||
|
|
||||||
/** {@inheritDoc} */
|
/** {@inheritDoc} */
|
||||||
@Override
|
@Override
|
||||||
public void handle(Callback[] callbacks) throws
|
public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
|
||||||
UnsupportedCallbackException {
|
|
||||||
AuthorizeCallback ac = null;
|
AuthorizeCallback ac = null;
|
||||||
for (Callback callback : callbacks) {
|
for (Callback callback : callbacks) {
|
||||||
if (callback instanceof AuthorizeCallback) {
|
if (callback instanceof AuthorizeCallback) {
|
||||||
ac = (AuthorizeCallback) callback;
|
ac = (AuthorizeCallback) callback;
|
||||||
} else {
|
} else {
|
||||||
throw new UnsupportedCallbackException(callback,
|
throw new UnsupportedCallbackException(callback, "Unrecognized SASL GSSAPI Callback");
|
||||||
"Unrecognized SASL GSSAPI Callback");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (ac != null) {
|
if (ac != null) {
|
||||||
|
@ -169,9 +224,10 @@ public class HBaseSaslRpcServer {
|
||||||
ac.setAuthorized(false);
|
ac.setAuthorized(false);
|
||||||
}
|
}
|
||||||
if (ac.isAuthorized()) {
|
if (ac.isAuthorized()) {
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("SASL server GSSAPI callback: setting "
|
LOG.debug(
|
||||||
+ "canonicalized client ID: " + authzid);
|
"SASL server GSSAPI callback: setting " + "canonicalized client ID: " + authzid);
|
||||||
|
}
|
||||||
ac.setAuthorizedID(authzid);
|
ac.setAuthorizedID(authzid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue