Thrift Http implementation
This commit is contained in:
parent
6aa8b3727c
commit
1cf8521191
|
@ -38,4 +38,12 @@ public class SecurityUtil {
|
||||||
}
|
}
|
||||||
return (i > -1) ? principal.substring(0, i) : principal;
|
return (i > -1) ? principal.substring(0, i) : principal;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the user name from a principal
|
||||||
|
*/
|
||||||
|
public static String getPrincipalWithoutRealm(final String principal) {
|
||||||
|
int i = principal.indexOf("@");
|
||||||
|
return (i > -1) ? principal.substring(0, i) : principal;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,37 @@
|
||||||
|
/**
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License. See accompanying LICENSE file.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.thrift;
|
||||||
|
|
||||||
|
public class HttpAuthenticationException extends Exception {
|
||||||
|
private static final long serialVersionUID = 0;
|
||||||
|
/**
|
||||||
|
* @param cause original exception
|
||||||
|
*/
|
||||||
|
public HttpAuthenticationException(Throwable cause) {
|
||||||
|
super(cause);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* @param msg exception message
|
||||||
|
*/
|
||||||
|
public HttpAuthenticationException(String msg) {
|
||||||
|
super(msg);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* @param msg exception message
|
||||||
|
* @param cause original exception
|
||||||
|
*/
|
||||||
|
public HttpAuthenticationException(String msg, Throwable cause) {
|
||||||
|
super(msg, cause);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,178 @@
|
||||||
|
package org.apache.hadoop.hbase.thrift;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
|
||||||
|
import javax.servlet.ServletException;
|
||||||
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.commons.net.util.Base64;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.security.SecurityUtil;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||||
|
import org.apache.hadoop.security.authorize.ProxyUsers;
|
||||||
|
import org.apache.thrift.TProcessor;
|
||||||
|
import org.apache.thrift.protocol.TProtocolFactory;
|
||||||
|
import org.apache.thrift.server.TServlet;
|
||||||
|
import org.ietf.jgss.GSSContext;
|
||||||
|
import org.ietf.jgss.GSSCredential;
|
||||||
|
import org.ietf.jgss.GSSException;
|
||||||
|
import org.ietf.jgss.GSSManager;
|
||||||
|
import org.ietf.jgss.GSSName;
|
||||||
|
import org.ietf.jgss.Oid;
|
||||||
|
|
||||||
|
|
||||||
|
public class ThriftHttpServlet extends TServlet {
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
public static final Log LOG = LogFactory.getLog(ThriftHttpServlet.class.getName());
|
||||||
|
private final UserGroupInformation realUser;
|
||||||
|
private final Configuration conf;
|
||||||
|
private final boolean securityEnabled;
|
||||||
|
private final boolean doAsEnabled;
|
||||||
|
ThriftServerRunner.HBaseHandler hbaseHandler;
|
||||||
|
|
||||||
|
public ThriftHttpServlet(TProcessor processor, TProtocolFactory protocolFactory,
|
||||||
|
UserGroupInformation realUser, Configuration conf, ThriftServerRunner.HBaseHandler
|
||||||
|
hbaseHandler, boolean securityEnabled, boolean doAsEnabled) {
|
||||||
|
super(processor, protocolFactory);
|
||||||
|
this.realUser = realUser;
|
||||||
|
this.conf = conf;
|
||||||
|
this.hbaseHandler = hbaseHandler;
|
||||||
|
this.securityEnabled = securityEnabled;
|
||||||
|
this.doAsEnabled = doAsEnabled;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doPost(HttpServletRequest request, HttpServletResponse response)
|
||||||
|
throws ServletException, IOException {
|
||||||
|
String effectiveUser = realUser.getShortUserName();
|
||||||
|
if (securityEnabled) {
|
||||||
|
try {
|
||||||
|
// As Thrift HTTP transport doesn't support SPNEGO yet (THRIFT-889),
|
||||||
|
// Kerberos authentication is being done at servlet level.
|
||||||
|
effectiveUser = doKerberosAuth(request);
|
||||||
|
} catch (HttpAuthenticationException e) {
|
||||||
|
LOG.error("Kerberos Authentication failed", e);
|
||||||
|
// Send a 401 to the client
|
||||||
|
response.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
|
||||||
|
response.getWriter().println("Authentication Error: " + e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
String doAsUserFromQuery = request.getHeader("doAs");
|
||||||
|
if (doAsUserFromQuery != null) {
|
||||||
|
if (!doAsEnabled) {
|
||||||
|
throw new ServletException("Support for proxyuser is not configured");
|
||||||
|
}
|
||||||
|
// create and attempt to authorize a proxy user (the client is attempting
|
||||||
|
// to do proxy user)
|
||||||
|
UserGroupInformation ugi = UserGroupInformation.createProxyUser(doAsUserFromQuery, realUser);
|
||||||
|
// validate the proxy user authorization
|
||||||
|
try {
|
||||||
|
ProxyUsers.authorize(ugi, request.getRemoteAddr());
|
||||||
|
} catch (AuthorizationException e) {
|
||||||
|
throw new ServletException(e.getMessage());
|
||||||
|
}
|
||||||
|
effectiveUser = doAsUserFromQuery;
|
||||||
|
}
|
||||||
|
hbaseHandler.setEffectiveUser(effectiveUser);
|
||||||
|
super.doPost(request, response);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Do the GSS-API kerberos authentication.
|
||||||
|
* We already have a logged in subject in the form of serviceUGI,
|
||||||
|
* which GSS-API will extract information from.
|
||||||
|
*/
|
||||||
|
private String doKerberosAuth(HttpServletRequest request)
|
||||||
|
throws HttpAuthenticationException {
|
||||||
|
try {
|
||||||
|
return realUser.doAs(new HttpKerberosServerAction(request, realUser));
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Failed to perform authentication");
|
||||||
|
throw new HttpAuthenticationException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class HttpKerberosServerAction implements PrivilegedExceptionAction<String> {
|
||||||
|
HttpServletRequest request;
|
||||||
|
UserGroupInformation serviceUGI;
|
||||||
|
HttpKerberosServerAction(HttpServletRequest request, UserGroupInformation serviceUGI) {
|
||||||
|
this.request = request;
|
||||||
|
this.serviceUGI = serviceUGI;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String run() throws HttpAuthenticationException {
|
||||||
|
// Get own Kerberos credentials for accepting connection
|
||||||
|
GSSManager manager = GSSManager.getInstance();
|
||||||
|
GSSContext gssContext = null;
|
||||||
|
String serverPrincipal = SecurityUtil.getPrincipalWithoutRealm(serviceUGI.getUserName());
|
||||||
|
try {
|
||||||
|
// This Oid for Kerberos GSS-API mechanism.
|
||||||
|
Oid kerberosMechOid = new Oid("1.2.840.113554.1.2.2");
|
||||||
|
// Oid for SPNego GSS-API mechanism.
|
||||||
|
Oid spnegoMechOid = new Oid("1.3.6.1.5.5.2");
|
||||||
|
// Oid for kerberos principal name
|
||||||
|
Oid krb5PrincipalOid = new Oid("1.2.840.113554.1.2.2.1");
|
||||||
|
// GSS name for server
|
||||||
|
GSSName serverName = manager.createName(serverPrincipal, krb5PrincipalOid);
|
||||||
|
// GSS credentials for server
|
||||||
|
GSSCredential serverCreds = manager.createCredential(serverName,
|
||||||
|
GSSCredential.DEFAULT_LIFETIME,
|
||||||
|
new Oid[]{kerberosMechOid, spnegoMechOid},
|
||||||
|
GSSCredential.ACCEPT_ONLY);
|
||||||
|
// Create a GSS context
|
||||||
|
gssContext = manager.createContext(serverCreds);
|
||||||
|
// Get service ticket from the authorization header
|
||||||
|
String serviceTicketBase64 = getAuthHeader(request);
|
||||||
|
byte[] inToken = Base64.decodeBase64(serviceTicketBase64.getBytes());
|
||||||
|
gssContext.acceptSecContext(inToken, 0, inToken.length);
|
||||||
|
// Authenticate or deny based on its context completion
|
||||||
|
if (!gssContext.isEstablished()) {
|
||||||
|
throw new HttpAuthenticationException("Kerberos authentication failed: " +
|
||||||
|
"unable to establish context with the service ticket " +
|
||||||
|
"provided by the client.");
|
||||||
|
}
|
||||||
|
return SecurityUtil.getUserFromPrincipal(gssContext.getSrcName().toString());
|
||||||
|
} catch (GSSException e) {
|
||||||
|
throw new HttpAuthenticationException("Kerberos authentication failed: ", e);
|
||||||
|
} finally {
|
||||||
|
if (gssContext != null) {
|
||||||
|
try {
|
||||||
|
gssContext.dispose();
|
||||||
|
} catch (GSSException e) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the base64 encoded auth header payload
|
||||||
|
*
|
||||||
|
* @throws HttpAuthenticationException if a remote or network exception occurs
|
||||||
|
*/
|
||||||
|
private String getAuthHeader(HttpServletRequest request)
|
||||||
|
throws HttpAuthenticationException {
|
||||||
|
String authHeader = request.getHeader("Authorization");
|
||||||
|
// Each http request must have an Authorization header
|
||||||
|
if (authHeader == null || authHeader.isEmpty()) {
|
||||||
|
throw new HttpAuthenticationException("Authorization header received " +
|
||||||
|
"from the client is empty.");
|
||||||
|
}
|
||||||
|
String authHeaderBase64String;
|
||||||
|
int beginIndex = ("Negotiate ").length();
|
||||||
|
authHeaderBase64String = authHeader.substring(beginIndex);
|
||||||
|
// Authorization header must have a payload
|
||||||
|
if (authHeaderBase64String == null || authHeaderBase64String.isEmpty()) {
|
||||||
|
throw new HttpAuthenticationException("Authorization header received " +
|
||||||
|
"from the client does not contain any data.");
|
||||||
|
}
|
||||||
|
return authHeaderBase64String;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -28,10 +28,10 @@ import org.apache.commons.cli.Options;
|
||||||
import org.apache.commons.cli.PosixParser;
|
import org.apache.commons.cli.PosixParser;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.http.InfoServer;
|
import org.apache.hadoop.hbase.http.InfoServer;
|
||||||
import org.apache.hadoop.hbase.thrift.ThriftServerRunner.ImplType;
|
import org.apache.hadoop.hbase.thrift.ThriftServerRunner.ImplType;
|
||||||
import org.apache.hadoop.hbase.util.VersionInfo;
|
import org.apache.hadoop.hbase.util.VersionInfo;
|
||||||
|
|
|
@ -50,7 +50,6 @@ import org.apache.commons.cli.Option;
|
||||||
import org.apache.commons.cli.OptionGroup;
|
import org.apache.commons.cli.OptionGroup;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
@ -61,6 +60,7 @@ import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.client.Append;
|
import org.apache.hadoop.hbase.client.Append;
|
||||||
import org.apache.hadoop.hbase.client.Delete;
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.Durability;
|
import org.apache.hadoop.hbase.client.Durability;
|
||||||
|
@ -99,6 +99,7 @@ import org.apache.hadoop.hbase.util.Strings;
|
||||||
import org.apache.hadoop.net.DNS;
|
import org.apache.hadoop.net.DNS;
|
||||||
import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
|
import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.authorize.ProxyUsers;
|
||||||
import org.apache.thrift.TException;
|
import org.apache.thrift.TException;
|
||||||
import org.apache.thrift.TProcessor;
|
import org.apache.thrift.TProcessor;
|
||||||
import org.apache.thrift.protocol.TBinaryProtocol;
|
import org.apache.thrift.protocol.TBinaryProtocol;
|
||||||
|
@ -108,6 +109,7 @@ import org.apache.thrift.protocol.TProtocolFactory;
|
||||||
import org.apache.thrift.server.THsHaServer;
|
import org.apache.thrift.server.THsHaServer;
|
||||||
import org.apache.thrift.server.TNonblockingServer;
|
import org.apache.thrift.server.TNonblockingServer;
|
||||||
import org.apache.thrift.server.TServer;
|
import org.apache.thrift.server.TServer;
|
||||||
|
import org.apache.thrift.server.TServlet;
|
||||||
import org.apache.thrift.server.TThreadedSelectorServer;
|
import org.apache.thrift.server.TThreadedSelectorServer;
|
||||||
import org.apache.thrift.transport.TFramedTransport;
|
import org.apache.thrift.transport.TFramedTransport;
|
||||||
import org.apache.thrift.transport.TNonblockingServerSocket;
|
import org.apache.thrift.transport.TNonblockingServerSocket;
|
||||||
|
@ -116,6 +118,13 @@ import org.apache.thrift.transport.TSaslServerTransport;
|
||||||
import org.apache.thrift.transport.TServerSocket;
|
import org.apache.thrift.transport.TServerSocket;
|
||||||
import org.apache.thrift.transport.TServerTransport;
|
import org.apache.thrift.transport.TServerTransport;
|
||||||
import org.apache.thrift.transport.TTransportFactory;
|
import org.apache.thrift.transport.TTransportFactory;
|
||||||
|
import org.mortbay.jetty.Connector;
|
||||||
|
import org.mortbay.jetty.Server;
|
||||||
|
import org.mortbay.jetty.nio.SelectChannelConnector;
|
||||||
|
import org.mortbay.jetty.security.SslSelectChannelConnector;
|
||||||
|
import org.mortbay.jetty.servlet.Context;
|
||||||
|
import org.mortbay.jetty.servlet.ServletHolder;
|
||||||
|
import org.mortbay.thread.QueuedThreadPool;
|
||||||
|
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
@ -139,6 +148,15 @@ public class ThriftServerRunner implements Runnable {
|
||||||
static final String MAX_FRAME_SIZE_CONF_KEY = "hbase.regionserver.thrift.framed.max_frame_size_in_mb";
|
static final String MAX_FRAME_SIZE_CONF_KEY = "hbase.regionserver.thrift.framed.max_frame_size_in_mb";
|
||||||
static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
|
static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
|
||||||
static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
|
static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
|
||||||
|
static final String USE_HTTP_CONF_KEY = "hbase.regionserver.thrift.http";
|
||||||
|
static final String HTTP_MIN_THREADS = "hbase.thrift.http_threads.min";
|
||||||
|
static final String HTTP_MAX_THREADS = "hbase.thrift.http_threads.max";
|
||||||
|
|
||||||
|
static final String THRIFT_SSL_ENABLED = "hbase.thrift.ssl.enabled";
|
||||||
|
static final String THRIFT_SSL_KEYSTORE_STORE = "hbase.thrift.ssl.keystore.store";
|
||||||
|
static final String THRIFT_SSL_KEYSTORE_PASSWORD = "hbase.thrift.ssl.keystore.password";
|
||||||
|
static final String THRIFT_SSL_KEYSTORE_KEYPASSWORD = "hbase.thrift.ssl.keystore.keypassword";
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Thrift quality of protection configuration key. Valid values can be:
|
* Thrift quality of protection configuration key. Valid values can be:
|
||||||
|
@ -154,10 +172,12 @@ public class ThriftServerRunner implements Runnable {
|
||||||
private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
|
private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
|
||||||
public static final int DEFAULT_LISTEN_PORT = 9090;
|
public static final int DEFAULT_LISTEN_PORT = 9090;
|
||||||
public static final int HREGION_VERSION = 1;
|
public static final int HREGION_VERSION = 1;
|
||||||
|
static final String THRIFT_SUPPORT_PROXYUSER = "hbase.thrift.support.proxyuser";
|
||||||
private final int listenPort;
|
private final int listenPort;
|
||||||
|
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
volatile TServer tserver;
|
volatile TServer tserver;
|
||||||
|
volatile Server httpServer;
|
||||||
private final Hbase.Iface handler;
|
private final Hbase.Iface handler;
|
||||||
private final ThriftMetrics metrics;
|
private final ThriftMetrics metrics;
|
||||||
private final HBaseHandler hbaseHandler;
|
private final HBaseHandler hbaseHandler;
|
||||||
|
@ -166,6 +186,9 @@ public class ThriftServerRunner implements Runnable {
|
||||||
private final String qop;
|
private final String qop;
|
||||||
private String host;
|
private String host;
|
||||||
|
|
||||||
|
private final boolean securityEnabled;
|
||||||
|
private final boolean doAsEnabled;
|
||||||
|
|
||||||
/** An enum of server implementation selections */
|
/** An enum of server implementation selections */
|
||||||
enum ImplType {
|
enum ImplType {
|
||||||
HS_HA("hsha", true, THsHaServer.class, true),
|
HS_HA("hsha", true, THsHaServer.class, true),
|
||||||
|
@ -267,7 +290,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
public ThriftServerRunner(Configuration conf) throws IOException {
|
public ThriftServerRunner(Configuration conf) throws IOException {
|
||||||
UserProvider userProvider = UserProvider.instantiate(conf);
|
UserProvider userProvider = UserProvider.instantiate(conf);
|
||||||
// login the server principal (if using secure Hadoop)
|
// login the server principal (if using secure Hadoop)
|
||||||
boolean securityEnabled = userProvider.isHadoopSecurityEnabled()
|
securityEnabled = userProvider.isHadoopSecurityEnabled()
|
||||||
&& userProvider.isHBaseSecurityEnabled();
|
&& userProvider.isHBaseSecurityEnabled();
|
||||||
if (securityEnabled) {
|
if (securityEnabled) {
|
||||||
host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
|
host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
|
||||||
|
@ -285,6 +308,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
hbaseHandler, metrics, conf);
|
hbaseHandler, metrics, conf);
|
||||||
this.realUser = userProvider.getCurrent().getUGI();
|
this.realUser = userProvider.getCurrent().getUGI();
|
||||||
qop = conf.get(THRIFT_QOP_KEY);
|
qop = conf.get(THRIFT_QOP_KEY);
|
||||||
|
doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER, false);
|
||||||
if (qop != null) {
|
if (qop != null) {
|
||||||
if (!qop.equals("auth") && !qop.equals("auth-int")
|
if (!qop.equals("auth") && !qop.equals("auth-int")
|
||||||
&& !qop.equals("auth-conf")) {
|
&& !qop.equals("auth-conf")) {
|
||||||
|
@ -303,21 +327,27 @@ public class ThriftServerRunner implements Runnable {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
realUser.doAs(
|
realUser.doAs(new PrivilegedAction<Object>() {
|
||||||
new PrivilegedAction<Object>() {
|
@Override
|
||||||
@Override
|
public Object run() {
|
||||||
public Object run() {
|
try {
|
||||||
try {
|
if (conf.getBoolean(USE_HTTP_CONF_KEY, false)) {
|
||||||
|
setupHTTPServer();
|
||||||
|
httpServer.start();
|
||||||
|
httpServer.join();
|
||||||
|
} else {
|
||||||
setupServer();
|
setupServer();
|
||||||
tserver.serve();
|
tserver.serve();
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.fatal("Cannot run ThriftServer", e);
|
|
||||||
// Crash the process if the ThriftServer is not running
|
|
||||||
System.exit(-1);
|
|
||||||
}
|
}
|
||||||
return null;
|
} catch (Exception e) {
|
||||||
|
LOG.fatal("Cannot run ThriftServer", e);
|
||||||
|
// Crash the process if the ThriftServer is not running
|
||||||
|
System.exit(-1);
|
||||||
}
|
}
|
||||||
});
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void shutdown() {
|
public void shutdown() {
|
||||||
|
@ -325,6 +355,70 @@ public class ThriftServerRunner implements Runnable {
|
||||||
tserver.stop();
|
tserver.stop();
|
||||||
tserver = null;
|
tserver = null;
|
||||||
}
|
}
|
||||||
|
if (httpServer != null) {
|
||||||
|
try {
|
||||||
|
httpServer.stop();
|
||||||
|
httpServer = null;
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Problem encountered in shutting down HTTP server " + e.getCause());
|
||||||
|
}
|
||||||
|
httpServer = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setupHTTPServer() throws IOException {
|
||||||
|
TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
|
||||||
|
TProcessor processor = new Hbase.Processor<Hbase.Iface>(handler);
|
||||||
|
TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, realUser,
|
||||||
|
conf, hbaseHandler, securityEnabled, doAsEnabled);
|
||||||
|
|
||||||
|
httpServer = new Server();
|
||||||
|
// Context handler
|
||||||
|
Context context = new Context(httpServer, "/", Context.SESSIONS);
|
||||||
|
context.setContextPath("/");
|
||||||
|
String httpPath = "/*";
|
||||||
|
httpServer.setHandler(context);
|
||||||
|
context.addServlet(new ServletHolder(thriftHttpServlet), httpPath);
|
||||||
|
|
||||||
|
// set up Jetty and run the embedded server
|
||||||
|
Connector connector = new SelectChannelConnector();
|
||||||
|
if(conf.getBoolean(THRIFT_SSL_ENABLED, false)) {
|
||||||
|
SslSelectChannelConnector sslConnector = new SslSelectChannelConnector();
|
||||||
|
String keystore = conf.get(THRIFT_SSL_KEYSTORE_STORE);
|
||||||
|
String password = HBaseConfiguration.getPassword(conf,
|
||||||
|
THRIFT_SSL_KEYSTORE_PASSWORD, null);
|
||||||
|
String keyPassword = HBaseConfiguration.getPassword(conf,
|
||||||
|
THRIFT_SSL_KEYSTORE_KEYPASSWORD, password);
|
||||||
|
sslConnector.setKeystore(keystore);
|
||||||
|
sslConnector.setPassword(password);
|
||||||
|
sslConnector.setKeyPassword(keyPassword);
|
||||||
|
connector = sslConnector;
|
||||||
|
}
|
||||||
|
String host = getBindAddress(conf).getHostAddress();
|
||||||
|
connector.setPort(listenPort);
|
||||||
|
connector.setHost(host);
|
||||||
|
httpServer.addConnector(connector);
|
||||||
|
|
||||||
|
if (doAsEnabled) {
|
||||||
|
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the default max thread number to 100 to limit
|
||||||
|
// the number of concurrent requests so that Thrfit HTTP server doesn't OOM easily.
|
||||||
|
// Jetty set the default max thread number to 250, if we don't set it.
|
||||||
|
//
|
||||||
|
// Our default min thread number 2 is the same as that used by Jetty.
|
||||||
|
int minThreads = conf.getInt(HTTP_MIN_THREADS, 2);
|
||||||
|
int maxThreads = conf.getInt(HTTP_MAX_THREADS, 100);
|
||||||
|
QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads);
|
||||||
|
threadPool.setMinThreads(minThreads);
|
||||||
|
httpServer.setThreadPool(threadPool);
|
||||||
|
|
||||||
|
httpServer.setSendServerVersion(false);
|
||||||
|
httpServer.setSendDateHeader(false);
|
||||||
|
httpServer.setStopAtShutdown(true);
|
||||||
|
|
||||||
|
LOG.info("Starting Thrift HTTP Server on " + Integer.toString(listenPort));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -0,0 +1,162 @@
|
||||||
|
/*
|
||||||
|
* Copyright The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.thrift;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
import org.apache.hadoop.hbase.thrift.generated.Hbase;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
|
||||||
|
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
|
||||||
|
import org.apache.thrift.protocol.TBinaryProtocol;
|
||||||
|
import org.apache.thrift.protocol.TProtocol;
|
||||||
|
import org.apache.thrift.transport.THttpClient;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import com.google.common.base.Joiner;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start the HBase Thrift HTTP server on a random port through the command-line
|
||||||
|
* interface and talk to it from client side.
|
||||||
|
*/
|
||||||
|
@Category({ClientTests.class, LargeTests.class})
|
||||||
|
|
||||||
|
public class TestThriftHttpServer {
|
||||||
|
|
||||||
|
public static final Log LOG =
|
||||||
|
LogFactory.getLog(TestThriftHttpServer.class);
|
||||||
|
|
||||||
|
private static final HBaseTestingUtility TEST_UTIL =
|
||||||
|
new HBaseTestingUtility();
|
||||||
|
|
||||||
|
private Thread httpServerThread;
|
||||||
|
private volatile Exception httpServerException;
|
||||||
|
|
||||||
|
private Exception clientSideException;
|
||||||
|
|
||||||
|
private ThriftServer thriftServer;
|
||||||
|
private int port;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.thrift.http", true);
|
||||||
|
TEST_UTIL.startMiniCluster();
|
||||||
|
//ensure that server time increments every time we do an operation, otherwise
|
||||||
|
//successive puts having the same timestamp will override each other
|
||||||
|
EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() throws Exception {
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
EnvironmentEdgeManager.reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startHttpServerThread(final String[] args) {
|
||||||
|
LOG.info("Starting HBase Thrift server with HTTP server: " + Joiner.on(" ").join(args));
|
||||||
|
|
||||||
|
httpServerException = null;
|
||||||
|
httpServerThread = new Thread(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
thriftServer.doMain(args);
|
||||||
|
} catch (Exception e) {
|
||||||
|
httpServerException = e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
httpServerThread.setName(ThriftServer.class.getSimpleName() +
|
||||||
|
"-httpServer");
|
||||||
|
httpServerThread.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=600000)
|
||||||
|
public void testRunThriftServer() throws Exception {
|
||||||
|
List<String> args = new ArrayList<String>();
|
||||||
|
port = HBaseTestingUtility.randomFreePort();
|
||||||
|
args.add("-" + ThriftServer.PORT_OPTION);
|
||||||
|
args.add(String.valueOf(port));
|
||||||
|
args.add("start");
|
||||||
|
|
||||||
|
thriftServer = new ThriftServer(TEST_UTIL.getConfiguration());
|
||||||
|
startHttpServerThread(args.toArray(new String[args.size()]));
|
||||||
|
|
||||||
|
// wait up to 10s for the server to start
|
||||||
|
for (int i = 0; i < 100
|
||||||
|
&& ( thriftServer.serverRunner == null || thriftServer.serverRunner.httpServer ==
|
||||||
|
null); i++) {
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
talkToThriftServer();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
clientSideException = ex;
|
||||||
|
} finally {
|
||||||
|
stopHttpServerThread();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (clientSideException != null) {
|
||||||
|
LOG.error("Thrift client threw an exception " + clientSideException);
|
||||||
|
throw new Exception(clientSideException);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static volatile boolean tableCreated = false;
|
||||||
|
|
||||||
|
private void talkToThriftServer() throws Exception {
|
||||||
|
THttpClient httpClient = new THttpClient(
|
||||||
|
"http://"+ HConstants.LOCALHOST + ":" + port);
|
||||||
|
httpClient.open();
|
||||||
|
try {
|
||||||
|
TProtocol prot;
|
||||||
|
prot = new TBinaryProtocol(httpClient);
|
||||||
|
Hbase.Client client = new Hbase.Client(prot);
|
||||||
|
if (!tableCreated){
|
||||||
|
TestThriftServer.createTestTables(client);
|
||||||
|
tableCreated = true;
|
||||||
|
}
|
||||||
|
TestThriftServer.checkTableList(client);
|
||||||
|
} finally {
|
||||||
|
httpClient.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void stopHttpServerThread() throws Exception {
|
||||||
|
LOG.debug("Stopping " + " Thrift HTTP server");
|
||||||
|
thriftServer.stop();
|
||||||
|
httpServerThread.join();
|
||||||
|
if (httpServerException != null) {
|
||||||
|
LOG.error("Command-line invocation of HBase Thrift server threw an " +
|
||||||
|
"exception", httpServerException);
|
||||||
|
throw new Exception(httpServerException);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -241,7 +241,5 @@ public class TestThriftServerCmdLine {
|
||||||
throw new Exception(cmdLineException);
|
throw new Exception(cmdLineException);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue