HBASE-5732 Remove the SecureRPCEngine and merge the security-related logic in the core engine

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1337396 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2012-05-11 22:06:57 +00:00
parent b259229b57
commit ec57be0ada
41 changed files with 1586 additions and 738 deletions

58
pom.xml
View File

@ -1624,64 +1624,6 @@
</build>
</profile>
<!-- profile for building against Hadoop 0.20+security-->
<profile>
<id>security</id>
<properties>
<hadoop.version>1.0.2</hadoop.version>
</properties>
<build>
<finalName>${project.artifactId}-${project.version}-security</finalName>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-source</id>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${project.basedir}/security/src/main/java</source>
</sources>
</configuration>
</execution>
<execution>
<id>add-test-source</id>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>${project.basedir}/security/src/test/java</source>
</sources>
</configuration>
</execution>
<execution>
<id>add-test-resource</id>
<goals>
<goal>add-test-resource</goal>
</goals>
<configuration>
<resources>
<resource>
<directory>${project.basedir}/security/src/test/resources</directory>
<includes>
<include>hbase-site.xml</include>
</includes>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<!--
profile for building against Hadoop 0.22.0. Activate using:
mvn -Dhadoop.profile=22

View File

@ -1,153 +0,0 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-->
<configuration>
<property>
<name>hbase.regionserver.msginterval</name>
<value>1000</value>
<description>Interval between messages from the RegionServer to HMaster
in milliseconds. Default is 15. Set this value low if you want unit
tests to be responsive.
</description>
</property>
<property>
<name>hbase.client.pause</name>
<value>1000</value>
<description>General client pause value. Used mostly as value to wait
before running a retry of a failed get, region lookup, etc.</description>
</property>
<property>
<name>hbase.client.retries.number</name>
<value>10</value>
<description>Maximum retries. Used as maximum for all retryable
operations such as fetching of the root region from root region
server, getting a cell's value, starting a row update, etc.
Default: 10.
</description>
</property>
<property>
<name>hbase.server.thread.wakefrequency</name>
<value>1000</value>
<description>Time to sleep in between searches for work (in milliseconds).
Used as sleep interval by service threads such as META scanner and log roller.
</description>
</property>
<property>
<name>hbase.master.event.waiting.time</name>
<value>50</value>
<description>Time to sleep between checks to see if a table event took place.
</description>
</property>
<property>
<name>hbase.regionserver.handler.count</name>
<value>5</value>
<description>Count of RPC Server instances spun up on RegionServers
Same property is used by the HMaster for count of master handlers.
Default is 10.
</description>
</property>
<property>
<name>hbase.master.info.port</name>
<value>-1</value>
<description>The port for the hbase master web UI
Set to -1 if you do not want the info server to run.
</description>
</property>
<property>
<name>hbase.regionserver.info.port</name>
<value>-1</value>
<description>The port for the hbase regionserver web UI
Set to -1 if you do not want the info server to run.
</description>
</property>
<property>
<name>hbase.regionserver.info.port.auto</name>
<value>true</value>
<description>Info server auto port bind. Enables automatic port
search if hbase.regionserver.info.port is already in use.
Enabled for testing to run multiple tests on one machine.
</description>
</property>
<property>
<name>hbase.master.lease.thread.wakefrequency</name>
<value>3000</value>
<description>The interval between checks for expired region server leases.
This value has been reduced due to the other reduced values above so that
the master will notice a dead region server sooner. The default is 15 seconds.
</description>
</property>
<property>
<name>hbase.regionserver.optionalcacheflushinterval</name>
<value>1000</value>
<description>
Amount of time to wait since the last time a region was flushed before
invoking an optional cache flush. Default 60,000.
</description>
</property>
<property>
<name>hbase.regionserver.safemode</name>
<value>false</value>
<description>
Turn on/off safe mode in region server. Always on for production, always off
for tests.
</description>
</property>
<property>
<name>hbase.hregion.max.filesize</name>
<value>67108864</value>
<description>
Maximum desired file size for an HRegion. If filesize exceeds
value + (value / 2), the HRegion is split in two. Default: 256M.
Keep the maximum filesize small so we split more often in tests.
</description>
</property>
<property>
<name>hadoop.log.dir</name>
<value>${user.dir}/../logs</value>
</property>
<property>
<name>hbase.zookeeper.property.clientPort</name>
<value>21818</value>
<description>Property from ZooKeeper's config zoo.cfg.
The port at which the clients will connect.
</description>
</property>
<property>
<name>hbase.defaults.for.version.skip</name>
<value>true</value>
<description>
Set to true to skip the 'hbase.defaults.for.version'.
Setting this to true can be useful in contexts other than
the other side of a maven generation; i.e. running in an
ide. You'll want to set this boolean to true to avoid
seeing the RuntimException complaint: "hbase-default.xml file
seems to be for and old version of HBase (@@@VERSION@@@), this
version is X.X.X-SNAPSHOT"
</description>
</property>
<property>
<name>hbase.rpc.engine</name>
<value>org.apache.hadoop.hbase.ipc.SecureRpcEngine</value>
</property>
</configuration>

View File

@ -22,7 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.VersionedProtocol;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.security.TokenInfo;
import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.hbase.security.KerberosInfo;
/**
* Protocol that a HBase client uses to communicate with a region server.

View File

@ -23,7 +23,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.ipc.VersionedProtocol;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.security.TokenInfo;
import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.hbase.security.KerberosInfo;
/**
* Protocol that a HBase client uses to communicate with a region server.

View File

@ -1,77 +0,0 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.security.User;
/**
* The IPC connection header sent by the client to the server
* on connection establishment.
*/
@InterfaceAudience.Private
class ConnectionHeader implements Writable {
protected String protocol;
public ConnectionHeader() {}
/**
* Create a new {@link ConnectionHeader} with the given <code>protocol</code>
* and {@link User}.
* @param protocol protocol used for communication between the IPC client
* and the server
* @param user {@link User} of the client communicating with
* the server
*/
public ConnectionHeader(String protocol, User user) {
this.protocol = protocol;
}
@Override
public void readFields(DataInput in) throws IOException {
protocol = Text.readString(in);
if (protocol.isEmpty()) {
protocol = null;
}
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, (protocol == null) ? "" : protocol);
}
public String getProtocol() {
return protocol;
}
public User getUser() {
return null;
}
public String toString() {
return protocol;
}
}

View File

@ -28,13 +28,18 @@ import java.io.EOFException;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
@ -48,18 +53,32 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequest;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
import org.apache.hadoop.hbase.security.KerberosInfo;
import org.apache.hadoop.hbase.security.TokenInfo;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.hbase.util.PoolMap.PoolType;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.hbase.io.DataOutputOutputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenSelector;
import org.apache.hadoop.util.ReflectionUtils;
import com.google.protobuf.ByteString;
@ -213,7 +232,12 @@ public class HBaseClient {
return this.startTime;
}
}
protected static Map<String,TokenSelector<? extends TokenIdentifier>> tokenHandlers =
new HashMap<String,TokenSelector<? extends TokenIdentifier>>();
static {
tokenHandlers.put(AuthenticationTokenIdentifier.AUTH_TOKEN_TYPE.toString(),
new AuthenticationTokenSelector());
}
/** Thread that reads responses and notifies callers. Each connection owns a
* socket connected to a remote address. Calls are multiplexed through this
* socket: responses may be delivered out of order. */
@ -223,6 +247,13 @@ public class HBaseClient {
protected Socket socket = null; // connected socket
protected DataInputStream in;
protected DataOutputStream out;
private InetSocketAddress server; // server ip:port
private String serverPrincipal; // server's krb5 principal name
private AuthMethod authMethod; // authentication method
private boolean useSasl;
private Token<? extends TokenIdentifier> token;
private HBaseSaslRpcClient saslRpcClient;
private int reloginMaxBackoff; // max pause before relogin on sasl failure
// currently active calls
protected final ConcurrentSkipListMap<Integer, Call> calls = new ConcurrentSkipListMap<Integer, Call>();
@ -235,20 +266,90 @@ public class HBaseClient {
throw new UnknownHostException("unknown host: " +
remoteId.getAddress().getHostName());
}
this.server = remoteId.getAddress();
UserGroupInformation ticket = remoteId.getTicket().getUGI();
Class<?> protocol = remoteId.getProtocol();
this.useSasl = UserGroupInformation.isSecurityEnabled();
if (useSasl && protocol != null) {
TokenInfo tokenInfo = protocol.getAnnotation(TokenInfo.class);
if (tokenInfo != null) {
TokenSelector<? extends TokenIdentifier> tokenSelector =
tokenHandlers.get(tokenInfo.value());
if (tokenSelector != null) {
token = tokenSelector.selectToken(new Text(clusterId),
ticket.getTokens());
} else if (LOG.isDebugEnabled()) {
LOG.debug("No token selector found for type "+tokenInfo.value());
}
}
KerberosInfo krbInfo = protocol.getAnnotation(KerberosInfo.class);
if (krbInfo != null) {
String serverKey = krbInfo.serverPrincipal();
if (serverKey == null) {
throw new IOException(
"Can't obtain server Kerberos config key from KerberosInfo");
}
serverPrincipal = SecurityUtil.getServerPrincipal(
conf.get(serverKey), server.getAddress().getCanonicalHostName().toLowerCase());
if (LOG.isDebugEnabled()) {
LOG.debug("RPC Server Kerberos principal name for protocol="
+ protocol.getCanonicalName() + " is " + serverPrincipal);
}
}
}
if (!useSasl) {
authMethod = AuthMethod.SIMPLE;
} else if (token != null) {
authMethod = AuthMethod.DIGEST;
} else {
authMethod = AuthMethod.KERBEROS;
}
if (LOG.isDebugEnabled())
LOG.debug("Use " + authMethod + " authentication for protocol "
+ protocol.getSimpleName());
reloginMaxBackoff = conf.getInt("hbase.security.relogin.maxbackoff", 5000);
this.remoteId = remoteId;
User ticket = remoteId.getTicket();
Class<? extends VersionedProtocol> protocol = remoteId.getProtocol();
ConnectionHeader.Builder builder = ConnectionHeader.newBuilder();
builder.setProtocol(protocol == null ? "" : protocol.getName());
UserInformation userInfoPB;
if ((userInfoPB = getUserInfoPB(ticket)) != null) {
builder.setUserInfo(userInfoPB);
}
this.header = builder.build();
this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
remoteId.getAddress().toString() +
((ticket==null)?" from an unknown user": (" from " + ticket.getName())));
((ticket==null)?" from an unknown user": (" from "
+ ticket.getUserName())));
this.setDaemon(true);
}
private UserInformation getUserInfoPB(UserGroupInformation ugi) {
if (ugi == null || authMethod == AuthMethod.DIGEST) {
// Don't send user for token auth
return null;
}
UserInformation.Builder userInfoPB = UserInformation.newBuilder();
if (ugi != null) {
if (authMethod == AuthMethod.KERBEROS) {
// Send effective user for Kerberos auth
userInfoPB.setEffectiveUser(ugi.getUserName());
} else if (authMethod == AuthMethod.SIMPLE) {
//Send both effective user and real user for simple auth
userInfoPB.setEffectiveUser(ugi.getUserName());
if (ugi.getRealUser() != null) {
userInfoPB.setRealUser(ugi.getRealUser().getUserName());
}
}
}
return userInfoPB.build();
}
/** Update lastActivity with the current time. */
protected void touch() {
lastActivity.set(System.currentTimeMillis());
@ -352,42 +453,6 @@ public class HBaseClient {
}
}
/** Connect to the server and set up the I/O streams. It then sends
* a header to the server and starts
* the connection thread that waits for responses.
* @throws java.io.IOException e
*/
protected synchronized void setupIOstreams()
throws IOException, InterruptedException {
if (socket != null || shouldCloseConnection.get()) {
return;
}
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to "+remoteId);
}
setupConnection();
this.in = new DataInputStream(new BufferedInputStream
(new PingInputStream(NetUtils.getInputStream(socket))));
this.out = new DataOutputStream
(new BufferedOutputStream(NetUtils.getOutputStream(socket)));
writeHeader();
// update last activity time
touch();
// start the receiver thread after the socket connection has been set up
start();
} catch (IOException e) {
markClosed(e);
close();
throw e;
}
}
protected void closeConnection() {
// close the current connection
if (socket != null) {
@ -437,16 +502,6 @@ public class HBaseClient {
" time(s).");
}
/* Write the header for each connection
* Out is not synchronized because only the first thread does this.
*/
private void writeHeader() throws IOException {
out.write(HBaseServer.HEADER.array());
out.write(HBaseServer.CURRENT_VERSION);
out.writeInt(header.getSerializedSize());
header.writeTo(out);
}
/* wait till someone signals us to start reading RPC response or
* it is idle too long, it is marked as to be closed,
* or the client is marked as not running.
@ -519,6 +574,230 @@ public class HBaseClient {
+ connections.size());
}
private synchronized void disposeSasl() {
if (saslRpcClient != null) {
try {
saslRpcClient.dispose();
saslRpcClient = null;
} catch (IOException ioe) {
LOG.error("Error disposing of SASL client", ioe);
}
}
}
private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
UserGroupInformation currentUser =
UserGroupInformation.getCurrentUser();
UserGroupInformation realUser = currentUser.getRealUser();
return authMethod == AuthMethod.KERBEROS &&
loginUser != null &&
//Make sure user logged in using Kerberos either keytab or TGT
loginUser.hasKerberosCredentials() &&
// relogin only in case it is the login user (e.g. JT)
// or superuser (like oozie).
(loginUser.equals(currentUser) || loginUser.equals(realUser));
}
private synchronized boolean setupSaslConnection(final InputStream in2,
final OutputStream out2) throws IOException {
saslRpcClient = new HBaseSaslRpcClient(authMethod, token, serverPrincipal);
return saslRpcClient.saslConnect(in2, out2);
}
/**
* If multiple clients with the same principal try to connect
* to the same server at the same time, the server assumes a
* replay attack is in progress. This is a feature of kerberos.
* In order to work around this, what is done is that the client
* backs off randomly and tries to initiate the connection
* again.
* The other problem is to do with ticket expiry. To handle that,
* a relogin is attempted.
*/
private synchronized void handleSaslConnectionFailure(
final int currRetries,
final int maxRetries, final Exception ex, final Random rand,
final UserGroupInformation user)
throws IOException, InterruptedException{
user.doAs(new PrivilegedExceptionAction<Object>() {
public Object run() throws IOException, InterruptedException {
closeConnection();
if (shouldAuthenticateOverKrb()) {
if (currRetries < maxRetries) {
LOG.debug("Exception encountered while connecting to " +
"the server : " + ex);
//try re-login
if (UserGroupInformation.isLoginKeytabBased()) {
UserGroupInformation.getLoginUser().reloginFromKeytab();
} else {
UserGroupInformation.getLoginUser().reloginFromTicketCache();
}
disposeSasl();
//have granularity of milliseconds
//we are sleeping with the Connection lock held but since this
//connection instance is being used for connecting to the server
//in question, it is okay
Thread.sleep((rand.nextInt(reloginMaxBackoff) + 1));
return null;
} else {
String msg = "Couldn't setup connection for " +
UserGroupInformation.getLoginUser().getUserName() +
" to " + serverPrincipal;
LOG.warn(msg);
throw (IOException) new IOException(msg).initCause(ex);
}
} else {
LOG.warn("Exception encountered while connecting to " +
"the server : " + ex);
}
if (ex instanceof RemoteException)
throw (RemoteException)ex;
throw new IOException(ex);
}
});
}
protected synchronized void setupIOstreams()
throws IOException, InterruptedException {
if (socket != null || shouldCloseConnection.get()) {
return;
}
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to "+server);
}
short numRetries = 0;
final short MAX_RETRIES = 5;
Random rand = null;
while (true) {
setupConnection();
InputStream inStream = NetUtils.getInputStream(socket);
OutputStream outStream = NetUtils.getOutputStream(socket);
writeRpcHeader(outStream);
if (useSasl) {
final InputStream in2 = inStream;
final OutputStream out2 = outStream;
UserGroupInformation ticket = remoteId.getTicket().getUGI();
if (authMethod == AuthMethod.KERBEROS) {;
if (ticket != null && ticket.getRealUser() != null) {
ticket = ticket.getRealUser();
}
}
boolean continueSasl = false;
try {
continueSasl =
ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws IOException {
return setupSaslConnection(in2, out2);
}
});
} catch (Exception ex) {
if (rand == null) {
rand = new Random();
}
handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, rand,
ticket);
continue;
}
if (continueSasl) {
// Sasl connect is successful. Let's set up Sasl i/o streams.
inStream = saslRpcClient.getInputStream(inStream);
outStream = saslRpcClient.getOutputStream(outStream);
} else {
// fall back to simple auth because server told us so.
authMethod = AuthMethod.SIMPLE;
useSasl = false;
}
}
this.in = new DataInputStream(new BufferedInputStream
(new PingInputStream(inStream)));
this.out = new DataOutputStream
(new BufferedOutputStream(outStream));
writeHeader();
// update last activity time
touch();
// start the receiver thread after the socket connection has been set up
start();
return;
}
} catch (IOException e) {
markClosed(e);
close();
throw e;
}
}
/* Write the RPC header */
private void writeRpcHeader(OutputStream outStream) throws IOException {
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));
// Write out the header, version and authentication method
out.write(HBaseServer.HEADER.array());
out.write(HBaseServer.CURRENT_VERSION);
authMethod.write(out);
out.flush();
}
/**
* Write the protocol header for each connection
* Out is not synchronized because only the first thread does this.
*/
private void writeHeader() throws IOException {
// Write out the ConnectionHeader
out.writeInt(header.getSerializedSize());
header.writeTo(out);
}
/** Close the connection. */
protected synchronized void close() {
if (!shouldCloseConnection.get()) {
LOG.error("The connection is not in the closed state");
return;
}
// release the resources
// first thing to do;take the connection out of the connection list
synchronized (connections) {
if (connections.get(remoteId) == this) {
connections.remove(remoteId);
}
}
// close the streams and therefore the socket
IOUtils.closeStream(out);
IOUtils.closeStream(in);
disposeSasl();
// clean up all calls
if (closeException == null) {
if (!calls.isEmpty()) {
LOG.warn(
"A connection is closed for no cause and calls are not empty. " +
"#Calls: " + calls.size());
// clean up calls anyway
closeException = new IOException("Unexpected closed connection");
cleanupCalls();
}
} else {
// log the info
if (LOG.isDebugEnabled()) {
LOG.debug("closing ipc connection to " + server + ": " +
closeException.getMessage(),closeException);
}
// cleanup calls
cleanupCalls();
}
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": closed");
}
/* Initiates a call by sending the parameter to the remote server.
* Note: this is not called from the Connection thread, but by other
* threads.
@ -575,15 +854,8 @@ public class HBaseClient {
LOG.debug(getName() + " got value #" + id);
Call call = calls.remove(id);
boolean isError = response.getError();
if (isError) {
if (call != null) {
//noinspection ThrowableInstanceNeverThrown
call.setException(new RemoteException(
response.getException().getExceptionName(),
response.getException().getStackTrace()));
}
} else {
Status status = response.getStatus();
if (status == Status.SUCCESS) {
ByteString responseObj = response.getResponse();
DataInputStream dis =
new DataInputStream(responseObj.newInput());
@ -594,6 +866,18 @@ public class HBaseClient {
if (call != null) {
call.setValue(value);
}
} else if (status == Status.ERROR) {
if (call != null) {
//noinspection ThrowableInstanceNeverThrown
call.setException(new RemoteException(
response.getException().getExceptionName(),
response.getException().getStackTrace()));
}
} else if (status == Status.FATAL) {
// Close the connection
markClosed(new RemoteException(
response.getException().getExceptionName(),
response.getException().getStackTrace()));
}
} catch (IOException e) {
if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) {
@ -620,47 +904,6 @@ public class HBaseClient {
}
}
/** Close the connection. */
protected synchronized void close() {
if (!shouldCloseConnection.get()) {
LOG.error("The connection is not in the closed state");
return;
}
// release the resources
// first thing to do;take the connection out of the connection list
synchronized (connections) {
connections.remove(remoteId, this);
}
// close the streams and therefore the socket
IOUtils.closeStream(out);
IOUtils.closeStream(in);
// clean up all calls
if (closeException == null) {
if (!calls.isEmpty()) {
LOG.warn(
"A connection is closed for no cause and calls are not empty");
// clean up calls anyway
closeException = new IOException("Unexpected closed connection");
cleanupCalls();
}
} else {
// log the info
if (LOG.isDebugEnabled()) {
LOG.debug("closing ipc connection to " + remoteId.address + ": " +
closeException.getMessage(),closeException);
}
// cleanup calls
cleanupCalls();
}
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": closed");
}
/* Cleanup all calls and mark them as done */
protected void cleanupCalls() {
cleanupCalls(0);

View File

@ -20,6 +20,8 @@
package org.apache.hadoop.hbase.ipc;
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@ -34,6 +36,7 @@ import java.net.SocketException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.Channels;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectionKey;
@ -41,6 +44,7 @@ import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -56,6 +60,10 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -68,14 +76,34 @@ import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequest;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslStatus;
import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.RPC.VersionMismatch;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
@ -96,7 +124,8 @@ import org.cliffc.high_scale_lib.Counter;
*/
@InterfaceAudience.Private
public abstract class HBaseServer implements RpcServer {
private final boolean authorize;
private boolean isSecurityEnabled;
/**
* The first four bytes of Hadoop RPC connections
*/
@ -130,6 +159,13 @@ public abstract class HBaseServer implements RpcServer {
protected static final Log TRACELOG =
LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer.trace");
private static final String AUTH_FAILED_FOR = "Auth failed for ";
private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
private static final Log AUDITLOG =
LogFactory.getLog("SecurityLogger."+Server.class.getName());
protected SecretManager<TokenIdentifier> secretManager;
protected ServiceAuthorizationManager authManager;
protected static final ThreadLocal<RpcServer> SERVER =
new ThreadLocal<RpcServer>();
private volatile boolean started = false;
@ -303,11 +339,12 @@ public abstract class HBaseServer implements RpcServer {
return param.toString() + " from " + connection.toString();
}
protected synchronized void setSaslTokenResponse(ByteBuffer response) {
this.response = response;
}
protected synchronized void setResponse(Object value, Status status,
String errorClass, String error) {
// Avoid overwriting an error value in the response. This can happen if
// endDelayThrowing is called by another thread before the actual call
// returning.
if (this.isError)
return;
if (errorClass != null) {
@ -328,8 +365,7 @@ public abstract class HBaseServer implements RpcServer {
if (result instanceof WritableWithSize) {
// get the size hint.
WritableWithSize ohint = (WritableWithSize) result;
long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE +
(2 * Bytes.SIZEOF_INT);
long hint = ohint.getWritableSize() + 2*Bytes.SIZEOF_INT;
if (hint > Integer.MAX_VALUE) {
// oops, new problem.
IOException ioe =
@ -342,12 +378,11 @@ public abstract class HBaseServer implements RpcServer {
}
ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
DataOutputStream out = new DataOutputStream(buf);
try {
RpcResponse.Builder builder = RpcResponse.newBuilder();
// Call id.
builder.setCallId(this.id);
builder.setError(error != null);
builder.setStatus(status);
if (error != null) {
RpcException.Builder b = RpcException.newBuilder();
b.setExceptionName(errorClass);
@ -359,8 +394,10 @@ public abstract class HBaseServer implements RpcServer {
byte[] response = d.getData();
builder.setResponse(ByteString.copyFrom(response));
}
builder.build().writeDelimitedTo(
DataOutputOutputStream.constructOutputStream(out));
builder.build().writeDelimitedTo(buf);
if (connection.useWrap) {
wrapWithSasl(buf);
}
} catch (IOException e) {
LOG.warn("Exception while creating response " + e);
}
@ -369,6 +406,28 @@ public abstract class HBaseServer implements RpcServer {
this.response = bb;
}
private void wrapWithSasl(ByteBufferOutputStream response)
throws IOException {
if (connection.useSasl) {
// getByteBuffer calls flip()
ByteBuffer buf = response.getByteBuffer();
byte[] token;
// synchronization may be needed since there can be multiple Handler
// threads using saslServer to wrap responses.
synchronized (connection.saslServer) {
token = connection.saslServer.wrap(buf.array(),
buf.arrayOffset(), buf.remaining());
}
if (LOG.isDebugEnabled())
LOG.debug("Adding saslServer wrapped token of size " + token.length
+ " as call response.");
buf.clear();
DataOutputStream saslOut = new DataOutputStream(response);
saslOut.writeInt(token.length);
saslOut.write(token, 0, token.length);
}
}
@Override
public synchronized void endDelay(Object result) throws IOException {
assert this.delayResponse;
@ -1056,8 +1115,8 @@ public abstract class HBaseServer implements RpcServer {
}
/** Reads calls from a connection and queues them for handling. */
protected class Connection {
private boolean versionRead = false; //if initial signature and
public class Connection {
private boolean rpcHeaderRead = false; //if initial signature and
//version are read
private boolean headerRead = false; //if the connection header that
//follows version is read.
@ -1068,6 +1127,7 @@ public abstract class HBaseServer implements RpcServer {
private volatile int rpcCount = 0; // number of outstanding rpcs
private long lastContact;
private int dataLength;
private InetAddress addr;
protected Socket socket;
// Cache the remote host & port info so that even if the socket is
// disconnected, we can say where it used to connect to.
@ -1075,8 +1135,27 @@ public abstract class HBaseServer implements RpcServer {
protected int remotePort;
ConnectionHeader header;
Class<? extends VersionedProtocol> protocol;
protected User user = null;
protected UserGroupInformation user = null;
private AuthMethod authMethod;
private boolean saslContextEstablished;
private boolean skipInitialSaslHandshake;
private ByteBuffer rpcHeaderBuffer;
private ByteBuffer unwrappedData;
private ByteBuffer unwrappedDataLengthBuffer;
boolean useSasl;
SaslServer saslServer;
private boolean useWrap = false;
// Fake 'call' for failed authorization response
private final int AUTHROIZATION_FAILED_CALLID = -1;
private final Call authFailedCall =
new Call(AUTHROIZATION_FAILED_CALLID, null, this, null, 0);
private ByteArrayOutputStream authFailedResponse =
new ByteArrayOutputStream();
// Fake 'call' for SASL context setup
private static final int SASL_CALLID = -33;
private final Call saslCall = new Call(SASL_CALLID, null, this, null, 0);
public UserGroupInformation attemptingUser = null; // user name before auth
public Connection(SocketChannel channel, long lastContact) {
this.channel = channel;
this.lastContact = lastContact;
@ -1110,6 +1189,10 @@ public abstract class HBaseServer implements RpcServer {
return hostAddress;
}
public InetAddress getHostInetAddress() {
return addr;
}
public int getRemotePort() {
return remotePort;
}
@ -1141,39 +1224,218 @@ public abstract class HBaseServer implements RpcServer {
return isIdle() && currentTime - lastContact > maxIdleTime;
}
private UserGroupInformation getAuthorizedUgi(String authorizedId)
throws IOException {
if (authMethod == AuthMethod.DIGEST) {
TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
secretManager);
UserGroupInformation ugi = tokenId.getUser();
if (ugi == null) {
throw new AccessControlException(
"Can't retrieve username from tokenIdentifier.");
}
ugi.addTokenIdentifier(tokenId);
return ugi;
} else {
return UserGroupInformation.createRemoteUser(authorizedId);
}
}
private void saslReadAndProcess(byte[] saslToken) throws IOException,
InterruptedException {
if (saslContextEstablished) {
if (LOG.isDebugEnabled())
LOG.debug("Have read input token of size " + saslToken.length
+ " for processing by saslServer.unwrap()");
if (!useWrap) {
processOneRpc(saslToken);
} else {
byte[] plaintextData = saslServer.unwrap(saslToken, 0,
saslToken.length);
processUnwrappedData(plaintextData);
}
} else {
byte[] replyToken = null;
try {
if (saslServer == null) {
switch (authMethod) {
case DIGEST:
if (secretManager == null) {
throw new AccessControlException(
"Server is not configured to do DIGEST authentication.");
}
saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
.getMechanismName(), null, HBaseSaslRpcServer.SASL_DEFAULT_REALM,
HBaseSaslRpcServer.SASL_PROPS, new SaslDigestCallbackHandler(
secretManager, this));
break;
default:
UserGroupInformation current = UserGroupInformation
.getCurrentUser();
String fullName = current.getUserName();
if (LOG.isDebugEnabled())
LOG.debug("Kerberos principal name is " + fullName);
final String names[] = HBaseSaslRpcServer.splitKerberosName(fullName);
if (names.length != 3) {
throw new AccessControlException(
"Kerberos principal name does NOT have the expected "
+ "hostname part: " + fullName);
}
current.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws SaslException {
saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
.getMechanismName(), names[0], names[1],
HBaseSaslRpcServer.SASL_PROPS, new SaslGssCallbackHandler());
return null;
}
});
}
if (saslServer == null)
throw new AccessControlException(
"Unable to find SASL server implementation for "
+ authMethod.getMechanismName());
if (LOG.isDebugEnabled())
LOG.debug("Created SASL server with mechanism = "
+ authMethod.getMechanismName());
}
if (LOG.isDebugEnabled())
LOG.debug("Have read input token of size " + saslToken.length
+ " for processing by saslServer.evaluateResponse()");
replyToken = saslServer.evaluateResponse(saslToken);
} catch (IOException e) {
IOException sendToClient = e;
Throwable cause = e;
while (cause != null) {
if (cause instanceof InvalidToken) {
sendToClient = (InvalidToken) cause;
break;
}
cause = cause.getCause();
}
doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
sendToClient.getLocalizedMessage());
rpcMetrics.authenticationFailures.inc();
String clientIP = this.toString();
// attempting user could be null
AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
throw e;
}
if (replyToken != null) {
if (LOG.isDebugEnabled())
LOG.debug("Will send token of size " + replyToken.length
+ " from saslServer.");
doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
null);
}
if (saslServer.isComplete()) {
String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
user = getAuthorizedUgi(saslServer.getAuthorizationID());
if (LOG.isDebugEnabled()) {
LOG.debug("SASL server context established. Authenticated client: "
+ user + ". Negotiated QoP is "
+ saslServer.getNegotiatedProperty(Sasl.QOP));
}
rpcMetrics.authenticationSuccesses.inc();
AUDITLOG.trace(AUTH_SUCCESSFUL_FOR + user);
saslContextEstablished = true;
}
}
}
/**
* No protobuf encoding of raw sasl messages
*/
private void doRawSaslReply(SaslStatus status, Writable rv,
String errorClass, String error) throws IOException {
//In my testing, have noticed that sasl messages are usually
//in the ballpark of 100-200. That's why the initialcapacity is 256.
ByteBufferOutputStream saslResponse = new ByteBufferOutputStream(256);
DataOutputStream out = new DataOutputStream(saslResponse);
out.writeInt(status.state); // write status
if (status == SaslStatus.SUCCESS) {
rv.write(out);
} else {
WritableUtils.writeString(out, errorClass);
WritableUtils.writeString(out, error);
}
saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
saslCall.responder = responder;
saslCall.sendResponseIfReady();
}
private void disposeSasl() {
if (saslServer != null) {
try {
saslServer.dispose();
saslServer = null;
} catch (SaslException ignored) {
}
}
}
public int readAndProcess() throws IOException, InterruptedException {
while (true) {
/* Read at most one RPC. If the header is not read completely yet
* then iterate until we read first RPC or until there is no data left.
*/
int count;
int count = -1;
if (dataLengthBuffer.remaining() > 0) {
count = channelRead(channel, dataLengthBuffer);
if (count < 0 || dataLengthBuffer.remaining() > 0)
return count;
}
if (!versionRead) {
if (!rpcHeaderRead) {
//Every connection is expected to send the header.
ByteBuffer versionBuffer = ByteBuffer.allocate(1);
count = channelRead(channel, versionBuffer);
if (count <= 0) {
if (rpcHeaderBuffer == null) {
rpcHeaderBuffer = ByteBuffer.allocate(2);
}
count = channelRead(channel, rpcHeaderBuffer);
if (count < 0 || rpcHeaderBuffer.remaining() > 0) {
return count;
}
int version = versionBuffer.get(0);
int version = rpcHeaderBuffer.get(0);
byte[] method = new byte[] {rpcHeaderBuffer.get(1)};
authMethod = AuthMethod.read(new DataInputStream(
new ByteArrayInputStream(method)));
dataLengthBuffer.flip();
if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
//Warning is ok since this is not supposed to happen.
LOG.warn("Incorrect header or version mismatch from " +
hostAddress + ":" + remotePort +
" got version " + version +
" expected version " + CURRENT_VERSION);
LOG.warn("Incorrect header or version mismatch from " +
hostAddress + ":" + remotePort +
" got version " + version +
" expected version " + CURRENT_VERSION);
setupBadVersionResponse(version);
return -1;
}
dataLengthBuffer.clear();
versionRead = true;
if (authMethod == null) {
throw new IOException("Unable to read authentication method");
}
if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
AccessControlException ae = new AccessControlException(
"Authentication is required");
setupResponse(authFailedResponse, authFailedCall, Status.FATAL,
null, ae.getClass().getName(), ae.getMessage());
responder.doRespond(authFailedCall);
throw ae;
}
if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(
HBaseSaslRpcServer.SWITCH_TO_SIMPLE_AUTH), null, null);
authMethod = AuthMethod.SIMPLE;
// client has already sent the initial Sasl message and we
// should ignore it. Both client and server should fall back
// to simple auth from now on.
skipInitialSaslHandshake = true;
}
if (authMethod != AuthMethod.SIMPLE) {
useSasl = true;
}
rpcHeaderBuffer = null;
rpcHeaderRead = true;
continue;
}
@ -1182,8 +1444,14 @@ public abstract class HBaseServer implements RpcServer {
dataLength = dataLengthBuffer.getInt();
if (dataLength == HBaseClient.PING_CALL_ID) {
dataLengthBuffer.clear();
return 0; //ping message
if(!useWrap) { //covers the !useSasl too
dataLengthBuffer.clear();
return 0; //ping message
}
}
if (dataLength < 0) {
throw new IllegalArgumentException("Unexpected data length "
+ dataLength + "!! from " + getHostAddress());
}
data = ByteBuffer.allocate(dataLength);
incRpcCount(); // Increment the rpc count
@ -1194,15 +1462,21 @@ public abstract class HBaseServer implements RpcServer {
if (data.remaining() == 0) {
dataLengthBuffer.clear();
data.flip();
if (headerRead) {
processData(data.array());
if (skipInitialSaslHandshake) {
data = null;
return count;
skipInitialSaslHandshake = false;
continue;
}
boolean isHeaderRead = headerRead;
if (useSasl) {
saslReadAndProcess(data.array());
} else {
processOneRpc(data.array());
}
processHeader();
headerRead = true;
data = null;
continue;
if (!isHeaderRead) {
continue;
}
}
return count;
}
@ -1238,16 +1512,104 @@ public abstract class HBaseServer implements RpcServer {
}
/// Reads the connection header following version
private void processHeader() throws IOException {
header = ConnectionHeader.parseFrom(new ByteArrayInputStream(data.array()));
private void processHeader(byte[] buf) throws IOException {
DataInputStream in =
new DataInputStream(new ByteArrayInputStream(buf));
header = ConnectionHeader.parseFrom(in);
try {
String protocolClassName = header.getProtocol();
protocol = getProtocolClass(protocolClassName, conf);
if (protocolClassName != null) {
protocol = getProtocolClass(header.getProtocol(), conf);
}
} catch (ClassNotFoundException cnfe) {
throw new IOException("Unknown protocol: " + header.getProtocol());
}
user = User.createUser(header);
UserGroupInformation protocolUser = createUser(header);
if (!useSasl) {
user = protocolUser;
if (user != null) {
user.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
}
} else {
// user is authenticated
user.setAuthenticationMethod(authMethod.authenticationMethod);
//Now we check if this is a proxy user case. If the protocol user is
//different from the 'user', it is a proxy user scenario. However,
//this is not allowed if user authenticated with DIGEST.
if ((protocolUser != null)
&& (!protocolUser.getUserName().equals(user.getUserName()))) {
if (authMethod == AuthMethod.DIGEST) {
// Not allowed to doAs if token authentication is used
throw new AccessControlException("Authenticated user (" + user
+ ") doesn't match what the client claims to be ("
+ protocolUser + ")");
} else {
// Effective user can be different from authenticated user
// for simple auth or kerberos auth
// The user is the real user. Now we create a proxy user
UserGroupInformation realUser = user;
user = UserGroupInformation.createProxyUser(protocolUser
.getUserName(), realUser);
// Now the user is a proxy user, set Authentication method Proxy.
user.setAuthenticationMethod(AuthenticationMethod.PROXY);
}
}
}
}
private void processUnwrappedData(byte[] inBuf) throws IOException,
InterruptedException {
ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(
inBuf));
// Read all RPCs contained in the inBuf, even partial ones
while (true) {
int count = -1;
if (unwrappedDataLengthBuffer.remaining() > 0) {
count = channelRead(ch, unwrappedDataLengthBuffer);
if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
return;
}
if (unwrappedData == null) {
unwrappedDataLengthBuffer.flip();
int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
if (unwrappedDataLength == HBaseClient.PING_CALL_ID) {
if (LOG.isDebugEnabled())
LOG.debug("Received ping message");
unwrappedDataLengthBuffer.clear();
continue; // ping message
}
unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
}
count = channelRead(ch, unwrappedData);
if (count <= 0 || unwrappedData.remaining() > 0)
return;
if (unwrappedData.remaining() == 0) {
unwrappedDataLengthBuffer.clear();
unwrappedData.flip();
processOneRpc(unwrappedData.array());
unwrappedData = null;
}
}
}
private void processOneRpc(byte[] buf) throws IOException,
InterruptedException {
if (headerRead) {
processData(buf);
} else {
processHeader(buf);
headerRead = true;
if (!authorizeConnection()) {
throw new AccessControlException("Connection from " + this
+ " for protocol " + header.getProtocol()
+ " is unauthorized for user " + user);
}
}
}
protected void processData(byte[] buf) throws IOException, InterruptedException {
@ -1303,7 +1665,34 @@ public abstract class HBaseServer implements RpcServer {
}
}
private boolean authorizeConnection() throws IOException {
try {
// If auth method is DIGEST, the token was obtained by the
// real user for the effective user, therefore not required to
// authorize real user. doAs is allowed only for simple or kerberos
// authentication
if (user != null && user.getRealUser() != null
&& (authMethod != AuthMethod.DIGEST)) {
ProxyUsers.authorize(user, this.getHostAddress(), conf);
}
authorize(user, header, getHostInetAddress());
if (LOG.isDebugEnabled()) {
LOG.debug("Successfully authorized " + header);
}
rpcMetrics.authorizationSuccesses.inc();
} catch (AuthorizationException ae) {
LOG.debug("Connection authorization failed: "+ae.getMessage(), ae);
rpcMetrics.authorizationFailures.inc();
setupResponse(authFailedResponse, authFailedCall, Status.FATAL, null,
ae.getClass().getName(), ae.getMessage());
responder.doRespond(authFailedCall);
return false;
}
return true;
}
protected synchronized void close() {
disposeSasl();
data = null;
dataLengthBuffer = null;
if (!channel.isOpen())
@ -1314,6 +1703,33 @@ public abstract class HBaseServer implements RpcServer {
}
try {socket.close();} catch(Exception ignored) {}
}
private UserGroupInformation createUser(ConnectionHeader head) {
UserGroupInformation ugi = null;
if (!head.hasUserInfo()) {
return null;
}
UserInformation userInfoProto = head.getUserInfo();
String effectiveUser = null;
if (userInfoProto.hasEffectiveUser()) {
effectiveUser = userInfoProto.getEffectiveUser();
}
String realUser = null;
if (userInfoProto.hasRealUser()) {
realUser = userInfoProto.getRealUser();
}
if (effectiveUser != null) {
if (realUser != null) {
UserGroupInformation realUserUgi =
UserGroupInformation.createRemoteUser(realUser);
ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
} else {
ugi = UserGroupInformation.createRemoteUser(effectiveUser);
}
}
return ugi;
}
}
/**
@ -1377,15 +1793,16 @@ public abstract class HBaseServer implements RpcServer {
throw new ServerNotRunningYetException("Server is not running yet");
if (LOG.isDebugEnabled()) {
User remoteUser = call.connection.user;
UserGroupInformation remoteUser = call.connection.user;
LOG.debug(getName() + ": call #" + call.id + " executing as "
+ (remoteUser == null ? "NULL principal" : remoteUser.getName()));
+ (remoteUser == null ? "NULL principal" :
remoteUser.getUserName()));
}
RequestContext.set(call.connection.user, getRemoteIp(),
RequestContext.set(User.create(call.connection.user), getRemoteIp(),
call.connection.protocol);
// make the call
value = call(call.connection.protocol, call.param, call.timestamp,
value = call(call.connection.protocol, call.param, call.timestamp,
status);
} catch (Throwable e) {
LOG.debug(getName()+", call "+call+": error: " + e, e);
@ -1517,6 +1934,12 @@ public abstract class HBaseServer implements RpcServer {
// Create the responder here
responder = new Responder();
this.authorize =
conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
if (isSecurityEnabled) {
HBaseSaslRpcServer.init(conf);
}
}
/**
@ -1572,6 +1995,10 @@ public abstract class HBaseServer implements RpcServer {
rpcMetrics.numOpenConnections.set(numConnections);
}
Configuration getConf() {
return conf;
}
/** Sets the socket buffer size used for responding to RPCs.
* @param size send size
*/
@ -1617,6 +2044,14 @@ public abstract class HBaseServer implements RpcServer {
}
}
public SecretManager<? extends TokenIdentifier> getSecretManager() {
return this.secretManager;
}
public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
}
/** Stops the service. No new calls will be handled after this is called. */
@Override
public synchronized void stop() {
@ -1682,6 +2117,31 @@ public abstract class HBaseServer implements RpcServer {
return rpcMetrics;
}
/**
* Authorize the incoming client connection.
*
* @param user client user
* @param connection incoming connection
* @param addr InetAddress of incoming connection
* @throws org.apache.hadoop.security.authorize.AuthorizationException when the client isn't authorized to talk the protocol
*/
public void authorize(UserGroupInformation user,
ConnectionHeader connection,
InetAddress addr
) throws AuthorizationException {
if (authorize) {
Class<?> protocol = null;
try {
protocol = getProtocolClass(connection.getProtocol(), getConf());
} catch (ClassNotFoundException cfne) {
throw new AuthorizationException("Unknown protocol: " +
connection.getProtocol());
}
authManager.authorize(user != null ? user : null,
protocol, getConf(), addr);
}
}
/**
* When the read or write buffer size is larger than this limit, i/o will be
* done in chunks of this size. Most RPC requests and responses would be

View File

@ -23,7 +23,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.ipc.VersionedProtocol;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
import org.apache.hadoop.hbase.security.TokenInfo;
import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.hbase.security.KerberosInfo;
/**
* Protocol that a RegionServer uses to communicate its status to the Master.

View File

@ -50,8 +50,11 @@ import org.apache.hadoop.hbase.util.Objects;
import org.apache.hadoop.io.*;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
import org.apache.hadoop.hbase.ipc.VersionedProtocol;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.*;
@ -252,9 +255,6 @@ class WritableRpcEngine implements RpcEngine {
private Class<?>[] ifaces;
private boolean verbose;
// for JSON encoding
private static ObjectMapper mapper = new ObjectMapper();
private static final String WARN_RESPONSE_TIME =
"hbase.ipc.warn.response.time";
private static final String WARN_RESPONSE_SIZE =
@ -310,6 +310,36 @@ class WritableRpcEngine implements RpcEngine {
DEFAULT_WARN_RESPONSE_SIZE);
}
public AuthenticationTokenSecretManager createSecretManager(){
if (!User.isSecurityEnabled() ||
!(instance instanceof org.apache.hadoop.hbase.Server)) {
return null;
}
org.apache.hadoop.hbase.Server server =
(org.apache.hadoop.hbase.Server)instance;
Configuration conf = server.getConfiguration();
long keyUpdateInterval =
conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
long maxAge =
conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
server.getServerName().toString(), keyUpdateInterval, maxAge);
}
@Override
public void startThreads() {
AuthenticationTokenSecretManager mgr = createSecretManager();
if (mgr != null) {
setSecretManager(mgr);
mgr.start();
}
this.authManager = new ServiceAuthorizationManager();
HBasePolicyProvider.init(conf, authManager);
// continue with base startup
super.startThreads();
}
@Override
public Writable call(Class<? extends VersionedProtocol> protocol,
Writable param, long receivedTime, MonitoredRPCHandler status)

View File

@ -15,7 +15,7 @@ public final class RPCProtos {
boolean hasEffectiveUser();
String getEffectiveUser();
// required string realUser = 2;
// optional string realUser = 2;
boolean hasRealUser();
String getRealUser();
}
@ -80,7 +80,7 @@ public final class RPCProtos {
}
}
// required string realUser = 2;
// optional string realUser = 2;
public static final int REALUSER_FIELD_NUMBER = 2;
private java.lang.Object realUser_;
public boolean hasRealUser() {
@ -125,10 +125,6 @@ public final class RPCProtos {
memoizedIsInitialized = 0;
return false;
}
if (!hasRealUser()) {
memoizedIsInitialized = 0;
return false;
}
memoizedIsInitialized = 1;
return true;
}
@ -406,10 +402,6 @@ public final class RPCProtos {
return false;
}
if (!hasRealUser()) {
return false;
}
return true;
}
@ -488,7 +480,7 @@ public final class RPCProtos {
onChanged();
}
// required string realUser = 2;
// optional string realUser = 2;
private java.lang.Object realUser_ = "";
public boolean hasRealUser() {
return ((bitField0_ & 0x00000002) == 0x00000002);
@ -2081,9 +2073,9 @@ public final class RPCProtos {
boolean hasCallId();
int getCallId();
// required bool error = 2;
boolean hasError();
boolean getError();
// required .RpcResponse.Status status = 2;
boolean hasStatus();
org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status getStatus();
// optional bytes response = 3;
boolean hasResponse();
@ -2122,6 +2114,78 @@ public final class RPCProtos {
return org.apache.hadoop.hbase.protobuf.generated.RPCProtos.internal_static_RpcResponse_fieldAccessorTable;
}
public enum Status
implements com.google.protobuf.ProtocolMessageEnum {
SUCCESS(0, 0),
ERROR(1, 1),
FATAL(2, 2),
;
public static final int SUCCESS_VALUE = 0;
public static final int ERROR_VALUE = 1;
public static final int FATAL_VALUE = 2;
public final int getNumber() { return value; }
public static Status valueOf(int value) {
switch (value) {
case 0: return SUCCESS;
case 1: return ERROR;
case 2: return FATAL;
default: return null;
}
}
public static com.google.protobuf.Internal.EnumLiteMap<Status>
internalGetValueMap() {
return internalValueMap;
}
private static com.google.protobuf.Internal.EnumLiteMap<Status>
internalValueMap =
new com.google.protobuf.Internal.EnumLiteMap<Status>() {
public Status findValueByNumber(int number) {
return Status.valueOf(number);
}
};
public final com.google.protobuf.Descriptors.EnumValueDescriptor
getValueDescriptor() {
return getDescriptor().getValues().get(index);
}
public final com.google.protobuf.Descriptors.EnumDescriptor
getDescriptorForType() {
return getDescriptor();
}
public static final com.google.protobuf.Descriptors.EnumDescriptor
getDescriptor() {
return org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.getDescriptor().getEnumTypes().get(0);
}
private static final Status[] VALUES = {
SUCCESS, ERROR, FATAL,
};
public static Status valueOf(
com.google.protobuf.Descriptors.EnumValueDescriptor desc) {
if (desc.getType() != getDescriptor()) {
throw new java.lang.IllegalArgumentException(
"EnumValueDescriptor is not for this type.");
}
return VALUES[desc.getIndex()];
}
private final int index;
private final int value;
private Status(int index, int value) {
this.index = index;
this.value = value;
}
// @@protoc_insertion_point(enum_scope:RpcResponse.Status)
}
private int bitField0_;
// required int32 callId = 1;
public static final int CALLID_FIELD_NUMBER = 1;
@ -2133,14 +2197,14 @@ public final class RPCProtos {
return callId_;
}
// required bool error = 2;
public static final int ERROR_FIELD_NUMBER = 2;
private boolean error_;
public boolean hasError() {
// required .RpcResponse.Status status = 2;
public static final int STATUS_FIELD_NUMBER = 2;
private org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status status_;
public boolean hasStatus() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
public boolean getError() {
return error_;
public org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status getStatus() {
return status_;
}
// optional bytes response = 3;
@ -2168,7 +2232,7 @@ public final class RPCProtos {
private void initFields() {
callId_ = 0;
error_ = false;
status_ = org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status.SUCCESS;
response_ = com.google.protobuf.ByteString.EMPTY;
exception_ = org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException.getDefaultInstance();
}
@ -2181,7 +2245,7 @@ public final class RPCProtos {
memoizedIsInitialized = 0;
return false;
}
if (!hasError()) {
if (!hasStatus()) {
memoizedIsInitialized = 0;
return false;
}
@ -2202,7 +2266,7 @@ public final class RPCProtos {
output.writeInt32(1, callId_);
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeBool(2, error_);
output.writeEnum(2, status_.getNumber());
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeBytes(3, response_);
@ -2225,7 +2289,7 @@ public final class RPCProtos {
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(2, error_);
.computeEnumSize(2, status_.getNumber());
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
size += com.google.protobuf.CodedOutputStream
@ -2263,10 +2327,10 @@ public final class RPCProtos {
result = result && (getCallId()
== other.getCallId());
}
result = result && (hasError() == other.hasError());
if (hasError()) {
result = result && (getError()
== other.getError());
result = result && (hasStatus() == other.hasStatus());
if (hasStatus()) {
result = result &&
(getStatus() == other.getStatus());
}
result = result && (hasResponse() == other.hasResponse());
if (hasResponse()) {
@ -2291,9 +2355,9 @@ public final class RPCProtos {
hash = (37 * hash) + CALLID_FIELD_NUMBER;
hash = (53 * hash) + getCallId();
}
if (hasError()) {
hash = (37 * hash) + ERROR_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getError());
if (hasStatus()) {
hash = (37 * hash) + STATUS_FIELD_NUMBER;
hash = (53 * hash) + hashEnum(getStatus());
}
if (hasResponse()) {
hash = (37 * hash) + RESPONSE_FIELD_NUMBER;
@ -2422,7 +2486,7 @@ public final class RPCProtos {
super.clear();
callId_ = 0;
bitField0_ = (bitField0_ & ~0x00000001);
error_ = false;
status_ = org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status.SUCCESS;
bitField0_ = (bitField0_ & ~0x00000002);
response_ = com.google.protobuf.ByteString.EMPTY;
bitField0_ = (bitField0_ & ~0x00000004);
@ -2477,7 +2541,7 @@ public final class RPCProtos {
if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
to_bitField0_ |= 0x00000002;
}
result.error_ = error_;
result.status_ = status_;
if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
to_bitField0_ |= 0x00000004;
}
@ -2509,8 +2573,8 @@ public final class RPCProtos {
if (other.hasCallId()) {
setCallId(other.getCallId());
}
if (other.hasError()) {
setError(other.getError());
if (other.hasStatus()) {
setStatus(other.getStatus());
}
if (other.hasResponse()) {
setResponse(other.getResponse());
@ -2527,7 +2591,7 @@ public final class RPCProtos {
return false;
}
if (!hasError()) {
if (!hasStatus()) {
return false;
}
@ -2569,8 +2633,14 @@ public final class RPCProtos {
break;
}
case 16: {
bitField0_ |= 0x00000002;
error_ = input.readBool();
int rawValue = input.readEnum();
org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status value = org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status.valueOf(rawValue);
if (value == null) {
unknownFields.mergeVarintField(2, rawValue);
} else {
bitField0_ |= 0x00000002;
status_ = value;
}
break;
}
case 26: {
@ -2614,23 +2684,26 @@ public final class RPCProtos {
return this;
}
// required bool error = 2;
private boolean error_ ;
public boolean hasError() {
// required .RpcResponse.Status status = 2;
private org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status status_ = org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status.SUCCESS;
public boolean hasStatus() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
public boolean getError() {
return error_;
public org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status getStatus() {
return status_;
}
public Builder setError(boolean value) {
public Builder setStatus(org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000002;
error_ = value;
status_ = value;
onChanged();
return this;
}
public Builder clearError() {
public Builder clearStatus() {
bitField0_ = (bitField0_ & ~0x00000002);
error_ = false;
status_ = org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status.SUCCESS;
onChanged();
return this;
}
@ -2795,17 +2868,19 @@ public final class RPCProtos {
static {
java.lang.String[] descriptorData = {
"\n\tRPC.proto\":\n\017UserInformation\022\025\n\reffect" +
"iveUser\030\001 \002(\t\022\020\n\010realUser\030\002 \002(\t\"w\n\020Conne" +
"iveUser\030\001 \002(\t\022\020\n\010realUser\030\002 \001(\t\"w\n\020Conne" +
"ctionHeader\022\"\n\010userInfo\030\001 \001(\0132\020.UserInfo" +
"rmation\022?\n\010protocol\030\002 \001(\t:-org.apache.ha" +
"doop.hbase.client.ClientProtocol\"-\n\nRpcR" +
"equest\022\016\n\006callId\030\001 \002(\005\022\017\n\007request\030\002 \001(\014\"" +
"9\n\014RpcException\022\025\n\rexceptionName\030\001 \002(\t\022\022" +
"\n\nstackTrace\030\002 \001(\t\"`\n\013RpcResponse\022\016\n\006cal" +
"lId\030\001 \002(\005\022\r\n\005error\030\002 \002(\010\022\020\n\010response\030\003 \001" +
"(\014\022 \n\texception\030\004 \001(\0132\r.RpcExceptionB<\n*",
"org.apache.hadoop.hbase.protobuf.generat" +
"edB\tRPCProtosH\001\240\001\001"
"\n\nstackTrace\030\002 \001(\t\"\243\001\n\013RpcResponse\022\016\n\006ca" +
"llId\030\001 \002(\005\022#\n\006status\030\002 \002(\0162\023.RpcResponse" +
".Status\022\020\n\010response\030\003 \001(\014\022 \n\texception\030\004",
" \001(\0132\r.RpcException\"+\n\006Status\022\013\n\007SUCCESS" +
"\020\000\022\t\n\005ERROR\020\001\022\t\n\005FATAL\020\002B<\n*org.apache.h" +
"adoop.hbase.protobuf.generatedB\tRPCProto" +
"sH\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -2849,7 +2924,7 @@ public final class RPCProtos {
internal_static_RpcResponse_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_RpcResponse_descriptor,
new java.lang.String[] { "CallId", "Error", "Response", "Exception", },
new java.lang.String[] { "CallId", "Status", "Response", "Exception", },
org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.class,
org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Builder.class);
return null;

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.security;
import org.apache.hadoop.hbase.DoNotRetryIOException;
/**
* Exception thrown by access-related methods.
*/
public class AccessDeniedException extends DoNotRetryIOException {
private static final long serialVersionUID = 1913879564363001780L;
public AccessDeniedException() {
super();
}
public AccessDeniedException(Class<?> clazz, String s) {
super( "AccessDenied [" + clazz.getName() + "]: " + s);
}
public AccessDeniedException(String s) {
super(s);
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.security;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.ipc.RegionServerStatusProtocol;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.Service;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
/**
* Implementation of secure Hadoop policy provider for mapping
* protocol interfaces to hbase-policy.xml entries.
*/
public class HBasePolicyProvider extends PolicyProvider {
protected static Service[] services = {
new Service("security.client.protocol.acl", ClientProtocol.class),
new Service("security.client.protocol.acl", AdminProtocol.class),
new Service("security.admin.protocol.acl", HMasterInterface.class),
new Service("security.masterregion.protocol.acl", RegionServerStatusProtocol.class)
};
@Override
public Service[] getServices() {
return services;
}
public static void init(Configuration conf,
ServiceAuthorizationManager authManager) {
// set service-level authorization security policy
conf.set("hadoop.policy.file", "hbase-policy.xml");
if (conf.getBoolean(
ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
authManager.refresh(conf, new HBasePolicyProvider());
}
}
}

View File

@ -0,0 +1,279 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.security;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.sasl.RealmCallback;
import javax.security.sasl.RealmChoiceCallback;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslClient;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslStatus;
import org.apache.hadoop.security.SaslInputStream;
import org.apache.hadoop.security.SaslOutputStream;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
/**
* A utility class that encapsulates SASL logic for RPC client.
* Copied from <code>org.apache.hadoop.security</code>
*/
public class HBaseSaslRpcClient {
public static final Log LOG = LogFactory.getLog(HBaseSaslRpcClient.class);
private final SaslClient saslClient;
/**
* Create a HBaseSaslRpcClient for an authentication method
*
* @param method
* the requested authentication method
* @param token
* token to use if needed by the authentication method
*/
public HBaseSaslRpcClient(AuthMethod method,
Token<? extends TokenIdentifier> token, String serverPrincipal)
throws IOException {
switch (method) {
case DIGEST:
if (LOG.isDebugEnabled())
LOG.debug("Creating SASL " + AuthMethod.DIGEST.getMechanismName()
+ " client to authenticate to service at " + token.getService());
saslClient = Sasl.createSaslClient(new String[] { AuthMethod.DIGEST
.getMechanismName() }, null, null, HBaseSaslRpcServer.SASL_DEFAULT_REALM,
HBaseSaslRpcServer.SASL_PROPS, new SaslClientCallbackHandler(token));
break;
case KERBEROS:
if (LOG.isDebugEnabled()) {
LOG
.debug("Creating SASL " + AuthMethod.KERBEROS.getMechanismName()
+ " client. Server's Kerberos principal name is "
+ serverPrincipal);
}
if (serverPrincipal == null || serverPrincipal.length() == 0) {
throw new IOException(
"Failed to specify server's Kerberos principal name");
}
String names[] = HBaseSaslRpcServer.splitKerberosName(serverPrincipal);
if (names.length != 3) {
throw new IOException(
"Kerberos principal does not have the expected format: "
+ serverPrincipal);
}
saslClient = Sasl.createSaslClient(new String[] { AuthMethod.KERBEROS
.getMechanismName() }, null, names[0], names[1],
HBaseSaslRpcServer.SASL_PROPS, null);
break;
default:
throw new IOException("Unknown authentication method " + method);
}
if (saslClient == null)
throw new IOException("Unable to find SASL client implementation");
}
private static void readStatus(DataInputStream inStream) throws IOException {
int status = inStream.readInt(); // read status
if (status != SaslStatus.SUCCESS.state) {
throw new RemoteException(WritableUtils.readString(inStream),
WritableUtils.readString(inStream));
}
}
/**
* Do client side SASL authentication with server via the given InputStream
* and OutputStream
*
* @param inS
* InputStream to use
* @param outS
* OutputStream to use
* @return true if connection is set up, or false if needs to switch
* to simple Auth.
* @throws IOException
*/
public boolean saslConnect(InputStream inS, OutputStream outS)
throws IOException {
DataInputStream inStream = new DataInputStream(new BufferedInputStream(inS));
DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(
outS));
try {
byte[] saslToken = new byte[0];
if (saslClient.hasInitialResponse())
saslToken = saslClient.evaluateChallenge(saslToken);
if (saslToken != null) {
outStream.writeInt(saslToken.length);
outStream.write(saslToken, 0, saslToken.length);
outStream.flush();
if (LOG.isDebugEnabled())
LOG.debug("Have sent token of size " + saslToken.length
+ " from initSASLContext.");
}
if (!saslClient.isComplete()) {
readStatus(inStream);
int len = inStream.readInt();
if (len == HBaseSaslRpcServer.SWITCH_TO_SIMPLE_AUTH) {
if (LOG.isDebugEnabled())
LOG.debug("Server asks us to fall back to simple auth.");
saslClient.dispose();
return false;
}
saslToken = new byte[len];
if (LOG.isDebugEnabled())
LOG.debug("Will read input token of size " + saslToken.length
+ " for processing by initSASLContext");
inStream.readFully(saslToken);
}
while (!saslClient.isComplete()) {
saslToken = saslClient.evaluateChallenge(saslToken);
if (saslToken != null) {
if (LOG.isDebugEnabled())
LOG.debug("Will send token of size " + saslToken.length
+ " from initSASLContext.");
outStream.writeInt(saslToken.length);
outStream.write(saslToken, 0, saslToken.length);
outStream.flush();
}
if (!saslClient.isComplete()) {
readStatus(inStream);
saslToken = new byte[inStream.readInt()];
if (LOG.isDebugEnabled())
LOG.debug("Will read input token of size " + saslToken.length
+ " for processing by initSASLContext");
inStream.readFully(saslToken);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("SASL client context established. Negotiated QoP: "
+ saslClient.getNegotiatedProperty(Sasl.QOP));
}
return true;
} catch (IOException e) {
try {
saslClient.dispose();
} catch (SaslException ignored) {
// ignore further exceptions during cleanup
}
throw e;
}
}
/**
* Get a SASL wrapped InputStream. Can be called only after saslConnect() has
* been called.
*
* @param in
* the InputStream to wrap
* @return a SASL wrapped InputStream
* @throws IOException
*/
public InputStream getInputStream(InputStream in) throws IOException {
if (!saslClient.isComplete()) {
throw new IOException("Sasl authentication exchange hasn't completed yet");
}
return new SaslInputStream(in, saslClient);
}
/**
* Get a SASL wrapped OutputStream. Can be called only after saslConnect() has
* been called.
*
* @param out
* the OutputStream to wrap
* @return a SASL wrapped OutputStream
* @throws IOException
*/
public OutputStream getOutputStream(OutputStream out) throws IOException {
if (!saslClient.isComplete()) {
throw new IOException("Sasl authentication exchange hasn't completed yet");
}
return new SaslOutputStream(out, saslClient);
}
/** Release resources used by wrapped saslClient */
public void dispose() throws SaslException {
saslClient.dispose();
}
private static class SaslClientCallbackHandler implements CallbackHandler {
private final String userName;
private final char[] userPassword;
public SaslClientCallbackHandler(Token<? extends TokenIdentifier> token) {
this.userName = HBaseSaslRpcServer.encodeIdentifier(token.getIdentifier());
this.userPassword = HBaseSaslRpcServer.encodePassword(token.getPassword());
}
public void handle(Callback[] callbacks)
throws UnsupportedCallbackException {
NameCallback nc = null;
PasswordCallback pc = null;
RealmCallback rc = null;
for (Callback callback : callbacks) {
if (callback instanceof RealmChoiceCallback) {
continue;
} else if (callback instanceof NameCallback) {
nc = (NameCallback) callback;
} else if (callback instanceof PasswordCallback) {
pc = (PasswordCallback) callback;
} else if (callback instanceof RealmCallback) {
rc = (RealmCallback) callback;
} else {
throw new UnsupportedCallbackException(callback,
"Unrecognized SASL client callback");
}
}
if (nc != null) {
if (LOG.isDebugEnabled())
LOG.debug("SASL client callback: setting username: " + userName);
nc.setName(userName);
}
if (pc != null) {
if (LOG.isDebugEnabled())
LOG.debug("SASL client callback: setting userPassword");
pc.setPassword(userPassword);
}
if (rc != null) {
if (LOG.isDebugEnabled())
LOG.debug("SASL client callback: setting realm: "
+ rc.getDefaultText());
rc.setText(rc.getDefaultText());
}
}
}
}

View File

@ -40,8 +40,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.ipc.SecureServer;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
@ -177,11 +175,11 @@ public class HBaseSaslRpcServer {
/** CallbackHandler for SASL DIGEST-MD5 mechanism */
public static class SaslDigestCallbackHandler implements CallbackHandler {
private SecretManager<TokenIdentifier> secretManager;
private SecureServer.SecureConnection connection;
private HBaseServer.Connection connection;
public SaslDigestCallbackHandler(
SecretManager<TokenIdentifier> secretManager,
SecureServer.SecureConnection connection) {
HBaseServer.Connection connection) {
this.secretManager = secretManager;
this.connection = connection;
}

View File

@ -22,17 +22,14 @@ package org.apache.hadoop.hbase.security;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
import org.apache.hadoop.hbase.util.Methods;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
@ -50,24 +47,12 @@ import org.apache.commons.logging.Log;
* HBase, but can be extended as needs change.
* </p>
*/
@InterfaceAudience.Private
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class User {
public static final String HBASE_SECURITY_CONF_KEY =
"hbase.security.authentication";
/**
* Flag to differentiate between API-incompatible changes to
* {@link org.apache.hadoop.security.UserGroupInformation} between vanilla
* Hadoop 0.20.x and secure Hadoop 0.20+.
*/
private static boolean IS_SECURE_HADOOP = true;
static {
try {
UserGroupInformation.class.getMethod("isSecurityEnabled");
} catch (NoSuchMethodException nsme) {
IS_SECURE_HADOOP = false;
}
}
private static Log LOG = LogFactory.getLog(User.class);
protected UserGroupInformation ugi;
@ -138,12 +123,7 @@ public abstract class User {
* Returns the {@code User} instance within current execution context.
*/
public static User getCurrent() throws IOException {
User user;
if (IS_SECURE_HADOOP) {
user = new SecureHadoopUser();
} else {
user = new HadoopUser();
}
User user = new SecureHadoopUser();
if (user.getUGI() == null) {
return null;
}
@ -159,38 +139,7 @@ public abstract class User {
if (ugi == null) {
return null;
}
if (IS_SECURE_HADOOP) {
return new SecureHadoopUser(ugi);
}
return new HadoopUser(ugi);
}
public static User createUser(ConnectionHeader head) {
UserGroupInformation ugi = null;
if (!head.hasUserInfo()) {
return create(null);
}
UserInformation userInfoProto = head.getUserInfo();
String effectiveUser = null;
if (userInfoProto.hasEffectiveUser()) {
effectiveUser = userInfoProto.getEffectiveUser();
}
String realUser = null;
if (userInfoProto.hasRealUser()) {
realUser = userInfoProto.getRealUser();
}
if (effectiveUser != null) {
if (realUser != null) {
UserGroupInformation realUserUgi =
UserGroupInformation.createRemoteUser(realUser);
ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
} else {
ugi = UserGroupInformation.createRemoteUser(effectiveUser);
}
}
return create(ugi);
return new SecureHadoopUser(ugi);
}
/**
@ -201,10 +150,7 @@ public abstract class User {
*/
public static User createUserForTesting(Configuration conf,
String name, String[] groups) {
if (IS_SECURE_HADOOP) {
return SecureHadoopUser.createUserForTesting(conf, name, groups);
}
return HadoopUser.createUserForTesting(conf, name, groups);
return SecureHadoopUser.createUserForTesting(conf, name, groups);
}
/**
@ -225,11 +171,7 @@ public abstract class User {
*/
public static void login(Configuration conf, String fileConfKey,
String principalConfKey, String localhost) throws IOException {
if (IS_SECURE_HADOOP) {
SecureHadoopUser.login(conf, fileConfKey, principalConfKey, localhost);
} else {
HadoopUser.login(conf, fileConfKey, principalConfKey, localhost);
}
SecureHadoopUser.login(conf, fileConfKey, principalConfKey, localhost);
}
/**
@ -239,11 +181,7 @@ public abstract class User {
* {@code UserGroupInformation.isSecurityEnabled()}.
*/
public static boolean isSecurityEnabled() {
if (IS_SECURE_HADOOP) {
return SecureHadoopUser.isSecurityEnabled();
} else {
return HadoopUser.isSecurityEnabled();
}
return SecureHadoopUser.isSecurityEnabled();
}
/**
@ -257,160 +195,6 @@ public abstract class User {
/* Concrete implementations */
/**
* Bridges {@link User} calls to invocations of the appropriate methods
* in {@link org.apache.hadoop.security.UserGroupInformation} in regular
* Hadoop 0.20 (ASF Hadoop and other versions without the backported security
* features).
*/
private static class HadoopUser extends User {
private HadoopUser() {
try {
ugi = (UserGroupInformation) callStatic("getCurrentUGI");
if (ugi == null) {
// Secure Hadoop UGI will perform an implicit login if the current
// user is null. Emulate the same behavior here for consistency
Configuration conf = HBaseConfiguration.create();
ugi = (UserGroupInformation) callStatic("login",
new Class[]{ Configuration.class }, new Object[]{ conf });
if (ugi != null) {
callStatic("setCurrentUser",
new Class[]{ UserGroupInformation.class }, new Object[]{ ugi });
}
}
} catch (RuntimeException re) {
throw re;
} catch (Exception e) {
throw new UndeclaredThrowableException(e,
"Unexpected exception HadoopUser<init>");
}
}
private HadoopUser(UserGroupInformation ugi) {
this.ugi = ugi;
}
@Override
public String getShortName() {
return ugi != null ? ugi.getUserName() : null;
}
@Override
public <T> T runAs(PrivilegedAction<T> action) {
T result = null;
UserGroupInformation previous = null;
try {
previous = (UserGroupInformation) callStatic("getCurrentUGI");
try {
if (ugi != null) {
callStatic("setCurrentUser", new Class[]{UserGroupInformation.class},
new Object[]{ugi});
}
result = action.run();
} finally {
callStatic("setCurrentUser", new Class[]{UserGroupInformation.class},
new Object[]{previous});
}
} catch (RuntimeException re) {
throw re;
} catch (Exception e) {
throw new UndeclaredThrowableException(e,
"Unexpected exception in runAs()");
}
return result;
}
@Override
public <T> T runAs(PrivilegedExceptionAction<T> action)
throws IOException, InterruptedException {
T result = null;
try {
UserGroupInformation previous =
(UserGroupInformation) callStatic("getCurrentUGI");
try {
if (ugi != null) {
callStatic("setCurrentUGI", new Class[]{UserGroupInformation.class},
new Object[]{ugi});
}
result = action.run();
} finally {
callStatic("setCurrentUGI", new Class[]{UserGroupInformation.class},
new Object[]{previous});
}
} catch (Exception e) {
if (e instanceof IOException) {
throw (IOException)e;
} else if (e instanceof InterruptedException) {
throw (InterruptedException)e;
} else if (e instanceof RuntimeException) {
throw (RuntimeException)e;
} else {
throw new UndeclaredThrowableException(e, "Unknown exception in runAs()");
}
}
return result;
}
@Override
public void obtainAuthTokenForJob(Configuration conf, Job job)
throws IOException, InterruptedException {
// this is a no-op. token creation is only supported for kerberos
// authenticated clients
}
@Override
public void obtainAuthTokenForJob(JobConf job)
throws IOException, InterruptedException {
// this is a no-op. token creation is only supported for kerberos
// authenticated clients
}
/** @see User#createUserForTesting(org.apache.hadoop.conf.Configuration, String, String[]) */
public static User createUserForTesting(Configuration conf,
String name, String[] groups) {
try {
Class c = Class.forName("org.apache.hadoop.security.UnixUserGroupInformation");
Constructor constructor = c.getConstructor(String.class, String[].class);
if (constructor == null) {
throw new NullPointerException(
);
}
UserGroupInformation newUser =
(UserGroupInformation)constructor.newInstance(name, groups);
// set user in configuration -- hack for regular hadoop
conf.set("hadoop.job.ugi", newUser.toString());
return new HadoopUser(newUser);
} catch (ClassNotFoundException cnfe) {
throw new RuntimeException(
"UnixUserGroupInformation not found, is this secure Hadoop?", cnfe);
} catch (NoSuchMethodException nsme) {
throw new RuntimeException(
"No valid constructor found for UnixUserGroupInformation!", nsme);
} catch (RuntimeException re) {
throw re;
} catch (Exception e) {
throw new UndeclaredThrowableException(e,
"Unexpected exception instantiating new UnixUserGroupInformation");
}
}
/**
* No-op since we're running on a version of Hadoop that doesn't support
* logins.
* @see User#login(org.apache.hadoop.conf.Configuration, String, String, String)
*/
public static void login(Configuration conf, String fileConfKey,
String principalConfKey, String localhost) throws IOException {
LOG.info("Skipping login, not running on secure Hadoop");
}
/** Always returns {@code false}. */
public static boolean isSecurityEnabled() {
return false;
}
}
/**
* Bridges {@code User} invocations to underlying calls to
* {@link org.apache.hadoop.security.UserGroupInformation} for secure Hadoop

View File

@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.security.access;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
import java.util.List;
/**
* Handles synchronization of access control list entries and updates
* throughout all nodes in the cluster. The {@link AccessController} instance
* on the {@code _acl_} table regions, creates a znode for each table as
* {@code /hbase/acl/tablename}, with the znode data containing a serialized
* list of the permissions granted for the table. The {@code AccessController}
* instances on all other cluster hosts watch the znodes for updates, which
* trigger updates in the {@link TableAuthManager} permission cache.
*/
public class ZKPermissionWatcher extends ZooKeeperListener {
private static Log LOG = LogFactory.getLog(ZKPermissionWatcher.class);
// parent node for permissions lists
static final String ACL_NODE = "acl";
TableAuthManager authManager;
String aclZNode;
public ZKPermissionWatcher(ZooKeeperWatcher watcher,
TableAuthManager authManager, Configuration conf) {
super(watcher);
this.authManager = authManager;
String aclZnodeParent = conf.get("zookeeper.znode.acl.parent", ACL_NODE);
this.aclZNode = ZKUtil.joinZNode(watcher.baseZNode, aclZnodeParent);
}
public void start() throws KeeperException {
watcher.registerListener(this);
if (ZKUtil.watchAndCheckExists(watcher, aclZNode)) {
List<ZKUtil.NodeAndData> existing =
ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
if (existing != null) {
refreshNodes(existing);
}
}
}
@Override
public void nodeCreated(String path) {
if (path.equals(aclZNode)) {
try {
List<ZKUtil.NodeAndData> nodes =
ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
refreshNodes(nodes);
} catch (KeeperException ke) {
LOG.error("Error reading data from zookeeper", ke);
// only option is to abort
watcher.abort("Zookeeper error obtaining acl node children", ke);
}
}
}
@Override
public void nodeDeleted(String path) {
if (aclZNode.equals(ZKUtil.getParent(path))) {
String table = ZKUtil.getNodeName(path);
authManager.remove(Bytes.toBytes(table));
}
}
@Override
public void nodeDataChanged(String path) {
if (aclZNode.equals(ZKUtil.getParent(path))) {
// update cache on an existing table node
String table = ZKUtil.getNodeName(path);
try {
byte[] data = ZKUtil.getDataAndWatch(watcher, path);
authManager.refreshCacheFromWritable(Bytes.toBytes(table), data);
} catch (KeeperException ke) {
LOG.error("Error reading data from zookeeper for node "+table, ke);
// only option is to abort
watcher.abort("Zookeeper error getting data for node " + table, ke);
} catch (IOException ioe) {
LOG.error("Error reading permissions writables", ioe);
}
}
}
@Override
public void nodeChildrenChanged(String path) {
if (path.equals(aclZNode)) {
// table permissions changed
try {
List<ZKUtil.NodeAndData> nodes =
ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
refreshNodes(nodes);
} catch (KeeperException ke) {
LOG.error("Error reading data from zookeeper for path "+path, ke);
watcher.abort("Zookeeper error get node children for path "+path, ke);
}
}
}
private void refreshNodes(List<ZKUtil.NodeAndData> nodes) {
for (ZKUtil.NodeAndData n : nodes) {
if (n.isEmpty()) continue;
String path = n.getNode();
String table = ZKUtil.getNodeName(path);
try {
byte[] nodeData = n.getData();
if (LOG.isDebugEnabled()) {
LOG.debug("Updating permissions cache from node "+table+" with data: "+
Bytes.toStringBinary(nodeData));
}
authManager.refreshCacheFromWritable(Bytes.toBytes(table),
nodeData);
} catch (IOException ioe) {
LOG.error("Failed parsing permissions for table '" + table +
"' from zk", ioe);
}
}
}
/***
* Write a table's access controls to the permissions mirror in zookeeper
* @param tableName
* @param permsData
*/
public void writeToZookeeper(String tableName,
byte[] permsData) {
String zkNode =
ZKUtil.joinZNode(ZKUtil.joinZNode(watcher.baseZNode, ACL_NODE),
tableName);
try {
ZKUtil.createWithParents(watcher, zkNode);
ZKUtil.updateExistingNodeData(watcher, zkNode,
permsData, -1);
} catch (KeeperException e) {
LOG.error("Failed updating permissions for table '" + tableName +
"'", e);
watcher.abort("Failed writing node "+zkNode+" to zookeeper", e);
}
}
}

View File

@ -233,6 +233,10 @@ public class AuthenticationTokenSecretManager
}
}
}
synchronized boolean isCurrentKeyRolled() {
return currentKey != null;
}
synchronized void rollCurrentKey() {
if (!leaderElector.isMaster()) {

View File

@ -25,9 +25,9 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.ipc.RequestContext;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.SecureServer;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.security.UserGroupInformation;
@ -56,11 +56,9 @@ public class TokenProvider extends BaseEndpointCoprocessor
RegionCoprocessorEnvironment regionEnv =
(RegionCoprocessorEnvironment)env;
RpcServer server = regionEnv.getRegionServerServices().getRpcServer();
if (server instanceof SecureServer) {
SecretManager mgr = ((SecureServer)server).getSecretManager();
if (mgr instanceof AuthenticationTokenSecretManager) {
secretManager = (AuthenticationTokenSecretManager)mgr;
}
SecretManager<?> mgr = ((HBaseServer)server).getSecretManager();
if (mgr instanceof AuthenticationTokenSecretManager) {
secretManager = (AuthenticationTokenSecretManager)mgr;
}
}
}

View File

@ -47,7 +47,7 @@ option optimize_for = SPEED;
message UserInformation {
required string effectiveUser = 1;
required string realUser = 2;
optional string realUser = 2;
}
message ConnectionHeader {
@ -94,9 +94,14 @@ message RpcResponse {
/** Echo back the callId the client sent */
required int32 callId = 1;
/** Did the RPC execution encounter an error at the server */
required bool error = 2;
enum Status {
SUCCESS = 0;
ERROR = 1;
FATAL = 2;
}
required Status status = 2;
/** Optional response bytes */
optional bytes response = 3;
/** Optional exception when error is true*/
optional RpcException exception = 4;
}
}

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.security.access;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ipc.SecureRpcEngine;
import org.apache.hadoop.hbase.security.User;
/**
@ -31,7 +30,6 @@ public class SecureTestUtil {
public static void enableSecurity(Configuration conf) throws IOException {
conf.set("hadoop.security.authorization", "false");
conf.set("hadoop.security.authentication", "simple");
conf.set("hbase.rpc.engine", SecureRpcEngine.class.getName());
conf.set("hbase.coprocessor.master.classes", AccessController.class.getName());
conf.set("hbase.coprocessor.region.classes", AccessController.class.getName());
// add the process running user to superusers

View File

@ -32,10 +32,9 @@ import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.ipc.RequestContext;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.SecureRpcEngine;
import org.apache.hadoop.hbase.ipc.SecureServer;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
@ -84,17 +83,18 @@ public class TestTokenAuthentication {
public static void setupBeforeClass() throws Exception {
TEST_UTIL = new HBaseTestingUtility();
Configuration conf = TEST_UTIL.getConfiguration();
conf.set(HBaseRPC.RPC_ENGINE_PROP, SecureRpcEngine.class.getName());
conf.set("hbase.coprocessor.region.classes",
IdentityCoprocessor.class.getName());
TEST_UTIL.startMiniCluster();
HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
RpcServer server = rs.getRpcServer();
assertTrue(server instanceof SecureServer);
SecretManager mgr =
((SecureServer)server).getSecretManager();
assertTrue(mgr instanceof AuthenticationTokenSecretManager);
secretManager = (AuthenticationTokenSecretManager)mgr;
secretManager = new AuthenticationTokenSecretManager(conf, rs.getZooKeeper(),
rs.getServerName().toString(),
conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000),
conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000));
secretManager.start();
while(secretManager.getCurrentKey() == null) {
Thread.sleep(1);
}
}
@AfterClass

View File

@ -122,6 +122,10 @@
Keep the maximum filesize small so we split more often in tests.
</description>
</property>
<property>
<name>hadoop.log.dir</name>
<value>${user.dir}/../logs</value>
</property>
<property>
<name>hbase.zookeeper.property.clientPort</name>
<value>21818</value>