HDFS-9914. Fix configurable WebhDFS connect/read timeout. Contributed by Xiaoyu Yao.

This commit is contained in:
Arpit Agarwal 2017-10-27 07:43:54 -07:00
parent ce10c1ef05
commit 2510fc1801
2 changed files with 50 additions and 27 deletions

View File

@ -23,7 +23,6 @@ import java.net.HttpURLConnection;
import java.net.URL; import java.net.URL;
import java.net.URLConnection; import java.net.URLConnection;
import java.security.GeneralSecurityException; import java.security.GeneralSecurityException;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.HostnameVerifier; import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.HttpsURLConnection;
@ -32,7 +31,6 @@ import javax.net.ssl.SSLSocketFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.web.oauth2.OAuth2ConnectionConfigurator; import org.apache.hadoop.hdfs.web.oauth2.OAuth2ConnectionConfigurator;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL; import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
@ -84,22 +82,48 @@ public class URLConnectionFactory {
*/ */
public static URLConnectionFactory newDefaultURLConnectionFactory( public static URLConnectionFactory newDefaultURLConnectionFactory(
Configuration conf) { Configuration conf) {
ConnectionConfigurator conn = getSSLConnectionConfiguration(conf); ConnectionConfigurator conn = getSSLConnectionConfiguration(
DEFAULT_SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, conf);
return new URLConnectionFactory(conn); return new URLConnectionFactory(conn);
} }
/**
* Construct a new URLConnectionFactory based on the configuration. It will
* hornor connecTimeout and readTimeout when they are specified.
*/
public static URLConnectionFactory newDefaultURLConnectionFactory(
int connectTimeout, int readTimeout, Configuration conf) {
ConnectionConfigurator conn = getSSLConnectionConfiguration(
connectTimeout, readTimeout, conf);
return new URLConnectionFactory(conn);
}
private static ConnectionConfigurator getSSLConnectionConfiguration( private static ConnectionConfigurator getSSLConnectionConfiguration(
Configuration conf) { final int connectTimeout, final int readTimeout, Configuration conf) {
ConnectionConfigurator conn; ConnectionConfigurator conn;
try { try {
conn = newSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf); conn = newSslConnConfigurator(connectTimeout, readTimeout, conf);
} catch (Exception e) { } catch (Exception e) {
LOG.warn( LOG.warn(
"Cannot load customized ssl related configuration. Fallback to" + "Cannot load customized ssl related configuration. Fallback to" +
" system-generic settings.", " system-generic settings.",
e); e);
conn = DEFAULT_TIMEOUT_CONN_CONFIGURATOR; if (connectTimeout == DEFAULT_SOCKET_TIMEOUT &&
readTimeout == DEFAULT_SOCKET_TIMEOUT) {
conn = DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
} else {
conn = new ConnectionConfigurator() {
@Override
public HttpURLConnection configure(HttpURLConnection connection)
throws IOException {
URLConnectionFactory.setTimeouts(connection,
connectTimeout,
readTimeout);
return connection;
}
};
}
} }
return conn; return conn;
@ -110,11 +134,12 @@ public class URLConnectionFactory {
* It will also try to load the SSL configuration when they are specified. * It will also try to load the SSL configuration when they are specified.
*/ */
public static URLConnectionFactory newOAuth2URLConnectionFactory( public static URLConnectionFactory newOAuth2URLConnectionFactory(
Configuration conf) throws IOException { int connectTimeout, int readTimeout, Configuration conf)
throws IOException {
ConnectionConfigurator conn; ConnectionConfigurator conn;
try { try {
ConnectionConfigurator sslConnConfigurator ConnectionConfigurator sslConnConfigurator
= newSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf); = newSslConnConfigurator(connectTimeout, readTimeout, conf);
conn = new OAuth2ConnectionConfigurator(conf, sslConnConfigurator); conn = new OAuth2ConnectionConfigurator(conf, sslConnConfigurator);
} catch (Exception e) { } catch (Exception e) {
@ -128,33 +153,18 @@ public class URLConnectionFactory {
this.connConfigurator = connConfigurator; this.connConfigurator = connConfigurator;
} }
/**
* Create a new ConnectionConfigurator for SSL connections
*/
private static ConnectionConfigurator newSslConnConfigurator( private static ConnectionConfigurator newSslConnConfigurator(
final int defaultTimeout, Configuration conf) final int connectTimeout, final int readTimeout, Configuration conf)
throws IOException, GeneralSecurityException { throws IOException, GeneralSecurityException {
final SSLFactory factory; final SSLFactory factory;
final SSLSocketFactory sf; final SSLSocketFactory sf;
final HostnameVerifier hv; final HostnameVerifier hv;
final int connectTimeout;
final int readTimeout;
factory = new SSLFactory(SSLFactory.Mode.CLIENT, conf); factory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
factory.init(); factory.init();
sf = factory.createSSLSocketFactory(); sf = factory.createSSLSocketFactory();
hv = factory.getHostnameVerifier(); hv = factory.getHostnameVerifier();
connectTimeout = (int) conf.getTimeDuration(
HdfsClientConfigKeys.DFS_WEBHDFS_SOCKET_CONNECT_TIMEOUT_KEY,
defaultTimeout,
TimeUnit.MILLISECONDS);
readTimeout = (int) conf.getTimeDuration(
HdfsClientConfigKeys.DFS_WEBHDFS_SOCKET_READ_TIMEOUT_KEY,
defaultTimeout,
TimeUnit.MILLISECONDS);
return new ConnectionConfigurator() { return new ConnectionConfigurator() {
@Override @Override
public HttpURLConnection configure(HttpURLConnection conn) public HttpURLConnection configure(HttpURLConnection conn)
@ -222,7 +232,8 @@ public class URLConnectionFactory {
* *
* @param connection * @param connection
* URLConnection to set * URLConnection to set
* @param socketTimeout * @param connectTimeout
* @param readTimeout
* the connection and read timeout of the connection. * the connection and read timeout of the connection.
*/ */
private static void setTimeouts(URLConnection connection, private static void setTimeouts(URLConnection connection,

View File

@ -46,6 +46,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.StringTokenizer; import java.util.StringTokenizer;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
@ -193,6 +194,17 @@ public class WebHdfsFileSystem extends FileSystem
HdfsClientConfigKeys.DFS_WEBHDFS_ACL_PERMISSION_PATTERN_KEY, HdfsClientConfigKeys.DFS_WEBHDFS_ACL_PERMISSION_PATTERN_KEY,
HdfsClientConfigKeys.DFS_WEBHDFS_ACL_PERMISSION_PATTERN_DEFAULT)); HdfsClientConfigKeys.DFS_WEBHDFS_ACL_PERMISSION_PATTERN_DEFAULT));
int connectTimeout = (int) conf.getTimeDuration(
HdfsClientConfigKeys.DFS_WEBHDFS_SOCKET_CONNECT_TIMEOUT_KEY,
URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT,
TimeUnit.MILLISECONDS);
int readTimeout = (int) conf.getTimeDuration(
HdfsClientConfigKeys.DFS_WEBHDFS_SOCKET_READ_TIMEOUT_KEY,
URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT,
TimeUnit.MILLISECONDS);
boolean isOAuth = conf.getBoolean( boolean isOAuth = conf.getBoolean(
HdfsClientConfigKeys.DFS_WEBHDFS_OAUTH_ENABLED_KEY, HdfsClientConfigKeys.DFS_WEBHDFS_OAUTH_ENABLED_KEY,
HdfsClientConfigKeys.DFS_WEBHDFS_OAUTH_ENABLED_DEFAULT); HdfsClientConfigKeys.DFS_WEBHDFS_OAUTH_ENABLED_DEFAULT);
@ -200,11 +212,11 @@ public class WebHdfsFileSystem extends FileSystem
if(isOAuth) { if(isOAuth) {
LOG.debug("Enabling OAuth2 in WebHDFS"); LOG.debug("Enabling OAuth2 in WebHDFS");
connectionFactory = URLConnectionFactory connectionFactory = URLConnectionFactory
.newOAuth2URLConnectionFactory(conf); .newOAuth2URLConnectionFactory(connectTimeout, readTimeout, conf);
} else { } else {
LOG.debug("Not enabling OAuth2 in WebHDFS"); LOG.debug("Not enabling OAuth2 in WebHDFS");
connectionFactory = URLConnectionFactory connectionFactory = URLConnectionFactory
.newDefaultURLConnectionFactory(conf); .newDefaultURLConnectionFactory(connectTimeout, readTimeout, conf);
} }