HBASE-5732 Remove the SecureRPCEngine and merge the security-related logic in the core engine; I MISSED A FEW DELETES
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1337398 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ec57be0ada
commit
cae042bb81
|
@ -1,522 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
|
||||
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod;
|
||||
import org.apache.hadoop.hbase.security.KerberosInfo;
|
||||
import org.apache.hadoop.hbase.security.TokenInfo;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
|
||||
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.PoolMap;
|
||||
import org.apache.hadoop.io.*;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.security.token.TokenSelector;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
||||
import javax.net.SocketFactory;
|
||||
import java.io.*;
|
||||
import java.net.*;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.HashMap;
|
||||
import java.util.Hashtable;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* A client for an IPC service, which support SASL authentication of connections
|
||||
* using either GSSAPI for Kerberos authentication or DIGEST-MD5 for
|
||||
* authentication using signed tokens.
|
||||
*
|
||||
* <p>
|
||||
* This is a copy of org.apache.hadoop.ipc.Client from secure Hadoop,
|
||||
* reworked to remove code duplicated with
|
||||
* {@link org.apache.hadoop.hbase.HBaseClient}. This is part of the loadable
|
||||
* {@link SecureRpcEngine}, and only functions in connection with a
|
||||
* {@link SecureServer} instance.
|
||||
* </p>
|
||||
*/
|
||||
public class SecureClient extends HBaseClient {
|
||||
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog("org.apache.hadoop.ipc.SecureClient");
|
||||
|
||||
protected static Map<String,TokenSelector<? extends TokenIdentifier>> tokenHandlers =
|
||||
new HashMap<String,TokenSelector<? extends TokenIdentifier>>();
|
||||
static {
|
||||
tokenHandlers.put(AuthenticationTokenIdentifier.AUTH_TOKEN_TYPE.toString(),
|
||||
new AuthenticationTokenSelector());
|
||||
}
|
||||
|
||||
/** Thread that reads responses and notifies callers. Each connection owns a
|
||||
* socket connected to a remote address. Calls are multiplexed through this
|
||||
* socket: responses may be delivered out of order. */
|
||||
protected class SecureConnection extends Connection {
|
||||
private InetSocketAddress server; // server ip:port
|
||||
private String serverPrincipal; // server's krb5 principal name
|
||||
private SecureConnectionHeader header; // connection header
|
||||
private AuthMethod authMethod; // authentication method
|
||||
private boolean useSasl;
|
||||
private Token<? extends TokenIdentifier> token;
|
||||
private HBaseSaslRpcClient saslRpcClient;
|
||||
private int reloginMaxBackoff; // max pause before relogin on sasl failure
|
||||
|
||||
public SecureConnection(ConnectionId remoteId) throws IOException {
|
||||
super(remoteId);
|
||||
this.server = remoteId.getAddress();
|
||||
|
||||
User ticket = remoteId.getTicket();
|
||||
Class<?> protocol = remoteId.getProtocol();
|
||||
this.useSasl = User.isSecurityEnabled();
|
||||
if (useSasl && protocol != null) {
|
||||
TokenInfo tokenInfo = protocol.getAnnotation(TokenInfo.class);
|
||||
if (tokenInfo != null) {
|
||||
TokenSelector<? extends TokenIdentifier> tokenSelector =
|
||||
tokenHandlers.get(tokenInfo.value());
|
||||
if (tokenSelector != null) {
|
||||
token = tokenSelector.selectToken(new Text(clusterId),
|
||||
ticket.getUGI().getTokens());
|
||||
} else if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("No token selector found for type "+tokenInfo.value());
|
||||
}
|
||||
}
|
||||
KerberosInfo krbInfo = protocol.getAnnotation(KerberosInfo.class);
|
||||
if (krbInfo != null) {
|
||||
String serverKey = krbInfo.serverPrincipal();
|
||||
if (serverKey == null) {
|
||||
throw new IOException(
|
||||
"Can't obtain server Kerberos config key from KerberosInfo");
|
||||
}
|
||||
serverPrincipal = SecurityUtil.getServerPrincipal(
|
||||
conf.get(serverKey), server.getAddress().getCanonicalHostName().toLowerCase());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("RPC Server Kerberos principal name for protocol="
|
||||
+ protocol.getCanonicalName() + " is " + serverPrincipal);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!useSasl) {
|
||||
authMethod = AuthMethod.SIMPLE;
|
||||
} else if (token != null) {
|
||||
authMethod = AuthMethod.DIGEST;
|
||||
} else {
|
||||
authMethod = AuthMethod.KERBEROS;
|
||||
}
|
||||
|
||||
header = new SecureConnectionHeader(
|
||||
protocol == null ? null : protocol.getName(), ticket, authMethod);
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Use " + authMethod + " authentication for protocol "
|
||||
+ protocol.getSimpleName());
|
||||
|
||||
reloginMaxBackoff = conf.getInt("hbase.security.relogin.maxbackoff", 5000);
|
||||
}
|
||||
|
||||
private synchronized void disposeSasl() {
|
||||
if (saslRpcClient != null) {
|
||||
try {
|
||||
saslRpcClient.dispose();
|
||||
saslRpcClient = null;
|
||||
} catch (IOException ioe) {
|
||||
LOG.info("Error disposing of SASL client", ioe);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void sendParam(Call call) {
|
||||
if (shouldCloseConnection.get()) {
|
||||
return;
|
||||
}
|
||||
// For serializing the data to be written.
|
||||
|
||||
final DataOutputBuffer d = new DataOutputBuffer();
|
||||
try {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(getName() + " sending #" + call.id);
|
||||
}
|
||||
d.writeInt(0xdeadbeef); // placeholder for data length
|
||||
d.writeInt(call.id);
|
||||
call.param.write(d);
|
||||
byte[] data = d.getData();
|
||||
int dataLength = d.getLength();
|
||||
// fill in the placeholder
|
||||
Bytes.putInt(data, 0, dataLength - 4);
|
||||
//noinspection SynchronizeOnNonFinalField
|
||||
synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
|
||||
out.write(data, 0, dataLength);
|
||||
out.flush();
|
||||
}
|
||||
} catch(IOException e) {
|
||||
markClosed(e);
|
||||
} finally {
|
||||
//the buffer is just an in-memory buffer, but it is still polite to
|
||||
// close early
|
||||
IOUtils.closeStream(d);
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
|
||||
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
|
||||
UserGroupInformation currentUser =
|
||||
UserGroupInformation.getCurrentUser();
|
||||
UserGroupInformation realUser = currentUser.getRealUser();
|
||||
return authMethod == AuthMethod.KERBEROS &&
|
||||
loginUser != null &&
|
||||
//Make sure user logged in using Kerberos either keytab or TGT
|
||||
loginUser.hasKerberosCredentials() &&
|
||||
// relogin only in case it is the login user (e.g. JT)
|
||||
// or superuser (like oozie).
|
||||
(loginUser.equals(currentUser) || loginUser.equals(realUser));
|
||||
}
|
||||
|
||||
private synchronized boolean setupSaslConnection(final InputStream in2,
|
||||
final OutputStream out2)
|
||||
throws IOException {
|
||||
saslRpcClient = new HBaseSaslRpcClient(authMethod, token, serverPrincipal);
|
||||
return saslRpcClient.saslConnect(in2, out2);
|
||||
}
|
||||
|
||||
/**
|
||||
* If multiple clients with the same principal try to connect
|
||||
* to the same server at the same time, the server assumes a
|
||||
* replay attack is in progress. This is a feature of kerberos.
|
||||
* In order to work around this, what is done is that the client
|
||||
* backs off randomly and tries to initiate the connection
|
||||
* again.
|
||||
* The other problem is to do with ticket expiry. To handle that,
|
||||
* a relogin is attempted.
|
||||
*/
|
||||
private synchronized void handleSaslConnectionFailure(
|
||||
final int currRetries,
|
||||
final int maxRetries, final Exception ex, final Random rand,
|
||||
final User user)
|
||||
throws IOException, InterruptedException{
|
||||
user.runAs(new PrivilegedExceptionAction<Object>() {
|
||||
public Object run() throws IOException, InterruptedException {
|
||||
closeConnection();
|
||||
if (shouldAuthenticateOverKrb()) {
|
||||
if (currRetries < maxRetries) {
|
||||
LOG.debug("Exception encountered while connecting to " +
|
||||
"the server : " + ex);
|
||||
//try re-login
|
||||
if (UserGroupInformation.isLoginKeytabBased()) {
|
||||
UserGroupInformation.getLoginUser().reloginFromKeytab();
|
||||
} else {
|
||||
UserGroupInformation.getLoginUser().reloginFromTicketCache();
|
||||
}
|
||||
disposeSasl();
|
||||
//have granularity of milliseconds
|
||||
//we are sleeping with the Connection lock held but since this
|
||||
//connection instance is being used for connecting to the server
|
||||
//in question, it is okay
|
||||
Thread.sleep((rand.nextInt(reloginMaxBackoff) + 1));
|
||||
return null;
|
||||
} else {
|
||||
String msg = "Couldn't setup connection for " +
|
||||
UserGroupInformation.getLoginUser().getUserName() +
|
||||
" to " + serverPrincipal;
|
||||
LOG.warn(msg);
|
||||
throw (IOException) new IOException(msg).initCause(ex);
|
||||
}
|
||||
} else {
|
||||
LOG.warn("Exception encountered while connecting to " +
|
||||
"the server : " + ex);
|
||||
}
|
||||
if (ex instanceof RemoteException)
|
||||
throw (RemoteException)ex;
|
||||
throw new IOException(ex);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized void setupIOstreams()
|
||||
throws IOException, InterruptedException {
|
||||
if (socket != null || shouldCloseConnection.get()) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Connecting to "+server);
|
||||
}
|
||||
short numRetries = 0;
|
||||
final short MAX_RETRIES = 5;
|
||||
Random rand = null;
|
||||
while (true) {
|
||||
setupConnection();
|
||||
InputStream inStream = NetUtils.getInputStream(socket);
|
||||
OutputStream outStream = NetUtils.getOutputStream(socket);
|
||||
writeRpcHeader(outStream);
|
||||
if (useSasl) {
|
||||
final InputStream in2 = inStream;
|
||||
final OutputStream out2 = outStream;
|
||||
User ticket = remoteId.getTicket();
|
||||
if (authMethod == AuthMethod.KERBEROS) {
|
||||
UserGroupInformation ugi = ticket.getUGI();
|
||||
if (ugi != null && ugi.getRealUser() != null) {
|
||||
ticket = User.create(ugi.getRealUser());
|
||||
}
|
||||
}
|
||||
boolean continueSasl = false;
|
||||
try {
|
||||
continueSasl =
|
||||
ticket.runAs(new PrivilegedExceptionAction<Boolean>() {
|
||||
@Override
|
||||
public Boolean run() throws IOException {
|
||||
return setupSaslConnection(in2, out2);
|
||||
}
|
||||
});
|
||||
} catch (Exception ex) {
|
||||
if (rand == null) {
|
||||
rand = new Random();
|
||||
}
|
||||
handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, rand,
|
||||
ticket);
|
||||
continue;
|
||||
}
|
||||
if (continueSasl) {
|
||||
// Sasl connect is successful. Let's set up Sasl i/o streams.
|
||||
inStream = saslRpcClient.getInputStream(inStream);
|
||||
outStream = saslRpcClient.getOutputStream(outStream);
|
||||
} else {
|
||||
// fall back to simple auth because server told us so.
|
||||
authMethod = AuthMethod.SIMPLE;
|
||||
header = new SecureConnectionHeader(header.getProtocol(),
|
||||
header.getUser(), authMethod);
|
||||
useSasl = false;
|
||||
}
|
||||
}
|
||||
this.in = new DataInputStream(new BufferedInputStream
|
||||
(new PingInputStream(inStream)));
|
||||
this.out = new DataOutputStream
|
||||
(new BufferedOutputStream(outStream));
|
||||
writeHeader();
|
||||
|
||||
// update last activity time
|
||||
touch();
|
||||
|
||||
// start the receiver thread after the socket connection has been set up
|
||||
start();
|
||||
return;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
markClosed(e);
|
||||
close();
|
||||
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/* Write the RPC header */
|
||||
private void writeRpcHeader(OutputStream outStream) throws IOException {
|
||||
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));
|
||||
// Write out the header, version and authentication method
|
||||
out.write(SecureServer.HEADER.array());
|
||||
out.write(SecureServer.CURRENT_VERSION);
|
||||
authMethod.write(out);
|
||||
out.flush();
|
||||
}
|
||||
|
||||
/**
|
||||
* Write the protocol header for each connection
|
||||
* Out is not synchronized because only the first thread does this.
|
||||
*/
|
||||
private void writeHeader() throws IOException {
|
||||
// Write out the ConnectionHeader
|
||||
DataOutputBuffer buf = new DataOutputBuffer();
|
||||
header.write(buf);
|
||||
|
||||
// Write out the payload length
|
||||
int bufLen = buf.getLength();
|
||||
out.writeInt(bufLen);
|
||||
out.write(buf.getData(), 0, bufLen);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void receiveResponse() {
|
||||
if (shouldCloseConnection.get()) {
|
||||
return;
|
||||
}
|
||||
touch();
|
||||
|
||||
try {
|
||||
int id = in.readInt(); // try to read an id
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug(getName() + " got value #" + id);
|
||||
|
||||
Call call = calls.remove(id);
|
||||
|
||||
int state = in.readInt(); // read call status
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("call #"+id+" state is " + state);
|
||||
}
|
||||
if (state == Status.SUCCESS.state) {
|
||||
Writable value = ReflectionUtils.newInstance(valueClass, conf);
|
||||
value.readFields(in); // read value
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("call #"+id+", response is:\n"+value.toString());
|
||||
}
|
||||
call.setValue(value);
|
||||
} else if (state == Status.ERROR.state) {
|
||||
call.setException(new RemoteException(WritableUtils.readString(in),
|
||||
WritableUtils.readString(in)));
|
||||
} else if (state == Status.FATAL.state) {
|
||||
// Close the connection
|
||||
markClosed(new RemoteException(WritableUtils.readString(in),
|
||||
WritableUtils.readString(in)));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) {
|
||||
// Clean up open calls but don't treat this as a fatal condition,
|
||||
// since we expect certain responses to not make it by the specified
|
||||
// {@link ConnectionId#rpcTimeout}.
|
||||
closeException = e;
|
||||
} else {
|
||||
// Since the server did not respond within the default ping interval
|
||||
// time, treat this as a fatal condition and close this connection
|
||||
markClosed(e);
|
||||
}
|
||||
} finally {
|
||||
if (remoteId.rpcTimeout > 0) {
|
||||
cleanupCalls(remoteId.rpcTimeout);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Close the connection. */
|
||||
protected synchronized void close() {
|
||||
if (!shouldCloseConnection.get()) {
|
||||
LOG.error("The connection is not in the closed state");
|
||||
return;
|
||||
}
|
||||
|
||||
// release the resources
|
||||
// first thing to do;take the connection out of the connection list
|
||||
synchronized (connections) {
|
||||
if (connections.get(remoteId) == this) {
|
||||
connections.remove(remoteId);
|
||||
}
|
||||
}
|
||||
|
||||
// close the streams and therefore the socket
|
||||
IOUtils.closeStream(out);
|
||||
IOUtils.closeStream(in);
|
||||
disposeSasl();
|
||||
|
||||
// clean up all calls
|
||||
if (closeException == null) {
|
||||
if (!calls.isEmpty()) {
|
||||
LOG.warn(
|
||||
"A connection is closed for no cause and calls are not empty");
|
||||
|
||||
// clean up calls anyway
|
||||
closeException = new IOException("Unexpected closed connection");
|
||||
cleanupCalls();
|
||||
}
|
||||
} else {
|
||||
// log the info
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("closing ipc connection to " + server + ": " +
|
||||
closeException.getMessage(),closeException);
|
||||
}
|
||||
|
||||
// cleanup calls
|
||||
cleanupCalls();
|
||||
}
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug(getName() + ": closed");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct an IPC client whose values are of the given {@link org.apache.hadoop.io.Writable}
|
||||
* class.
|
||||
* @param valueClass value class
|
||||
* @param conf configuration
|
||||
* @param factory socket factory
|
||||
*/
|
||||
public SecureClient(Class<? extends Writable> valueClass, Configuration conf,
|
||||
SocketFactory factory) {
|
||||
super(valueClass, conf, factory);
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct an IPC client with the default SocketFactory
|
||||
* @param valueClass value class
|
||||
* @param conf configuration
|
||||
*/
|
||||
public SecureClient(Class<? extends Writable> valueClass, Configuration conf) {
|
||||
this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SecureConnection getConnection(InetSocketAddress addr,
|
||||
Class<? extends VersionedProtocol> protocol,
|
||||
User ticket,
|
||||
int rpcTimeout,
|
||||
Call call)
|
||||
throws IOException, InterruptedException {
|
||||
if (!running.get()) {
|
||||
// the client is stopped
|
||||
throw new IOException("The client is stopped");
|
||||
}
|
||||
SecureConnection connection;
|
||||
/* we could avoid this allocation for each RPC by having a
|
||||
* connectionsId object and with set() method. We need to manage the
|
||||
* refs for keys in HashMap properly. For now its ok.
|
||||
*/
|
||||
ConnectionId remoteId = new ConnectionId(addr, protocol, ticket, rpcTimeout);
|
||||
do {
|
||||
synchronized (connections) {
|
||||
connection = (SecureConnection)connections.get(remoteId);
|
||||
if (connection == null) {
|
||||
connection = new SecureConnection(remoteId);
|
||||
connections.put(remoteId, connection);
|
||||
}
|
||||
}
|
||||
} while (!connection.addCall(call));
|
||||
|
||||
//we don't invoke the method below inside "synchronized (connections)"
|
||||
//block above. The reason for that is if the server happens to be slow,
|
||||
//it will take longer to establish a connection and that will slow the
|
||||
//entire system down.
|
||||
connection.setupIOstreams();
|
||||
return connection;
|
||||
}
|
||||
}
|
|
@ -1,118 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
||||
/**
|
||||
* The IPC connection header sent by the client to the server
|
||||
* on connection establishment. Part of the {@link SecureRpcEngine}
|
||||
* implementation.
|
||||
*/
|
||||
class SecureConnectionHeader extends ConnectionHeader {
|
||||
private User user = null;
|
||||
private AuthMethod authMethod;
|
||||
|
||||
public SecureConnectionHeader() {}
|
||||
|
||||
/**
|
||||
* Create a new {@link org.apache.hadoop.hbase.ipc.SecureConnectionHeader} with the given <code>protocol</code>
|
||||
* and {@link org.apache.hadoop.security.UserGroupInformation}.
|
||||
* @param protocol protocol used for communication between the IPC client
|
||||
* and the server
|
||||
* @param ugi {@link org.apache.hadoop.security.UserGroupInformation} of the client communicating with
|
||||
* the server
|
||||
*/
|
||||
public SecureConnectionHeader(String protocol, User user, AuthMethod authMethod) {
|
||||
this.protocol = protocol;
|
||||
this.user = user;
|
||||
this.authMethod = authMethod;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
protocol = Text.readString(in);
|
||||
if (protocol.isEmpty()) {
|
||||
protocol = null;
|
||||
}
|
||||
boolean ugiUsernamePresent = in.readBoolean();
|
||||
if (ugiUsernamePresent) {
|
||||
String username = in.readUTF();
|
||||
boolean realUserNamePresent = in.readBoolean();
|
||||
if (realUserNamePresent) {
|
||||
String realUserName = in.readUTF();
|
||||
UserGroupInformation realUserUgi =
|
||||
UserGroupInformation.createRemoteUser(realUserName);
|
||||
user = User.create(
|
||||
UserGroupInformation.createProxyUser(username, realUserUgi));
|
||||
} else {
|
||||
user = User.create(UserGroupInformation.createRemoteUser(username));
|
||||
}
|
||||
} else {
|
||||
user = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
Text.writeString(out, (protocol == null) ? "" : protocol);
|
||||
if (user != null) {
|
||||
UserGroupInformation ugi = user.getUGI();
|
||||
if (authMethod == AuthMethod.KERBEROS) {
|
||||
// Send effective user for Kerberos auth
|
||||
out.writeBoolean(true);
|
||||
out.writeUTF(ugi.getUserName());
|
||||
out.writeBoolean(false);
|
||||
} else if (authMethod == AuthMethod.DIGEST) {
|
||||
// Don't send user for token auth
|
||||
out.writeBoolean(false);
|
||||
} else {
|
||||
//Send both effective user and real user for simple auth
|
||||
out.writeBoolean(true);
|
||||
out.writeUTF(ugi.getUserName());
|
||||
if (ugi.getRealUser() != null) {
|
||||
out.writeBoolean(true);
|
||||
out.writeUTF(ugi.getRealUser().getUserName());
|
||||
} else {
|
||||
out.writeBoolean(false);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
out.writeBoolean(false);
|
||||
}
|
||||
}
|
||||
|
||||
public String getProtocol() {
|
||||
return protocol;
|
||||
}
|
||||
|
||||
public User getUser() {
|
||||
return user;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return protocol + "-" + user;
|
||||
}
|
||||
}
|
|
@ -1,437 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
|
||||
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
|
||||
import org.apache.hadoop.hbase.util.Objects;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
|
||||
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
import javax.net.SocketFactory;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.*;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* A loadable RPC engine supporting SASL authentication of connections, using
|
||||
* GSSAPI for Kerberos authentication or DIGEST-MD5 for authentication via
|
||||
* signed tokens.
|
||||
*
|
||||
* <p>
|
||||
* This is a fork of the {@code org.apache.hadoop.ipc.WriteableRpcEngine} from
|
||||
* secure Hadoop, reworked to eliminate code duplication with the existing
|
||||
* HBase {@link WritableRpcEngine}.
|
||||
* </p>
|
||||
*
|
||||
* @see SecureClient
|
||||
* @see SecureServer
|
||||
*/
|
||||
public class SecureRpcEngine implements RpcEngine {
|
||||
// Leave this out in the hadoop ipc package but keep class name. Do this
|
||||
// so that we dont' get the logging of this class's invocations by doing our
|
||||
// blanket enabling DEBUG on the o.a.h.h. package.
|
||||
protected static final Log LOG =
|
||||
LogFactory.getLog("org.apache.hadoop.ipc.SecureRpcEngine");
|
||||
|
||||
private SecureRpcEngine() {
|
||||
super();
|
||||
} // no public ctor
|
||||
|
||||
/* Cache a client using its socket factory as the hash key */
|
||||
static private class ClientCache {
|
||||
private Map<SocketFactory, SecureClient> clients =
|
||||
new HashMap<SocketFactory, SecureClient>();
|
||||
|
||||
protected ClientCache() {}
|
||||
|
||||
/**
|
||||
* Construct & cache an IPC client with the user-provided SocketFactory
|
||||
* if no cached client exists.
|
||||
*
|
||||
* @param conf Configuration
|
||||
* @param factory socket factory
|
||||
* @return an IPC client
|
||||
*/
|
||||
protected synchronized SecureClient getClient(Configuration conf,
|
||||
SocketFactory factory) {
|
||||
// Construct & cache client. The configuration is only used for timeout,
|
||||
// and Clients have connection pools. So we can either (a) lose some
|
||||
// connection pooling and leak sockets, or (b) use the same timeout for all
|
||||
// configurations. Since the IPC is usually intended globally, not
|
||||
// per-job, we choose (a).
|
||||
SecureClient client = clients.get(factory);
|
||||
if (client == null) {
|
||||
// Make an hbase client instead of hadoop Client.
|
||||
client = new SecureClient(HbaseObjectWritable.class, conf, factory);
|
||||
clients.put(factory, client);
|
||||
} else {
|
||||
client.incCount();
|
||||
}
|
||||
return client;
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct & cache an IPC client with the default SocketFactory
|
||||
* if no cached client exists.
|
||||
*
|
||||
* @param conf Configuration
|
||||
* @return an IPC client
|
||||
*/
|
||||
protected synchronized SecureClient getClient(Configuration conf) {
|
||||
return getClient(conf, SocketFactory.getDefault());
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop a RPC client connection
|
||||
* A RPC client is closed only when its reference count becomes zero.
|
||||
* @param client client to stop
|
||||
*/
|
||||
protected void stopClient(SecureClient client) {
|
||||
synchronized (this) {
|
||||
client.decCount();
|
||||
if (client.isZeroReference()) {
|
||||
clients.remove(client.getSocketFactory());
|
||||
}
|
||||
}
|
||||
if (client.isZeroReference()) {
|
||||
client.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected final static ClientCache CLIENTS = new ClientCache();
|
||||
|
||||
private static class Invoker implements InvocationHandler {
|
||||
private Class<? extends VersionedProtocol> protocol;
|
||||
private InetSocketAddress address;
|
||||
private User ticket;
|
||||
private SecureClient client;
|
||||
private boolean isClosed = false;
|
||||
final private int rpcTimeout;
|
||||
|
||||
public Invoker(Class<? extends VersionedProtocol> protocol,
|
||||
InetSocketAddress address, User ticket,
|
||||
Configuration conf, SocketFactory factory, int rpcTimeout) {
|
||||
this.protocol = protocol;
|
||||
this.address = address;
|
||||
this.ticket = ticket;
|
||||
this.client = CLIENTS.getClient(conf, factory);
|
||||
this.rpcTimeout = rpcTimeout;
|
||||
}
|
||||
|
||||
public Object invoke(Object proxy, Method method, Object[] args)
|
||||
throws Throwable {
|
||||
final boolean logDebug = LOG.isDebugEnabled();
|
||||
long startTime = 0;
|
||||
if (logDebug) {
|
||||
startTime = System.currentTimeMillis();
|
||||
}
|
||||
try {
|
||||
HbaseObjectWritable value = (HbaseObjectWritable)
|
||||
client.call(new Invocation(method, args), address,
|
||||
protocol, ticket, rpcTimeout);
|
||||
if (logDebug) {
|
||||
long callTime = System.currentTimeMillis() - startTime;
|
||||
LOG.debug("Call: " + method.getName() + " " + callTime);
|
||||
}
|
||||
return value.get();
|
||||
} catch (Throwable t) {
|
||||
// For protobuf protocols, ServiceException is expected
|
||||
if (Invocation.PROTOBUF_PROTOCOLS.contains(protocol)) {
|
||||
if (t instanceof RemoteException) {
|
||||
Throwable cause = ((RemoteException)t).unwrapRemoteException();
|
||||
throw new ServiceException(cause);
|
||||
}
|
||||
throw new ServiceException(t);
|
||||
}
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
|
||||
/* close the IPC client that's responsible for this invoker's RPCs */
|
||||
synchronized protected void close() {
|
||||
if (!isClosed) {
|
||||
isClosed = true;
|
||||
CLIENTS.stopClient(client);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct a client-side proxy object that implements the named protocol,
|
||||
* talking to a server at the named address.
|
||||
*
|
||||
* @param protocol interface
|
||||
* @param clientVersion version we are expecting
|
||||
* @param addr remote address
|
||||
* @param ticket ticket
|
||||
* @param conf configuration
|
||||
* @param factory socket factory
|
||||
* @return proxy
|
||||
* @throws java.io.IOException e
|
||||
*/
|
||||
public VersionedProtocol getProxy(
|
||||
Class<? extends VersionedProtocol> protocol, long clientVersion,
|
||||
InetSocketAddress addr, User ticket,
|
||||
Configuration conf, SocketFactory factory, int rpcTimeout)
|
||||
throws IOException {
|
||||
if (User.isSecurityEnabled()) {
|
||||
HBaseSaslRpcServer.init(conf);
|
||||
}
|
||||
VersionedProtocol proxy =
|
||||
(VersionedProtocol) Proxy.newProxyInstance(
|
||||
protocol.getClassLoader(), new Class[] { protocol },
|
||||
new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
|
||||
try {
|
||||
long serverVersion = proxy.getProtocolVersion(protocol.getName(),
|
||||
clientVersion);
|
||||
if (serverVersion != clientVersion) {
|
||||
throw new HBaseRPC.VersionMismatch(protocol.getName(), clientVersion,
|
||||
serverVersion);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
if (t instanceof UndeclaredThrowableException) {
|
||||
t = t.getCause();
|
||||
}
|
||||
if (t instanceof ServiceException) {
|
||||
throw ProtobufUtil.getRemoteException((ServiceException)t);
|
||||
}
|
||||
if (!(t instanceof IOException)) {
|
||||
LOG.error("Unexpected throwable object ", t);
|
||||
throw new IOException(t);
|
||||
}
|
||||
throw (IOException)t;
|
||||
}
|
||||
return proxy;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop this proxy and release its invoker's resource
|
||||
* @param proxy the proxy to be stopped
|
||||
*/
|
||||
public void stopProxy(VersionedProtocol proxy) {
|
||||
if (proxy!=null) {
|
||||
((Invoker)Proxy.getInvocationHandler(proxy)).close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/** Expert: Make multiple, parallel calls to a set of servers. */
|
||||
public Object[] call(Method method, Object[][] params,
|
||||
InetSocketAddress[] addrs,
|
||||
Class<? extends VersionedProtocol> protocol,
|
||||
User ticket, Configuration conf)
|
||||
throws IOException, InterruptedException {
|
||||
|
||||
Invocation[] invocations = new Invocation[params.length];
|
||||
for (int i = 0; i < params.length; i++)
|
||||
invocations[i] = new Invocation(method, params[i]);
|
||||
SecureClient client = CLIENTS.getClient(conf);
|
||||
try {
|
||||
Writable[] wrappedValues =
|
||||
client.call(invocations, addrs, protocol, ticket);
|
||||
|
||||
if (method.getReturnType() == Void.TYPE) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Object[] values =
|
||||
(Object[])Array.newInstance(method.getReturnType(), wrappedValues.length);
|
||||
for (int i = 0; i < values.length; i++)
|
||||
if (wrappedValues[i] != null)
|
||||
values[i] = ((HbaseObjectWritable)wrappedValues[i]).get();
|
||||
|
||||
return values;
|
||||
} finally {
|
||||
CLIENTS.stopClient(client);
|
||||
}
|
||||
}
|
||||
|
||||
/** Construct a server for a protocol implementation instance listening on a
|
||||
* port and address, with a secret manager. */
|
||||
public Server getServer(Class<? extends VersionedProtocol> protocol,
|
||||
final Object instance,
|
||||
Class<?>[] ifaces,
|
||||
final String bindAddress, final int port,
|
||||
final int numHandlers,
|
||||
int metaHandlerCount, final boolean verbose, Configuration conf,
|
||||
int highPriorityLevel)
|
||||
throws IOException {
|
||||
Server server = new Server(instance, ifaces, conf, bindAddress, port,
|
||||
numHandlers, metaHandlerCount, verbose,
|
||||
highPriorityLevel);
|
||||
return server;
|
||||
}
|
||||
|
||||
/** An RPC Server. */
|
||||
public static class Server extends SecureServer {
|
||||
private Object instance;
|
||||
private Class<?> implementation;
|
||||
private Class<?>[] ifaces;
|
||||
private boolean verbose;
|
||||
|
||||
private static String classNameBase(String className) {
|
||||
String[] names = className.split("\\.", -1);
|
||||
if (names == null || names.length == 0) {
|
||||
return className;
|
||||
}
|
||||
return names[names.length-1];
|
||||
}
|
||||
|
||||
/** Construct an RPC server.
|
||||
* @param instance the instance whose methods will be called
|
||||
* @param conf the configuration to use
|
||||
* @param bindAddress the address to bind on to listen for connection
|
||||
* @param port the port to listen for connections on
|
||||
* @param numHandlers the number of method handler threads to run
|
||||
* @param verbose whether each call should be logged
|
||||
* @throws java.io.IOException e
|
||||
*/
|
||||
public Server(Object instance, final Class<?>[] ifaces,
|
||||
Configuration conf, String bindAddress, int port,
|
||||
int numHandlers, int metaHandlerCount, boolean verbose,
|
||||
int highPriorityLevel)
|
||||
throws IOException {
|
||||
super(bindAddress, port, Invocation.class, numHandlers, metaHandlerCount, conf,
|
||||
classNameBase(instance.getClass().getName()), highPriorityLevel);
|
||||
this.instance = instance;
|
||||
this.implementation = instance.getClass();
|
||||
this.verbose = verbose;
|
||||
|
||||
this.ifaces = ifaces;
|
||||
|
||||
// create metrics for the advertised interfaces this server implements.
|
||||
this.rpcMetrics.createMetrics(this.ifaces);
|
||||
}
|
||||
|
||||
public AuthenticationTokenSecretManager createSecretManager(){
|
||||
if (instance instanceof org.apache.hadoop.hbase.Server) {
|
||||
org.apache.hadoop.hbase.Server server =
|
||||
(org.apache.hadoop.hbase.Server)instance;
|
||||
Configuration conf = server.getConfiguration();
|
||||
long keyUpdateInterval =
|
||||
conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
|
||||
long maxAge =
|
||||
conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
|
||||
return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
|
||||
server.getServerName().toString(), keyUpdateInterval, maxAge);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startThreads() {
|
||||
AuthenticationTokenSecretManager mgr = createSecretManager();
|
||||
if (mgr != null) {
|
||||
setSecretManager(mgr);
|
||||
mgr.start();
|
||||
}
|
||||
this.authManager = new ServiceAuthorizationManager();
|
||||
HBasePolicyProvider.init(conf, authManager);
|
||||
|
||||
// continue with base startup
|
||||
super.startThreads();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Writable call(Class<? extends VersionedProtocol> protocol,
|
||||
Writable param, long receivedTime, MonitoredRPCHandler status)
|
||||
throws IOException {
|
||||
try {
|
||||
Invocation call = (Invocation)param;
|
||||
if(call.getMethodName() == null) {
|
||||
throw new IOException("Could not find requested method, the usual " +
|
||||
"cause is a version mismatch between client and server.");
|
||||
}
|
||||
if (verbose) log("Call: " + call);
|
||||
|
||||
Method method =
|
||||
protocol.getMethod(call.getMethodName(),
|
||||
call.getParameterClasses());
|
||||
method.setAccessible(true);
|
||||
|
||||
Object impl = null;
|
||||
if (protocol.isAssignableFrom(this.implementation)) {
|
||||
impl = this.instance;
|
||||
}
|
||||
else {
|
||||
throw new HBaseRPC.UnknownProtocolException(protocol);
|
||||
}
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
Object[] params = call.getParameters();
|
||||
Object value = method.invoke(impl, params);
|
||||
int processingTime = (int) (System.currentTimeMillis() - startTime);
|
||||
int qTime = (int) (startTime-receivedTime);
|
||||
if (TRACELOG.isDebugEnabled()) {
|
||||
TRACELOG.debug("Call #" + CurCall.get().id +
|
||||
"; Served: " + protocol.getSimpleName()+"#"+call.getMethodName() +
|
||||
" queueTime=" + qTime +
|
||||
" processingTime=" + processingTime +
|
||||
" contents=" + Objects.describeQuantity(params));
|
||||
}
|
||||
rpcMetrics.rpcQueueTime.inc(qTime);
|
||||
rpcMetrics.rpcProcessingTime.inc(processingTime);
|
||||
rpcMetrics.inc(call.getMethodName(), processingTime);
|
||||
if (verbose) log("Return: "+value);
|
||||
|
||||
return new HbaseObjectWritable(method.getReturnType(), value);
|
||||
} catch (InvocationTargetException e) {
|
||||
Throwable target = e.getTargetException();
|
||||
if (target instanceof IOException) {
|
||||
throw (IOException)target;
|
||||
}
|
||||
if (target instanceof ServiceException) {
|
||||
throw ProtobufUtil.getRemoteException((ServiceException)target);
|
||||
}
|
||||
IOException ioe = new IOException(target.toString());
|
||||
ioe.setStackTrace(target.getStackTrace());
|
||||
throw ioe;
|
||||
} catch (Throwable e) {
|
||||
if (!(e instanceof IOException)) {
|
||||
LOG.error("Unexpected throwable object ", e);
|
||||
}
|
||||
IOException ioe = new IOException(e.toString());
|
||||
ioe.setStackTrace(e.getStackTrace());
|
||||
throw ioe;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected static void log(String value) {
|
||||
String v = value;
|
||||
if (v != null && v.length() > 55)
|
||||
v = v.substring(0, 55)+"...";
|
||||
LOG.info(v);
|
||||
}
|
||||
}
|
|
@ -1,739 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
|
||||
import org.apache.hadoop.hbase.io.WritableWithSize;
|
||||
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
|
||||
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod;
|
||||
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
|
||||
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
|
||||
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslStatus;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.BytesWritable;
|
||||
import org.apache.hadoop.io.IntWritable;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
||||
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||
import org.apache.hadoop.security.authorize.ProxyUsers;
|
||||
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
|
||||
import org.apache.hadoop.security.token.SecretManager;
|
||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
|
||||
import javax.security.sasl.Sasl;
|
||||
import javax.security.sasl.SaslException;
|
||||
import javax.security.sasl.SaslServer;
|
||||
import java.io.*;
|
||||
import java.net.*;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.*;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.*;
|
||||
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION;
|
||||
|
||||
/**
|
||||
* An abstract IPC service, supporting SASL authentication of connections,
|
||||
* using GSSAPI for Kerberos authentication or DIGEST-MD5 for authentication
|
||||
* via signed tokens.
|
||||
*
|
||||
* <p>
|
||||
* This is part of the {@link SecureRpcEngine} implementation.
|
||||
* </p>
|
||||
*
|
||||
* @see org.apache.hadoop.hbase.ipc.SecureClient
|
||||
*/
|
||||
public abstract class SecureServer extends HBaseServer {
|
||||
private final boolean authorize;
|
||||
private boolean isSecurityEnabled;
|
||||
|
||||
/**
|
||||
* The first four bytes of secure RPC connections
|
||||
*/
|
||||
public static final ByteBuffer HEADER = ByteBuffer.wrap("srpc".getBytes());
|
||||
|
||||
// 1 : Introduce ping and server does not throw away RPCs
|
||||
// 3 : Introduce the protocol into the RPC connection header
|
||||
// 4 : Introduced SASL security layer
|
||||
public static final byte CURRENT_VERSION = 4;
|
||||
public static final Set<Byte> INSECURE_VERSIONS = ImmutableSet.of((byte) 5);
|
||||
|
||||
public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.SecureServer");
|
||||
private static final Log AUDITLOG =
|
||||
LogFactory.getLog("SecurityLogger.org.apache.hadoop.ipc.SecureServer");
|
||||
private static final String AUTH_FAILED_FOR = "Auth failed for ";
|
||||
private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
|
||||
|
||||
protected SecretManager<TokenIdentifier> secretManager;
|
||||
protected ServiceAuthorizationManager authManager;
|
||||
|
||||
protected class SecureCall extends HBaseServer.Call {
|
||||
public SecureCall(int id, Writable param, Connection connection,
|
||||
Responder responder, long size) {
|
||||
super(id, param, connection, responder, size);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized void setResponse(Object value, Status status,
|
||||
String errorClass, String error) {
|
||||
Writable result = null;
|
||||
if (value instanceof Writable) {
|
||||
result = (Writable) value;
|
||||
} else {
|
||||
/* We might have a null value and errors. Avoid creating a
|
||||
* HbaseObjectWritable, because the constructor fails on null. */
|
||||
if (value != null) {
|
||||
result = new HbaseObjectWritable(value);
|
||||
}
|
||||
}
|
||||
|
||||
int size = BUFFER_INITIAL_SIZE;
|
||||
if (result instanceof WritableWithSize) {
|
||||
// get the size hint.
|
||||
WritableWithSize ohint = (WritableWithSize) result;
|
||||
long hint = ohint.getWritableSize() + Bytes.SIZEOF_INT + Bytes.SIZEOF_INT;
|
||||
if (hint > Integer.MAX_VALUE) {
|
||||
// oops, new problem.
|
||||
IOException ioe =
|
||||
new IOException("Result buffer size too large: " + hint);
|
||||
errorClass = ioe.getClass().getName();
|
||||
error = StringUtils.stringifyException(ioe);
|
||||
} else {
|
||||
size = (int)hint;
|
||||
}
|
||||
}
|
||||
|
||||
ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
|
||||
DataOutputStream out = new DataOutputStream(buf);
|
||||
try {
|
||||
out.writeInt(this.id); // write call id
|
||||
out.writeInt(status.state); // write status
|
||||
} catch (IOException e) {
|
||||
errorClass = e.getClass().getName();
|
||||
error = StringUtils.stringifyException(e);
|
||||
}
|
||||
|
||||
try {
|
||||
if (status == Status.SUCCESS) {
|
||||
result.write(out);
|
||||
} else {
|
||||
WritableUtils.writeString(out, errorClass);
|
||||
WritableUtils.writeString(out, error);
|
||||
}
|
||||
if (((SecureConnection)connection).useWrap) {
|
||||
wrapWithSasl(buf);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Error sending response to call: ", e);
|
||||
}
|
||||
|
||||
this.response = buf.getByteBuffer();
|
||||
}
|
||||
|
||||
private void wrapWithSasl(ByteBufferOutputStream response)
|
||||
throws IOException {
|
||||
if (((SecureConnection)connection).useSasl) {
|
||||
// getByteBuffer calls flip()
|
||||
ByteBuffer buf = response.getByteBuffer();
|
||||
byte[] token;
|
||||
// synchronization may be needed since there can be multiple Handler
|
||||
// threads using saslServer to wrap responses.
|
||||
synchronized (((SecureConnection)connection).saslServer) {
|
||||
token = ((SecureConnection)connection).saslServer.wrap(buf.array(),
|
||||
buf.arrayOffset(), buf.remaining());
|
||||
}
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Adding saslServer wrapped token of size " + token.length
|
||||
+ " as call response.");
|
||||
buf.clear();
|
||||
DataOutputStream saslOut = new DataOutputStream(response);
|
||||
saslOut.writeInt(token.length);
|
||||
saslOut.write(token, 0, token.length);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Reads calls from a connection and queues them for handling. */
|
||||
public class SecureConnection extends HBaseServer.Connection {
|
||||
private boolean rpcHeaderRead = false; // if initial rpc header is read
|
||||
private boolean headerRead = false; //if the connection header that
|
||||
//follows version is read.
|
||||
private ByteBuffer data;
|
||||
private ByteBuffer dataLengthBuffer;
|
||||
protected final LinkedList<SecureCall> responseQueue;
|
||||
private int dataLength;
|
||||
private InetAddress addr;
|
||||
|
||||
boolean useSasl;
|
||||
SaslServer saslServer;
|
||||
private AuthMethod authMethod;
|
||||
private boolean saslContextEstablished;
|
||||
private boolean skipInitialSaslHandshake;
|
||||
private ByteBuffer rpcHeaderBuffer;
|
||||
private ByteBuffer unwrappedData;
|
||||
private ByteBuffer unwrappedDataLengthBuffer;
|
||||
private SecureConnectionHeader header;
|
||||
|
||||
public UserGroupInformation attemptingUser = null; // user name before auth
|
||||
|
||||
// Fake 'call' for failed authorization response
|
||||
private final int AUTHORIZATION_FAILED_CALLID = -1;
|
||||
// Fake 'call' for SASL context setup
|
||||
private static final int SASL_CALLID = -33;
|
||||
private final SecureCall saslCall = new SecureCall(SASL_CALLID, null, this, null, 0);
|
||||
|
||||
private boolean useWrap = false;
|
||||
|
||||
public SecureConnection(SocketChannel channel, long lastContact) {
|
||||
super(channel, lastContact);
|
||||
this.header = new SecureConnectionHeader();
|
||||
this.channel = channel;
|
||||
this.data = null;
|
||||
this.dataLengthBuffer = ByteBuffer.allocate(4);
|
||||
this.unwrappedData = null;
|
||||
this.unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
|
||||
this.socket = channel.socket();
|
||||
this.addr = socket.getInetAddress();
|
||||
this.responseQueue = new LinkedList<SecureCall>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getHostAddress() + ":" + remotePort;
|
||||
}
|
||||
|
||||
public String getHostAddress() {
|
||||
return hostAddress;
|
||||
}
|
||||
|
||||
public InetAddress getHostInetAddress() {
|
||||
return addr;
|
||||
}
|
||||
|
||||
private User getAuthorizedUgi(String authorizedId)
|
||||
throws IOException {
|
||||
if (authMethod == AuthMethod.DIGEST) {
|
||||
TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
|
||||
secretManager);
|
||||
UserGroupInformation ugi = tokenId.getUser();
|
||||
if (ugi == null) {
|
||||
throw new AccessControlException(
|
||||
"Can't retrieve username from tokenIdentifier.");
|
||||
}
|
||||
ugi.addTokenIdentifier(tokenId);
|
||||
return User.create(ugi);
|
||||
} else {
|
||||
return User.create(UserGroupInformation.createRemoteUser(authorizedId));
|
||||
}
|
||||
}
|
||||
|
||||
private void saslReadAndProcess(byte[] saslToken) throws IOException,
|
||||
InterruptedException {
|
||||
if (!saslContextEstablished) {
|
||||
byte[] replyToken = null;
|
||||
try {
|
||||
if (saslServer == null) {
|
||||
switch (authMethod) {
|
||||
case DIGEST:
|
||||
if (secretManager == null) {
|
||||
throw new AccessControlException(
|
||||
"Server is not configured to do DIGEST authentication.");
|
||||
}
|
||||
saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
|
||||
.getMechanismName(), null, HBaseSaslRpcServer.SASL_DEFAULT_REALM,
|
||||
HBaseSaslRpcServer.SASL_PROPS, new SaslDigestCallbackHandler(
|
||||
secretManager, this));
|
||||
break;
|
||||
default:
|
||||
UserGroupInformation current = UserGroupInformation
|
||||
.getCurrentUser();
|
||||
String fullName = current.getUserName();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Kerberos principal name is " + fullName);
|
||||
final String names[] = HBaseSaslRpcServer.splitKerberosName(fullName);
|
||||
if (names.length != 3) {
|
||||
throw new AccessControlException(
|
||||
"Kerberos principal name does NOT have the expected "
|
||||
+ "hostname part: " + fullName);
|
||||
}
|
||||
current.doAs(new PrivilegedExceptionAction<Object>() {
|
||||
@Override
|
||||
public Object run() throws SaslException {
|
||||
saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
|
||||
.getMechanismName(), names[0], names[1],
|
||||
HBaseSaslRpcServer.SASL_PROPS, new SaslGssCallbackHandler());
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
if (saslServer == null)
|
||||
throw new AccessControlException(
|
||||
"Unable to find SASL server implementation for "
|
||||
+ authMethod.getMechanismName());
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Created SASL server with mechanism = "
|
||||
+ authMethod.getMechanismName());
|
||||
}
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Have read input token of size " + saslToken.length
|
||||
+ " for processing by saslServer.evaluateResponse()");
|
||||
replyToken = saslServer.evaluateResponse(saslToken);
|
||||
} catch (IOException e) {
|
||||
IOException sendToClient = e;
|
||||
Throwable cause = e;
|
||||
while (cause != null) {
|
||||
if (cause instanceof InvalidToken) {
|
||||
sendToClient = (InvalidToken) cause;
|
||||
break;
|
||||
}
|
||||
cause = cause.getCause();
|
||||
}
|
||||
doSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
|
||||
sendToClient.getLocalizedMessage());
|
||||
rpcMetrics.authenticationFailures.inc();
|
||||
String clientIP = this.toString();
|
||||
// attempting user could be null
|
||||
AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
|
||||
throw e;
|
||||
}
|
||||
if (replyToken != null) {
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Will send token of size " + replyToken.length
|
||||
+ " from saslServer.");
|
||||
doSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
|
||||
null);
|
||||
}
|
||||
if (saslServer.isComplete()) {
|
||||
LOG.debug("SASL server context established. Negotiated QoP is "
|
||||
+ saslServer.getNegotiatedProperty(Sasl.QOP));
|
||||
String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
|
||||
useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
|
||||
user = getAuthorizedUgi(saslServer.getAuthorizationID());
|
||||
LOG.debug("SASL server successfully authenticated client: " + user);
|
||||
rpcMetrics.authenticationSuccesses.inc();
|
||||
AUDITLOG.trace(AUTH_SUCCESSFUL_FOR + user);
|
||||
saslContextEstablished = true;
|
||||
}
|
||||
} else {
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Have read input token of size " + saslToken.length
|
||||
+ " for processing by saslServer.unwrap()");
|
||||
|
||||
if (!useWrap) {
|
||||
processOneRpc(saslToken);
|
||||
} else {
|
||||
byte[] plaintextData = saslServer.unwrap(saslToken, 0,
|
||||
saslToken.length);
|
||||
processUnwrappedData(plaintextData);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void doSaslReply(SaslStatus status, Writable rv,
|
||||
String errorClass, String error) throws IOException {
|
||||
saslCall.setResponse(rv,
|
||||
status == SaslStatus.SUCCESS ? Status.SUCCESS : Status.ERROR,
|
||||
errorClass, error);
|
||||
saslCall.responder = responder;
|
||||
saslCall.sendResponseIfReady();
|
||||
}
|
||||
|
||||
private void disposeSasl() {
|
||||
if (saslServer != null) {
|
||||
try {
|
||||
saslServer.dispose();
|
||||
} catch (SaslException ignored) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public int readAndProcess() throws IOException, InterruptedException {
|
||||
while (true) {
|
||||
/* Read at most one RPC. If the header is not read completely yet
|
||||
* then iterate until we read first RPC or until there is no data left.
|
||||
*/
|
||||
int count = -1;
|
||||
if (dataLengthBuffer.remaining() > 0) {
|
||||
count = channelRead(channel, dataLengthBuffer);
|
||||
if (count < 0 || dataLengthBuffer.remaining() > 0)
|
||||
return count;
|
||||
}
|
||||
|
||||
if (!rpcHeaderRead) {
|
||||
//Every connection is expected to send the header.
|
||||
if (rpcHeaderBuffer == null) {
|
||||
rpcHeaderBuffer = ByteBuffer.allocate(2);
|
||||
}
|
||||
count = channelRead(channel, rpcHeaderBuffer);
|
||||
if (count < 0 || rpcHeaderBuffer.remaining() > 0) {
|
||||
return count;
|
||||
}
|
||||
int version = rpcHeaderBuffer.get(0);
|
||||
byte[] method = new byte[] {rpcHeaderBuffer.get(1)};
|
||||
authMethod = AuthMethod.read(new DataInputStream(
|
||||
new ByteArrayInputStream(method)));
|
||||
dataLengthBuffer.flip();
|
||||
if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
|
||||
//Warning is ok since this is not supposed to happen.
|
||||
if (INSECURE_VERSIONS.contains(version)) {
|
||||
LOG.warn("An insecure client (version '" + version + "') is attempting to connect " +
|
||||
" to this version '" + CURRENT_VERSION + "' secure server from " +
|
||||
hostAddress + ":" + remotePort);
|
||||
} else {
|
||||
LOG.warn("Incorrect header or version mismatch from " +
|
||||
hostAddress + ":" + remotePort +
|
||||
" got version " + version +
|
||||
" expected version " + CURRENT_VERSION);
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
dataLengthBuffer.clear();
|
||||
if (authMethod == null) {
|
||||
throw new IOException("Unable to read authentication method");
|
||||
}
|
||||
if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
|
||||
AccessControlException ae = new AccessControlException(
|
||||
"Authentication is required");
|
||||
SecureCall failedCall = new SecureCall(AUTHORIZATION_FAILED_CALLID, null, this,
|
||||
null, 0);
|
||||
failedCall.setResponse(null, Status.FATAL, ae.getClass().getName(),
|
||||
ae.getMessage());
|
||||
responder.doRespond(failedCall);
|
||||
throw ae;
|
||||
}
|
||||
if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
|
||||
doSaslReply(SaslStatus.SUCCESS, new IntWritable(
|
||||
HBaseSaslRpcServer.SWITCH_TO_SIMPLE_AUTH), null, null);
|
||||
authMethod = AuthMethod.SIMPLE;
|
||||
// client has already sent the initial Sasl message and we
|
||||
// should ignore it. Both client and server should fall back
|
||||
// to simple auth from now on.
|
||||
skipInitialSaslHandshake = true;
|
||||
}
|
||||
if (authMethod != AuthMethod.SIMPLE) {
|
||||
useSasl = true;
|
||||
}
|
||||
|
||||
rpcHeaderBuffer = null;
|
||||
rpcHeaderRead = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (data == null) {
|
||||
dataLengthBuffer.flip();
|
||||
dataLength = dataLengthBuffer.getInt();
|
||||
|
||||
if (dataLength == HBaseClient.PING_CALL_ID) {
|
||||
if(!useWrap) { //covers the !useSasl too
|
||||
dataLengthBuffer.clear();
|
||||
return 0; //ping message
|
||||
}
|
||||
}
|
||||
if (dataLength < 0) {
|
||||
LOG.warn("Unexpected data length " + dataLength + "!! from " +
|
||||
getHostAddress());
|
||||
}
|
||||
data = ByteBuffer.allocate(dataLength);
|
||||
incRpcCount(); // Increment the rpc count
|
||||
}
|
||||
|
||||
count = channelRead(channel, data);
|
||||
|
||||
if (data.remaining() == 0) {
|
||||
dataLengthBuffer.clear();
|
||||
data.flip();
|
||||
if (skipInitialSaslHandshake) {
|
||||
data = null;
|
||||
skipInitialSaslHandshake = false;
|
||||
continue;
|
||||
}
|
||||
boolean isHeaderRead = headerRead;
|
||||
if (useSasl) {
|
||||
saslReadAndProcess(data.array());
|
||||
} else {
|
||||
processOneRpc(data.array());
|
||||
}
|
||||
data = null;
|
||||
if (!isHeaderRead) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
return count;
|
||||
}
|
||||
}
|
||||
|
||||
/// Reads the connection header following version
|
||||
private void processHeader(byte[] buf) throws IOException {
|
||||
DataInputStream in =
|
||||
new DataInputStream(new ByteArrayInputStream(buf));
|
||||
header.readFields(in);
|
||||
try {
|
||||
String protocolClassName = header.getProtocol();
|
||||
if (protocolClassName != null) {
|
||||
protocol = getProtocolClass(header.getProtocol(), conf);
|
||||
}
|
||||
} catch (ClassNotFoundException cnfe) {
|
||||
throw new IOException("Unknown protocol: " + header.getProtocol());
|
||||
}
|
||||
|
||||
User protocolUser = header.getUser();
|
||||
if (!useSasl) {
|
||||
user = protocolUser;
|
||||
if (user != null) {
|
||||
user.getUGI().setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
|
||||
}
|
||||
} else {
|
||||
// user is authenticated
|
||||
user.getUGI().setAuthenticationMethod(authMethod.authenticationMethod);
|
||||
//Now we check if this is a proxy user case. If the protocol user is
|
||||
//different from the 'user', it is a proxy user scenario. However,
|
||||
//this is not allowed if user authenticated with DIGEST.
|
||||
if ((protocolUser != null)
|
||||
&& (!protocolUser.getName().equals(user.getName()))) {
|
||||
if (authMethod == AuthMethod.DIGEST) {
|
||||
// Not allowed to doAs if token authentication is used
|
||||
throw new AccessControlException("Authenticated user (" + user
|
||||
+ ") doesn't match what the client claims to be ("
|
||||
+ protocolUser + ")");
|
||||
} else {
|
||||
// Effective user can be different from authenticated user
|
||||
// for simple auth or kerberos auth
|
||||
// The user is the real user. Now we create a proxy user
|
||||
UserGroupInformation realUser = user.getUGI();
|
||||
user = User.create(
|
||||
UserGroupInformation.createProxyUser(protocolUser.getName(),
|
||||
realUser));
|
||||
// Now the user is a proxy user, set Authentication method Proxy.
|
||||
user.getUGI().setAuthenticationMethod(AuthenticationMethod.PROXY);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void processUnwrappedData(byte[] inBuf) throws IOException,
|
||||
InterruptedException {
|
||||
ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(
|
||||
inBuf));
|
||||
// Read all RPCs contained in the inBuf, even partial ones
|
||||
while (true) {
|
||||
int count = -1;
|
||||
if (unwrappedDataLengthBuffer.remaining() > 0) {
|
||||
count = channelRead(ch, unwrappedDataLengthBuffer);
|
||||
if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
|
||||
return;
|
||||
}
|
||||
|
||||
if (unwrappedData == null) {
|
||||
unwrappedDataLengthBuffer.flip();
|
||||
int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
|
||||
|
||||
if (unwrappedDataLength == HBaseClient.PING_CALL_ID) {
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Received ping message");
|
||||
unwrappedDataLengthBuffer.clear();
|
||||
continue; // ping message
|
||||
}
|
||||
unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
|
||||
}
|
||||
|
||||
count = channelRead(ch, unwrappedData);
|
||||
if (count <= 0 || unwrappedData.remaining() > 0)
|
||||
return;
|
||||
|
||||
if (unwrappedData.remaining() == 0) {
|
||||
unwrappedDataLengthBuffer.clear();
|
||||
unwrappedData.flip();
|
||||
processOneRpc(unwrappedData.array());
|
||||
unwrappedData = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void processOneRpc(byte[] buf) throws IOException,
|
||||
InterruptedException {
|
||||
if (headerRead) {
|
||||
processData(buf);
|
||||
} else {
|
||||
processHeader(buf);
|
||||
headerRead = true;
|
||||
if (!authorizeConnection()) {
|
||||
throw new AccessControlException("Connection from " + this
|
||||
+ " for protocol " + header.getProtocol()
|
||||
+ " is unauthorized for user " + user);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void processData(byte[] buf) throws IOException, InterruptedException {
|
||||
DataInputStream dis =
|
||||
new DataInputStream(new ByteArrayInputStream(buf));
|
||||
int id = dis.readInt(); // try to read an id
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(" got #" + id);
|
||||
}
|
||||
|
||||
Writable param = ReflectionUtils.newInstance(paramClass, conf); // read param
|
||||
param.readFields(dis);
|
||||
|
||||
SecureCall call = new SecureCall(id, param, this, responder, buf.length);
|
||||
|
||||
if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) {
|
||||
priorityCallQueue.put(call);
|
||||
} else {
|
||||
callQueue.put(call); // queue the call; maybe blocked here
|
||||
}
|
||||
}
|
||||
|
||||
private boolean authorizeConnection() throws IOException {
|
||||
try {
|
||||
// If auth method is DIGEST, the token was obtained by the
|
||||
// real user for the effective user, therefore not required to
|
||||
// authorize real user. doAs is allowed only for simple or kerberos
|
||||
// authentication
|
||||
if (user != null && user.getUGI().getRealUser() != null
|
||||
&& (authMethod != AuthMethod.DIGEST)) {
|
||||
ProxyUsers.authorize(user.getUGI(), this.getHostAddress(), conf);
|
||||
}
|
||||
authorize(user, header, getHostInetAddress());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Successfully authorized " + header);
|
||||
}
|
||||
rpcMetrics.authorizationSuccesses.inc();
|
||||
} catch (AuthorizationException ae) {
|
||||
LOG.debug("Connection authorization failed: "+ae.getMessage(), ae);
|
||||
rpcMetrics.authorizationFailures.inc();
|
||||
SecureCall failedCall = new SecureCall(AUTHORIZATION_FAILED_CALLID, null, this,
|
||||
null, 0);
|
||||
failedCall.setResponse(null, Status.FATAL, ae.getClass().getName(),
|
||||
ae.getMessage());
|
||||
responder.doRespond(failedCall);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
protected synchronized void close() {
|
||||
disposeSasl();
|
||||
data = null;
|
||||
dataLengthBuffer = null;
|
||||
if (!channel.isOpen())
|
||||
return;
|
||||
try {socket.shutdownOutput();} catch(Exception ignored) {} // FindBugs DE_MIGHT_IGNORE
|
||||
if (channel.isOpen()) {
|
||||
try {channel.close();} catch(Exception ignored) {}
|
||||
}
|
||||
try {socket.close();} catch(Exception ignored) {}
|
||||
}
|
||||
}
|
||||
|
||||
/** Constructs a server listening on the named port and address. Parameters passed must
|
||||
* be of the named class. The <code>handlerCount</handlerCount> determines
|
||||
* the number of handler threads that will be used to process calls.
|
||||
*
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
protected SecureServer(String bindAddress, int port,
|
||||
Class<? extends Writable> paramClass, int handlerCount,
|
||||
int priorityHandlerCount, Configuration conf, String serverName,
|
||||
int highPriorityLevel)
|
||||
throws IOException {
|
||||
super(bindAddress, port, paramClass, handlerCount, priorityHandlerCount,
|
||||
conf, serverName, highPriorityLevel);
|
||||
this.authorize =
|
||||
conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
|
||||
this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
|
||||
LOG.debug("security enabled="+isSecurityEnabled);
|
||||
|
||||
if (isSecurityEnabled) {
|
||||
HBaseSaslRpcServer.init(conf);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Connection getConnection(SocketChannel channel, long time) {
|
||||
return new SecureConnection(channel, time);
|
||||
}
|
||||
|
||||
Configuration getConf() {
|
||||
return conf;
|
||||
}
|
||||
|
||||
/** for unit testing only, should be called before server is started */
|
||||
void disableSecurity() {
|
||||
this.isSecurityEnabled = false;
|
||||
}
|
||||
|
||||
/** for unit testing only, should be called before server is started */
|
||||
void enableSecurity() {
|
||||
this.isSecurityEnabled = true;
|
||||
}
|
||||
|
||||
/** Stops the service. No new calls will be handled after this is called. */
|
||||
public synchronized void stop() {
|
||||
super.stop();
|
||||
}
|
||||
|
||||
public SecretManager<? extends TokenIdentifier> getSecretManager() {
|
||||
return this.secretManager;
|
||||
}
|
||||
|
||||
public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
|
||||
this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* Authorize the incoming client connection.
|
||||
*
|
||||
* @param user client user
|
||||
* @param connection incoming connection
|
||||
* @param addr InetAddress of incoming connection
|
||||
* @throws org.apache.hadoop.security.authorize.AuthorizationException when the client isn't authorized to talk the protocol
|
||||
*/
|
||||
public void authorize(User user,
|
||||
ConnectionHeader connection,
|
||||
InetAddress addr
|
||||
) throws AuthorizationException {
|
||||
if (authorize) {
|
||||
Class<?> protocol = null;
|
||||
try {
|
||||
protocol = getProtocolClass(connection.getProtocol(), getConf());
|
||||
} catch (ClassNotFoundException cfne) {
|
||||
throw new AuthorizationException("Unknown protocol: " +
|
||||
connection.getProtocol());
|
||||
}
|
||||
authManager.authorize(user != null ? user.getUGI() : null,
|
||||
protocol, getConf(), addr);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,39 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.security;
|
||||
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
|
||||
/**
|
||||
* Exception thrown by access-related methods.
|
||||
*/
|
||||
public class AccessDeniedException extends DoNotRetryIOException {
|
||||
private static final long serialVersionUID = 1913879564363001780L;
|
||||
|
||||
public AccessDeniedException() {
|
||||
super();
|
||||
}
|
||||
|
||||
public AccessDeniedException(Class<?> clazz, String s) {
|
||||
super( "AccessDenied [" + clazz.getName() + "]: " + s);
|
||||
}
|
||||
|
||||
public AccessDeniedException(String s) {
|
||||
super(s);
|
||||
}
|
||||
}
|
|
@ -1,55 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.security;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.client.AdminProtocol;
|
||||
import org.apache.hadoop.hbase.client.ClientProtocol;
|
||||
import org.apache.hadoop.hbase.ipc.HMasterInterface;
|
||||
import org.apache.hadoop.hbase.ipc.RegionServerStatusProtocol;
|
||||
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||
import org.apache.hadoop.security.authorize.Service;
|
||||
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
|
||||
|
||||
/**
|
||||
* Implementation of secure Hadoop policy provider for mapping
|
||||
* protocol interfaces to hbase-policy.xml entries.
|
||||
*/
|
||||
public class HBasePolicyProvider extends PolicyProvider {
|
||||
protected static Service[] services = {
|
||||
new Service("security.client.protocol.acl", ClientProtocol.class),
|
||||
new Service("security.client.protocol.acl", AdminProtocol.class),
|
||||
new Service("security.admin.protocol.acl", HMasterInterface.class),
|
||||
new Service("security.masterregion.protocol.acl", RegionServerStatusProtocol.class)
|
||||
};
|
||||
|
||||
@Override
|
||||
public Service[] getServices() {
|
||||
return services;
|
||||
}
|
||||
|
||||
public static void init(Configuration conf,
|
||||
ServiceAuthorizationManager authManager) {
|
||||
// set service-level authorization security policy
|
||||
conf.set("hadoop.policy.file", "hbase-policy.xml");
|
||||
if (conf.getBoolean(
|
||||
ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
|
||||
authManager.refresh(conf, new HBasePolicyProvider());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,280 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.security;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
|
||||
import javax.security.auth.callback.Callback;
|
||||
import javax.security.auth.callback.CallbackHandler;
|
||||
import javax.security.auth.callback.NameCallback;
|
||||
import javax.security.auth.callback.PasswordCallback;
|
||||
import javax.security.auth.callback.UnsupportedCallbackException;
|
||||
import javax.security.sasl.RealmCallback;
|
||||
import javax.security.sasl.RealmChoiceCallback;
|
||||
import javax.security.sasl.Sasl;
|
||||
import javax.security.sasl.SaslException;
|
||||
import javax.security.sasl.SaslClient;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod;
|
||||
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslStatus;
|
||||
import org.apache.hadoop.security.SaslInputStream;
|
||||
import org.apache.hadoop.security.SaslOutputStream;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
|
||||
/**
|
||||
* A utility class that encapsulates SASL logic for RPC client.
|
||||
* Copied from <code>org.apache.hadoop.security</code>
|
||||
*/
|
||||
public class HBaseSaslRpcClient {
|
||||
public static final Log LOG = LogFactory.getLog(HBaseSaslRpcClient.class);
|
||||
|
||||
private final SaslClient saslClient;
|
||||
|
||||
/**
|
||||
* Create a HBaseSaslRpcClient for an authentication method
|
||||
*
|
||||
* @param method
|
||||
* the requested authentication method
|
||||
* @param token
|
||||
* token to use if needed by the authentication method
|
||||
*/
|
||||
public HBaseSaslRpcClient(AuthMethod method,
|
||||
Token<? extends TokenIdentifier> token, String serverPrincipal)
|
||||
throws IOException {
|
||||
switch (method) {
|
||||
case DIGEST:
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Creating SASL " + AuthMethod.DIGEST.getMechanismName()
|
||||
+ " client to authenticate to service at " + token.getService());
|
||||
saslClient = Sasl.createSaslClient(new String[] { AuthMethod.DIGEST
|
||||
.getMechanismName() }, null, null, HBaseSaslRpcServer.SASL_DEFAULT_REALM,
|
||||
HBaseSaslRpcServer.SASL_PROPS, new SaslClientCallbackHandler(token));
|
||||
break;
|
||||
case KERBEROS:
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG
|
||||
.debug("Creating SASL " + AuthMethod.KERBEROS.getMechanismName()
|
||||
+ " client. Server's Kerberos principal name is "
|
||||
+ serverPrincipal);
|
||||
}
|
||||
if (serverPrincipal == null || serverPrincipal.length() == 0) {
|
||||
throw new IOException(
|
||||
"Failed to specify server's Kerberos principal name");
|
||||
}
|
||||
String names[] = HBaseSaslRpcServer.splitKerberosName(serverPrincipal);
|
||||
if (names.length != 3) {
|
||||
throw new IOException(
|
||||
"Kerberos principal name does NOT have the expected hostname part: "
|
||||
+ serverPrincipal);
|
||||
}
|
||||
saslClient = Sasl.createSaslClient(new String[] { AuthMethod.KERBEROS
|
||||
.getMechanismName() }, null, names[0], names[1],
|
||||
HBaseSaslRpcServer.SASL_PROPS, null);
|
||||
break;
|
||||
default:
|
||||
throw new IOException("Unknown authentication method " + method);
|
||||
}
|
||||
if (saslClient == null)
|
||||
throw new IOException("Unable to find SASL client implementation");
|
||||
}
|
||||
|
||||
private static void readStatus(DataInputStream inStream) throws IOException {
|
||||
int id = inStream.readInt(); // read and discard dummy id
|
||||
int status = inStream.readInt(); // read status
|
||||
if (status != SaslStatus.SUCCESS.state) {
|
||||
throw new RemoteException(WritableUtils.readString(inStream),
|
||||
WritableUtils.readString(inStream));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Do client side SASL authentication with server via the given InputStream
|
||||
* and OutputStream
|
||||
*
|
||||
* @param inS
|
||||
* InputStream to use
|
||||
* @param outS
|
||||
* OutputStream to use
|
||||
* @return true if connection is set up, or false if needs to switch
|
||||
* to simple Auth.
|
||||
* @throws IOException
|
||||
*/
|
||||
public boolean saslConnect(InputStream inS, OutputStream outS)
|
||||
throws IOException {
|
||||
DataInputStream inStream = new DataInputStream(new BufferedInputStream(inS));
|
||||
DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(
|
||||
outS));
|
||||
|
||||
try {
|
||||
byte[] saslToken = new byte[0];
|
||||
if (saslClient.hasInitialResponse())
|
||||
saslToken = saslClient.evaluateChallenge(saslToken);
|
||||
if (saslToken != null) {
|
||||
outStream.writeInt(saslToken.length);
|
||||
outStream.write(saslToken, 0, saslToken.length);
|
||||
outStream.flush();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Have sent token of size " + saslToken.length
|
||||
+ " from initSASLContext.");
|
||||
}
|
||||
if (!saslClient.isComplete()) {
|
||||
readStatus(inStream);
|
||||
int len = inStream.readInt();
|
||||
if (len == HBaseSaslRpcServer.SWITCH_TO_SIMPLE_AUTH) {
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Server asks us to fall back to simple auth.");
|
||||
saslClient.dispose();
|
||||
return false;
|
||||
}
|
||||
saslToken = new byte[len];
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Will read input token of size " + saslToken.length
|
||||
+ " for processing by initSASLContext");
|
||||
inStream.readFully(saslToken);
|
||||
}
|
||||
|
||||
while (!saslClient.isComplete()) {
|
||||
saslToken = saslClient.evaluateChallenge(saslToken);
|
||||
if (saslToken != null) {
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Will send token of size " + saslToken.length
|
||||
+ " from initSASLContext.");
|
||||
outStream.writeInt(saslToken.length);
|
||||
outStream.write(saslToken, 0, saslToken.length);
|
||||
outStream.flush();
|
||||
}
|
||||
if (!saslClient.isComplete()) {
|
||||
readStatus(inStream);
|
||||
saslToken = new byte[inStream.readInt()];
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Will read input token of size " + saslToken.length
|
||||
+ " for processing by initSASLContext");
|
||||
inStream.readFully(saslToken);
|
||||
}
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("SASL client context established. Negotiated QoP: "
|
||||
+ saslClient.getNegotiatedProperty(Sasl.QOP));
|
||||
}
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
try {
|
||||
saslClient.dispose();
|
||||
} catch (SaslException ignored) {
|
||||
// ignore further exceptions during cleanup
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a SASL wrapped InputStream. Can be called only after saslConnect() has
|
||||
* been called.
|
||||
*
|
||||
* @param in
|
||||
* the InputStream to wrap
|
||||
* @return a SASL wrapped InputStream
|
||||
* @throws IOException
|
||||
*/
|
||||
public InputStream getInputStream(InputStream in) throws IOException {
|
||||
if (!saslClient.isComplete()) {
|
||||
throw new IOException("Sasl authentication exchange hasn't completed yet");
|
||||
}
|
||||
return new SaslInputStream(in, saslClient);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a SASL wrapped OutputStream. Can be called only after saslConnect() has
|
||||
* been called.
|
||||
*
|
||||
* @param out
|
||||
* the OutputStream to wrap
|
||||
* @return a SASL wrapped OutputStream
|
||||
* @throws IOException
|
||||
*/
|
||||
public OutputStream getOutputStream(OutputStream out) throws IOException {
|
||||
if (!saslClient.isComplete()) {
|
||||
throw new IOException("Sasl authentication exchange hasn't completed yet");
|
||||
}
|
||||
return new SaslOutputStream(out, saslClient);
|
||||
}
|
||||
|
||||
/** Release resources used by wrapped saslClient */
|
||||
public void dispose() throws SaslException {
|
||||
saslClient.dispose();
|
||||
}
|
||||
|
||||
private static class SaslClientCallbackHandler implements CallbackHandler {
|
||||
private final String userName;
|
||||
private final char[] userPassword;
|
||||
|
||||
public SaslClientCallbackHandler(Token<? extends TokenIdentifier> token) {
|
||||
this.userName = HBaseSaslRpcServer.encodeIdentifier(token.getIdentifier());
|
||||
this.userPassword = HBaseSaslRpcServer.encodePassword(token.getPassword());
|
||||
}
|
||||
|
||||
public void handle(Callback[] callbacks)
|
||||
throws UnsupportedCallbackException {
|
||||
NameCallback nc = null;
|
||||
PasswordCallback pc = null;
|
||||
RealmCallback rc = null;
|
||||
for (Callback callback : callbacks) {
|
||||
if (callback instanceof RealmChoiceCallback) {
|
||||
continue;
|
||||
} else if (callback instanceof NameCallback) {
|
||||
nc = (NameCallback) callback;
|
||||
} else if (callback instanceof PasswordCallback) {
|
||||
pc = (PasswordCallback) callback;
|
||||
} else if (callback instanceof RealmCallback) {
|
||||
rc = (RealmCallback) callback;
|
||||
} else {
|
||||
throw new UnsupportedCallbackException(callback,
|
||||
"Unrecognized SASL client callback");
|
||||
}
|
||||
}
|
||||
if (nc != null) {
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("SASL client callback: setting username: " + userName);
|
||||
nc.setName(userName);
|
||||
}
|
||||
if (pc != null) {
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("SASL client callback: setting userPassword");
|
||||
pc.setPassword(userPassword);
|
||||
}
|
||||
if (rc != null) {
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("SASL client callback: setting realm: "
|
||||
+ rc.getDefaultText());
|
||||
rc.setText(rc.getDefaultText());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,164 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.security.access;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Handles synchronization of access control list entries and updates
|
||||
* throughout all nodes in the cluster. The {@link AccessController} instance
|
||||
* on the {@code _acl_} table regions, creates a znode for each table as
|
||||
* {@code /hbase/acl/tablename}, with the znode data containing a serialized
|
||||
* list of the permissions granted for the table. The {@code AccessController}
|
||||
* instances on all other cluster hosts watch the znodes for updates, which
|
||||
* trigger updates in the {@link TableAuthManager} permission cache.
|
||||
*/
|
||||
public class ZKPermissionWatcher extends ZooKeeperListener {
|
||||
private static Log LOG = LogFactory.getLog(ZKPermissionWatcher.class);
|
||||
// parent node for permissions lists
|
||||
static final String ACL_NODE = "acl";
|
||||
TableAuthManager authManager;
|
||||
String aclZNode;
|
||||
|
||||
public ZKPermissionWatcher(ZooKeeperWatcher watcher,
|
||||
TableAuthManager authManager, Configuration conf) {
|
||||
super(watcher);
|
||||
this.authManager = authManager;
|
||||
String aclZnodeParent = conf.get("zookeeper.znode.acl.parent", ACL_NODE);
|
||||
this.aclZNode = ZKUtil.joinZNode(watcher.baseZNode, aclZnodeParent);
|
||||
}
|
||||
|
||||
public void start() throws KeeperException {
|
||||
watcher.registerListener(this);
|
||||
if (ZKUtil.watchAndCheckExists(watcher, aclZNode)) {
|
||||
List<ZKUtil.NodeAndData> existing =
|
||||
ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
|
||||
if (existing != null) {
|
||||
refreshNodes(existing);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void nodeCreated(String path) {
|
||||
if (path.equals(aclZNode)) {
|
||||
try {
|
||||
List<ZKUtil.NodeAndData> nodes =
|
||||
ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
|
||||
refreshNodes(nodes);
|
||||
} catch (KeeperException ke) {
|
||||
LOG.error("Error reading data from zookeeper", ke);
|
||||
// only option is to abort
|
||||
watcher.abort("Zookeeper error obtaining acl node children", ke);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void nodeDeleted(String path) {
|
||||
if (aclZNode.equals(ZKUtil.getParent(path))) {
|
||||
String table = ZKUtil.getNodeName(path);
|
||||
authManager.remove(Bytes.toBytes(table));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void nodeDataChanged(String path) {
|
||||
if (aclZNode.equals(ZKUtil.getParent(path))) {
|
||||
// update cache on an existing table node
|
||||
String table = ZKUtil.getNodeName(path);
|
||||
try {
|
||||
byte[] data = ZKUtil.getDataAndWatch(watcher, path);
|
||||
authManager.refreshCacheFromWritable(Bytes.toBytes(table), data);
|
||||
} catch (KeeperException ke) {
|
||||
LOG.error("Error reading data from zookeeper for node "+table, ke);
|
||||
// only option is to abort
|
||||
watcher.abort("Zookeeper error getting data for node " + table, ke);
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Error reading permissions writables", ioe);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void nodeChildrenChanged(String path) {
|
||||
if (path.equals(aclZNode)) {
|
||||
// table permissions changed
|
||||
try {
|
||||
List<ZKUtil.NodeAndData> nodes =
|
||||
ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
|
||||
refreshNodes(nodes);
|
||||
} catch (KeeperException ke) {
|
||||
LOG.error("Error reading data from zookeeper for path "+path, ke);
|
||||
watcher.abort("Zookeeper error get node children for path "+path, ke);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void refreshNodes(List<ZKUtil.NodeAndData> nodes) {
|
||||
for (ZKUtil.NodeAndData n : nodes) {
|
||||
if (n.isEmpty()) continue;
|
||||
String path = n.getNode();
|
||||
String table = ZKUtil.getNodeName(path);
|
||||
try {
|
||||
byte[] nodeData = n.getData();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Updating permissions cache from node "+table+" with data: "+
|
||||
Bytes.toStringBinary(nodeData));
|
||||
}
|
||||
authManager.refreshCacheFromWritable(Bytes.toBytes(table),
|
||||
nodeData);
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Failed parsing permissions for table '" + table +
|
||||
"' from zk", ioe);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/***
|
||||
* Write a table's access controls to the permissions mirror in zookeeper
|
||||
* @param tableName
|
||||
* @param permsData
|
||||
*/
|
||||
public void writeToZookeeper(String tableName,
|
||||
byte[] permsData) {
|
||||
String zkNode =
|
||||
ZKUtil.joinZNode(ZKUtil.joinZNode(watcher.baseZNode, ACL_NODE),
|
||||
tableName);
|
||||
try {
|
||||
ZKUtil.createWithParents(watcher, zkNode);
|
||||
ZKUtil.updateExistingNodeData(watcher, zkNode,
|
||||
permsData, -1);
|
||||
} catch (KeeperException e) {
|
||||
LOG.error("Failed updating permissions for table '" + tableName +
|
||||
"'", e);
|
||||
watcher.abort("Failed writing node "+zkNode+" to zookeeper", e);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue