HDFS-5538. URLConnectionFactory should pick up the SSL related configuration by default. Contributed by Haohui Mai.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1545491 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c4bdddeab5
commit
d8a2383461
|
@ -22,19 +22,14 @@ import java.io.IOException;
|
|||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
import java.net.URLConnection;
|
||||
import java.net.UnknownHostException;
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.ServiceLoader;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.security.auth.Subject;
|
||||
import javax.security.auth.kerberos.KerberosPrincipal;
|
||||
import javax.security.auth.kerberos.KerberosTicket;
|
||||
|
||||
|
@ -44,22 +39,19 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.http.HttpConfig;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||
import org.apache.hadoop.security.ssl.SSLFactory;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenInfo;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
//this will need to be replaced someday when there is a suitable replacement
|
||||
import sun.net.dns.ResolverConfiguration;
|
||||
import sun.net.util.IPAddressUtil;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
|
||||
@InterfaceStability.Evolving
|
||||
public class SecurityUtil {
|
||||
|
@ -73,22 +65,12 @@ public class SecurityUtil {
|
|||
@VisibleForTesting
|
||||
static HostResolver hostResolver;
|
||||
|
||||
private static SSLFactory sslFactory;
|
||||
|
||||
static {
|
||||
Configuration conf = new Configuration();
|
||||
boolean useIp = conf.getBoolean(
|
||||
CommonConfigurationKeys.HADOOP_SECURITY_TOKEN_SERVICE_USE_IP,
|
||||
CommonConfigurationKeys.HADOOP_SECURITY_TOKEN_SERVICE_USE_IP_DEFAULT);
|
||||
setTokenServiceUseIp(useIp);
|
||||
if (HttpConfig.isSecure()) {
|
||||
sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
|
||||
try {
|
||||
sslFactory.init();
|
||||
} catch (Exception ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -102,29 +84,6 @@ public class SecurityUtil {
|
|||
: new StandardHostResolver();
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the original TGT within the current subject's credentials. Cross-realm
|
||||
* TGT's of the form "krbtgt/TWO.COM@ONE.COM" may be present.
|
||||
*
|
||||
* @return The TGT from the current subject
|
||||
* @throws IOException
|
||||
* if TGT can't be found
|
||||
*/
|
||||
private static KerberosTicket getTgtFromSubject() throws IOException {
|
||||
Subject current = Subject.getSubject(AccessController.getContext());
|
||||
if (current == null) {
|
||||
throw new IOException(
|
||||
"Can't get TGT from current Subject, because it is null");
|
||||
}
|
||||
Set<KerberosTicket> tickets = current
|
||||
.getPrivateCredentials(KerberosTicket.class);
|
||||
for (KerberosTicket t : tickets) {
|
||||
if (isOriginalTGT(t))
|
||||
return t;
|
||||
}
|
||||
throw new IOException("Failed to find TGT from current Subject:"+current);
|
||||
}
|
||||
|
||||
/**
|
||||
* TGS must have the server principal of the form "krbtgt/FOO@FOO".
|
||||
* @param principal
|
||||
|
@ -492,30 +451,6 @@ public class SecurityUtil {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Open a (if need be) secure connection to a URL in a secure environment
|
||||
* that is using SPNEGO to authenticate its URLs. All Namenode and Secondary
|
||||
* Namenode URLs that are protected via SPNEGO should be accessed via this
|
||||
* method.
|
||||
*
|
||||
* @param url to authenticate via SPNEGO.
|
||||
* @return A connection that has been authenticated via SPNEGO
|
||||
* @throws IOException If unable to authenticate via SPNEGO
|
||||
*/
|
||||
public static URLConnection openSecureHttpConnection(URL url) throws IOException {
|
||||
if (!HttpConfig.isSecure() && !UserGroupInformation.isSecurityEnabled()) {
|
||||
return url.openConnection();
|
||||
}
|
||||
|
||||
AuthenticatedURL.Token token = new AuthenticatedURL.Token();
|
||||
try {
|
||||
return new AuthenticatedURL(null, sslFactory).openConnection(url, token);
|
||||
} catch (AuthenticationException e) {
|
||||
throw new IOException("Exception trying to open authenticated connection to "
|
||||
+ url, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolves a host subject to the security requirements determined by
|
||||
* hadoop.security.token.service.use_ip.
|
||||
|
|
|
@ -212,6 +212,9 @@ Trunk (Unreleased)
|
|||
and INodeFileUnderConstructionWithSnapshot with FileUnderContructionFeature.
|
||||
(jing9 via szetszwo)
|
||||
|
||||
HDFS-5538. URLConnectionFactory should pick up the SSL related configuration
|
||||
by default. (Haohui Mai via jing9)
|
||||
|
||||
OPTIMIZATIONS
|
||||
HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)
|
||||
|
||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.server.namenode.JournalSet;
|
|||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
|
||||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
||||
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
|
@ -87,6 +88,7 @@ public class QuorumJournalManager implements JournalManager {
|
|||
private final AsyncLoggerSet loggers;
|
||||
|
||||
private int outputBufferCapacity = 512 * 1024;
|
||||
private final URLConnectionFactory connectionFactory;
|
||||
|
||||
public QuorumJournalManager(Configuration conf,
|
||||
URI uri, NamespaceInfo nsInfo) throws IOException {
|
||||
|
@ -102,6 +104,8 @@ public class QuorumJournalManager implements JournalManager {
|
|||
this.uri = uri;
|
||||
this.nsInfo = nsInfo;
|
||||
this.loggers = new AsyncLoggerSet(createLoggers(loggerFactory));
|
||||
this.connectionFactory = URLConnectionFactory
|
||||
.newDefaultURLConnectionFactory(conf);
|
||||
|
||||
// Configure timeouts.
|
||||
this.startSegmentTimeoutMs = conf.getInt(
|
||||
|
@ -475,8 +479,8 @@ public class QuorumJournalManager implements JournalManager {
|
|||
URL url = logger.buildURLToFetchLogs(remoteLog.getStartTxId());
|
||||
|
||||
EditLogInputStream elis = EditLogFileInputStream.fromUrl(
|
||||
url, remoteLog.getStartTxId(), remoteLog.getEndTxId(),
|
||||
remoteLog.isInProgress());
|
||||
connectionFactory, url, remoteLog.getStartTxId(),
|
||||
remoteLog.getEndTxId(), remoteLog.isInProgress());
|
||||
allStreams.add(elis);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,8 +36,11 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage.HttpGetFailedException;
|
||||
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
@ -100,15 +103,22 @@ public class EditLogFileInputStream extends EditLogInputStream {
|
|||
/**
|
||||
* Open an EditLogInputStream for the given URL.
|
||||
*
|
||||
* @param url the url hosting the log
|
||||
* @param startTxId the expected starting txid
|
||||
* @param endTxId the expected ending txid
|
||||
* @param inProgress whether the log is in-progress
|
||||
* @param connectionFactory
|
||||
* the URLConnectionFactory used to create the connection.
|
||||
* @param url
|
||||
* the url hosting the log
|
||||
* @param startTxId
|
||||
* the expected starting txid
|
||||
* @param endTxId
|
||||
* the expected ending txid
|
||||
* @param inProgress
|
||||
* whether the log is in-progress
|
||||
* @return a stream from which edits may be read
|
||||
*/
|
||||
public static EditLogInputStream fromUrl(URL url, long startTxId,
|
||||
public static EditLogInputStream fromUrl(
|
||||
URLConnectionFactory connectionFactory, URL url, long startTxId,
|
||||
long endTxId, boolean inProgress) {
|
||||
return new EditLogFileInputStream(new URLLog(url),
|
||||
return new EditLogFileInputStream(new URLLog(connectionFactory, url),
|
||||
startTxId, endTxId, inProgress);
|
||||
}
|
||||
|
||||
|
@ -365,8 +375,12 @@ public class EditLogFileInputStream extends EditLogInputStream {
|
|||
private long advertisedSize = -1;
|
||||
|
||||
private final static String CONTENT_LENGTH = "Content-Length";
|
||||
private final URLConnectionFactory connectionFactory;
|
||||
private final boolean isSpnegoEnabled;
|
||||
|
||||
public URLLog(URL url) {
|
||||
public URLLog(URLConnectionFactory connectionFactory, URL url) {
|
||||
this.connectionFactory = connectionFactory;
|
||||
this.isSpnegoEnabled = UserGroupInformation.isSecurityEnabled();
|
||||
this.url = url;
|
||||
}
|
||||
|
||||
|
@ -376,8 +390,13 @@ public class EditLogFileInputStream extends EditLogInputStream {
|
|||
new PrivilegedExceptionAction<InputStream>() {
|
||||
@Override
|
||||
public InputStream run() throws IOException {
|
||||
HttpURLConnection connection = (HttpURLConnection)
|
||||
SecurityUtil.openSecureHttpConnection(url);
|
||||
HttpURLConnection connection;
|
||||
try {
|
||||
connection = (HttpURLConnection)
|
||||
connectionFactory.openConnection(url, isSpnegoEnabled);
|
||||
} catch (AuthenticationException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
|
||||
if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
|
||||
throw new HttpGetFailedException(
|
||||
|
|
|
@ -35,7 +35,8 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.http.HttpConfig;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
|
@ -46,6 +47,7 @@ import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
|||
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
||||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
|
||||
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
|
||||
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
|
||||
import org.apache.hadoop.io.MD5Hash;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
@ -62,6 +64,15 @@ public class TransferFsImage {
|
|||
public final static String MD5_HEADER = "X-MD5-Digest";
|
||||
@VisibleForTesting
|
||||
static int timeout = 0;
|
||||
private static URLConnectionFactory connectionFactory;
|
||||
private static boolean isSpnegoEnabled;
|
||||
|
||||
static {
|
||||
Configuration conf = new Configuration();
|
||||
connectionFactory = URLConnectionFactory
|
||||
.newDefaultURLConnectionFactory(conf);
|
||||
isSpnegoEnabled = UserGroupInformation.isSecurityEnabled();
|
||||
}
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TransferFsImage.class);
|
||||
|
||||
|
@ -250,8 +261,13 @@ public class TransferFsImage {
|
|||
public static MD5Hash doGetUrl(URL url, List<File> localPaths,
|
||||
Storage dstStorage, boolean getChecksum) throws IOException {
|
||||
long startTime = Time.monotonicNow();
|
||||
HttpURLConnection connection = (HttpURLConnection)
|
||||
SecurityUtil.openSecureHttpConnection(url);
|
||||
HttpURLConnection connection;
|
||||
try {
|
||||
connection = (HttpURLConnection)
|
||||
connectionFactory.openConnection(url, isSpnegoEnabled);
|
||||
} catch (AuthenticationException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
|
||||
if (timeout <= 0) {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
|
|
|
@ -36,9 +36,10 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|||
import org.apache.hadoop.hdfs.HAUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck;
|
||||
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
|
||||
import org.apache.hadoop.http.HttpConfig;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
@ -94,6 +95,8 @@ public class DFSck extends Configured implements Tool {
|
|||
|
||||
private final UserGroupInformation ugi;
|
||||
private final PrintStream out;
|
||||
private final URLConnectionFactory connectionFactory;
|
||||
private final boolean isSpnegoEnabled;
|
||||
|
||||
/**
|
||||
* Filesystem checker.
|
||||
|
@ -107,6 +110,9 @@ public class DFSck extends Configured implements Tool {
|
|||
super(conf);
|
||||
this.ugi = UserGroupInformation.getCurrentUser();
|
||||
this.out = out;
|
||||
this.connectionFactory = URLConnectionFactory
|
||||
.newDefaultURLConnectionFactory(conf);
|
||||
this.isSpnegoEnabled = UserGroupInformation.isSecurityEnabled();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -158,7 +164,12 @@ public class DFSck extends Configured implements Tool {
|
|||
url.append("&startblockafter=").append(String.valueOf(cookie));
|
||||
}
|
||||
URL path = new URL(url.toString());
|
||||
URLConnection connection = SecurityUtil.openSecureHttpConnection(path);
|
||||
URLConnection connection;
|
||||
try {
|
||||
connection = connectionFactory.openConnection(path, isSpnegoEnabled);
|
||||
} catch (AuthenticationException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
InputStream stream = connection.getInputStream();
|
||||
BufferedReader input = new BufferedReader(new InputStreamReader(
|
||||
stream, "UTF-8"));
|
||||
|
@ -278,7 +289,12 @@ public class DFSck extends Configured implements Tool {
|
|||
return listCorruptFileBlocks(dir, url.toString());
|
||||
}
|
||||
URL path = new URL(url.toString());
|
||||
URLConnection connection = SecurityUtil.openSecureHttpConnection(path);
|
||||
URLConnection connection;
|
||||
try {
|
||||
connection = connectionFactory.openConnection(path, isSpnegoEnabled);
|
||||
} catch (AuthenticationException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
InputStream stream = connection.getInputStream();
|
||||
BufferedReader input = new BufferedReader(new InputStreamReader(
|
||||
stream, "UTF-8"));
|
||||
|
|
|
@ -145,7 +145,7 @@ public class DelegationTokenFetcher {
|
|||
// default to using the local file system
|
||||
FileSystem local = FileSystem.getLocal(conf);
|
||||
final Path tokenFile = new Path(local.getWorkingDirectory(), remaining[0]);
|
||||
final URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
|
||||
final URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_SYSTEM_CONNECTION_FACTORY;
|
||||
|
||||
// Login the current user
|
||||
UserGroupInformation.getCurrentUser().doAs(
|
||||
|
|
|
@ -176,10 +176,9 @@ public class HftpFileSystem extends FileSystem
|
|||
* Initialize connectionFactory and tokenAspect. This function is intended to
|
||||
* be overridden by HsFtpFileSystem.
|
||||
*/
|
||||
protected void initConnectionFactoryAndTokenAspect(Configuration conf)
|
||||
protected void initTokenAspect(Configuration conf)
|
||||
throws IOException {
|
||||
tokenAspect = new TokenAspect<HftpFileSystem>(this, TOKEN_KIND);
|
||||
connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -187,6 +186,8 @@ public class HftpFileSystem extends FileSystem
|
|||
throws IOException {
|
||||
super.initialize(name, conf);
|
||||
setConf(conf);
|
||||
this.connectionFactory = URLConnectionFactory
|
||||
.newDefaultURLConnectionFactory(conf);
|
||||
this.ugi = UserGroupInformation.getCurrentUser();
|
||||
this.nnUri = getNamenodeUri(name);
|
||||
|
||||
|
@ -197,7 +198,7 @@ public class HftpFileSystem extends FileSystem
|
|||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
|
||||
initConnectionFactoryAndTokenAspect(conf);
|
||||
initTokenAspect(conf);
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
tokenAspect.initDelegationToken(ugi);
|
||||
}
|
||||
|
@ -338,7 +339,7 @@ public class HftpFileSystem extends FileSystem
|
|||
}
|
||||
|
||||
static class RangeHeaderUrlOpener extends ByteRangeInputStream.URLOpener {
|
||||
URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
|
||||
URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_SYSTEM_CONNECTION_FACTORY;
|
||||
|
||||
RangeHeaderUrlOpener(final URL url) {
|
||||
super(url);
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
package org.apache.hadoop.hdfs.web;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.GeneralSecurityException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
@ -61,18 +60,8 @@ public class HsftpFileSystem extends HftpFileSystem {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void initConnectionFactoryAndTokenAspect(Configuration conf) throws IOException {
|
||||
protected void initTokenAspect(Configuration conf) throws IOException {
|
||||
tokenAspect = new TokenAspect<HftpFileSystem>(this, TOKEN_KIND);
|
||||
|
||||
connectionFactory = new URLConnectionFactory(
|
||||
URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT);
|
||||
try {
|
||||
connectionFactory.setConnConfigurator(URLConnectionFactory
|
||||
.newSslConnConfigurator(URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT,
|
||||
conf));
|
||||
} catch (GeneralSecurityException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -17,10 +17,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.web;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.GeneralSecurityException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
||||
|
@ -44,20 +40,6 @@ public class SWebHdfsFileSystem extends WebHdfsFileSystem {
|
|||
tokenAspect = new TokenAspect<WebHdfsFileSystem>(this, TOKEN_KIND);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void initializeConnectionFactory(Configuration conf)
|
||||
throws IOException {
|
||||
connectionFactory = new URLConnectionFactory(
|
||||
URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT);
|
||||
try {
|
||||
connectionFactory.setConnConfigurator(URLConnectionFactory
|
||||
.newSslConnConfigurator(URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT,
|
||||
conf));
|
||||
} catch (GeneralSecurityException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getDefaultPort() {
|
||||
return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY,
|
||||
|
|
|
@ -39,6 +39,8 @@ import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
|||
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
|
||||
import org.apache.hadoop.security.ssl.SSLFactory;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* Utilities for handling URLs
|
||||
*/
|
||||
|
@ -54,26 +56,50 @@ public class URLConnectionFactory {
|
|||
* Timeout for socket connects and reads
|
||||
*/
|
||||
public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute
|
||||
private final ConnectionConfigurator connConfigurator;
|
||||
|
||||
public static final URLConnectionFactory DEFAULT_CONNECTION_FACTORY = new URLConnectionFactory(
|
||||
DEFAULT_SOCKET_TIMEOUT);
|
||||
|
||||
private int socketTimeout;
|
||||
|
||||
/** Configure connections for AuthenticatedURL */
|
||||
private ConnectionConfigurator connConfigurator = new ConnectionConfigurator() {
|
||||
private static final ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR = new ConnectionConfigurator() {
|
||||
@Override
|
||||
public HttpURLConnection configure(HttpURLConnection conn)
|
||||
throws IOException {
|
||||
URLConnectionFactory.setTimeouts(conn, socketTimeout);
|
||||
URLConnectionFactory.setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT);
|
||||
return conn;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* The URLConnectionFactory that sets the default timeout and it only trusts
|
||||
* Java's SSL certificates.
|
||||
*/
|
||||
public static final URLConnectionFactory DEFAULT_SYSTEM_CONNECTION_FACTORY = new URLConnectionFactory(
|
||||
DEFAULT_TIMEOUT_CONN_CONFIGURATOR);
|
||||
|
||||
/**
|
||||
* Construct a new URLConnectionFactory based on the configuration. It will
|
||||
* try to load SSL certificates when it is specified.
|
||||
*/
|
||||
public static URLConnectionFactory newDefaultURLConnectionFactory(Configuration conf) {
|
||||
ConnectionConfigurator conn = null;
|
||||
try {
|
||||
conn = newSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf);
|
||||
} catch (Exception e) {
|
||||
LOG.debug(
|
||||
"Cannot load customized ssl related configuration. Fallback to system-generic settings.",
|
||||
e);
|
||||
conn = DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
|
||||
}
|
||||
return new URLConnectionFactory(conn);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
URLConnectionFactory(ConnectionConfigurator connConfigurator) {
|
||||
this.connConfigurator = connConfigurator;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new ConnectionConfigurator for SSL connections
|
||||
*/
|
||||
static ConnectionConfigurator newSslConnConfigurator(final int timeout,
|
||||
private static ConnectionConfigurator newSslConnConfigurator(final int timeout,
|
||||
Configuration conf) throws IOException, GeneralSecurityException {
|
||||
final SSLFactory factory;
|
||||
final SSLSocketFactory sf;
|
||||
|
@ -99,10 +125,6 @@ public class URLConnectionFactory {
|
|||
};
|
||||
}
|
||||
|
||||
public URLConnectionFactory(int socketTimeout) {
|
||||
this.socketTimeout = socketTimeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* Opens a url with read and connect timeouts
|
||||
*
|
||||
|
@ -153,14 +175,6 @@ public class URLConnectionFactory {
|
|||
}
|
||||
}
|
||||
|
||||
public ConnectionConfigurator getConnConfigurator() {
|
||||
return connConfigurator;
|
||||
}
|
||||
|
||||
public void setConnConfigurator(ConnectionConfigurator connConfigurator) {
|
||||
this.connConfigurator = connConfigurator;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets timeout parameters on the given URLConnection.
|
||||
*
|
||||
|
@ -169,7 +183,7 @@ public class URLConnectionFactory {
|
|||
* @param socketTimeout
|
||||
* the connection and read timeout of the connection.
|
||||
*/
|
||||
static void setTimeouts(URLConnection connection, int socketTimeout) {
|
||||
private static void setTimeouts(URLConnection connection, int socketTimeout) {
|
||||
connection.setConnectTimeout(socketTimeout);
|
||||
connection.setReadTimeout(socketTimeout);
|
||||
}
|
||||
|
|
|
@ -112,7 +112,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
public static final String PATH_PREFIX = "/" + SCHEME + "/v" + VERSION;
|
||||
|
||||
/** Default connection factory may be overridden in tests to use smaller timeout values */
|
||||
URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
|
||||
protected URLConnectionFactory connectionFactory;
|
||||
|
||||
/** Delegation token kind */
|
||||
public static final Text TOKEN_KIND = new Text("WEBHDFS delegation");
|
||||
|
@ -152,22 +152,15 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
tokenAspect = new TokenAspect<WebHdfsFileSystem>(this, TOKEN_KIND);
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize connectionFactory. This function is intended to
|
||||
* be overridden by SWebHdfsFileSystem.
|
||||
*/
|
||||
protected void initializeConnectionFactory(Configuration conf)
|
||||
throws IOException {
|
||||
connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void initialize(URI uri, Configuration conf
|
||||
) throws IOException {
|
||||
super.initialize(uri, conf);
|
||||
setConf(conf);
|
||||
connectionFactory = URLConnectionFactory
|
||||
.newDefaultURLConnectionFactory(conf);
|
||||
initializeTokenAspect();
|
||||
initializeConnectionFactory(conf);
|
||||
|
||||
|
||||
ugi = UserGroupInformation.getCurrentUser();
|
||||
|
||||
|
|
|
@ -20,40 +20,41 @@ package org.apache.hadoop.hdfs.server.namenode;
|
|||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.mockito.Matchers.anyBoolean;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URL;
|
||||
import java.util.EnumMap;
|
||||
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.http.HttpServlet;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.util.Holder;
|
||||
import org.apache.hadoop.http.HttpServer;
|
||||
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
public class TestEditLogFileInputStream {
|
||||
private static final byte[] FAKE_LOG_DATA = TestEditLog.HADOOP20_SOME_EDITS;
|
||||
|
||||
@Test
|
||||
public void testReadURL() throws Exception {
|
||||
// Start a simple web server which hosts the log data.
|
||||
HttpServer server = new HttpServer.Builder().setName("test")
|
||||
.setBindAddress("0.0.0.0").setPort(0).setFindPort(true).build();
|
||||
server.start();
|
||||
try {
|
||||
server.addServlet("fakeLog", "/fakeLog", FakeLogServlet.class);
|
||||
URL url = new URL("http://localhost:" + server.getPort() + "/fakeLog");
|
||||
EditLogInputStream elis = EditLogFileInputStream.fromUrl(
|
||||
url, HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID,
|
||||
false);
|
||||
HttpURLConnection conn = mock(HttpURLConnection.class);
|
||||
doReturn(new ByteArrayInputStream(FAKE_LOG_DATA)).when(conn).getInputStream();
|
||||
doReturn(HttpURLConnection.HTTP_OK).when(conn).getResponseCode();
|
||||
doReturn(Integer.toString(FAKE_LOG_DATA.length)).when(conn).getHeaderField("Content-Length");
|
||||
|
||||
URLConnectionFactory factory = mock(URLConnectionFactory.class);
|
||||
doReturn(conn).when(factory).openConnection(Mockito.<URL> any(),
|
||||
anyBoolean());
|
||||
|
||||
URL url = new URL("http://localhost/fakeLog");
|
||||
EditLogInputStream elis = EditLogFileInputStream.fromUrl(factory, url,
|
||||
HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID, false);
|
||||
// Read the edit log and verify that we got all of the data.
|
||||
EnumMap<FSEditLogOpCodes, Holder<Integer>> counts =
|
||||
FSImageTestUtil.countEditLogOpTypes(elis);
|
||||
EnumMap<FSEditLogOpCodes, Holder<Integer>> counts = FSImageTestUtil
|
||||
.countEditLogOpTypes(elis);
|
||||
assertThat(counts.get(FSEditLogOpCodes.OP_ADD).held, is(1));
|
||||
assertThat(counts.get(FSEditLogOpCodes.OP_SET_GENSTAMP_V1).held, is(1));
|
||||
assertThat(counts.get(FSEditLogOpCodes.OP_CLOSE).held, is(1));
|
||||
|
@ -61,23 +62,5 @@ public class TestEditLogFileInputStream {
|
|||
// Check that length header was picked up.
|
||||
assertEquals(FAKE_LOG_DATA.length, elis.length());
|
||||
elis.close();
|
||||
} finally {
|
||||
server.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("serial")
|
||||
public static class FakeLogServlet extends HttpServlet {
|
||||
@Override
|
||||
public void doGet(HttpServletRequest request,
|
||||
HttpServletResponse response
|
||||
) throws ServletException, IOException {
|
||||
response.setHeader("Content-Length",
|
||||
String.valueOf(FAKE_LOG_DATA.length));
|
||||
OutputStream out = response.getOutputStream();
|
||||
out.write(FAKE_LOG_DATA);
|
||||
out.close();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -34,10 +34,7 @@ public final class TestURLConnectionFactory {
|
|||
public void testConnConfiguratior() throws IOException {
|
||||
final URL u = new URL("http://localhost");
|
||||
final List<HttpURLConnection> conns = Lists.newArrayList();
|
||||
URLConnectionFactory fc = new URLConnectionFactory(
|
||||
URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT);
|
||||
|
||||
fc.setConnConfigurator(new ConnectionConfigurator() {
|
||||
URLConnectionFactory fc = new URLConnectionFactory(new ConnectionConfigurator() {
|
||||
@Override
|
||||
public HttpURLConnection configure(HttpURLConnection conn)
|
||||
throws IOException {
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStream;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.ServerSocket;
|
||||
import java.net.Socket;
|
||||
|
@ -41,6 +42,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -66,7 +68,14 @@ public class TestWebHdfsTimeouts {
|
|||
private InetSocketAddress nnHttpAddress;
|
||||
private ServerSocket serverSocket;
|
||||
private Thread serverThread;
|
||||
private URLConnectionFactory connectionFactory = new URLConnectionFactory(SHORT_SOCKET_TIMEOUT);
|
||||
private URLConnectionFactory connectionFactory = new URLConnectionFactory(new ConnectionConfigurator() {
|
||||
@Override
|
||||
public HttpURLConnection configure(HttpURLConnection conn) throws IOException {
|
||||
conn.setReadTimeout(SHORT_SOCKET_TIMEOUT);
|
||||
conn.setConnectTimeout(SHORT_SOCKET_TIMEOUT);
|
||||
return conn;
|
||||
}
|
||||
});
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
|
@ -82,7 +91,6 @@ public class TestWebHdfsTimeouts {
|
|||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
fs.connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
|
||||
IOUtils.cleanup(LOG, clients.toArray(new SocketChannel[clients.size()]));
|
||||
IOUtils.cleanup(LOG, fs);
|
||||
if (serverSocket != null) {
|
||||
|
@ -242,7 +250,7 @@ public class TestWebHdfsTimeouts {
|
|||
*/
|
||||
private void startSingleTemporaryRedirectResponseThread(
|
||||
final boolean consumeConnectionBacklog) {
|
||||
fs.connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
|
||||
fs.connectionFactory = URLConnectionFactory.DEFAULT_SYSTEM_CONNECTION_FACTORY;
|
||||
serverThread = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
|
|
@ -83,7 +83,7 @@ public class TestDelegationTokenRemoteFetcher {
|
|||
|
||||
private static final String EXP_DATE = "124123512361236";
|
||||
private static final String tokenFile = "http.file.dta";
|
||||
private static final URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
|
||||
private static final URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_SYSTEM_CONNECTION_FACTORY;
|
||||
|
||||
private int httpPort;
|
||||
private URI serviceUrl;
|
||||
|
|
Loading…
Reference in New Issue