HBASE-19852 HBase Thrift should use a SPNEGO HTTP/hostname principal for checking HTTP Kerberos authentication
Return 401 sooner when AUTHORIZATION header is missing HBase Thrift server was checking for the AUTHORIZATION header and assuming it was always present even when it was the first request. Many clients will not send the AUTHORIZATION header until a 401 is received. HBase Thrift in the case of no header was throwing multiple exceptions and filling the logs with exceptions. This was fixed by checking that if the AUTHORIZATION header is empty then return a 401 immediately if security is enabled. Signed-off-by: Josh Elser <elserj@apache.org>
This commit is contained in:
parent
d44e8a7aff
commit
a8ea49bfb7
|
@ -284,6 +284,11 @@
|
||||||
<groupId>org.glassfish</groupId>
|
<groupId>org.glassfish</groupId>
|
||||||
<artifactId>javax.el</artifactId>
|
<artifactId>javax.el</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.kerby</groupId>
|
||||||
|
<artifactId>kerb-simplekdc</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<profiles>
|
<profiles>
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -18,6 +18,9 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.thrift;
|
package org.apache.hadoop.hbase.thrift;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.thrift.ThriftServerRunner.THRIFT_SPNEGO_KEYTAB_FILE_KEY;
|
||||||
|
import static org.apache.hadoop.hbase.thrift.ThriftServerRunner.THRIFT_SPNEGO_PRINCIPAL_KEY;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
|
||||||
|
@ -31,6 +34,7 @@ import org.apache.hadoop.hbase.util.Base64;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.authorize.AuthorizationException;
|
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||||
import org.apache.hadoop.security.authorize.ProxyUsers;
|
import org.apache.hadoop.security.authorize.ProxyUsers;
|
||||||
|
import org.apache.http.HttpHeaders;
|
||||||
import org.apache.thrift.TProcessor;
|
import org.apache.thrift.TProcessor;
|
||||||
import org.apache.thrift.protocol.TProtocolFactory;
|
import org.apache.thrift.protocol.TProtocolFactory;
|
||||||
import org.apache.thrift.server.TServlet;
|
import org.apache.thrift.server.TServlet;
|
||||||
|
@ -52,26 +56,35 @@ import org.slf4j.LoggerFactory;
|
||||||
public class ThriftHttpServlet extends TServlet {
|
public class ThriftHttpServlet extends TServlet {
|
||||||
private static final long serialVersionUID = 1L;
|
private static final long serialVersionUID = 1L;
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(ThriftHttpServlet.class.getName());
|
private static final Logger LOG = LoggerFactory.getLogger(ThriftHttpServlet.class.getName());
|
||||||
private transient final UserGroupInformation realUser;
|
private final transient UserGroupInformation serviceUGI;
|
||||||
private transient final Configuration conf;
|
private final transient UserGroupInformation httpUGI;
|
||||||
private final boolean securityEnabled;
|
private final transient ThriftServerRunner.HBaseHandler hbaseHandler;
|
||||||
private final boolean doAsEnabled;
|
private final boolean doAsEnabled;
|
||||||
private transient ThriftServerRunner.HBaseHandler hbaseHandler;
|
private final boolean securityEnabled;
|
||||||
|
|
||||||
// HTTP Header related constants.
|
// HTTP Header related constants.
|
||||||
public static final String WWW_AUTHENTICATE = "WWW-Authenticate";
|
|
||||||
public static final String AUTHORIZATION = "Authorization";
|
|
||||||
public static final String NEGOTIATE = "Negotiate";
|
public static final String NEGOTIATE = "Negotiate";
|
||||||
|
|
||||||
public ThriftHttpServlet(TProcessor processor, TProtocolFactory protocolFactory,
|
public ThriftHttpServlet(TProcessor processor, TProtocolFactory protocolFactory,
|
||||||
UserGroupInformation realUser, Configuration conf, ThriftServerRunner.HBaseHandler
|
UserGroupInformation serviceUGI, Configuration conf,
|
||||||
hbaseHandler, boolean securityEnabled, boolean doAsEnabled) {
|
ThriftServerRunner.HBaseHandler hbaseHandler, boolean securityEnabled, boolean doAsEnabled)
|
||||||
|
throws IOException {
|
||||||
super(processor, protocolFactory);
|
super(processor, protocolFactory);
|
||||||
this.realUser = realUser;
|
this.serviceUGI = serviceUGI;
|
||||||
this.conf = conf;
|
|
||||||
this.hbaseHandler = hbaseHandler;
|
this.hbaseHandler = hbaseHandler;
|
||||||
this.securityEnabled = securityEnabled;
|
this.securityEnabled = securityEnabled;
|
||||||
this.doAsEnabled = doAsEnabled;
|
this.doAsEnabled = doAsEnabled;
|
||||||
|
|
||||||
|
if (securityEnabled) {
|
||||||
|
// login the spnego principal
|
||||||
|
UserGroupInformation.setConfiguration(conf);
|
||||||
|
this.httpUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(
|
||||||
|
conf.get(THRIFT_SPNEGO_PRINCIPAL_KEY),
|
||||||
|
conf.get(THRIFT_SPNEGO_KEYTAB_FILE_KEY)
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
this.httpUGI = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -79,6 +92,19 @@ public class ThriftHttpServlet extends TServlet {
|
||||||
throws ServletException, IOException {
|
throws ServletException, IOException {
|
||||||
String effectiveUser = request.getRemoteUser();
|
String effectiveUser = request.getRemoteUser();
|
||||||
if (securityEnabled) {
|
if (securityEnabled) {
|
||||||
|
/*
|
||||||
|
Check that the AUTHORIZATION header has any content. If it does not then return a 401
|
||||||
|
requesting AUTHORIZATION header to be sent. This is typical where the first request doesn't
|
||||||
|
send the AUTHORIZATION header initially.
|
||||||
|
*/
|
||||||
|
String authHeader = request.getHeader(HttpHeaders.AUTHORIZATION);
|
||||||
|
if (authHeader == null || authHeader.isEmpty()) {
|
||||||
|
// Send a 401 to the client
|
||||||
|
response.addHeader(HttpHeaders.WWW_AUTHENTICATE, NEGOTIATE);
|
||||||
|
response.sendError(HttpServletResponse.SC_UNAUTHORIZED);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// As Thrift HTTP transport doesn't support SPNEGO yet (THRIFT-889),
|
// As Thrift HTTP transport doesn't support SPNEGO yet (THRIFT-889),
|
||||||
// Kerberos authentication is being done at servlet level.
|
// Kerberos authentication is being done at servlet level.
|
||||||
|
@ -86,20 +112,22 @@ public class ThriftHttpServlet extends TServlet {
|
||||||
effectiveUser = identity.principal;
|
effectiveUser = identity.principal;
|
||||||
// It is standard for client applications expect this header.
|
// It is standard for client applications expect this header.
|
||||||
// Please see http://tools.ietf.org/html/rfc4559 for more details.
|
// Please see http://tools.ietf.org/html/rfc4559 for more details.
|
||||||
response.addHeader(WWW_AUTHENTICATE, NEGOTIATE + " " + identity.outToken);
|
response.addHeader(HttpHeaders.WWW_AUTHENTICATE, NEGOTIATE + " " + identity.outToken);
|
||||||
} catch (HttpAuthenticationException e) {
|
} catch (HttpAuthenticationException e) {
|
||||||
LOG.error("Kerberos Authentication failed", e);
|
LOG.error("Kerberos Authentication failed", e);
|
||||||
// Send a 401 to the client
|
// Send a 401 to the client
|
||||||
response.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
|
response.addHeader(HttpHeaders.WWW_AUTHENTICATE, NEGOTIATE);
|
||||||
response.addHeader(WWW_AUTHENTICATE, NEGOTIATE);
|
response.sendError(HttpServletResponse.SC_UNAUTHORIZED,
|
||||||
response.getWriter().println("Authentication Error: " + e.getMessage());
|
"Authentication Error: " + e.getMessage());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
String doAsUserFromQuery = request.getHeader("doAs");
|
|
||||||
if(effectiveUser == null) {
|
if(effectiveUser == null) {
|
||||||
effectiveUser = realUser.getShortUserName();
|
effectiveUser = serviceUGI.getShortUserName();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
String doAsUserFromQuery = request.getHeader("doAs");
|
||||||
if (doAsUserFromQuery != null) {
|
if (doAsUserFromQuery != null) {
|
||||||
if (!doAsEnabled) {
|
if (!doAsEnabled) {
|
||||||
throw new ServletException("Support for proxyuser is not configured");
|
throw new ServletException("Support for proxyuser is not configured");
|
||||||
|
@ -112,9 +140,9 @@ public class ThriftHttpServlet extends TServlet {
|
||||||
remoteUser);
|
remoteUser);
|
||||||
// validate the proxy user authorization
|
// validate the proxy user authorization
|
||||||
try {
|
try {
|
||||||
ProxyUsers.authorize(ugi, request.getRemoteAddr(), conf);
|
ProxyUsers.authorize(ugi, request.getRemoteAddr());
|
||||||
} catch (AuthorizationException e) {
|
} catch (AuthorizationException e) {
|
||||||
throw new ServletException(e.getMessage());
|
throw new ServletException(e);
|
||||||
}
|
}
|
||||||
effectiveUser = doAsUserFromQuery;
|
effectiveUser = doAsUserFromQuery;
|
||||||
}
|
}
|
||||||
|
@ -124,17 +152,17 @@ public class ThriftHttpServlet extends TServlet {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Do the GSS-API kerberos authentication.
|
* Do the GSS-API kerberos authentication.
|
||||||
* We already have a logged in subject in the form of serviceUGI,
|
* We already have a logged in subject in the form of httpUGI,
|
||||||
* which GSS-API will extract information from.
|
* which GSS-API will extract information from.
|
||||||
*/
|
*/
|
||||||
private RemoteUserIdentity doKerberosAuth(HttpServletRequest request)
|
private RemoteUserIdentity doKerberosAuth(HttpServletRequest request)
|
||||||
throws HttpAuthenticationException {
|
throws HttpAuthenticationException {
|
||||||
HttpKerberosServerAction action = new HttpKerberosServerAction(request, realUser);
|
HttpKerberosServerAction action = new HttpKerberosServerAction(request, httpUGI);
|
||||||
try {
|
try {
|
||||||
String principal = realUser.doAs(action);
|
String principal = httpUGI.doAs(action);
|
||||||
return new RemoteUserIdentity(principal, action.outToken);
|
return new RemoteUserIdentity(principal, action.outToken);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Failed to perform authentication");
|
LOG.info("Failed to authenticate with {} kerberos principal", httpUGI.getUserName());
|
||||||
throw new HttpAuthenticationException(e);
|
throw new HttpAuthenticationException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -154,12 +182,12 @@ public class ThriftHttpServlet extends TServlet {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class HttpKerberosServerAction implements PrivilegedExceptionAction<String> {
|
private static class HttpKerberosServerAction implements PrivilegedExceptionAction<String> {
|
||||||
HttpServletRequest request;
|
final HttpServletRequest request;
|
||||||
UserGroupInformation serviceUGI;
|
final UserGroupInformation httpUGI;
|
||||||
String outToken = null;
|
String outToken = null;
|
||||||
HttpKerberosServerAction(HttpServletRequest request, UserGroupInformation serviceUGI) {
|
HttpKerberosServerAction(HttpServletRequest request, UserGroupInformation httpUGI) {
|
||||||
this.request = request;
|
this.request = request;
|
||||||
this.serviceUGI = serviceUGI;
|
this.httpUGI = httpUGI;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -167,7 +195,7 @@ public class ThriftHttpServlet extends TServlet {
|
||||||
// Get own Kerberos credentials for accepting connection
|
// Get own Kerberos credentials for accepting connection
|
||||||
GSSManager manager = GSSManager.getInstance();
|
GSSManager manager = GSSManager.getInstance();
|
||||||
GSSContext gssContext = null;
|
GSSContext gssContext = null;
|
||||||
String serverPrincipal = SecurityUtil.getPrincipalWithoutRealm(serviceUGI.getUserName());
|
String serverPrincipal = SecurityUtil.getPrincipalWithoutRealm(httpUGI.getUserName());
|
||||||
try {
|
try {
|
||||||
// This Oid for Kerberos GSS-API mechanism.
|
// This Oid for Kerberos GSS-API mechanism.
|
||||||
Oid kerberosMechOid = new Oid("1.2.840.113554.1.2.2");
|
Oid kerberosMechOid = new Oid("1.2.840.113554.1.2.2");
|
||||||
|
@ -218,7 +246,7 @@ public class ThriftHttpServlet extends TServlet {
|
||||||
*/
|
*/
|
||||||
private String getAuthHeader(HttpServletRequest request)
|
private String getAuthHeader(HttpServletRequest request)
|
||||||
throws HttpAuthenticationException {
|
throws HttpAuthenticationException {
|
||||||
String authHeader = request.getHeader(AUTHORIZATION);
|
String authHeader = request.getHeader(HttpHeaders.AUTHORIZATION);
|
||||||
// Each http request must have an Authorization header
|
// Each http request must have an Authorization header
|
||||||
if (authHeader == null || authHeader.isEmpty()) {
|
if (authHeader == null || authHeader.isEmpty()) {
|
||||||
throw new HttpAuthenticationException("Authorization header received " +
|
throw new HttpAuthenticationException("Authorization header received " +
|
||||||
|
@ -228,7 +256,7 @@ public class ThriftHttpServlet extends TServlet {
|
||||||
int beginIndex = (NEGOTIATE + " ").length();
|
int beginIndex = (NEGOTIATE + " ").length();
|
||||||
authHeaderBase64String = authHeader.substring(beginIndex);
|
authHeaderBase64String = authHeader.substring(beginIndex);
|
||||||
// Authorization header must have a payload
|
// Authorization header must have a payload
|
||||||
if (authHeaderBase64String == null || authHeaderBase64String.isEmpty()) {
|
if (authHeaderBase64String.isEmpty()) {
|
||||||
throw new HttpAuthenticationException("Authorization header received " +
|
throw new HttpAuthenticationException("Authorization header received " +
|
||||||
"from the client does not contain any data.");
|
"from the client does not contain any data.");
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,6 +52,7 @@ public class ThriftServer {
|
||||||
static final String COMPACT_OPTION = "compact";
|
static final String COMPACT_OPTION = "compact";
|
||||||
static final String FRAMED_OPTION = "framed";
|
static final String FRAMED_OPTION = "framed";
|
||||||
static final String PORT_OPTION = "port";
|
static final String PORT_OPTION = "port";
|
||||||
|
static final String INFOPORT_OPTION = "infoport";
|
||||||
|
|
||||||
private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
|
private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
|
||||||
private static final int DEFAULT_LISTEN_PORT = 9090;
|
private static final int DEFAULT_LISTEN_PORT = 9090;
|
||||||
|
@ -116,7 +117,7 @@ public class ThriftServer {
|
||||||
options.addOption("f", FRAMED_OPTION, false, "Use framed transport");
|
options.addOption("f", FRAMED_OPTION, false, "Use framed transport");
|
||||||
options.addOption("c", COMPACT_OPTION, false, "Use the compact protocol");
|
options.addOption("c", COMPACT_OPTION, false, "Use the compact protocol");
|
||||||
options.addOption("h", "help", false, "Print help information");
|
options.addOption("h", "help", false, "Print help information");
|
||||||
options.addOption(null, "infoport", true, "Port for web UI");
|
options.addOption(null, INFOPORT_OPTION, true, "Port for web UI");
|
||||||
|
|
||||||
options.addOption("m", MIN_WORKERS_OPTION, true,
|
options.addOption("m", MIN_WORKERS_OPTION, true,
|
||||||
"The minimum number of worker threads for " +
|
"The minimum number of worker threads for " +
|
||||||
|
@ -161,13 +162,14 @@ public class ThriftServer {
|
||||||
|
|
||||||
// check for user-defined info server port setting, if so override the conf
|
// check for user-defined info server port setting, if so override the conf
|
||||||
try {
|
try {
|
||||||
if (cmd.hasOption("infoport")) {
|
if (cmd.hasOption(INFOPORT_OPTION)) {
|
||||||
String val = cmd.getOptionValue("infoport");
|
String val = cmd.getOptionValue(INFOPORT_OPTION);
|
||||||
conf.setInt("hbase.thrift.info.port", Integer.parseInt(val));
|
conf.setInt("hbase.thrift.info.port", Integer.parseInt(val));
|
||||||
LOG.debug("Web UI port set to " + val);
|
LOG.debug("Web UI port set to " + val);
|
||||||
}
|
}
|
||||||
} catch (NumberFormatException e) {
|
} catch (NumberFormatException e) {
|
||||||
LOG.error("Could not parse the value provided for the infoport option", e);
|
LOG.error("Could not parse the value provided for the " + INFOPORT_OPTION +
|
||||||
|
" option", e);
|
||||||
printUsageAndExit(options, -1);
|
printUsageAndExit(options, -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -110,7 +110,6 @@ import org.apache.thrift.TException;
|
||||||
import org.apache.thrift.TProcessor;
|
import org.apache.thrift.TProcessor;
|
||||||
import org.apache.thrift.protocol.TBinaryProtocol;
|
import org.apache.thrift.protocol.TBinaryProtocol;
|
||||||
import org.apache.thrift.protocol.TCompactProtocol;
|
import org.apache.thrift.protocol.TCompactProtocol;
|
||||||
import org.apache.thrift.protocol.TProtocol;
|
|
||||||
import org.apache.thrift.protocol.TProtocolFactory;
|
import org.apache.thrift.protocol.TProtocolFactory;
|
||||||
import org.apache.thrift.server.THsHaServer;
|
import org.apache.thrift.server.THsHaServer;
|
||||||
import org.apache.thrift.server.TNonblockingServer;
|
import org.apache.thrift.server.TNonblockingServer;
|
||||||
|
@ -167,17 +166,28 @@ public class ThriftServerRunner implements Runnable {
|
||||||
static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
|
static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
|
||||||
static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
|
static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
|
||||||
static final String USE_HTTP_CONF_KEY = "hbase.regionserver.thrift.http";
|
static final String USE_HTTP_CONF_KEY = "hbase.regionserver.thrift.http";
|
||||||
static final String HTTP_MIN_THREADS = "hbase.thrift.http_threads.min";
|
static final String HTTP_MIN_THREADS_KEY = "hbase.thrift.http_threads.min";
|
||||||
static final String HTTP_MAX_THREADS = "hbase.thrift.http_threads.max";
|
static final String HTTP_MAX_THREADS_KEY = "hbase.thrift.http_threads.max";
|
||||||
|
|
||||||
static final String THRIFT_SSL_ENABLED = "hbase.thrift.ssl.enabled";
|
static final String THRIFT_SSL_ENABLED_KEY = "hbase.thrift.ssl.enabled";
|
||||||
static final String THRIFT_SSL_KEYSTORE_STORE = "hbase.thrift.ssl.keystore.store";
|
static final String THRIFT_SSL_KEYSTORE_STORE_KEY = "hbase.thrift.ssl.keystore.store";
|
||||||
static final String THRIFT_SSL_KEYSTORE_PASSWORD = "hbase.thrift.ssl.keystore.password";
|
static final String THRIFT_SSL_KEYSTORE_PASSWORD_KEY = "hbase.thrift.ssl.keystore.password";
|
||||||
static final String THRIFT_SSL_KEYSTORE_KEYPASSWORD = "hbase.thrift.ssl.keystore.keypassword";
|
static final String THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY = "hbase.thrift.ssl.keystore.keypassword";
|
||||||
static final String THRIFT_SSL_EXCLUDE_CIPHER_SUITES = "hbase.thrift.ssl.exclude.cipher.suites";
|
static final String THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY =
|
||||||
static final String THRIFT_SSL_INCLUDE_CIPHER_SUITES = "hbase.thrift.ssl.include.cipher.suites";
|
"hbase.thrift.ssl.exclude.cipher.suites";
|
||||||
static final String THRIFT_SSL_EXCLUDE_PROTOCOLS = "hbase.thrift.ssl.exclude.protocols";
|
static final String THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY =
|
||||||
static final String THRIFT_SSL_INCLUDE_PROTOCOLS = "hbase.thrift.ssl.include.protocols";
|
"hbase.thrift.ssl.include.cipher.suites";
|
||||||
|
static final String THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY = "hbase.thrift.ssl.exclude.protocols";
|
||||||
|
static final String THRIFT_SSL_INCLUDE_PROTOCOLS_KEY = "hbase.thrift.ssl.include.protocols";
|
||||||
|
|
||||||
|
static final String THRIFT_SUPPORT_PROXYUSER_KEY = "hbase.thrift.support.proxyuser";
|
||||||
|
|
||||||
|
static final String THRIFT_DNS_INTERFACE_KEY = "hbase.thrift.dns.interface";
|
||||||
|
static final String THRIFT_DNS_NAMESERVER_KEY = "hbase.thrift.dns.nameserver";
|
||||||
|
static final String THRIFT_KERBEROS_PRINCIPAL_KEY = "hbase.thrift.kerberos.principal";
|
||||||
|
static final String THRIFT_KEYTAB_FILE_KEY = "hbase.thrift.keytab.file";
|
||||||
|
static final String THRIFT_SPNEGO_PRINCIPAL_KEY = "hbase.thrift.spnego.principal";
|
||||||
|
static final String THRIFT_SPNEGO_KEYTAB_FILE_KEY = "hbase.thrift.spnego.keytab.file";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Amount of time in milliseconds before a server thread will timeout
|
* Amount of time in milliseconds before a server thread will timeout
|
||||||
|
@ -204,7 +214,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
|
private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
|
||||||
public static final int DEFAULT_LISTEN_PORT = 9090;
|
public static final int DEFAULT_LISTEN_PORT = 9090;
|
||||||
public static final int HREGION_VERSION = 1;
|
public static final int HREGION_VERSION = 1;
|
||||||
static final String THRIFT_SUPPORT_PROXYUSER = "hbase.thrift.support.proxyuser";
|
|
||||||
private final int listenPort;
|
private final int listenPort;
|
||||||
|
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
|
@ -213,7 +223,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
private final Hbase.Iface handler;
|
private final Hbase.Iface handler;
|
||||||
private final ThriftMetrics metrics;
|
private final ThriftMetrics metrics;
|
||||||
private final HBaseHandler hbaseHandler;
|
private final HBaseHandler hbaseHandler;
|
||||||
private final UserGroupInformation realUser;
|
private final UserGroupInformation serviceUGI;
|
||||||
|
|
||||||
private SaslUtil.QualityOfProtection qop;
|
private SaslUtil.QualityOfProtection qop;
|
||||||
private String host;
|
private String host;
|
||||||
|
@ -231,8 +241,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
HS_HA("hsha", true, THsHaServer.class, true),
|
HS_HA("hsha", true, THsHaServer.class, true),
|
||||||
NONBLOCKING("nonblocking", true, TNonblockingServer.class, true),
|
NONBLOCKING("nonblocking", true, TNonblockingServer.class, true),
|
||||||
THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
|
THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
|
||||||
THREADED_SELECTOR(
|
THREADED_SELECTOR("threadedselector", true, TThreadedSelectorServer.class, true);
|
||||||
"threadedselector", true, TThreadedSelectorServer.class, true);
|
|
||||||
|
|
||||||
public static final ImplType DEFAULT = THREAD_POOL;
|
public static final ImplType DEFAULT = THREAD_POOL;
|
||||||
|
|
||||||
|
@ -250,8 +259,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return <code>-option</code> so we can get the list of options from
|
* @return <code>-option</code>
|
||||||
* {@link #values()}
|
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
|
@ -329,45 +337,44 @@ public class ThriftServerRunner implements Runnable {
|
||||||
}
|
}
|
||||||
return l;
|
return l;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public ThriftServerRunner(Configuration conf) throws IOException {
|
public ThriftServerRunner(Configuration conf) throws IOException {
|
||||||
UserProvider userProvider = UserProvider.instantiate(conf);
|
|
||||||
// login the server principal (if using secure Hadoop)
|
// login the server principal (if using secure Hadoop)
|
||||||
|
UserProvider userProvider = UserProvider.instantiate(conf);
|
||||||
securityEnabled = userProvider.isHadoopSecurityEnabled()
|
securityEnabled = userProvider.isHadoopSecurityEnabled()
|
||||||
&& userProvider.isHBaseSecurityEnabled();
|
&& userProvider.isHBaseSecurityEnabled();
|
||||||
if (securityEnabled) {
|
if (securityEnabled) {
|
||||||
host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
|
host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
|
||||||
conf.get("hbase.thrift.dns.interface", "default"),
|
conf.get(THRIFT_DNS_INTERFACE_KEY, "default"),
|
||||||
conf.get("hbase.thrift.dns.nameserver", "default")));
|
conf.get(THRIFT_DNS_NAMESERVER_KEY, "default")));
|
||||||
userProvider.login("hbase.thrift.keytab.file",
|
userProvider.login(THRIFT_KEYTAB_FILE_KEY, THRIFT_KERBEROS_PRINCIPAL_KEY, host);
|
||||||
"hbase.thrift.kerberos.principal", host);
|
|
||||||
}
|
}
|
||||||
|
this.serviceUGI = userProvider.getCurrent().getUGI();
|
||||||
|
|
||||||
this.conf = HBaseConfiguration.create(conf);
|
this.conf = HBaseConfiguration.create(conf);
|
||||||
this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
|
this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
|
||||||
this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
|
this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
|
||||||
this.pauseMonitor = new JvmPauseMonitor(conf, this.metrics.getSource());
|
this.pauseMonitor = new JvmPauseMonitor(conf, this.metrics.getSource());
|
||||||
this.hbaseHandler = new HBaseHandler(conf, userProvider);
|
this.hbaseHandler = new HBaseHandler(conf, userProvider);
|
||||||
this.hbaseHandler.initMetrics(metrics);
|
this.hbaseHandler.initMetrics(metrics);
|
||||||
this.handler = HbaseHandlerMetricsProxy.newInstance(
|
this.handler = HbaseHandlerMetricsProxy.newInstance(hbaseHandler, metrics, conf);
|
||||||
hbaseHandler, metrics, conf);
|
|
||||||
this.realUser = userProvider.getCurrent().getUGI();
|
boolean httpEnabled = conf.getBoolean(USE_HTTP_CONF_KEY, false);
|
||||||
|
doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER_KEY, false);
|
||||||
|
if (doAsEnabled && !httpEnabled) {
|
||||||
|
LOG.warn("Fail to enable the doAs feature. " + USE_HTTP_CONF_KEY + " is not configured");
|
||||||
|
}
|
||||||
|
|
||||||
String strQop = conf.get(THRIFT_QOP_KEY);
|
String strQop = conf.get(THRIFT_QOP_KEY);
|
||||||
if (strQop != null) {
|
if (strQop != null) {
|
||||||
this.qop = SaslUtil.getQop(strQop);
|
this.qop = SaslUtil.getQop(strQop);
|
||||||
}
|
}
|
||||||
doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER, false);
|
|
||||||
if (doAsEnabled) {
|
|
||||||
if (!conf.getBoolean(USE_HTTP_CONF_KEY, false)) {
|
|
||||||
LOG.warn("Fail to enable the doAs feature. hbase.regionserver.thrift.http is not configured ");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (qop != null) {
|
if (qop != null) {
|
||||||
if (qop != QualityOfProtection.AUTHENTICATION &&
|
if (qop != QualityOfProtection.AUTHENTICATION &&
|
||||||
qop != QualityOfProtection.INTEGRITY &&
|
qop != QualityOfProtection.INTEGRITY &&
|
||||||
qop != QualityOfProtection.PRIVACY) {
|
qop != QualityOfProtection.PRIVACY) {
|
||||||
throw new IOException(String.format("Invalide %s: It must be one of %s, %s, or %s.",
|
throw new IOException(String.format("Invalid %s: It must be one of %s, %s, or %s.",
|
||||||
THRIFT_QOP_KEY,
|
THRIFT_QOP_KEY,
|
||||||
QualityOfProtection.AUTHENTICATION.name(),
|
QualityOfProtection.AUTHENTICATION.name(),
|
||||||
QualityOfProtection.INTEGRITY.name(),
|
QualityOfProtection.INTEGRITY.name(),
|
||||||
|
@ -375,8 +382,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
}
|
}
|
||||||
checkHttpSecurity(qop, conf);
|
checkHttpSecurity(qop, conf);
|
||||||
if (!securityEnabled) {
|
if (!securityEnabled) {
|
||||||
throw new IOException("Thrift server must"
|
throw new IOException("Thrift server must run in secure mode to support authentication");
|
||||||
+ " run in secure mode to support authentication");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -384,9 +390,9 @@ public class ThriftServerRunner implements Runnable {
|
||||||
private void checkHttpSecurity(QualityOfProtection qop, Configuration conf) {
|
private void checkHttpSecurity(QualityOfProtection qop, Configuration conf) {
|
||||||
if (qop == QualityOfProtection.PRIVACY &&
|
if (qop == QualityOfProtection.PRIVACY &&
|
||||||
conf.getBoolean(USE_HTTP_CONF_KEY, false) &&
|
conf.getBoolean(USE_HTTP_CONF_KEY, false) &&
|
||||||
!conf.getBoolean(THRIFT_SSL_ENABLED, false)) {
|
!conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false)) {
|
||||||
throw new IllegalArgumentException("Thrift HTTP Server's QoP is privacy, but " +
|
throw new IllegalArgumentException("Thrift HTTP Server's QoP is privacy, but " +
|
||||||
THRIFT_SSL_ENABLED + " is false");
|
THRIFT_SSL_ENABLED_KEY + " is false");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -395,7 +401,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
realUser.doAs(new PrivilegedAction<Object>() {
|
serviceUGI.doAs(new PrivilegedAction<Object>() {
|
||||||
@Override
|
@Override
|
||||||
public Object run() {
|
public Object run() {
|
||||||
try {
|
try {
|
||||||
|
@ -432,7 +438,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
httpServer.stop();
|
httpServer.stop();
|
||||||
httpServer = null;
|
httpServer = null;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Problem encountered in shutting down HTTP server " + e.getCause());
|
LOG.error("Problem encountered in shutting down HTTP server", e);
|
||||||
}
|
}
|
||||||
httpServer = null;
|
httpServer = null;
|
||||||
}
|
}
|
||||||
|
@ -441,7 +447,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
private void setupHTTPServer() throws IOException {
|
private void setupHTTPServer() throws IOException {
|
||||||
TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
|
TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
|
||||||
TProcessor processor = new Hbase.Processor<>(handler);
|
TProcessor processor = new Hbase.Processor<>(handler);
|
||||||
TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, realUser,
|
TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, serviceUGI,
|
||||||
conf, hbaseHandler, securityEnabled, doAsEnabled);
|
conf, hbaseHandler, securityEnabled, doAsEnabled);
|
||||||
|
|
||||||
// Set the default max thread number to 100 to limit
|
// Set the default max thread number to 100 to limit
|
||||||
|
@ -449,8 +455,8 @@ public class ThriftServerRunner implements Runnable {
|
||||||
// Jetty set the default max thread number to 250, if we don't set it.
|
// Jetty set the default max thread number to 250, if we don't set it.
|
||||||
//
|
//
|
||||||
// Our default min thread number 2 is the same as that used by Jetty.
|
// Our default min thread number 2 is the same as that used by Jetty.
|
||||||
int minThreads = conf.getInt(HTTP_MIN_THREADS, 2);
|
int minThreads = conf.getInt(HTTP_MIN_THREADS_KEY, 2);
|
||||||
int maxThreads = conf.getInt(HTTP_MAX_THREADS, 100);
|
int maxThreads = conf.getInt(HTTP_MAX_THREADS_KEY, 100);
|
||||||
QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads);
|
QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads);
|
||||||
threadPool.setMinThreads(minThreads);
|
threadPool.setMinThreads(minThreads);
|
||||||
httpServer = new Server(threadPool);
|
httpServer = new Server(threadPool);
|
||||||
|
@ -472,39 +478,39 @@ public class ThriftServerRunner implements Runnable {
|
||||||
httpConfig.setSendDateHeader(false);
|
httpConfig.setSendDateHeader(false);
|
||||||
|
|
||||||
ServerConnector serverConnector;
|
ServerConnector serverConnector;
|
||||||
if(conf.getBoolean(THRIFT_SSL_ENABLED, false)) {
|
if(conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false)) {
|
||||||
HttpConfiguration httpsConfig = new HttpConfiguration(httpConfig);
|
HttpConfiguration httpsConfig = new HttpConfiguration(httpConfig);
|
||||||
httpsConfig.addCustomizer(new SecureRequestCustomizer());
|
httpsConfig.addCustomizer(new SecureRequestCustomizer());
|
||||||
|
|
||||||
SslContextFactory sslCtxFactory = new SslContextFactory();
|
SslContextFactory sslCtxFactory = new SslContextFactory();
|
||||||
String keystore = conf.get(THRIFT_SSL_KEYSTORE_STORE);
|
String keystore = conf.get(THRIFT_SSL_KEYSTORE_STORE_KEY);
|
||||||
String password = HBaseConfiguration.getPassword(conf,
|
String password = HBaseConfiguration.getPassword(conf,
|
||||||
THRIFT_SSL_KEYSTORE_PASSWORD, null);
|
THRIFT_SSL_KEYSTORE_PASSWORD_KEY, null);
|
||||||
String keyPassword = HBaseConfiguration.getPassword(conf,
|
String keyPassword = HBaseConfiguration.getPassword(conf,
|
||||||
THRIFT_SSL_KEYSTORE_KEYPASSWORD, password);
|
THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY, password);
|
||||||
sslCtxFactory.setKeyStorePath(keystore);
|
sslCtxFactory.setKeyStorePath(keystore);
|
||||||
sslCtxFactory.setKeyStorePassword(password);
|
sslCtxFactory.setKeyStorePassword(password);
|
||||||
sslCtxFactory.setKeyManagerPassword(keyPassword);
|
sslCtxFactory.setKeyManagerPassword(keyPassword);
|
||||||
|
|
||||||
String[] excludeCiphers = conf.getStrings(
|
String[] excludeCiphers = conf.getStrings(
|
||||||
THRIFT_SSL_EXCLUDE_CIPHER_SUITES, ArrayUtils.EMPTY_STRING_ARRAY);
|
THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
|
||||||
if (excludeCiphers.length != 0) {
|
if (excludeCiphers.length != 0) {
|
||||||
sslCtxFactory.setExcludeCipherSuites(excludeCiphers);
|
sslCtxFactory.setExcludeCipherSuites(excludeCiphers);
|
||||||
}
|
}
|
||||||
String[] includeCiphers = conf.getStrings(
|
String[] includeCiphers = conf.getStrings(
|
||||||
THRIFT_SSL_INCLUDE_CIPHER_SUITES, ArrayUtils.EMPTY_STRING_ARRAY);
|
THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
|
||||||
if (includeCiphers.length != 0) {
|
if (includeCiphers.length != 0) {
|
||||||
sslCtxFactory.setIncludeCipherSuites(includeCiphers);
|
sslCtxFactory.setIncludeCipherSuites(includeCiphers);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Disable SSLv3 by default due to "Poodle" Vulnerability - CVE-2014-3566
|
// Disable SSLv3 by default due to "Poodle" Vulnerability - CVE-2014-3566
|
||||||
String[] excludeProtocols = conf.getStrings(
|
String[] excludeProtocols = conf.getStrings(
|
||||||
THRIFT_SSL_EXCLUDE_PROTOCOLS, "SSLv3");
|
THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY, "SSLv3");
|
||||||
if (excludeProtocols.length != 0) {
|
if (excludeProtocols.length != 0) {
|
||||||
sslCtxFactory.setExcludeProtocols(excludeProtocols);
|
sslCtxFactory.setExcludeProtocols(excludeProtocols);
|
||||||
}
|
}
|
||||||
String[] includeProtocols = conf.getStrings(
|
String[] includeProtocols = conf.getStrings(
|
||||||
THRIFT_SSL_INCLUDE_PROTOCOLS, ArrayUtils.EMPTY_STRING_ARRAY);
|
THRIFT_SSL_INCLUDE_PROTOCOLS_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
|
||||||
if (includeProtocols.length != 0) {
|
if (includeProtocols.length != 0) {
|
||||||
sslCtxFactory.setIncludeProtocols(includeProtocols);
|
sslCtxFactory.setIncludeProtocols(includeProtocols);
|
||||||
}
|
}
|
||||||
|
@ -516,8 +522,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
serverConnector = new ServerConnector(httpServer, new HttpConnectionFactory(httpConfig));
|
serverConnector = new ServerConnector(httpServer, new HttpConnectionFactory(httpConfig));
|
||||||
}
|
}
|
||||||
serverConnector.setPort(listenPort);
|
serverConnector.setPort(listenPort);
|
||||||
String host = getBindAddress(conf).getHostAddress();
|
serverConnector.setHost(getBindAddress(conf).getHostAddress());
|
||||||
serverConnector.setHost(host);
|
|
||||||
httpServer.addConnector(serverConnector);
|
httpServer.addConnector(serverConnector);
|
||||||
httpServer.setStopAtShutdown(true);
|
httpServer.setStopAtShutdown(true);
|
||||||
|
|
||||||
|
@ -525,7 +530,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
|
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info("Starting Thrift HTTP Server on " + Integer.toString(listenPort));
|
LOG.info("Starting Thrift HTTP Server on {}", Integer.toString(listenPort));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -560,8 +565,11 @@ public class ThriftServerRunner implements Runnable {
|
||||||
transportFactory = new TTransportFactory();
|
transportFactory = new TTransportFactory();
|
||||||
} else {
|
} else {
|
||||||
// Extract the name from the principal
|
// Extract the name from the principal
|
||||||
String name = SecurityUtil.getUserFromPrincipal(
|
String thriftKerberosPrincipal = conf.get(THRIFT_KERBEROS_PRINCIPAL_KEY);
|
||||||
conf.get("hbase.thrift.kerberos.principal"));
|
if (thriftKerberosPrincipal == null) {
|
||||||
|
throw new IllegalArgumentException(THRIFT_KERBEROS_PRINCIPAL_KEY + " cannot be null");
|
||||||
|
}
|
||||||
|
String name = SecurityUtil.getUserFromPrincipal(thriftKerberosPrincipal);
|
||||||
Map<String, String> saslProperties = SaslUtil.initSaslProperties(qop.name());
|
Map<String, String> saslProperties = SaslUtil.initSaslProperties(qop.name());
|
||||||
TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
|
TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
|
||||||
saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
|
saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
|
||||||
|
@ -586,7 +594,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
} else {
|
} else {
|
||||||
ac.setAuthorized(true);
|
ac.setAuthorized(true);
|
||||||
String userName = SecurityUtil.getUserFromPrincipal(authzid);
|
String userName = SecurityUtil.getUserFromPrincipal(authzid);
|
||||||
LOG.info("Effective user: " + userName);
|
LOG.info("Effective user: {}", userName);
|
||||||
ac.setAuthorizedID(userName);
|
ac.setAuthorizedID(userName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -595,27 +603,21 @@ public class ThriftServerRunner implements Runnable {
|
||||||
transportFactory = saslFactory;
|
transportFactory = saslFactory;
|
||||||
|
|
||||||
// Create a processor wrapper, to get the caller
|
// Create a processor wrapper, to get the caller
|
||||||
processor = new TProcessor() {
|
processor = (inProt, outProt) -> {
|
||||||
@Override
|
TSaslServerTransport saslServerTransport =
|
||||||
public boolean process(TProtocol inProt,
|
(TSaslServerTransport)inProt.getTransport();
|
||||||
TProtocol outProt) throws TException {
|
SaslServer saslServer = saslServerTransport.getSaslServer();
|
||||||
TSaslServerTransport saslServerTransport =
|
String principal = saslServer.getAuthorizationID();
|
||||||
(TSaslServerTransport)inProt.getTransport();
|
hbaseHandler.setEffectiveUser(principal);
|
||||||
SaslServer saslServer = saslServerTransport.getSaslServer();
|
return p.process(inProt, outProt);
|
||||||
String principal = saslServer.getAuthorizationID();
|
|
||||||
hbaseHandler.setEffectiveUser(principal);
|
|
||||||
return p.process(inProt, outProt);
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
|
if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
|
||||||
LOG.error("Server types " + Joiner.on(", ").join(
|
LOG.error("Server types {} don't support IP address binding at the moment. See " +
|
||||||
ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " +
|
"https://issues.apache.org/jira/browse/HBASE-2155 for details.",
|
||||||
"address binding at the moment. See " +
|
Joiner.on(", ").join(ImplType.serversThatCannotSpecifyBindIP()));
|
||||||
"https://issues.apache.org/jira/browse/HBASE-2155 for details.");
|
throw new RuntimeException("-" + BIND_CONF_KEY + " not supported with " + implType);
|
||||||
throw new RuntimeException(
|
|
||||||
"-" + BIND_CONF_KEY + " not supported with " + implType);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Thrift's implementation uses '0' as a placeholder for 'use the default.'
|
// Thrift's implementation uses '0' as a placeholder for 'use the default.'
|
||||||
|
@ -656,8 +658,8 @@ public class ThriftServerRunner implements Runnable {
|
||||||
.protocolFactory(protocolFactory);
|
.protocolFactory(protocolFactory);
|
||||||
tserver = new TThreadedSelectorServer(serverArgs);
|
tserver = new TThreadedSelectorServer(serverArgs);
|
||||||
}
|
}
|
||||||
LOG.info("starting HBase " + implType.simpleClassName() +
|
LOG.info("starting HBase {} server on {}", implType.simpleClassName(),
|
||||||
" server on " + Integer.toString(listenPort));
|
Integer.toString(listenPort));
|
||||||
} else if (implType == ImplType.THREAD_POOL) {
|
} else if (implType == ImplType.THREAD_POOL) {
|
||||||
// Thread pool server. Get the IP address to bind to.
|
// Thread pool server. Get the IP address to bind to.
|
||||||
InetAddress listenAddress = getBindAddress(conf);
|
InetAddress listenAddress = getBindAddress(conf);
|
||||||
|
@ -743,11 +745,11 @@ public class ThriftServerRunner implements Runnable {
|
||||||
|
|
||||||
// nextScannerId and scannerMap are used to manage scanner state
|
// nextScannerId and scannerMap are used to manage scanner state
|
||||||
protected int nextScannerId = 0;
|
protected int nextScannerId = 0;
|
||||||
protected HashMap<Integer, ResultScannerWrapper> scannerMap = null;
|
protected HashMap<Integer, ResultScannerWrapper> scannerMap;
|
||||||
private ThriftMetrics metrics = null;
|
private ThriftMetrics metrics = null;
|
||||||
|
|
||||||
private final ConnectionCache connectionCache;
|
private final ConnectionCache connectionCache;
|
||||||
IncrementCoalescer coalescer = null;
|
IncrementCoalescer coalescer;
|
||||||
|
|
||||||
static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
|
static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
|
||||||
static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
|
static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
|
||||||
|
@ -913,8 +915,8 @@ public class ThriftServerRunner implements Runnable {
|
||||||
try {
|
try {
|
||||||
TableName[] tableNames = this.getAdmin().listTableNames();
|
TableName[] tableNames = this.getAdmin().listTableNames();
|
||||||
ArrayList<ByteBuffer> list = new ArrayList<>(tableNames.length);
|
ArrayList<ByteBuffer> list = new ArrayList<>(tableNames.length);
|
||||||
for (int i = 0; i < tableNames.length; i++) {
|
for (TableName tableName : tableNames) {
|
||||||
list.add(ByteBuffer.wrap(tableNames[i].getName()));
|
list.add(ByteBuffer.wrap(tableName.getName()));
|
||||||
}
|
}
|
||||||
return list;
|
return list;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -927,8 +929,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
* @return the list of regions in the given table, or an empty list if the table does not exist
|
* @return the list of regions in the given table, or an empty list if the table does not exist
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public List<TRegionInfo> getTableRegions(ByteBuffer tableName)
|
public List<TRegionInfo> getTableRegions(ByteBuffer tableName) throws IOError {
|
||||||
throws IOError {
|
|
||||||
try (RegionLocator locator = connectionCache.getRegionLocator(getBytes(tableName))) {
|
try (RegionLocator locator = connectionCache.getRegionLocator(getBytes(tableName))) {
|
||||||
List<HRegionLocation> regionLocations = locator.getAllRegionLocations();
|
List<HRegionLocation> regionLocations = locator.getAllRegionLocations();
|
||||||
List<TRegionInfo> results = new ArrayList<>(regionLocations.size());
|
List<TRegionInfo> results = new ArrayList<>(regionLocations.size());
|
||||||
|
@ -1318,7 +1319,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
public void deleteTable(ByteBuffer in_tableName) throws IOError {
|
public void deleteTable(ByteBuffer in_tableName) throws IOError {
|
||||||
TableName tableName = getTableName(in_tableName);
|
TableName tableName = getTableName(in_tableName);
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("deleteTable: table=" + tableName);
|
LOG.debug("deleteTable: table={}", tableName);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
if (!getAdmin().tableExists(tableName)) {
|
if (!getAdmin().tableExists(tableName)) {
|
||||||
|
@ -1335,8 +1336,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
public void mutateRow(ByteBuffer tableName, ByteBuffer row,
|
public void mutateRow(ByteBuffer tableName, ByteBuffer row,
|
||||||
List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes)
|
List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes)
|
||||||
throws IOError, IllegalArgument {
|
throws IOError, IllegalArgument {
|
||||||
mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP,
|
mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP, attributes);
|
||||||
attributes);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1366,8 +1366,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
} else {
|
} else {
|
||||||
delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
|
delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
|
||||||
}
|
}
|
||||||
delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
|
delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
|
||||||
: Durability.SKIP_WAL);
|
|
||||||
} else {
|
} else {
|
||||||
if(famAndQf.length == 1) {
|
if(famAndQf.length == 1) {
|
||||||
LOG.warn("No column qualifier specified. Delete is the only mutation supported "
|
LOG.warn("No column qualifier specified. Delete is the only mutation supported "
|
||||||
|
@ -1472,7 +1471,6 @@ public class ThriftServerRunner implements Runnable {
|
||||||
table.put(puts);
|
table.put(puts);
|
||||||
if (!deletes.isEmpty())
|
if (!deletes.isEmpty())
|
||||||
table.delete(deletes);
|
table.delete(deletes);
|
||||||
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn(e.getMessage(), e);
|
LOG.warn(e.getMessage(), e);
|
||||||
throw getIOError(e);
|
throw getIOError(e);
|
||||||
|
@ -1513,11 +1511,10 @@ public class ThriftServerRunner implements Runnable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void scannerClose(int id) throws IOError, IllegalArgument {
|
public void scannerClose(int id) throws IOError, IllegalArgument {
|
||||||
LOG.debug("scannerClose: id=" + id);
|
LOG.debug("scannerClose: id={}", id);
|
||||||
ResultScannerWrapper resultScannerWrapper = getScanner(id);
|
ResultScannerWrapper resultScannerWrapper = getScanner(id);
|
||||||
if (resultScannerWrapper == null) {
|
if (resultScannerWrapper == null) {
|
||||||
String message = "scanner ID is invalid";
|
LOG.warn("scanner ID is invalid");
|
||||||
LOG.warn(message);
|
|
||||||
throw new IllegalArgument("scanner ID is invalid");
|
throw new IllegalArgument("scanner ID is invalid");
|
||||||
}
|
}
|
||||||
resultScannerWrapper.getScanner().close();
|
resultScannerWrapper.getScanner().close();
|
||||||
|
@ -1527,7 +1524,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
@Override
|
@Override
|
||||||
public List<TRowResult> scannerGetList(int id,int nbRows)
|
public List<TRowResult> scannerGetList(int id,int nbRows)
|
||||||
throws IllegalArgument, IOError {
|
throws IllegalArgument, IOError {
|
||||||
LOG.debug("scannerGetList: id=" + id);
|
LOG.debug("scannerGetList: id={}", id);
|
||||||
ResultScannerWrapper resultScannerWrapper = getScanner(id);
|
ResultScannerWrapper resultScannerWrapper = getScanner(id);
|
||||||
if (null == resultScannerWrapper) {
|
if (null == resultScannerWrapper) {
|
||||||
String message = "scanner ID is invalid";
|
String message = "scanner ID is invalid";
|
||||||
|
@ -1535,7 +1532,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
throw new IllegalArgument("scanner ID is invalid");
|
throw new IllegalArgument("scanner ID is invalid");
|
||||||
}
|
}
|
||||||
|
|
||||||
Result [] results = null;
|
Result [] results;
|
||||||
try {
|
try {
|
||||||
results = resultScannerWrapper.getScanner().next(nbRows);
|
results = resultScannerWrapper.getScanner().next(nbRows);
|
||||||
if (null == results) {
|
if (null == results) {
|
||||||
|
@ -1578,7 +1575,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
if (tScan.isSetBatchSize()) {
|
if (tScan.isSetBatchSize()) {
|
||||||
scan.setBatch(tScan.getBatchSize());
|
scan.setBatch(tScan.getBatchSize());
|
||||||
}
|
}
|
||||||
if (tScan.isSetColumns() && tScan.getColumns().size() != 0) {
|
if (tScan.isSetColumns() && !tScan.getColumns().isEmpty()) {
|
||||||
for(ByteBuffer column : tScan.getColumns()) {
|
for(ByteBuffer column : tScan.getColumns()) {
|
||||||
byte [][] famQf = CellUtil.parseColumn(getBytes(column));
|
byte [][] famQf = CellUtil.parseColumn(getBytes(column));
|
||||||
if(famQf.length == 1) {
|
if(famQf.length == 1) {
|
||||||
|
@ -1618,7 +1615,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
table = getTable(tableName);
|
table = getTable(tableName);
|
||||||
Scan scan = new Scan(getBytes(startRow));
|
Scan scan = new Scan(getBytes(startRow));
|
||||||
addAttributes(scan, attributes);
|
addAttributes(scan, attributes);
|
||||||
if(columns != null && columns.size() != 0) {
|
if(columns != null && !columns.isEmpty()) {
|
||||||
for(ByteBuffer column : columns) {
|
for(ByteBuffer column : columns) {
|
||||||
byte [][] famQf = CellUtil.parseColumn(getBytes(column));
|
byte [][] famQf = CellUtil.parseColumn(getBytes(column));
|
||||||
if(famQf.length == 1) {
|
if(famQf.length == 1) {
|
||||||
|
@ -1648,7 +1645,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
table = getTable(tableName);
|
table = getTable(tableName);
|
||||||
Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
|
Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
|
||||||
addAttributes(scan, attributes);
|
addAttributes(scan, attributes);
|
||||||
if(columns != null && columns.size() != 0) {
|
if(columns != null && !columns.isEmpty()) {
|
||||||
for(ByteBuffer column : columns) {
|
for(ByteBuffer column : columns) {
|
||||||
byte [][] famQf = CellUtil.parseColumn(getBytes(column));
|
byte [][] famQf = CellUtil.parseColumn(getBytes(column));
|
||||||
if(famQf.length == 1) {
|
if(famQf.length == 1) {
|
||||||
|
@ -1682,7 +1679,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
Filter f = new WhileMatchFilter(
|
Filter f = new WhileMatchFilter(
|
||||||
new PrefixFilter(getBytes(startAndPrefix)));
|
new PrefixFilter(getBytes(startAndPrefix)));
|
||||||
scan.setFilter(f);
|
scan.setFilter(f);
|
||||||
if (columns != null && columns.size() != 0) {
|
if (columns != null && !columns.isEmpty()) {
|
||||||
for(ByteBuffer column : columns) {
|
for(ByteBuffer column : columns) {
|
||||||
byte [][] famQf = CellUtil.parseColumn(getBytes(column));
|
byte [][] famQf = CellUtil.parseColumn(getBytes(column));
|
||||||
if(famQf.length == 1) {
|
if(famQf.length == 1) {
|
||||||
|
@ -1712,7 +1709,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
Scan scan = new Scan(getBytes(startRow));
|
Scan scan = new Scan(getBytes(startRow));
|
||||||
addAttributes(scan, attributes);
|
addAttributes(scan, attributes);
|
||||||
scan.setTimeRange(0, timestamp);
|
scan.setTimeRange(0, timestamp);
|
||||||
if (columns != null && columns.size() != 0) {
|
if (columns != null && !columns.isEmpty()) {
|
||||||
for (ByteBuffer column : columns) {
|
for (ByteBuffer column : columns) {
|
||||||
byte [][] famQf = CellUtil.parseColumn(getBytes(column));
|
byte [][] famQf = CellUtil.parseColumn(getBytes(column));
|
||||||
if(famQf.length == 1) {
|
if(famQf.length == 1) {
|
||||||
|
@ -1743,7 +1740,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
|
Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
|
||||||
addAttributes(scan, attributes);
|
addAttributes(scan, attributes);
|
||||||
scan.setTimeRange(0, timestamp);
|
scan.setTimeRange(0, timestamp);
|
||||||
if (columns != null && columns.size() != 0) {
|
if (columns != null && !columns.isEmpty()) {
|
||||||
for (ByteBuffer column : columns) {
|
for (ByteBuffer column : columns) {
|
||||||
byte [][] famQf = CellUtil.parseColumn(getBytes(column));
|
byte [][] famQf = CellUtil.parseColumn(getBytes(column));
|
||||||
if(famQf.length == 1) {
|
if(famQf.length == 1) {
|
||||||
|
@ -1844,13 +1841,9 @@ public class ThriftServerRunner implements Runnable {
|
||||||
scan.setReversed(true);
|
scan.setReversed(true);
|
||||||
scan.addFamily(family);
|
scan.addFamily(family);
|
||||||
scan.setStartRow(row);
|
scan.setStartRow(row);
|
||||||
Table table = getTable(tableName);
|
try (Table table = getTable(tableName);
|
||||||
try (ResultScanner scanner = table.getScanner(scan)) {
|
ResultScanner scanner = table.getScanner(scan)) {
|
||||||
return scanner.next();
|
return scanner.next();
|
||||||
} finally{
|
|
||||||
if(table != null){
|
|
||||||
table.close();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1962,7 +1955,6 @@ public class ThriftServerRunner implements Runnable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static IOError getIOError(Throwable throwable) {
|
private static IOError getIOError(Throwable throwable) {
|
||||||
IOError error = new IOErrorWithCause(throwable);
|
IOError error = new IOErrorWithCause(throwable);
|
||||||
error.setMessage(Throwables.getStackTraceAsString(throwable));
|
error.setMessage(Throwables.getStackTraceAsString(throwable));
|
||||||
|
@ -2000,7 +1992,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class IOErrorWithCause extends IOError {
|
public static class IOErrorWithCause extends IOError {
|
||||||
private Throwable cause;
|
private final Throwable cause;
|
||||||
public IOErrorWithCause(Throwable cause) {
|
public IOErrorWithCause(Throwable cause) {
|
||||||
this.cause = cause;
|
this.cause = cause;
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,7 +58,6 @@ import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
|
||||||
*/
|
*/
|
||||||
@Category({ClientTests.class, LargeTests.class})
|
@Category({ClientTests.class, LargeTests.class})
|
||||||
public class TestThriftHttpServer {
|
public class TestThriftHttpServer {
|
||||||
|
|
||||||
@ClassRule
|
@ClassRule
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
HBaseClassTestRule.forClass(TestThriftHttpServer.class);
|
HBaseClassTestRule.forClass(TestThriftHttpServer.class);
|
||||||
|
@ -66,8 +65,7 @@ public class TestThriftHttpServer {
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(TestThriftHttpServer.class);
|
LoggerFactory.getLogger(TestThriftHttpServer.class);
|
||||||
|
|
||||||
private static final HBaseTestingUtility TEST_UTIL =
|
static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
new HBaseTestingUtility();
|
|
||||||
|
|
||||||
private Thread httpServerThread;
|
private Thread httpServerThread;
|
||||||
private volatile Exception httpServerException;
|
private volatile Exception httpServerException;
|
||||||
|
@ -75,7 +73,7 @@ public class TestThriftHttpServer {
|
||||||
private Exception clientSideException;
|
private Exception clientSideException;
|
||||||
|
|
||||||
private ThriftServer thriftServer;
|
private ThriftServer thriftServer;
|
||||||
private int port;
|
int port;
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUpBeforeClass() throws Exception {
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
@ -152,11 +150,14 @@ public class TestThriftHttpServer {
|
||||||
runThriftServer(0);
|
runThriftServer(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void runThriftServer(int customHeaderSize) throws Exception {
|
void runThriftServer(int customHeaderSize) throws Exception {
|
||||||
List<String> args = new ArrayList<>(3);
|
List<String> args = new ArrayList<>(3);
|
||||||
port = HBaseTestingUtility.randomFreePort();
|
port = HBaseTestingUtility.randomFreePort();
|
||||||
args.add("-" + ThriftServer.PORT_OPTION);
|
args.add("-" + ThriftServer.PORT_OPTION);
|
||||||
args.add(String.valueOf(port));
|
args.add(String.valueOf(port));
|
||||||
|
args.add("-" + ThriftServer.INFOPORT_OPTION);
|
||||||
|
int infoPort = HBaseTestingUtility.randomFreePort();
|
||||||
|
args.add(String.valueOf(infoPort));
|
||||||
args.add("start");
|
args.add("start");
|
||||||
|
|
||||||
thriftServer = new ThriftServer(TEST_UTIL.getConfiguration());
|
thriftServer = new ThriftServer(TEST_UTIL.getConfiguration());
|
||||||
|
@ -194,9 +195,9 @@ public class TestThriftHttpServer {
|
||||||
Assert.assertEquals(HttpURLConnection.HTTP_FORBIDDEN, conn.getResponseCode());
|
Assert.assertEquals(HttpURLConnection.HTTP_FORBIDDEN, conn.getResponseCode());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static volatile boolean tableCreated = false;
|
static volatile boolean tableCreated = false;
|
||||||
|
|
||||||
private void talkToThriftServer(String url, int customHeaderSize) throws Exception {
|
void talkToThriftServer(String url, int customHeaderSize) throws Exception {
|
||||||
THttpClient httpClient = new THttpClient(url);
|
THttpClient httpClient = new THttpClient(url);
|
||||||
httpClient.open();
|
httpClient.open();
|
||||||
|
|
||||||
|
|
|
@ -164,7 +164,7 @@ public class TestThriftServerCmdLine {
|
||||||
port = HBaseTestingUtility.randomFreePort();
|
port = HBaseTestingUtility.randomFreePort();
|
||||||
args.add("-" + ThriftServer.PORT_OPTION);
|
args.add("-" + ThriftServer.PORT_OPTION);
|
||||||
args.add(String.valueOf(port));
|
args.add(String.valueOf(port));
|
||||||
args.add("-infoport");
|
args.add("-" + ThriftServer.INFOPORT_OPTION);
|
||||||
int infoPort = HBaseTestingUtility.randomFreePort();
|
int infoPort = HBaseTestingUtility.randomFreePort();
|
||||||
args.add(String.valueOf(infoPort));
|
args.add(String.valueOf(infoPort));
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,280 @@
|
||||||
|
/*
|
||||||
|
* Copyright The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.thrift;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.thrift.ThriftServerRunner.THRIFT_KERBEROS_PRINCIPAL_KEY;
|
||||||
|
import static org.apache.hadoop.hbase.thrift.ThriftServerRunner.THRIFT_KEYTAB_FILE_KEY;
|
||||||
|
import static org.apache.hadoop.hbase.thrift.ThriftServerRunner.THRIFT_SPNEGO_KEYTAB_FILE_KEY;
|
||||||
|
import static org.apache.hadoop.hbase.thrift.ThriftServerRunner.THRIFT_SPNEGO_PRINCIPAL_KEY;
|
||||||
|
import static org.apache.hadoop.hbase.thrift.ThriftServerRunner.THRIFT_SUPPORT_PROXYUSER_KEY;
|
||||||
|
import static org.apache.hadoop.hbase.thrift.ThriftServerRunner.USE_HTTP_CONF_KEY;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.security.Principal;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import javax.security.auth.Subject;
|
||||||
|
import javax.security.auth.kerberos.KerberosTicket;
|
||||||
|
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
import org.apache.hadoop.hbase.thrift.generated.Hbase;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.security.authentication.util.KerberosName;
|
||||||
|
import org.apache.http.HttpHeaders;
|
||||||
|
import org.apache.http.auth.AuthSchemeProvider;
|
||||||
|
import org.apache.http.auth.AuthScope;
|
||||||
|
import org.apache.http.auth.KerberosCredentials;
|
||||||
|
import org.apache.http.client.config.AuthSchemes;
|
||||||
|
import org.apache.http.config.Lookup;
|
||||||
|
import org.apache.http.config.RegistryBuilder;
|
||||||
|
import org.apache.http.impl.auth.SPNegoSchemeFactory;
|
||||||
|
import org.apache.http.impl.client.BasicCredentialsProvider;
|
||||||
|
import org.apache.http.impl.client.CloseableHttpClient;
|
||||||
|
import org.apache.http.impl.client.HttpClients;
|
||||||
|
import org.apache.kerby.kerberos.kerb.KrbException;
|
||||||
|
import org.apache.kerby.kerberos.kerb.client.JaasKrbUtil;
|
||||||
|
import org.apache.kerby.kerberos.kerb.server.SimpleKdcServer;
|
||||||
|
import org.apache.thrift.protocol.TBinaryProtocol;
|
||||||
|
import org.apache.thrift.protocol.TProtocol;
|
||||||
|
import org.apache.thrift.transport.THttpClient;
|
||||||
|
import org.ietf.jgss.GSSCredential;
|
||||||
|
import org.ietf.jgss.GSSManager;
|
||||||
|
import org.ietf.jgss.GSSName;
|
||||||
|
import org.ietf.jgss.Oid;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start the HBase Thrift HTTP server on a random port through the command-line
|
||||||
|
* interface and talk to it from client side with SPNEGO security enabled.
|
||||||
|
*/
|
||||||
|
@Category({ClientTests.class, LargeTests.class})
|
||||||
|
public class TestThriftSpnegoHttpServer extends TestThriftHttpServer {
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestThriftSpnegoHttpServer.class);
|
||||||
|
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(TestThriftSpnegoHttpServer.class);
|
||||||
|
|
||||||
|
private static SimpleKdcServer kdc;
|
||||||
|
private static File serverKeytab;
|
||||||
|
private static File spnegoServerKeytab;
|
||||||
|
private static File clientKeytab;
|
||||||
|
|
||||||
|
private static String clientPrincipal;
|
||||||
|
private static String serverPrincipal;
|
||||||
|
private static String spnegoServerPrincipal;
|
||||||
|
|
||||||
|
private static void setupUser(SimpleKdcServer kdc, File keytab, String principal)
|
||||||
|
throws KrbException {
|
||||||
|
kdc.createPrincipal(principal);
|
||||||
|
kdc.exportPrincipal(principal, keytab);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static SimpleKdcServer buildMiniKdc() throws Exception {
|
||||||
|
SimpleKdcServer kdc = new SimpleKdcServer();
|
||||||
|
|
||||||
|
final File target = new File(System.getProperty("user.dir"), "target");
|
||||||
|
File kdcDir = new File(target, TestThriftSpnegoHttpServer.class.getSimpleName());
|
||||||
|
if (kdcDir.exists()) {
|
||||||
|
FileUtils.deleteDirectory(kdcDir);
|
||||||
|
}
|
||||||
|
kdcDir.mkdirs();
|
||||||
|
kdc.setWorkDir(kdcDir);
|
||||||
|
|
||||||
|
kdc.setKdcHost(HConstants.LOCALHOST);
|
||||||
|
int kdcPort = HBaseTestingUtility.randomFreePort();
|
||||||
|
kdc.setAllowTcp(true);
|
||||||
|
kdc.setAllowUdp(false);
|
||||||
|
kdc.setKdcTcpPort(kdcPort);
|
||||||
|
|
||||||
|
LOG.info("Starting KDC server at " + HConstants.LOCALHOST + ":" + kdcPort);
|
||||||
|
|
||||||
|
kdc.init();
|
||||||
|
|
||||||
|
return kdc;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void addSecurityConfigurations(Configuration conf) {
|
||||||
|
KerberosName.setRules("DEFAULT");
|
||||||
|
|
||||||
|
HBaseKerberosUtils.setKeytabFileForTesting(serverKeytab.getAbsolutePath());
|
||||||
|
HBaseKerberosUtils.setPrincipalForTesting(serverPrincipal);
|
||||||
|
HBaseKerberosUtils.setSecuredConfiguration(conf);
|
||||||
|
|
||||||
|
// if we drop support for hadoop-2.4.0 and hadoop-2.4.1,
|
||||||
|
// the following key should be changed.
|
||||||
|
// 1) DFS_NAMENODE_USER_NAME_KEY -> DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY
|
||||||
|
// 2) DFS_DATANODE_USER_NAME_KEY -> DFS_DATANODE_KERBEROS_PRINCIPAL_KEY
|
||||||
|
conf.set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, serverPrincipal);
|
||||||
|
conf.set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, serverKeytab.getAbsolutePath());
|
||||||
|
conf.set(DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY, serverPrincipal);
|
||||||
|
conf.set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, serverKeytab.getAbsolutePath());
|
||||||
|
|
||||||
|
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
|
||||||
|
|
||||||
|
conf.set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, spnegoServerPrincipal);
|
||||||
|
conf.set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY,
|
||||||
|
spnegoServerKeytab.getAbsolutePath());
|
||||||
|
|
||||||
|
conf.setBoolean("ignore.secure.ports.for.testing", true);
|
||||||
|
|
||||||
|
conf.setBoolean(THRIFT_SUPPORT_PROXYUSER_KEY, true);
|
||||||
|
conf.setBoolean(USE_HTTP_CONF_KEY, true);
|
||||||
|
conf.set("hadoop.proxyuser.hbase.hosts", "*");
|
||||||
|
conf.set("hadoop.proxyuser.hbase.groups", "*");
|
||||||
|
|
||||||
|
conf.set(THRIFT_KERBEROS_PRINCIPAL_KEY, serverPrincipal);
|
||||||
|
conf.set(THRIFT_KEYTAB_FILE_KEY, serverKeytab.getAbsolutePath());
|
||||||
|
conf.set(THRIFT_SPNEGO_PRINCIPAL_KEY, spnegoServerPrincipal);
|
||||||
|
conf.set(THRIFT_SPNEGO_KEYTAB_FILE_KEY, spnegoServerKeytab.getAbsolutePath());
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
final File target = new File(System.getProperty("user.dir"), "target");
|
||||||
|
assertTrue(target.exists());
|
||||||
|
|
||||||
|
File keytabDir = new File(target, TestThriftSpnegoHttpServer.class.getSimpleName() +
|
||||||
|
"_keytabs");
|
||||||
|
if (keytabDir.exists()) {
|
||||||
|
FileUtils.deleteDirectory(keytabDir);
|
||||||
|
}
|
||||||
|
keytabDir.mkdirs();
|
||||||
|
|
||||||
|
kdc = buildMiniKdc();
|
||||||
|
kdc.start();
|
||||||
|
|
||||||
|
clientPrincipal = "client@" + kdc.getKdcConfig().getKdcRealm();
|
||||||
|
clientKeytab = new File(keytabDir, clientPrincipal + ".keytab");
|
||||||
|
setupUser(kdc, clientKeytab, clientPrincipal);
|
||||||
|
|
||||||
|
serverPrincipal = "hbase/" + HConstants.LOCALHOST + "@" + kdc.getKdcConfig().getKdcRealm();
|
||||||
|
serverKeytab = new File(keytabDir, serverPrincipal.replace('/', '_') + ".keytab");
|
||||||
|
setupUser(kdc, serverKeytab, serverPrincipal);
|
||||||
|
|
||||||
|
spnegoServerPrincipal = "HTTP/" + HConstants.LOCALHOST + "@" + kdc.getKdcConfig().getKdcRealm();
|
||||||
|
spnegoServerKeytab = new File(keytabDir, spnegoServerPrincipal.replace('/', '_') + ".keytab");
|
||||||
|
setupUser(kdc, spnegoServerKeytab, spnegoServerPrincipal);
|
||||||
|
|
||||||
|
TEST_UTIL.getConfiguration().setBoolean(USE_HTTP_CONF_KEY, true);
|
||||||
|
TEST_UTIL.getConfiguration().setBoolean("hbase.table.sanity.checks", false);
|
||||||
|
addSecurityConfigurations(TEST_UTIL.getConfiguration());
|
||||||
|
|
||||||
|
TestThriftHttpServer.setUpBeforeClass();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() throws Exception {
|
||||||
|
TestThriftHttpServer.tearDownAfterClass();
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (null != kdc) {
|
||||||
|
kdc.stop();
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.info("Failed to stop mini KDC", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void talkToThriftServer(String url, int customHeaderSize) throws Exception {
|
||||||
|
// Close httpClient and THttpClient automatically on any failures
|
||||||
|
try (
|
||||||
|
CloseableHttpClient httpClient = createHttpClient();
|
||||||
|
THttpClient tHttpClient = new THttpClient(url, httpClient)
|
||||||
|
) {
|
||||||
|
tHttpClient.open();
|
||||||
|
if (customHeaderSize > 0) {
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
for (int i = 0; i < customHeaderSize; i++) {
|
||||||
|
sb.append("a");
|
||||||
|
}
|
||||||
|
tHttpClient.setCustomHeader(HttpHeaders.USER_AGENT, sb.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
TProtocol prot = new TBinaryProtocol(tHttpClient);
|
||||||
|
Hbase.Client client = new Hbase.Client(prot);
|
||||||
|
if (!tableCreated) {
|
||||||
|
TestThriftServer.createTestTables(client);
|
||||||
|
tableCreated = true;
|
||||||
|
}
|
||||||
|
TestThriftServer.checkTableList(client);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private CloseableHttpClient createHttpClient() throws Exception {
|
||||||
|
final Subject clientSubject = JaasKrbUtil.loginUsingKeytab(clientPrincipal, clientKeytab);
|
||||||
|
final Set<Principal> clientPrincipals = clientSubject.getPrincipals();
|
||||||
|
// Make sure the subject has a principal
|
||||||
|
assertFalse(clientPrincipals.isEmpty());
|
||||||
|
|
||||||
|
// Get a TGT for the subject (might have many, different encryption types). The first should
|
||||||
|
// be the default encryption type.
|
||||||
|
Set<KerberosTicket> privateCredentials =
|
||||||
|
clientSubject.getPrivateCredentials(KerberosTicket.class);
|
||||||
|
assertFalse(privateCredentials.isEmpty());
|
||||||
|
KerberosTicket tgt = privateCredentials.iterator().next();
|
||||||
|
assertNotNull(tgt);
|
||||||
|
|
||||||
|
// The name of the principal
|
||||||
|
final String clientPrincipalName = clientPrincipals.iterator().next().getName();
|
||||||
|
|
||||||
|
return Subject.doAs(clientSubject, new PrivilegedExceptionAction<CloseableHttpClient>() {
|
||||||
|
@Override
|
||||||
|
public CloseableHttpClient run() throws Exception {
|
||||||
|
// Logs in with Kerberos via GSS
|
||||||
|
GSSManager gssManager = GSSManager.getInstance();
|
||||||
|
// jGSS Kerberos login constant
|
||||||
|
Oid oid = new Oid("1.2.840.113554.1.2.2");
|
||||||
|
GSSName gssClient = gssManager.createName(clientPrincipalName, GSSName.NT_USER_NAME);
|
||||||
|
GSSCredential credential = gssManager.createCredential(gssClient,
|
||||||
|
GSSCredential.DEFAULT_LIFETIME, oid, GSSCredential.INITIATE_ONLY);
|
||||||
|
|
||||||
|
Lookup<AuthSchemeProvider> authRegistry = RegistryBuilder.<AuthSchemeProvider>create()
|
||||||
|
.register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory(true, true))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
|
||||||
|
credentialsProvider.setCredentials(AuthScope.ANY, new KerberosCredentials(credential));
|
||||||
|
|
||||||
|
return HttpClients.custom()
|
||||||
|
.setDefaultAuthSchemeRegistry(authRegistry)
|
||||||
|
.setDefaultCredentialsProvider(credentialsProvider)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue