diff --git a/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureClient.java b/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureClient.java deleted file mode 100644 index a1604db8bd2..00000000000 --- a/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureClient.java +++ /dev/null @@ -1,522 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.ipc; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.security.HBaseSaslRpcClient; -import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod; -import org.apache.hadoop.hbase.security.KerberosInfo; -import org.apache.hadoop.hbase.security.TokenInfo; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier; -import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.PoolMap; -import org.apache.hadoop.io.*; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.security.token.TokenSelector; -import org.apache.hadoop.util.ReflectionUtils; - -import javax.net.SocketFactory; -import java.io.*; -import java.net.*; -import java.security.PrivilegedExceptionAction; -import java.util.HashMap; -import java.util.Hashtable; -import java.util.Iterator; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Random; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - -/** - * A client for an IPC service, which support SASL authentication of connections - * using either GSSAPI for Kerberos authentication or DIGEST-MD5 for - * authentication using signed tokens. - * - *

- * This is a copy of org.apache.hadoop.ipc.Client from secure Hadoop, - * reworked to remove code duplicated with - * {@link org.apache.hadoop.hbase.HBaseClient}. This is part of the loadable - * {@link SecureRpcEngine}, and only functions in connection with a - * {@link SecureServer} instance. - *

- */ -public class SecureClient extends HBaseClient { - - private static final Log LOG = - LogFactory.getLog("org.apache.hadoop.ipc.SecureClient"); - - protected static Map> tokenHandlers = - new HashMap>(); - static { - tokenHandlers.put(AuthenticationTokenIdentifier.AUTH_TOKEN_TYPE.toString(), - new AuthenticationTokenSelector()); - } - - /** Thread that reads responses and notifies callers. Each connection owns a - * socket connected to a remote address. Calls are multiplexed through this - * socket: responses may be delivered out of order. */ - protected class SecureConnection extends Connection { - private InetSocketAddress server; // server ip:port - private String serverPrincipal; // server's krb5 principal name - private SecureConnectionHeader header; // connection header - private AuthMethod authMethod; // authentication method - private boolean useSasl; - private Token token; - private HBaseSaslRpcClient saslRpcClient; - private int reloginMaxBackoff; // max pause before relogin on sasl failure - - public SecureConnection(ConnectionId remoteId) throws IOException { - super(remoteId); - this.server = remoteId.getAddress(); - - User ticket = remoteId.getTicket(); - Class protocol = remoteId.getProtocol(); - this.useSasl = User.isSecurityEnabled(); - if (useSasl && protocol != null) { - TokenInfo tokenInfo = protocol.getAnnotation(TokenInfo.class); - if (tokenInfo != null) { - TokenSelector tokenSelector = - tokenHandlers.get(tokenInfo.value()); - if (tokenSelector != null) { - token = tokenSelector.selectToken(new Text(clusterId), - ticket.getUGI().getTokens()); - } else if (LOG.isDebugEnabled()) { - LOG.debug("No token selector found for type "+tokenInfo.value()); - } - } - KerberosInfo krbInfo = protocol.getAnnotation(KerberosInfo.class); - if (krbInfo != null) { - String serverKey = krbInfo.serverPrincipal(); - if (serverKey == null) { - throw new IOException( - "Can't obtain server Kerberos config key from KerberosInfo"); - } - serverPrincipal = SecurityUtil.getServerPrincipal( - conf.get(serverKey), server.getAddress().getCanonicalHostName().toLowerCase()); - if (LOG.isDebugEnabled()) { - LOG.debug("RPC Server Kerberos principal name for protocol=" - + protocol.getCanonicalName() + " is " + serverPrincipal); - } - } - } - - if (!useSasl) { - authMethod = AuthMethod.SIMPLE; - } else if (token != null) { - authMethod = AuthMethod.DIGEST; - } else { - authMethod = AuthMethod.KERBEROS; - } - - header = new SecureConnectionHeader( - protocol == null ? null : protocol.getName(), ticket, authMethod); - - if (LOG.isDebugEnabled()) - LOG.debug("Use " + authMethod + " authentication for protocol " - + protocol.getSimpleName()); - - reloginMaxBackoff = conf.getInt("hbase.security.relogin.maxbackoff", 5000); - } - - private synchronized void disposeSasl() { - if (saslRpcClient != null) { - try { - saslRpcClient.dispose(); - saslRpcClient = null; - } catch (IOException ioe) { - LOG.info("Error disposing of SASL client", ioe); - } - } - } - - @Override - protected void sendParam(Call call) { - if (shouldCloseConnection.get()) { - return; - } - // For serializing the data to be written. - - final DataOutputBuffer d = new DataOutputBuffer(); - try { - if (LOG.isDebugEnabled()) { - LOG.debug(getName() + " sending #" + call.id); - } - d.writeInt(0xdeadbeef); // placeholder for data length - d.writeInt(call.id); - call.param.write(d); - byte[] data = d.getData(); - int dataLength = d.getLength(); - // fill in the placeholder - Bytes.putInt(data, 0, dataLength - 4); - //noinspection SynchronizeOnNonFinalField - synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC - out.write(data, 0, dataLength); - out.flush(); - } - } catch(IOException e) { - markClosed(e); - } finally { - //the buffer is just an in-memory buffer, but it is still polite to - // close early - IOUtils.closeStream(d); - } - } - - private synchronized boolean shouldAuthenticateOverKrb() throws IOException { - UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); - UserGroupInformation currentUser = - UserGroupInformation.getCurrentUser(); - UserGroupInformation realUser = currentUser.getRealUser(); - return authMethod == AuthMethod.KERBEROS && - loginUser != null && - //Make sure user logged in using Kerberos either keytab or TGT - loginUser.hasKerberosCredentials() && - // relogin only in case it is the login user (e.g. JT) - // or superuser (like oozie). - (loginUser.equals(currentUser) || loginUser.equals(realUser)); - } - - private synchronized boolean setupSaslConnection(final InputStream in2, - final OutputStream out2) - throws IOException { - saslRpcClient = new HBaseSaslRpcClient(authMethod, token, serverPrincipal); - return saslRpcClient.saslConnect(in2, out2); - } - - /** - * If multiple clients with the same principal try to connect - * to the same server at the same time, the server assumes a - * replay attack is in progress. This is a feature of kerberos. - * In order to work around this, what is done is that the client - * backs off randomly and tries to initiate the connection - * again. - * The other problem is to do with ticket expiry. To handle that, - * a relogin is attempted. - */ - private synchronized void handleSaslConnectionFailure( - final int currRetries, - final int maxRetries, final Exception ex, final Random rand, - final User user) - throws IOException, InterruptedException{ - user.runAs(new PrivilegedExceptionAction() { - public Object run() throws IOException, InterruptedException { - closeConnection(); - if (shouldAuthenticateOverKrb()) { - if (currRetries < maxRetries) { - LOG.debug("Exception encountered while connecting to " + - "the server : " + ex); - //try re-login - if (UserGroupInformation.isLoginKeytabBased()) { - UserGroupInformation.getLoginUser().reloginFromKeytab(); - } else { - UserGroupInformation.getLoginUser().reloginFromTicketCache(); - } - disposeSasl(); - //have granularity of milliseconds - //we are sleeping with the Connection lock held but since this - //connection instance is being used for connecting to the server - //in question, it is okay - Thread.sleep((rand.nextInt(reloginMaxBackoff) + 1)); - return null; - } else { - String msg = "Couldn't setup connection for " + - UserGroupInformation.getLoginUser().getUserName() + - " to " + serverPrincipal; - LOG.warn(msg); - throw (IOException) new IOException(msg).initCause(ex); - } - } else { - LOG.warn("Exception encountered while connecting to " + - "the server : " + ex); - } - if (ex instanceof RemoteException) - throw (RemoteException)ex; - throw new IOException(ex); - } - }); - } - - @Override - protected synchronized void setupIOstreams() - throws IOException, InterruptedException { - if (socket != null || shouldCloseConnection.get()) { - return; - } - - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Connecting to "+server); - } - short numRetries = 0; - final short MAX_RETRIES = 5; - Random rand = null; - while (true) { - setupConnection(); - InputStream inStream = NetUtils.getInputStream(socket); - OutputStream outStream = NetUtils.getOutputStream(socket); - writeRpcHeader(outStream); - if (useSasl) { - final InputStream in2 = inStream; - final OutputStream out2 = outStream; - User ticket = remoteId.getTicket(); - if (authMethod == AuthMethod.KERBEROS) { - UserGroupInformation ugi = ticket.getUGI(); - if (ugi != null && ugi.getRealUser() != null) { - ticket = User.create(ugi.getRealUser()); - } - } - boolean continueSasl = false; - try { - continueSasl = - ticket.runAs(new PrivilegedExceptionAction() { - @Override - public Boolean run() throws IOException { - return setupSaslConnection(in2, out2); - } - }); - } catch (Exception ex) { - if (rand == null) { - rand = new Random(); - } - handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, rand, - ticket); - continue; - } - if (continueSasl) { - // Sasl connect is successful. Let's set up Sasl i/o streams. - inStream = saslRpcClient.getInputStream(inStream); - outStream = saslRpcClient.getOutputStream(outStream); - } else { - // fall back to simple auth because server told us so. - authMethod = AuthMethod.SIMPLE; - header = new SecureConnectionHeader(header.getProtocol(), - header.getUser(), authMethod); - useSasl = false; - } - } - this.in = new DataInputStream(new BufferedInputStream - (new PingInputStream(inStream))); - this.out = new DataOutputStream - (new BufferedOutputStream(outStream)); - writeHeader(); - - // update last activity time - touch(); - - // start the receiver thread after the socket connection has been set up - start(); - return; - } - } catch (IOException e) { - markClosed(e); - close(); - - throw e; - } - } - - /* Write the RPC header */ - private void writeRpcHeader(OutputStream outStream) throws IOException { - DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream)); - // Write out the header, version and authentication method - out.write(SecureServer.HEADER.array()); - out.write(SecureServer.CURRENT_VERSION); - authMethod.write(out); - out.flush(); - } - - /** - * Write the protocol header for each connection - * Out is not synchronized because only the first thread does this. - */ - private void writeHeader() throws IOException { - // Write out the ConnectionHeader - DataOutputBuffer buf = new DataOutputBuffer(); - header.write(buf); - - // Write out the payload length - int bufLen = buf.getLength(); - out.writeInt(bufLen); - out.write(buf.getData(), 0, bufLen); - } - - @Override - protected void receiveResponse() { - if (shouldCloseConnection.get()) { - return; - } - touch(); - - try { - int id = in.readInt(); // try to read an id - - if (LOG.isDebugEnabled()) - LOG.debug(getName() + " got value #" + id); - - Call call = calls.remove(id); - - int state = in.readInt(); // read call status - if (LOG.isDebugEnabled()) { - LOG.debug("call #"+id+" state is " + state); - } - if (state == Status.SUCCESS.state) { - Writable value = ReflectionUtils.newInstance(valueClass, conf); - value.readFields(in); // read value - if (LOG.isDebugEnabled()) { - LOG.debug("call #"+id+", response is:\n"+value.toString()); - } - call.setValue(value); - } else if (state == Status.ERROR.state) { - call.setException(new RemoteException(WritableUtils.readString(in), - WritableUtils.readString(in))); - } else if (state == Status.FATAL.state) { - // Close the connection - markClosed(new RemoteException(WritableUtils.readString(in), - WritableUtils.readString(in))); - } - } catch (IOException e) { - if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) { - // Clean up open calls but don't treat this as a fatal condition, - // since we expect certain responses to not make it by the specified - // {@link ConnectionId#rpcTimeout}. - closeException = e; - } else { - // Since the server did not respond within the default ping interval - // time, treat this as a fatal condition and close this connection - markClosed(e); - } - } finally { - if (remoteId.rpcTimeout > 0) { - cleanupCalls(remoteId.rpcTimeout); - } - } - } - - /** Close the connection. */ - protected synchronized void close() { - if (!shouldCloseConnection.get()) { - LOG.error("The connection is not in the closed state"); - return; - } - - // release the resources - // first thing to do;take the connection out of the connection list - synchronized (connections) { - if (connections.get(remoteId) == this) { - connections.remove(remoteId); - } - } - - // close the streams and therefore the socket - IOUtils.closeStream(out); - IOUtils.closeStream(in); - disposeSasl(); - - // clean up all calls - if (closeException == null) { - if (!calls.isEmpty()) { - LOG.warn( - "A connection is closed for no cause and calls are not empty"); - - // clean up calls anyway - closeException = new IOException("Unexpected closed connection"); - cleanupCalls(); - } - } else { - // log the info - if (LOG.isDebugEnabled()) { - LOG.debug("closing ipc connection to " + server + ": " + - closeException.getMessage(),closeException); - } - - // cleanup calls - cleanupCalls(); - } - if (LOG.isDebugEnabled()) - LOG.debug(getName() + ": closed"); - } - } - - /** - * Construct an IPC client whose values are of the given {@link org.apache.hadoop.io.Writable} - * class. - * @param valueClass value class - * @param conf configuration - * @param factory socket factory - */ - public SecureClient(Class valueClass, Configuration conf, - SocketFactory factory) { - super(valueClass, conf, factory); - } - - /** - * Construct an IPC client with the default SocketFactory - * @param valueClass value class - * @param conf configuration - */ - public SecureClient(Class valueClass, Configuration conf) { - this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf)); - } - - @Override - protected SecureConnection getConnection(InetSocketAddress addr, - Class protocol, - User ticket, - int rpcTimeout, - Call call) - throws IOException, InterruptedException { - if (!running.get()) { - // the client is stopped - throw new IOException("The client is stopped"); - } - SecureConnection connection; - /* we could avoid this allocation for each RPC by having a - * connectionsId object and with set() method. We need to manage the - * refs for keys in HashMap properly. For now its ok. - */ - ConnectionId remoteId = new ConnectionId(addr, protocol, ticket, rpcTimeout); - do { - synchronized (connections) { - connection = (SecureConnection)connections.get(remoteId); - if (connection == null) { - connection = new SecureConnection(remoteId); - connections.put(remoteId, connection); - } - } - } while (!connection.addCall(call)); - - //we don't invoke the method below inside "synchronized (connections)" - //block above. The reason for that is if the server happens to be slow, - //it will take longer to establish a connection and that will slow the - //entire system down. - connection.setupIOstreams(); - return connection; - } -} \ No newline at end of file diff --git a/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureConnectionHeader.java b/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureConnectionHeader.java deleted file mode 100644 index 50608215199..00000000000 --- a/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureConnectionHeader.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.ipc; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.security.UserGroupInformation; - -/** - * The IPC connection header sent by the client to the server - * on connection establishment. Part of the {@link SecureRpcEngine} - * implementation. - */ -class SecureConnectionHeader extends ConnectionHeader { - private User user = null; - private AuthMethod authMethod; - - public SecureConnectionHeader() {} - - /** - * Create a new {@link org.apache.hadoop.hbase.ipc.SecureConnectionHeader} with the given protocol - * and {@link org.apache.hadoop.security.UserGroupInformation}. - * @param protocol protocol used for communication between the IPC client - * and the server - * @param ugi {@link org.apache.hadoop.security.UserGroupInformation} of the client communicating with - * the server - */ - public SecureConnectionHeader(String protocol, User user, AuthMethod authMethod) { - this.protocol = protocol; - this.user = user; - this.authMethod = authMethod; - } - - @Override - public void readFields(DataInput in) throws IOException { - protocol = Text.readString(in); - if (protocol.isEmpty()) { - protocol = null; - } - boolean ugiUsernamePresent = in.readBoolean(); - if (ugiUsernamePresent) { - String username = in.readUTF(); - boolean realUserNamePresent = in.readBoolean(); - if (realUserNamePresent) { - String realUserName = in.readUTF(); - UserGroupInformation realUserUgi = - UserGroupInformation.createRemoteUser(realUserName); - user = User.create( - UserGroupInformation.createProxyUser(username, realUserUgi)); - } else { - user = User.create(UserGroupInformation.createRemoteUser(username)); - } - } else { - user = null; - } - } - - @Override - public void write(DataOutput out) throws IOException { - Text.writeString(out, (protocol == null) ? "" : protocol); - if (user != null) { - UserGroupInformation ugi = user.getUGI(); - if (authMethod == AuthMethod.KERBEROS) { - // Send effective user for Kerberos auth - out.writeBoolean(true); - out.writeUTF(ugi.getUserName()); - out.writeBoolean(false); - } else if (authMethod == AuthMethod.DIGEST) { - // Don't send user for token auth - out.writeBoolean(false); - } else { - //Send both effective user and real user for simple auth - out.writeBoolean(true); - out.writeUTF(ugi.getUserName()); - if (ugi.getRealUser() != null) { - out.writeBoolean(true); - out.writeUTF(ugi.getRealUser().getUserName()); - } else { - out.writeBoolean(false); - } - } - } else { - out.writeBoolean(false); - } - } - - public String getProtocol() { - return protocol; - } - - public User getUser() { - return user; - } - - public String toString() { - return protocol + "-" + user; - } -} diff --git a/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java b/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java deleted file mode 100644 index 8383d6c8e88..00000000000 --- a/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java +++ /dev/null @@ -1,437 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.ipc; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.io.HbaseObjectWritable; -import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.security.HBasePolicyProvider; -import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; -import org.apache.hadoop.hbase.util.Objects; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; - -import com.google.protobuf.ServiceException; - -import javax.net.SocketFactory; -import java.io.IOException; -import java.lang.reflect.*; -import java.net.InetSocketAddress; -import java.util.HashMap; -import java.util.Map; - -/** - * A loadable RPC engine supporting SASL authentication of connections, using - * GSSAPI for Kerberos authentication or DIGEST-MD5 for authentication via - * signed tokens. - * - *

- * This is a fork of the {@code org.apache.hadoop.ipc.WriteableRpcEngine} from - * secure Hadoop, reworked to eliminate code duplication with the existing - * HBase {@link WritableRpcEngine}. - *

- * - * @see SecureClient - * @see SecureServer - */ -public class SecureRpcEngine implements RpcEngine { - // Leave this out in the hadoop ipc package but keep class name. Do this - // so that we dont' get the logging of this class's invocations by doing our - // blanket enabling DEBUG on the o.a.h.h. package. - protected static final Log LOG = - LogFactory.getLog("org.apache.hadoop.ipc.SecureRpcEngine"); - - private SecureRpcEngine() { - super(); - } // no public ctor - - /* Cache a client using its socket factory as the hash key */ - static private class ClientCache { - private Map clients = - new HashMap(); - - protected ClientCache() {} - - /** - * Construct & cache an IPC client with the user-provided SocketFactory - * if no cached client exists. - * - * @param conf Configuration - * @param factory socket factory - * @return an IPC client - */ - protected synchronized SecureClient getClient(Configuration conf, - SocketFactory factory) { - // Construct & cache client. The configuration is only used for timeout, - // and Clients have connection pools. So we can either (a) lose some - // connection pooling and leak sockets, or (b) use the same timeout for all - // configurations. Since the IPC is usually intended globally, not - // per-job, we choose (a). - SecureClient client = clients.get(factory); - if (client == null) { - // Make an hbase client instead of hadoop Client. - client = new SecureClient(HbaseObjectWritable.class, conf, factory); - clients.put(factory, client); - } else { - client.incCount(); - } - return client; - } - - /** - * Construct & cache an IPC client with the default SocketFactory - * if no cached client exists. - * - * @param conf Configuration - * @return an IPC client - */ - protected synchronized SecureClient getClient(Configuration conf) { - return getClient(conf, SocketFactory.getDefault()); - } - - /** - * Stop a RPC client connection - * A RPC client is closed only when its reference count becomes zero. - * @param client client to stop - */ - protected void stopClient(SecureClient client) { - synchronized (this) { - client.decCount(); - if (client.isZeroReference()) { - clients.remove(client.getSocketFactory()); - } - } - if (client.isZeroReference()) { - client.stop(); - } - } - } - - protected final static ClientCache CLIENTS = new ClientCache(); - - private static class Invoker implements InvocationHandler { - private Class protocol; - private InetSocketAddress address; - private User ticket; - private SecureClient client; - private boolean isClosed = false; - final private int rpcTimeout; - - public Invoker(Class protocol, - InetSocketAddress address, User ticket, - Configuration conf, SocketFactory factory, int rpcTimeout) { - this.protocol = protocol; - this.address = address; - this.ticket = ticket; - this.client = CLIENTS.getClient(conf, factory); - this.rpcTimeout = rpcTimeout; - } - - public Object invoke(Object proxy, Method method, Object[] args) - throws Throwable { - final boolean logDebug = LOG.isDebugEnabled(); - long startTime = 0; - if (logDebug) { - startTime = System.currentTimeMillis(); - } - try { - HbaseObjectWritable value = (HbaseObjectWritable) - client.call(new Invocation(method, args), address, - protocol, ticket, rpcTimeout); - if (logDebug) { - long callTime = System.currentTimeMillis() - startTime; - LOG.debug("Call: " + method.getName() + " " + callTime); - } - return value.get(); - } catch (Throwable t) { - // For protobuf protocols, ServiceException is expected - if (Invocation.PROTOBUF_PROTOCOLS.contains(protocol)) { - if (t instanceof RemoteException) { - Throwable cause = ((RemoteException)t).unwrapRemoteException(); - throw new ServiceException(cause); - } - throw new ServiceException(t); - } - throw t; - } - } - - /* close the IPC client that's responsible for this invoker's RPCs */ - synchronized protected void close() { - if (!isClosed) { - isClosed = true; - CLIENTS.stopClient(client); - } - } - } - - /** - * Construct a client-side proxy object that implements the named protocol, - * talking to a server at the named address. - * - * @param protocol interface - * @param clientVersion version we are expecting - * @param addr remote address - * @param ticket ticket - * @param conf configuration - * @param factory socket factory - * @return proxy - * @throws java.io.IOException e - */ - public VersionedProtocol getProxy( - Class protocol, long clientVersion, - InetSocketAddress addr, User ticket, - Configuration conf, SocketFactory factory, int rpcTimeout) - throws IOException { - if (User.isSecurityEnabled()) { - HBaseSaslRpcServer.init(conf); - } - VersionedProtocol proxy = - (VersionedProtocol) Proxy.newProxyInstance( - protocol.getClassLoader(), new Class[] { protocol }, - new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)); - try { - long serverVersion = proxy.getProtocolVersion(protocol.getName(), - clientVersion); - if (serverVersion != clientVersion) { - throw new HBaseRPC.VersionMismatch(protocol.getName(), clientVersion, - serverVersion); - } - } catch (Throwable t) { - if (t instanceof UndeclaredThrowableException) { - t = t.getCause(); - } - if (t instanceof ServiceException) { - throw ProtobufUtil.getRemoteException((ServiceException)t); - } - if (!(t instanceof IOException)) { - LOG.error("Unexpected throwable object ", t); - throw new IOException(t); - } - throw (IOException)t; - } - return proxy; - } - - /** - * Stop this proxy and release its invoker's resource - * @param proxy the proxy to be stopped - */ - public void stopProxy(VersionedProtocol proxy) { - if (proxy!=null) { - ((Invoker)Proxy.getInvocationHandler(proxy)).close(); - } - } - - - /** Expert: Make multiple, parallel calls to a set of servers. */ - public Object[] call(Method method, Object[][] params, - InetSocketAddress[] addrs, - Class protocol, - User ticket, Configuration conf) - throws IOException, InterruptedException { - - Invocation[] invocations = new Invocation[params.length]; - for (int i = 0; i < params.length; i++) - invocations[i] = new Invocation(method, params[i]); - SecureClient client = CLIENTS.getClient(conf); - try { - Writable[] wrappedValues = - client.call(invocations, addrs, protocol, ticket); - - if (method.getReturnType() == Void.TYPE) { - return null; - } - - Object[] values = - (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length); - for (int i = 0; i < values.length; i++) - if (wrappedValues[i] != null) - values[i] = ((HbaseObjectWritable)wrappedValues[i]).get(); - - return values; - } finally { - CLIENTS.stopClient(client); - } - } - - /** Construct a server for a protocol implementation instance listening on a - * port and address, with a secret manager. */ - public Server getServer(Class protocol, - final Object instance, - Class[] ifaces, - final String bindAddress, final int port, - final int numHandlers, - int metaHandlerCount, final boolean verbose, Configuration conf, - int highPriorityLevel) - throws IOException { - Server server = new Server(instance, ifaces, conf, bindAddress, port, - numHandlers, metaHandlerCount, verbose, - highPriorityLevel); - return server; - } - - /** An RPC Server. */ - public static class Server extends SecureServer { - private Object instance; - private Class implementation; - private Class[] ifaces; - private boolean verbose; - - private static String classNameBase(String className) { - String[] names = className.split("\\.", -1); - if (names == null || names.length == 0) { - return className; - } - return names[names.length-1]; - } - - /** Construct an RPC server. - * @param instance the instance whose methods will be called - * @param conf the configuration to use - * @param bindAddress the address to bind on to listen for connection - * @param port the port to listen for connections on - * @param numHandlers the number of method handler threads to run - * @param verbose whether each call should be logged - * @throws java.io.IOException e - */ - public Server(Object instance, final Class[] ifaces, - Configuration conf, String bindAddress, int port, - int numHandlers, int metaHandlerCount, boolean verbose, - int highPriorityLevel) - throws IOException { - super(bindAddress, port, Invocation.class, numHandlers, metaHandlerCount, conf, - classNameBase(instance.getClass().getName()), highPriorityLevel); - this.instance = instance; - this.implementation = instance.getClass(); - this.verbose = verbose; - - this.ifaces = ifaces; - - // create metrics for the advertised interfaces this server implements. - this.rpcMetrics.createMetrics(this.ifaces); - } - - public AuthenticationTokenSecretManager createSecretManager(){ - if (instance instanceof org.apache.hadoop.hbase.Server) { - org.apache.hadoop.hbase.Server server = - (org.apache.hadoop.hbase.Server)instance; - Configuration conf = server.getConfiguration(); - long keyUpdateInterval = - conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000); - long maxAge = - conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000); - return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(), - server.getServerName().toString(), keyUpdateInterval, maxAge); - } - return null; - } - - @Override - public void startThreads() { - AuthenticationTokenSecretManager mgr = createSecretManager(); - if (mgr != null) { - setSecretManager(mgr); - mgr.start(); - } - this.authManager = new ServiceAuthorizationManager(); - HBasePolicyProvider.init(conf, authManager); - - // continue with base startup - super.startThreads(); - } - - @Override - public Writable call(Class protocol, - Writable param, long receivedTime, MonitoredRPCHandler status) - throws IOException { - try { - Invocation call = (Invocation)param; - if(call.getMethodName() == null) { - throw new IOException("Could not find requested method, the usual " + - "cause is a version mismatch between client and server."); - } - if (verbose) log("Call: " + call); - - Method method = - protocol.getMethod(call.getMethodName(), - call.getParameterClasses()); - method.setAccessible(true); - - Object impl = null; - if (protocol.isAssignableFrom(this.implementation)) { - impl = this.instance; - } - else { - throw new HBaseRPC.UnknownProtocolException(protocol); - } - - long startTime = System.currentTimeMillis(); - Object[] params = call.getParameters(); - Object value = method.invoke(impl, params); - int processingTime = (int) (System.currentTimeMillis() - startTime); - int qTime = (int) (startTime-receivedTime); - if (TRACELOG.isDebugEnabled()) { - TRACELOG.debug("Call #" + CurCall.get().id + - "; Served: " + protocol.getSimpleName()+"#"+call.getMethodName() + - " queueTime=" + qTime + - " processingTime=" + processingTime + - " contents=" + Objects.describeQuantity(params)); - } - rpcMetrics.rpcQueueTime.inc(qTime); - rpcMetrics.rpcProcessingTime.inc(processingTime); - rpcMetrics.inc(call.getMethodName(), processingTime); - if (verbose) log("Return: "+value); - - return new HbaseObjectWritable(method.getReturnType(), value); - } catch (InvocationTargetException e) { - Throwable target = e.getTargetException(); - if (target instanceof IOException) { - throw (IOException)target; - } - if (target instanceof ServiceException) { - throw ProtobufUtil.getRemoteException((ServiceException)target); - } - IOException ioe = new IOException(target.toString()); - ioe.setStackTrace(target.getStackTrace()); - throw ioe; - } catch (Throwable e) { - if (!(e instanceof IOException)) { - LOG.error("Unexpected throwable object ", e); - } - IOException ioe = new IOException(e.toString()); - ioe.setStackTrace(e.getStackTrace()); - throw ioe; - } - } - } - - protected static void log(String value) { - String v = value; - if (v != null && v.length() > 55) - v = v.substring(0, 55)+"..."; - LOG.info(v); - } -} \ No newline at end of file diff --git a/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureServer.java b/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureServer.java deleted file mode 100644 index 8f285560243..00000000000 --- a/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureServer.java +++ /dev/null @@ -1,739 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.ipc; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.io.HbaseObjectWritable; -import org.apache.hadoop.hbase.io.WritableWithSize; -import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; -import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod; -import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler; -import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler; -import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslStatus; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.util.ByteBufferOutputStream; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; -import org.apache.hadoop.security.authorize.AuthorizationException; -import org.apache.hadoop.security.authorize.ProxyUsers; -import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; -import org.apache.hadoop.security.token.SecretManager; -import org.apache.hadoop.security.token.SecretManager.InvalidToken; -import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.util.StringUtils; - -import com.google.common.collect.ImmutableSet; - -import javax.security.sasl.Sasl; -import javax.security.sasl.SaslException; -import javax.security.sasl.SaslServer; -import java.io.*; -import java.net.*; -import java.nio.ByteBuffer; -import java.nio.channels.*; -import java.security.PrivilegedExceptionAction; -import java.util.*; - -import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION; - -/** - * An abstract IPC service, supporting SASL authentication of connections, - * using GSSAPI for Kerberos authentication or DIGEST-MD5 for authentication - * via signed tokens. - * - *

- * This is part of the {@link SecureRpcEngine} implementation. - *

- * - * @see org.apache.hadoop.hbase.ipc.SecureClient - */ -public abstract class SecureServer extends HBaseServer { - private final boolean authorize; - private boolean isSecurityEnabled; - - /** - * The first four bytes of secure RPC connections - */ - public static final ByteBuffer HEADER = ByteBuffer.wrap("srpc".getBytes()); - - // 1 : Introduce ping and server does not throw away RPCs - // 3 : Introduce the protocol into the RPC connection header - // 4 : Introduced SASL security layer - public static final byte CURRENT_VERSION = 4; - public static final Set INSECURE_VERSIONS = ImmutableSet.of((byte) 5); - - public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.SecureServer"); - private static final Log AUDITLOG = - LogFactory.getLog("SecurityLogger.org.apache.hadoop.ipc.SecureServer"); - private static final String AUTH_FAILED_FOR = "Auth failed for "; - private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for "; - - protected SecretManager secretManager; - protected ServiceAuthorizationManager authManager; - - protected class SecureCall extends HBaseServer.Call { - public SecureCall(int id, Writable param, Connection connection, - Responder responder, long size) { - super(id, param, connection, responder, size); - } - - @Override - protected synchronized void setResponse(Object value, Status status, - String errorClass, String error) { - Writable result = null; - if (value instanceof Writable) { - result = (Writable) value; - } else { - /* We might have a null value and errors. Avoid creating a - * HbaseObjectWritable, because the constructor fails on null. */ - if (value != null) { - result = new HbaseObjectWritable(value); - } - } - - int size = BUFFER_INITIAL_SIZE; - if (result instanceof WritableWithSize) { - // get the size hint. - WritableWithSize ohint = (WritableWithSize) result; - long hint = ohint.getWritableSize() + Bytes.SIZEOF_INT + Bytes.SIZEOF_INT; - if (hint > Integer.MAX_VALUE) { - // oops, new problem. - IOException ioe = - new IOException("Result buffer size too large: " + hint); - errorClass = ioe.getClass().getName(); - error = StringUtils.stringifyException(ioe); - } else { - size = (int)hint; - } - } - - ByteBufferOutputStream buf = new ByteBufferOutputStream(size); - DataOutputStream out = new DataOutputStream(buf); - try { - out.writeInt(this.id); // write call id - out.writeInt(status.state); // write status - } catch (IOException e) { - errorClass = e.getClass().getName(); - error = StringUtils.stringifyException(e); - } - - try { - if (status == Status.SUCCESS) { - result.write(out); - } else { - WritableUtils.writeString(out, errorClass); - WritableUtils.writeString(out, error); - } - if (((SecureConnection)connection).useWrap) { - wrapWithSasl(buf); - } - } catch (IOException e) { - LOG.warn("Error sending response to call: ", e); - } - - this.response = buf.getByteBuffer(); - } - - private void wrapWithSasl(ByteBufferOutputStream response) - throws IOException { - if (((SecureConnection)connection).useSasl) { - // getByteBuffer calls flip() - ByteBuffer buf = response.getByteBuffer(); - byte[] token; - // synchronization may be needed since there can be multiple Handler - // threads using saslServer to wrap responses. - synchronized (((SecureConnection)connection).saslServer) { - token = ((SecureConnection)connection).saslServer.wrap(buf.array(), - buf.arrayOffset(), buf.remaining()); - } - if (LOG.isDebugEnabled()) - LOG.debug("Adding saslServer wrapped token of size " + token.length - + " as call response."); - buf.clear(); - DataOutputStream saslOut = new DataOutputStream(response); - saslOut.writeInt(token.length); - saslOut.write(token, 0, token.length); - } - } - } - - /** Reads calls from a connection and queues them for handling. */ - public class SecureConnection extends HBaseServer.Connection { - private boolean rpcHeaderRead = false; // if initial rpc header is read - private boolean headerRead = false; //if the connection header that - //follows version is read. - private ByteBuffer data; - private ByteBuffer dataLengthBuffer; - protected final LinkedList responseQueue; - private int dataLength; - private InetAddress addr; - - boolean useSasl; - SaslServer saslServer; - private AuthMethod authMethod; - private boolean saslContextEstablished; - private boolean skipInitialSaslHandshake; - private ByteBuffer rpcHeaderBuffer; - private ByteBuffer unwrappedData; - private ByteBuffer unwrappedDataLengthBuffer; - private SecureConnectionHeader header; - - public UserGroupInformation attemptingUser = null; // user name before auth - - // Fake 'call' for failed authorization response - private final int AUTHORIZATION_FAILED_CALLID = -1; - // Fake 'call' for SASL context setup - private static final int SASL_CALLID = -33; - private final SecureCall saslCall = new SecureCall(SASL_CALLID, null, this, null, 0); - - private boolean useWrap = false; - - public SecureConnection(SocketChannel channel, long lastContact) { - super(channel, lastContact); - this.header = new SecureConnectionHeader(); - this.channel = channel; - this.data = null; - this.dataLengthBuffer = ByteBuffer.allocate(4); - this.unwrappedData = null; - this.unwrappedDataLengthBuffer = ByteBuffer.allocate(4); - this.socket = channel.socket(); - this.addr = socket.getInetAddress(); - this.responseQueue = new LinkedList(); - } - - @Override - public String toString() { - return getHostAddress() + ":" + remotePort; - } - - public String getHostAddress() { - return hostAddress; - } - - public InetAddress getHostInetAddress() { - return addr; - } - - private User getAuthorizedUgi(String authorizedId) - throws IOException { - if (authMethod == AuthMethod.DIGEST) { - TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId, - secretManager); - UserGroupInformation ugi = tokenId.getUser(); - if (ugi == null) { - throw new AccessControlException( - "Can't retrieve username from tokenIdentifier."); - } - ugi.addTokenIdentifier(tokenId); - return User.create(ugi); - } else { - return User.create(UserGroupInformation.createRemoteUser(authorizedId)); - } - } - - private void saslReadAndProcess(byte[] saslToken) throws IOException, - InterruptedException { - if (!saslContextEstablished) { - byte[] replyToken = null; - try { - if (saslServer == null) { - switch (authMethod) { - case DIGEST: - if (secretManager == null) { - throw new AccessControlException( - "Server is not configured to do DIGEST authentication."); - } - saslServer = Sasl.createSaslServer(AuthMethod.DIGEST - .getMechanismName(), null, HBaseSaslRpcServer.SASL_DEFAULT_REALM, - HBaseSaslRpcServer.SASL_PROPS, new SaslDigestCallbackHandler( - secretManager, this)); - break; - default: - UserGroupInformation current = UserGroupInformation - .getCurrentUser(); - String fullName = current.getUserName(); - if (LOG.isDebugEnabled()) - LOG.debug("Kerberos principal name is " + fullName); - final String names[] = HBaseSaslRpcServer.splitKerberosName(fullName); - if (names.length != 3) { - throw new AccessControlException( - "Kerberos principal name does NOT have the expected " - + "hostname part: " + fullName); - } - current.doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws SaslException { - saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS - .getMechanismName(), names[0], names[1], - HBaseSaslRpcServer.SASL_PROPS, new SaslGssCallbackHandler()); - return null; - } - }); - } - if (saslServer == null) - throw new AccessControlException( - "Unable to find SASL server implementation for " - + authMethod.getMechanismName()); - if (LOG.isDebugEnabled()) - LOG.debug("Created SASL server with mechanism = " - + authMethod.getMechanismName()); - } - if (LOG.isDebugEnabled()) - LOG.debug("Have read input token of size " + saslToken.length - + " for processing by saslServer.evaluateResponse()"); - replyToken = saslServer.evaluateResponse(saslToken); - } catch (IOException e) { - IOException sendToClient = e; - Throwable cause = e; - while (cause != null) { - if (cause instanceof InvalidToken) { - sendToClient = (InvalidToken) cause; - break; - } - cause = cause.getCause(); - } - doSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(), - sendToClient.getLocalizedMessage()); - rpcMetrics.authenticationFailures.inc(); - String clientIP = this.toString(); - // attempting user could be null - AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser); - throw e; - } - if (replyToken != null) { - if (LOG.isDebugEnabled()) - LOG.debug("Will send token of size " + replyToken.length - + " from saslServer."); - doSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null, - null); - } - if (saslServer.isComplete()) { - LOG.debug("SASL server context established. Negotiated QoP is " - + saslServer.getNegotiatedProperty(Sasl.QOP)); - String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP); - useWrap = qop != null && !"auth".equalsIgnoreCase(qop); - user = getAuthorizedUgi(saslServer.getAuthorizationID()); - LOG.debug("SASL server successfully authenticated client: " + user); - rpcMetrics.authenticationSuccesses.inc(); - AUDITLOG.trace(AUTH_SUCCESSFUL_FOR + user); - saslContextEstablished = true; - } - } else { - if (LOG.isDebugEnabled()) - LOG.debug("Have read input token of size " + saslToken.length - + " for processing by saslServer.unwrap()"); - - if (!useWrap) { - processOneRpc(saslToken); - } else { - byte[] plaintextData = saslServer.unwrap(saslToken, 0, - saslToken.length); - processUnwrappedData(plaintextData); - } - } - } - - private void doSaslReply(SaslStatus status, Writable rv, - String errorClass, String error) throws IOException { - saslCall.setResponse(rv, - status == SaslStatus.SUCCESS ? Status.SUCCESS : Status.ERROR, - errorClass, error); - saslCall.responder = responder; - saslCall.sendResponseIfReady(); - } - - private void disposeSasl() { - if (saslServer != null) { - try { - saslServer.dispose(); - } catch (SaslException ignored) { - } - } - } - - public int readAndProcess() throws IOException, InterruptedException { - while (true) { - /* Read at most one RPC. If the header is not read completely yet - * then iterate until we read first RPC or until there is no data left. - */ - int count = -1; - if (dataLengthBuffer.remaining() > 0) { - count = channelRead(channel, dataLengthBuffer); - if (count < 0 || dataLengthBuffer.remaining() > 0) - return count; - } - - if (!rpcHeaderRead) { - //Every connection is expected to send the header. - if (rpcHeaderBuffer == null) { - rpcHeaderBuffer = ByteBuffer.allocate(2); - } - count = channelRead(channel, rpcHeaderBuffer); - if (count < 0 || rpcHeaderBuffer.remaining() > 0) { - return count; - } - int version = rpcHeaderBuffer.get(0); - byte[] method = new byte[] {rpcHeaderBuffer.get(1)}; - authMethod = AuthMethod.read(new DataInputStream( - new ByteArrayInputStream(method))); - dataLengthBuffer.flip(); - if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) { - //Warning is ok since this is not supposed to happen. - if (INSECURE_VERSIONS.contains(version)) { - LOG.warn("An insecure client (version '" + version + "') is attempting to connect " + - " to this version '" + CURRENT_VERSION + "' secure server from " + - hostAddress + ":" + remotePort); - } else { - LOG.warn("Incorrect header or version mismatch from " + - hostAddress + ":" + remotePort + - " got version " + version + - " expected version " + CURRENT_VERSION); - } - - return -1; - } - dataLengthBuffer.clear(); - if (authMethod == null) { - throw new IOException("Unable to read authentication method"); - } - if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) { - AccessControlException ae = new AccessControlException( - "Authentication is required"); - SecureCall failedCall = new SecureCall(AUTHORIZATION_FAILED_CALLID, null, this, - null, 0); - failedCall.setResponse(null, Status.FATAL, ae.getClass().getName(), - ae.getMessage()); - responder.doRespond(failedCall); - throw ae; - } - if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) { - doSaslReply(SaslStatus.SUCCESS, new IntWritable( - HBaseSaslRpcServer.SWITCH_TO_SIMPLE_AUTH), null, null); - authMethod = AuthMethod.SIMPLE; - // client has already sent the initial Sasl message and we - // should ignore it. Both client and server should fall back - // to simple auth from now on. - skipInitialSaslHandshake = true; - } - if (authMethod != AuthMethod.SIMPLE) { - useSasl = true; - } - - rpcHeaderBuffer = null; - rpcHeaderRead = true; - continue; - } - - if (data == null) { - dataLengthBuffer.flip(); - dataLength = dataLengthBuffer.getInt(); - - if (dataLength == HBaseClient.PING_CALL_ID) { - if(!useWrap) { //covers the !useSasl too - dataLengthBuffer.clear(); - return 0; //ping message - } - } - if (dataLength < 0) { - LOG.warn("Unexpected data length " + dataLength + "!! from " + - getHostAddress()); - } - data = ByteBuffer.allocate(dataLength); - incRpcCount(); // Increment the rpc count - } - - count = channelRead(channel, data); - - if (data.remaining() == 0) { - dataLengthBuffer.clear(); - data.flip(); - if (skipInitialSaslHandshake) { - data = null; - skipInitialSaslHandshake = false; - continue; - } - boolean isHeaderRead = headerRead; - if (useSasl) { - saslReadAndProcess(data.array()); - } else { - processOneRpc(data.array()); - } - data = null; - if (!isHeaderRead) { - continue; - } - } - return count; - } - } - - /// Reads the connection header following version - private void processHeader(byte[] buf) throws IOException { - DataInputStream in = - new DataInputStream(new ByteArrayInputStream(buf)); - header.readFields(in); - try { - String protocolClassName = header.getProtocol(); - if (protocolClassName != null) { - protocol = getProtocolClass(header.getProtocol(), conf); - } - } catch (ClassNotFoundException cnfe) { - throw new IOException("Unknown protocol: " + header.getProtocol()); - } - - User protocolUser = header.getUser(); - if (!useSasl) { - user = protocolUser; - if (user != null) { - user.getUGI().setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod); - } - } else { - // user is authenticated - user.getUGI().setAuthenticationMethod(authMethod.authenticationMethod); - //Now we check if this is a proxy user case. If the protocol user is - //different from the 'user', it is a proxy user scenario. However, - //this is not allowed if user authenticated with DIGEST. - if ((protocolUser != null) - && (!protocolUser.getName().equals(user.getName()))) { - if (authMethod == AuthMethod.DIGEST) { - // Not allowed to doAs if token authentication is used - throw new AccessControlException("Authenticated user (" + user - + ") doesn't match what the client claims to be (" - + protocolUser + ")"); - } else { - // Effective user can be different from authenticated user - // for simple auth or kerberos auth - // The user is the real user. Now we create a proxy user - UserGroupInformation realUser = user.getUGI(); - user = User.create( - UserGroupInformation.createProxyUser(protocolUser.getName(), - realUser)); - // Now the user is a proxy user, set Authentication method Proxy. - user.getUGI().setAuthenticationMethod(AuthenticationMethod.PROXY); - } - } - } - } - - private void processUnwrappedData(byte[] inBuf) throws IOException, - InterruptedException { - ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream( - inBuf)); - // Read all RPCs contained in the inBuf, even partial ones - while (true) { - int count = -1; - if (unwrappedDataLengthBuffer.remaining() > 0) { - count = channelRead(ch, unwrappedDataLengthBuffer); - if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0) - return; - } - - if (unwrappedData == null) { - unwrappedDataLengthBuffer.flip(); - int unwrappedDataLength = unwrappedDataLengthBuffer.getInt(); - - if (unwrappedDataLength == HBaseClient.PING_CALL_ID) { - if (LOG.isDebugEnabled()) - LOG.debug("Received ping message"); - unwrappedDataLengthBuffer.clear(); - continue; // ping message - } - unwrappedData = ByteBuffer.allocate(unwrappedDataLength); - } - - count = channelRead(ch, unwrappedData); - if (count <= 0 || unwrappedData.remaining() > 0) - return; - - if (unwrappedData.remaining() == 0) { - unwrappedDataLengthBuffer.clear(); - unwrappedData.flip(); - processOneRpc(unwrappedData.array()); - unwrappedData = null; - } - } - } - - private void processOneRpc(byte[] buf) throws IOException, - InterruptedException { - if (headerRead) { - processData(buf); - } else { - processHeader(buf); - headerRead = true; - if (!authorizeConnection()) { - throw new AccessControlException("Connection from " + this - + " for protocol " + header.getProtocol() - + " is unauthorized for user " + user); - } - } - } - - protected void processData(byte[] buf) throws IOException, InterruptedException { - DataInputStream dis = - new DataInputStream(new ByteArrayInputStream(buf)); - int id = dis.readInt(); // try to read an id - - if (LOG.isDebugEnabled()) { - LOG.debug(" got #" + id); - } - - Writable param = ReflectionUtils.newInstance(paramClass, conf); // read param - param.readFields(dis); - - SecureCall call = new SecureCall(id, param, this, responder, buf.length); - - if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) { - priorityCallQueue.put(call); - } else { - callQueue.put(call); // queue the call; maybe blocked here - } - } - - private boolean authorizeConnection() throws IOException { - try { - // If auth method is DIGEST, the token was obtained by the - // real user for the effective user, therefore not required to - // authorize real user. doAs is allowed only for simple or kerberos - // authentication - if (user != null && user.getUGI().getRealUser() != null - && (authMethod != AuthMethod.DIGEST)) { - ProxyUsers.authorize(user.getUGI(), this.getHostAddress(), conf); - } - authorize(user, header, getHostInetAddress()); - if (LOG.isDebugEnabled()) { - LOG.debug("Successfully authorized " + header); - } - rpcMetrics.authorizationSuccesses.inc(); - } catch (AuthorizationException ae) { - LOG.debug("Connection authorization failed: "+ae.getMessage(), ae); - rpcMetrics.authorizationFailures.inc(); - SecureCall failedCall = new SecureCall(AUTHORIZATION_FAILED_CALLID, null, this, - null, 0); - failedCall.setResponse(null, Status.FATAL, ae.getClass().getName(), - ae.getMessage()); - responder.doRespond(failedCall); - return false; - } - return true; - } - - protected synchronized void close() { - disposeSasl(); - data = null; - dataLengthBuffer = null; - if (!channel.isOpen()) - return; - try {socket.shutdownOutput();} catch(Exception ignored) {} // FindBugs DE_MIGHT_IGNORE - if (channel.isOpen()) { - try {channel.close();} catch(Exception ignored) {} - } - try {socket.close();} catch(Exception ignored) {} - } - } - - /** Constructs a server listening on the named port and address. Parameters passed must - * be of the named class. The handlerCount determines - * the number of handler threads that will be used to process calls. - * - */ - @SuppressWarnings("unchecked") - protected SecureServer(String bindAddress, int port, - Class paramClass, int handlerCount, - int priorityHandlerCount, Configuration conf, String serverName, - int highPriorityLevel) - throws IOException { - super(bindAddress, port, paramClass, handlerCount, priorityHandlerCount, - conf, serverName, highPriorityLevel); - this.authorize = - conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false); - this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled(); - LOG.debug("security enabled="+isSecurityEnabled); - - if (isSecurityEnabled) { - HBaseSaslRpcServer.init(conf); - } - } - - @Override - protected Connection getConnection(SocketChannel channel, long time) { - return new SecureConnection(channel, time); - } - - Configuration getConf() { - return conf; - } - - /** for unit testing only, should be called before server is started */ - void disableSecurity() { - this.isSecurityEnabled = false; - } - - /** for unit testing only, should be called before server is started */ - void enableSecurity() { - this.isSecurityEnabled = true; - } - - /** Stops the service. No new calls will be handled after this is called. */ - public synchronized void stop() { - super.stop(); - } - - public SecretManager getSecretManager() { - return this.secretManager; - } - - public void setSecretManager(SecretManager secretManager) { - this.secretManager = (SecretManager) secretManager; - } - - /** - * Authorize the incoming client connection. - * - * @param user client user - * @param connection incoming connection - * @param addr InetAddress of incoming connection - * @throws org.apache.hadoop.security.authorize.AuthorizationException when the client isn't authorized to talk the protocol - */ - public void authorize(User user, - ConnectionHeader connection, - InetAddress addr - ) throws AuthorizationException { - if (authorize) { - Class protocol = null; - try { - protocol = getProtocolClass(connection.getProtocol(), getConf()); - } catch (ClassNotFoundException cfne) { - throw new AuthorizationException("Unknown protocol: " + - connection.getProtocol()); - } - authManager.authorize(user != null ? user.getUGI() : null, - protocol, getConf(), addr); - } - } -} \ No newline at end of file diff --git a/security/src/main/java/org/apache/hadoop/hbase/security/AccessDeniedException.java b/security/src/main/java/org/apache/hadoop/hbase/security/AccessDeniedException.java deleted file mode 100644 index b8c5d3b929c..00000000000 --- a/security/src/main/java/org/apache/hadoop/hbase/security/AccessDeniedException.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.security; - -import org.apache.hadoop.hbase.DoNotRetryIOException; - -/** - * Exception thrown by access-related methods. - */ -public class AccessDeniedException extends DoNotRetryIOException { - private static final long serialVersionUID = 1913879564363001780L; - - public AccessDeniedException() { - super(); - } - - public AccessDeniedException(Class clazz, String s) { - super( "AccessDenied [" + clazz.getName() + "]: " + s); - } - - public AccessDeniedException(String s) { - super(s); - } -} diff --git a/security/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java b/security/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java deleted file mode 100644 index 79bc0d00a4b..00000000000 --- a/security/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.security; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.AdminProtocol; -import org.apache.hadoop.hbase.client.ClientProtocol; -import org.apache.hadoop.hbase.ipc.HMasterInterface; -import org.apache.hadoop.hbase.ipc.RegionServerStatusProtocol; -import org.apache.hadoop.security.authorize.PolicyProvider; -import org.apache.hadoop.security.authorize.Service; -import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; - -/** - * Implementation of secure Hadoop policy provider for mapping - * protocol interfaces to hbase-policy.xml entries. - */ -public class HBasePolicyProvider extends PolicyProvider { - protected static Service[] services = { - new Service("security.client.protocol.acl", ClientProtocol.class), - new Service("security.client.protocol.acl", AdminProtocol.class), - new Service("security.admin.protocol.acl", HMasterInterface.class), - new Service("security.masterregion.protocol.acl", RegionServerStatusProtocol.class) - }; - - @Override - public Service[] getServices() { - return services; - } - - public static void init(Configuration conf, - ServiceAuthorizationManager authManager) { - // set service-level authorization security policy - conf.set("hadoop.policy.file", "hbase-policy.xml"); - if (conf.getBoolean( - ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) { - authManager.refresh(conf, new HBasePolicyProvider()); - } - } -} diff --git a/security/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java b/security/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java deleted file mode 100644 index 809097305b4..00000000000 --- a/security/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java +++ /dev/null @@ -1,280 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.security; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -import javax.security.auth.callback.Callback; -import javax.security.auth.callback.CallbackHandler; -import javax.security.auth.callback.NameCallback; -import javax.security.auth.callback.PasswordCallback; -import javax.security.auth.callback.UnsupportedCallbackException; -import javax.security.sasl.RealmCallback; -import javax.security.sasl.RealmChoiceCallback; -import javax.security.sasl.Sasl; -import javax.security.sasl.SaslException; -import javax.security.sasl.SaslClient; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod; -import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslStatus; -import org.apache.hadoop.security.SaslInputStream; -import org.apache.hadoop.security.SaslOutputStream; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; - -/** - * A utility class that encapsulates SASL logic for RPC client. - * Copied from org.apache.hadoop.security - */ -public class HBaseSaslRpcClient { - public static final Log LOG = LogFactory.getLog(HBaseSaslRpcClient.class); - - private final SaslClient saslClient; - - /** - * Create a HBaseSaslRpcClient for an authentication method - * - * @param method - * the requested authentication method - * @param token - * token to use if needed by the authentication method - */ - public HBaseSaslRpcClient(AuthMethod method, - Token token, String serverPrincipal) - throws IOException { - switch (method) { - case DIGEST: - if (LOG.isDebugEnabled()) - LOG.debug("Creating SASL " + AuthMethod.DIGEST.getMechanismName() - + " client to authenticate to service at " + token.getService()); - saslClient = Sasl.createSaslClient(new String[] { AuthMethod.DIGEST - .getMechanismName() }, null, null, HBaseSaslRpcServer.SASL_DEFAULT_REALM, - HBaseSaslRpcServer.SASL_PROPS, new SaslClientCallbackHandler(token)); - break; - case KERBEROS: - if (LOG.isDebugEnabled()) { - LOG - .debug("Creating SASL " + AuthMethod.KERBEROS.getMechanismName() - + " client. Server's Kerberos principal name is " - + serverPrincipal); - } - if (serverPrincipal == null || serverPrincipal.length() == 0) { - throw new IOException( - "Failed to specify server's Kerberos principal name"); - } - String names[] = HBaseSaslRpcServer.splitKerberosName(serverPrincipal); - if (names.length != 3) { - throw new IOException( - "Kerberos principal name does NOT have the expected hostname part: " - + serverPrincipal); - } - saslClient = Sasl.createSaslClient(new String[] { AuthMethod.KERBEROS - .getMechanismName() }, null, names[0], names[1], - HBaseSaslRpcServer.SASL_PROPS, null); - break; - default: - throw new IOException("Unknown authentication method " + method); - } - if (saslClient == null) - throw new IOException("Unable to find SASL client implementation"); - } - - private static void readStatus(DataInputStream inStream) throws IOException { - int id = inStream.readInt(); // read and discard dummy id - int status = inStream.readInt(); // read status - if (status != SaslStatus.SUCCESS.state) { - throw new RemoteException(WritableUtils.readString(inStream), - WritableUtils.readString(inStream)); - } - } - - /** - * Do client side SASL authentication with server via the given InputStream - * and OutputStream - * - * @param inS - * InputStream to use - * @param outS - * OutputStream to use - * @return true if connection is set up, or false if needs to switch - * to simple Auth. - * @throws IOException - */ - public boolean saslConnect(InputStream inS, OutputStream outS) - throws IOException { - DataInputStream inStream = new DataInputStream(new BufferedInputStream(inS)); - DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream( - outS)); - - try { - byte[] saslToken = new byte[0]; - if (saslClient.hasInitialResponse()) - saslToken = saslClient.evaluateChallenge(saslToken); - if (saslToken != null) { - outStream.writeInt(saslToken.length); - outStream.write(saslToken, 0, saslToken.length); - outStream.flush(); - if (LOG.isDebugEnabled()) - LOG.debug("Have sent token of size " + saslToken.length - + " from initSASLContext."); - } - if (!saslClient.isComplete()) { - readStatus(inStream); - int len = inStream.readInt(); - if (len == HBaseSaslRpcServer.SWITCH_TO_SIMPLE_AUTH) { - if (LOG.isDebugEnabled()) - LOG.debug("Server asks us to fall back to simple auth."); - saslClient.dispose(); - return false; - } - saslToken = new byte[len]; - if (LOG.isDebugEnabled()) - LOG.debug("Will read input token of size " + saslToken.length - + " for processing by initSASLContext"); - inStream.readFully(saslToken); - } - - while (!saslClient.isComplete()) { - saslToken = saslClient.evaluateChallenge(saslToken); - if (saslToken != null) { - if (LOG.isDebugEnabled()) - LOG.debug("Will send token of size " + saslToken.length - + " from initSASLContext."); - outStream.writeInt(saslToken.length); - outStream.write(saslToken, 0, saslToken.length); - outStream.flush(); - } - if (!saslClient.isComplete()) { - readStatus(inStream); - saslToken = new byte[inStream.readInt()]; - if (LOG.isDebugEnabled()) - LOG.debug("Will read input token of size " + saslToken.length - + " for processing by initSASLContext"); - inStream.readFully(saslToken); - } - } - if (LOG.isDebugEnabled()) { - LOG.debug("SASL client context established. Negotiated QoP: " - + saslClient.getNegotiatedProperty(Sasl.QOP)); - } - return true; - } catch (IOException e) { - try { - saslClient.dispose(); - } catch (SaslException ignored) { - // ignore further exceptions during cleanup - } - throw e; - } - } - - /** - * Get a SASL wrapped InputStream. Can be called only after saslConnect() has - * been called. - * - * @param in - * the InputStream to wrap - * @return a SASL wrapped InputStream - * @throws IOException - */ - public InputStream getInputStream(InputStream in) throws IOException { - if (!saslClient.isComplete()) { - throw new IOException("Sasl authentication exchange hasn't completed yet"); - } - return new SaslInputStream(in, saslClient); - } - - /** - * Get a SASL wrapped OutputStream. Can be called only after saslConnect() has - * been called. - * - * @param out - * the OutputStream to wrap - * @return a SASL wrapped OutputStream - * @throws IOException - */ - public OutputStream getOutputStream(OutputStream out) throws IOException { - if (!saslClient.isComplete()) { - throw new IOException("Sasl authentication exchange hasn't completed yet"); - } - return new SaslOutputStream(out, saslClient); - } - - /** Release resources used by wrapped saslClient */ - public void dispose() throws SaslException { - saslClient.dispose(); - } - - private static class SaslClientCallbackHandler implements CallbackHandler { - private final String userName; - private final char[] userPassword; - - public SaslClientCallbackHandler(Token token) { - this.userName = HBaseSaslRpcServer.encodeIdentifier(token.getIdentifier()); - this.userPassword = HBaseSaslRpcServer.encodePassword(token.getPassword()); - } - - public void handle(Callback[] callbacks) - throws UnsupportedCallbackException { - NameCallback nc = null; - PasswordCallback pc = null; - RealmCallback rc = null; - for (Callback callback : callbacks) { - if (callback instanceof RealmChoiceCallback) { - continue; - } else if (callback instanceof NameCallback) { - nc = (NameCallback) callback; - } else if (callback instanceof PasswordCallback) { - pc = (PasswordCallback) callback; - } else if (callback instanceof RealmCallback) { - rc = (RealmCallback) callback; - } else { - throw new UnsupportedCallbackException(callback, - "Unrecognized SASL client callback"); - } - } - if (nc != null) { - if (LOG.isDebugEnabled()) - LOG.debug("SASL client callback: setting username: " + userName); - nc.setName(userName); - } - if (pc != null) { - if (LOG.isDebugEnabled()) - LOG.debug("SASL client callback: setting userPassword"); - pc.setPassword(userPassword); - } - if (rc != null) { - if (LOG.isDebugEnabled()) - LOG.debug("SASL client callback: setting realm: " - + rc.getDefaultText()); - rc.setText(rc.getDefaultText()); - } - } - } -} diff --git a/security/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java b/security/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java deleted file mode 100644 index f7e8654abe0..00000000000 --- a/security/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.security.access; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.KeeperException; - -import java.io.IOException; -import java.util.List; - -/** - * Handles synchronization of access control list entries and updates - * throughout all nodes in the cluster. The {@link AccessController} instance - * on the {@code _acl_} table regions, creates a znode for each table as - * {@code /hbase/acl/tablename}, with the znode data containing a serialized - * list of the permissions granted for the table. The {@code AccessController} - * instances on all other cluster hosts watch the znodes for updates, which - * trigger updates in the {@link TableAuthManager} permission cache. - */ -public class ZKPermissionWatcher extends ZooKeeperListener { - private static Log LOG = LogFactory.getLog(ZKPermissionWatcher.class); - // parent node for permissions lists - static final String ACL_NODE = "acl"; - TableAuthManager authManager; - String aclZNode; - - public ZKPermissionWatcher(ZooKeeperWatcher watcher, - TableAuthManager authManager, Configuration conf) { - super(watcher); - this.authManager = authManager; - String aclZnodeParent = conf.get("zookeeper.znode.acl.parent", ACL_NODE); - this.aclZNode = ZKUtil.joinZNode(watcher.baseZNode, aclZnodeParent); - } - - public void start() throws KeeperException { - watcher.registerListener(this); - if (ZKUtil.watchAndCheckExists(watcher, aclZNode)) { - List existing = - ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); - if (existing != null) { - refreshNodes(existing); - } - } - } - - @Override - public void nodeCreated(String path) { - if (path.equals(aclZNode)) { - try { - List nodes = - ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); - refreshNodes(nodes); - } catch (KeeperException ke) { - LOG.error("Error reading data from zookeeper", ke); - // only option is to abort - watcher.abort("Zookeeper error obtaining acl node children", ke); - } - } - } - - @Override - public void nodeDeleted(String path) { - if (aclZNode.equals(ZKUtil.getParent(path))) { - String table = ZKUtil.getNodeName(path); - authManager.remove(Bytes.toBytes(table)); - } - } - - @Override - public void nodeDataChanged(String path) { - if (aclZNode.equals(ZKUtil.getParent(path))) { - // update cache on an existing table node - String table = ZKUtil.getNodeName(path); - try { - byte[] data = ZKUtil.getDataAndWatch(watcher, path); - authManager.refreshCacheFromWritable(Bytes.toBytes(table), data); - } catch (KeeperException ke) { - LOG.error("Error reading data from zookeeper for node "+table, ke); - // only option is to abort - watcher.abort("Zookeeper error getting data for node " + table, ke); - } catch (IOException ioe) { - LOG.error("Error reading permissions writables", ioe); - } - } - } - - @Override - public void nodeChildrenChanged(String path) { - if (path.equals(aclZNode)) { - // table permissions changed - try { - List nodes = - ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); - refreshNodes(nodes); - } catch (KeeperException ke) { - LOG.error("Error reading data from zookeeper for path "+path, ke); - watcher.abort("Zookeeper error get node children for path "+path, ke); - } - } - } - - private void refreshNodes(List nodes) { - for (ZKUtil.NodeAndData n : nodes) { - if (n.isEmpty()) continue; - String path = n.getNode(); - String table = ZKUtil.getNodeName(path); - try { - byte[] nodeData = n.getData(); - if (LOG.isDebugEnabled()) { - LOG.debug("Updating permissions cache from node "+table+" with data: "+ - Bytes.toStringBinary(nodeData)); - } - authManager.refreshCacheFromWritable(Bytes.toBytes(table), - nodeData); - } catch (IOException ioe) { - LOG.error("Failed parsing permissions for table '" + table + - "' from zk", ioe); - } - } - } - - /*** - * Write a table's access controls to the permissions mirror in zookeeper - * @param tableName - * @param permsData - */ - public void writeToZookeeper(String tableName, - byte[] permsData) { - String zkNode = - ZKUtil.joinZNode(ZKUtil.joinZNode(watcher.baseZNode, ACL_NODE), - tableName); - try { - ZKUtil.createWithParents(watcher, zkNode); - ZKUtil.updateExistingNodeData(watcher, zkNode, - permsData, -1); - } catch (KeeperException e) { - LOG.error("Failed updating permissions for table '" + tableName + - "'", e); - watcher.abort("Failed writing node "+zkNode+" to zookeeper", e); - } - } -}