HDFS-5538. Merge change r1545491 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1551692 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jing Zhao 2013-12-17 20:45:27 +00:00
parent 42d48b02df
commit 1b00e40a30
16 changed files with 168 additions and 210 deletions

View File

@ -22,19 +22,14 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.net.URLConnection;
import java.net.UnknownHostException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.ServiceLoader;
import java.util.Set;
import javax.security.auth.Subject;
import javax.security.auth.kerberos.KerberosPrincipal;
import javax.security.auth.kerberos.KerberosTicket;
@ -44,22 +39,19 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenInfo;
import com.google.common.annotations.VisibleForTesting;
//this will need to be replaced someday when there is a suitable replacement
import sun.net.dns.ResolverConfiguration;
import sun.net.util.IPAddressUtil;
import com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Evolving
public class SecurityUtil {
@ -73,22 +65,12 @@ public class SecurityUtil {
@VisibleForTesting
static HostResolver hostResolver;
private static SSLFactory sslFactory;
static {
Configuration conf = new Configuration();
boolean useIp = conf.getBoolean(
CommonConfigurationKeys.HADOOP_SECURITY_TOKEN_SERVICE_USE_IP,
CommonConfigurationKeys.HADOOP_SECURITY_TOKEN_SERVICE_USE_IP_DEFAULT);
setTokenServiceUseIp(useIp);
if (HttpConfig.isSecure()) {
sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
try {
sslFactory.init();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}
/**
@ -102,29 +84,6 @@ public class SecurityUtil {
: new StandardHostResolver();
}
/**
* Find the original TGT within the current subject's credentials. Cross-realm
* TGT's of the form "krbtgt/TWO.COM@ONE.COM" may be present.
*
* @return The TGT from the current subject
* @throws IOException
* if TGT can't be found
*/
private static KerberosTicket getTgtFromSubject() throws IOException {
Subject current = Subject.getSubject(AccessController.getContext());
if (current == null) {
throw new IOException(
"Can't get TGT from current Subject, because it is null");
}
Set<KerberosTicket> tickets = current
.getPrivateCredentials(KerberosTicket.class);
for (KerberosTicket t : tickets) {
if (isOriginalTGT(t))
return t;
}
throw new IOException("Failed to find TGT from current Subject:"+current);
}
/**
* TGS must have the server principal of the form "krbtgt/FOO@FOO".
* @param principal
@ -492,30 +451,6 @@ public class SecurityUtil {
}
}
/**
* Open a (if need be) secure connection to a URL in a secure environment
* that is using SPNEGO to authenticate its URLs. All Namenode and Secondary
* Namenode URLs that are protected via SPNEGO should be accessed via this
* method.
*
* @param url to authenticate via SPNEGO.
* @return A connection that has been authenticated via SPNEGO
* @throws IOException If unable to authenticate via SPNEGO
*/
public static URLConnection openSecureHttpConnection(URL url) throws IOException {
if (!HttpConfig.isSecure() && !UserGroupInformation.isSecurityEnabled()) {
return url.openConnection();
}
AuthenticatedURL.Token token = new AuthenticatedURL.Token();
try {
return new AuthenticatedURL(null, sslFactory).openConnection(url, token);
} catch (AuthenticationException e) {
throw new IOException("Exception trying to open authenticated connection to "
+ url, e);
}
}
/**
* Resolves a host subject to the security requirements determined by
* hadoop.security.token.service.use_ip.

View File

@ -159,6 +159,9 @@ Release 2.4.0 - UNRELEASED
HDFS-5350. Name Node should report fsimage transfer time as a metric.
(Jimmy Xiang via wang)
HDFS-5538. URLConnectionFactory should pick up the SSL related configuration
by default. (Haohui Mai via jing9)
OPTIMIZATIONS
HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn)

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.server.namenode.JournalSet;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
@ -87,6 +88,7 @@ public class QuorumJournalManager implements JournalManager {
private final AsyncLoggerSet loggers;
private int outputBufferCapacity = 512 * 1024;
private final URLConnectionFactory connectionFactory;
public QuorumJournalManager(Configuration conf,
URI uri, NamespaceInfo nsInfo) throws IOException {
@ -102,6 +104,8 @@ public class QuorumJournalManager implements JournalManager {
this.uri = uri;
this.nsInfo = nsInfo;
this.loggers = new AsyncLoggerSet(createLoggers(loggerFactory));
this.connectionFactory = URLConnectionFactory
.newDefaultURLConnectionFactory(conf);
// Configure timeouts.
this.startSegmentTimeoutMs = conf.getInt(
@ -470,8 +474,8 @@ public class QuorumJournalManager implements JournalManager {
URL url = logger.buildURLToFetchLogs(remoteLog.getStartTxId());
EditLogInputStream elis = EditLogFileInputStream.fromUrl(
url, remoteLog.getStartTxId(), remoteLog.getEndTxId(),
remoteLog.isInProgress());
connectionFactory, url, remoteLog.getStartTxId(),
remoteLog.getEndTxId(), remoteLog.isInProgress());
allStreams.add(elis);
}
}

View File

@ -36,9 +36,12 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage.HttpGetFailedException;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@ -101,15 +104,22 @@ public class EditLogFileInputStream extends EditLogInputStream {
/**
* Open an EditLogInputStream for the given URL.
*
* @param url the url hosting the log
* @param startTxId the expected starting txid
* @param endTxId the expected ending txid
* @param inProgress whether the log is in-progress
* @param connectionFactory
* the URLConnectionFactory used to create the connection.
* @param url
* the url hosting the log
* @param startTxId
* the expected starting txid
* @param endTxId
* the expected ending txid
* @param inProgress
* whether the log is in-progress
* @return a stream from which edits may be read
*/
public static EditLogInputStream fromUrl(URL url, long startTxId,
public static EditLogInputStream fromUrl(
URLConnectionFactory connectionFactory, URL url, long startTxId,
long endTxId, boolean inProgress) {
return new EditLogFileInputStream(new URLLog(url),
return new EditLogFileInputStream(new URLLog(connectionFactory, url),
startTxId, endTxId, inProgress);
}
@ -366,8 +376,12 @@ public class EditLogFileInputStream extends EditLogInputStream {
private long advertisedSize = -1;
private final static String CONTENT_LENGTH = "Content-Length";
private final URLConnectionFactory connectionFactory;
private final boolean isSpnegoEnabled;
public URLLog(URL url) {
public URLLog(URLConnectionFactory connectionFactory, URL url) {
this.connectionFactory = connectionFactory;
this.isSpnegoEnabled = UserGroupInformation.isSecurityEnabled();
this.url = url;
}
@ -377,8 +391,13 @@ public class EditLogFileInputStream extends EditLogInputStream {
new PrivilegedExceptionAction<InputStream>() {
@Override
public InputStream run() throws IOException {
HttpURLConnection connection = (HttpURLConnection)
SecurityUtil.openSecureHttpConnection(url);
HttpURLConnection connection;
try {
connection = (HttpURLConnection)
connectionFactory.openConnection(url, isSpnegoEnabled);
} catch (AuthenticationException e) {
throw new IOException(e);
}
if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
throw new HttpGetFailedException(

View File

@ -35,7 +35,8 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@ -46,6 +47,7 @@ import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.io.MD5Hash;
import com.google.common.annotations.VisibleForTesting;
@ -62,6 +64,15 @@ public class TransferFsImage {
public final static String MD5_HEADER = "X-MD5-Digest";
@VisibleForTesting
static int timeout = 0;
private static URLConnectionFactory connectionFactory;
private static boolean isSpnegoEnabled;
static {
Configuration conf = new Configuration();
connectionFactory = URLConnectionFactory
.newDefaultURLConnectionFactory(conf);
isSpnegoEnabled = UserGroupInformation.isSecurityEnabled();
}
private static final Log LOG = LogFactory.getLog(TransferFsImage.class);
@ -250,9 +261,13 @@ public class TransferFsImage {
public static MD5Hash doGetUrl(URL url, List<File> localPaths,
Storage dstStorage, boolean getChecksum) throws IOException {
long startTime = Time.monotonicNow();
HttpURLConnection connection = (HttpURLConnection)
SecurityUtil.openSecureHttpConnection(url);
HttpURLConnection connection;
try {
connection = (HttpURLConnection)
connectionFactory.openConnection(url, isSpnegoEnabled);
} catch (AuthenticationException e) {
throw new IOException(e);
}
if (timeout <= 0) {
Configuration conf = new HdfsConfiguration();

View File

@ -36,9 +36,10 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@ -102,6 +103,8 @@ public class DFSck extends Configured implements Tool {
private final UserGroupInformation ugi;
private final PrintStream out;
private final URLConnectionFactory connectionFactory;
private final boolean isSpnegoEnabled;
/**
* Filesystem checker.
@ -115,6 +118,9 @@ public class DFSck extends Configured implements Tool {
super(conf);
this.ugi = UserGroupInformation.getCurrentUser();
this.out = out;
this.connectionFactory = URLConnectionFactory
.newDefaultURLConnectionFactory(conf);
this.isSpnegoEnabled = UserGroupInformation.isSecurityEnabled();
}
/**
@ -166,7 +172,12 @@ public class DFSck extends Configured implements Tool {
url.append("&startblockafter=").append(String.valueOf(cookie));
}
URL path = new URL(url.toString());
URLConnection connection = SecurityUtil.openSecureHttpConnection(path);
URLConnection connection;
try {
connection = connectionFactory.openConnection(path, isSpnegoEnabled);
} catch (AuthenticationException e) {
throw new IOException(e);
}
InputStream stream = connection.getInputStream();
BufferedReader input = new BufferedReader(new InputStreamReader(
stream, "UTF-8"));
@ -288,7 +299,12 @@ public class DFSck extends Configured implements Tool {
return listCorruptFileBlocks(dir, url.toString());
}
URL path = new URL(url.toString());
URLConnection connection = SecurityUtil.openSecureHttpConnection(path);
URLConnection connection;
try {
connection = connectionFactory.openConnection(path, isSpnegoEnabled);
} catch (AuthenticationException e) {
throw new IOException(e);
}
InputStream stream = connection.getInputStream();
BufferedReader input = new BufferedReader(new InputStreamReader(
stream, "UTF-8"));

View File

@ -145,7 +145,7 @@ public class DelegationTokenFetcher {
// default to using the local file system
FileSystem local = FileSystem.getLocal(conf);
final Path tokenFile = new Path(local.getWorkingDirectory(), remaining[0]);
final URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
final URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_SYSTEM_CONNECTION_FACTORY;
// Login the current user
UserGroupInformation.getCurrentUser().doAs(

View File

@ -176,10 +176,9 @@ public class HftpFileSystem extends FileSystem
* Initialize connectionFactory and tokenAspect. This function is intended to
* be overridden by HsFtpFileSystem.
*/
protected void initConnectionFactoryAndTokenAspect(Configuration conf)
protected void initTokenAspect(Configuration conf)
throws IOException {
tokenAspect = new TokenAspect<HftpFileSystem>(this, TOKEN_KIND);
connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
}
@Override
@ -187,6 +186,8 @@ public class HftpFileSystem extends FileSystem
throws IOException {
super.initialize(name, conf);
setConf(conf);
this.connectionFactory = URLConnectionFactory
.newDefaultURLConnectionFactory(conf);
this.ugi = UserGroupInformation.getCurrentUser();
this.nnUri = getNamenodeUri(name);
@ -197,7 +198,7 @@ public class HftpFileSystem extends FileSystem
throw new IllegalArgumentException(e);
}
initConnectionFactoryAndTokenAspect(conf);
initTokenAspect(conf);
if (UserGroupInformation.isSecurityEnabled()) {
tokenAspect.initDelegationToken(ugi);
}
@ -338,7 +339,7 @@ public class HftpFileSystem extends FileSystem
}
static class RangeHeaderUrlOpener extends ByteRangeInputStream.URLOpener {
URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_SYSTEM_CONNECTION_FACTORY;
RangeHeaderUrlOpener(final URL url) {
super(url);

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.hdfs.web;
import java.io.IOException;
import java.security.GeneralSecurityException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -61,18 +60,8 @@ public class HsftpFileSystem extends HftpFileSystem {
}
@Override
protected void initConnectionFactoryAndTokenAspect(Configuration conf) throws IOException {
protected void initTokenAspect(Configuration conf) throws IOException {
tokenAspect = new TokenAspect<HftpFileSystem>(this, TOKEN_KIND);
connectionFactory = new URLConnectionFactory(
URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT);
try {
connectionFactory.setConnConfigurator(URLConnectionFactory
.newSslConnConfigurator(URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT,
conf));
} catch (GeneralSecurityException e) {
throw new IOException(e);
}
}
@Override

View File

@ -17,10 +17,6 @@
*/
package org.apache.hadoop.hdfs.web;
import java.io.IOException;
import java.security.GeneralSecurityException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.io.Text;
@ -44,20 +40,6 @@ public class SWebHdfsFileSystem extends WebHdfsFileSystem {
tokenAspect = new TokenAspect<WebHdfsFileSystem>(this, TOKEN_KIND);
}
@Override
protected void initializeConnectionFactory(Configuration conf)
throws IOException {
connectionFactory = new URLConnectionFactory(
URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT);
try {
connectionFactory.setConnConfigurator(URLConnectionFactory
.newSslConnConfigurator(URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT,
conf));
} catch (GeneralSecurityException e) {
throw new IOException(e);
}
}
@Override
protected int getDefaultPort() {
return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY,

View File

@ -39,6 +39,8 @@ import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
import org.apache.hadoop.security.ssl.SSLFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* Utilities for handling URLs
*/
@ -54,26 +56,50 @@ public class URLConnectionFactory {
* Timeout for socket connects and reads
*/
public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute
private final ConnectionConfigurator connConfigurator;
public static final URLConnectionFactory DEFAULT_CONNECTION_FACTORY = new URLConnectionFactory(
DEFAULT_SOCKET_TIMEOUT);
private int socketTimeout;
/** Configure connections for AuthenticatedURL */
private ConnectionConfigurator connConfigurator = new ConnectionConfigurator() {
private static final ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR = new ConnectionConfigurator() {
@Override
public HttpURLConnection configure(HttpURLConnection conn)
throws IOException {
URLConnectionFactory.setTimeouts(conn, socketTimeout);
URLConnectionFactory.setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT);
return conn;
}
};
/**
* The URLConnectionFactory that sets the default timeout and it only trusts
* Java's SSL certificates.
*/
public static final URLConnectionFactory DEFAULT_SYSTEM_CONNECTION_FACTORY = new URLConnectionFactory(
DEFAULT_TIMEOUT_CONN_CONFIGURATOR);
/**
* Construct a new URLConnectionFactory based on the configuration. It will
* try to load SSL certificates when it is specified.
*/
public static URLConnectionFactory newDefaultURLConnectionFactory(Configuration conf) {
ConnectionConfigurator conn = null;
try {
conn = newSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf);
} catch (Exception e) {
LOG.debug(
"Cannot load customized ssl related configuration. Fallback to system-generic settings.",
e);
conn = DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
}
return new URLConnectionFactory(conn);
}
@VisibleForTesting
URLConnectionFactory(ConnectionConfigurator connConfigurator) {
this.connConfigurator = connConfigurator;
}
/**
* Create a new ConnectionConfigurator for SSL connections
*/
static ConnectionConfigurator newSslConnConfigurator(final int timeout,
private static ConnectionConfigurator newSslConnConfigurator(final int timeout,
Configuration conf) throws IOException, GeneralSecurityException {
final SSLFactory factory;
final SSLSocketFactory sf;
@ -99,10 +125,6 @@ public class URLConnectionFactory {
};
}
public URLConnectionFactory(int socketTimeout) {
this.socketTimeout = socketTimeout;
}
/**
* Opens a url with read and connect timeouts
*
@ -153,14 +175,6 @@ public class URLConnectionFactory {
}
}
public ConnectionConfigurator getConnConfigurator() {
return connConfigurator;
}
public void setConnConfigurator(ConnectionConfigurator connConfigurator) {
this.connConfigurator = connConfigurator;
}
/**
* Sets timeout parameters on the given URLConnection.
*
@ -169,7 +183,7 @@ public class URLConnectionFactory {
* @param socketTimeout
* the connection and read timeout of the connection.
*/
static void setTimeouts(URLConnection connection, int socketTimeout) {
private static void setTimeouts(URLConnection connection, int socketTimeout) {
connection.setConnectTimeout(socketTimeout);
connection.setReadTimeout(socketTimeout);
}

View File

@ -113,7 +113,7 @@ public class WebHdfsFileSystem extends FileSystem
public static final String PATH_PREFIX = "/" + SCHEME + "/v" + VERSION;
/** Default connection factory may be overridden in tests to use smaller timeout values */
URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
protected URLConnectionFactory connectionFactory;
/** Delegation token kind */
public static final Text TOKEN_KIND = new Text("WEBHDFS delegation");
@ -153,15 +153,6 @@ public class WebHdfsFileSystem extends FileSystem
tokenAspect = new TokenAspect<WebHdfsFileSystem>(this, TOKEN_KIND);
}
/**
* Initialize connectionFactory. This function is intended to
* be overridden by SWebHdfsFileSystem.
*/
protected void initializeConnectionFactory(Configuration conf)
throws IOException {
connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
}
@Override
public synchronized void initialize(URI uri, Configuration conf
) throws IOException {
@ -169,8 +160,9 @@ public class WebHdfsFileSystem extends FileSystem
setConf(conf);
/** set user pattern based on configuration file */
UserParam.setUserPattern(conf.get(DFSConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY, DFSConfigKeys.DFS_WEBHDFS_USER_PATTERN_DEFAULT));
connectionFactory = URLConnectionFactory
.newDefaultURLConnectionFactory(conf);
initializeTokenAspect();
initializeConnectionFactory(conf);
ugi = UserGroupInformation.getCurrentUser();

View File

@ -19,40 +19,41 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.*;
import static org.hamcrest.CoreMatchers.is;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.io.OutputStream;
import java.io.ByteArrayInputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.EnumMap;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.util.Holder;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.junit.Test;
import org.mockito.Mockito;
public class TestEditLogFileInputStream {
private static final byte[] FAKE_LOG_DATA = TestEditLog.HADOOP20_SOME_EDITS;
@Test
public void testReadURL() throws Exception {
// Start a simple web server which hosts the log data.
HttpServer server = new HttpServer.Builder().setName("test")
.setBindAddress("0.0.0.0").setPort(0).setFindPort(true).build();
server.start();
try {
server.addServlet("fakeLog", "/fakeLog", FakeLogServlet.class);
URL url = new URL("http://localhost:" + server.getPort() + "/fakeLog");
EditLogInputStream elis = EditLogFileInputStream.fromUrl(
url, HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID,
false);
HttpURLConnection conn = mock(HttpURLConnection.class);
doReturn(new ByteArrayInputStream(FAKE_LOG_DATA)).when(conn).getInputStream();
doReturn(HttpURLConnection.HTTP_OK).when(conn).getResponseCode();
doReturn(Integer.toString(FAKE_LOG_DATA.length)).when(conn).getHeaderField("Content-Length");
URLConnectionFactory factory = mock(URLConnectionFactory.class);
doReturn(conn).when(factory).openConnection(Mockito.<URL> any(),
anyBoolean());
URL url = new URL("http://localhost/fakeLog");
EditLogInputStream elis = EditLogFileInputStream.fromUrl(factory, url,
HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID, false);
// Read the edit log and verify that we got all of the data.
EnumMap<FSEditLogOpCodes, Holder<Integer>> counts =
FSImageTestUtil.countEditLogOpTypes(elis);
EnumMap<FSEditLogOpCodes, Holder<Integer>> counts = FSImageTestUtil
.countEditLogOpTypes(elis);
assertThat(counts.get(FSEditLogOpCodes.OP_ADD).held, is(1));
assertThat(counts.get(FSEditLogOpCodes.OP_SET_GENSTAMP_V1).held, is(1));
assertThat(counts.get(FSEditLogOpCodes.OP_CLOSE).held, is(1));
@ -60,23 +61,5 @@ public class TestEditLogFileInputStream {
// Check that length header was picked up.
assertEquals(FAKE_LOG_DATA.length, elis.length());
elis.close();
} finally {
server.stop();
}
}
@SuppressWarnings("serial")
public static class FakeLogServlet extends HttpServlet {
@Override
public void doGet(HttpServletRequest request,
HttpServletResponse response
) throws ServletException, IOException {
response.setHeader("Content-Length",
String.valueOf(FAKE_LOG_DATA.length));
OutputStream out = response.getOutputStream();
out.write(FAKE_LOG_DATA);
out.close();
}
}
}

View File

@ -34,10 +34,7 @@ public final class TestURLConnectionFactory {
public void testConnConfiguratior() throws IOException {
final URL u = new URL("http://localhost");
final List<HttpURLConnection> conns = Lists.newArrayList();
URLConnectionFactory fc = new URLConnectionFactory(
URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT);
fc.setConnConfigurator(new ConnectionConfigurator() {
URLConnectionFactory fc = new URLConnectionFactory(new ConnectionConfigurator() {
@Override
public HttpURLConnection configure(HttpURLConnection conn)
throws IOException {

View File

@ -26,6 +26,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
@ -41,6 +42,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -66,7 +68,14 @@ public class TestWebHdfsTimeouts {
private InetSocketAddress nnHttpAddress;
private ServerSocket serverSocket;
private Thread serverThread;
private URLConnectionFactory connectionFactory = new URLConnectionFactory(SHORT_SOCKET_TIMEOUT);
private URLConnectionFactory connectionFactory = new URLConnectionFactory(new ConnectionConfigurator() {
@Override
public HttpURLConnection configure(HttpURLConnection conn) throws IOException {
conn.setReadTimeout(SHORT_SOCKET_TIMEOUT);
conn.setConnectTimeout(SHORT_SOCKET_TIMEOUT);
return conn;
}
});
@Before
public void setUp() throws Exception {
@ -82,7 +91,6 @@ public class TestWebHdfsTimeouts {
@After
public void tearDown() throws Exception {
fs.connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
IOUtils.cleanup(LOG, clients.toArray(new SocketChannel[clients.size()]));
IOUtils.cleanup(LOG, fs);
if (serverSocket != null) {
@ -242,7 +250,7 @@ public class TestWebHdfsTimeouts {
*/
private void startSingleTemporaryRedirectResponseThread(
final boolean consumeConnectionBacklog) {
fs.connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
fs.connectionFactory = URLConnectionFactory.DEFAULT_SYSTEM_CONNECTION_FACTORY;
serverThread = new Thread() {
@Override
public void run() {

View File

@ -83,7 +83,7 @@ public class TestDelegationTokenRemoteFetcher {
private static final String EXP_DATE = "124123512361236";
private static final String tokenFile = "http.file.dta";
private static final URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
private static final URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_SYSTEM_CONNECTION_FACTORY;
private int httpPort;
private URI serviceUrl;