diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index b6a71f0b03d..462e8c0a2ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -335,6 +335,9 @@ Release 2.3.0 - UNRELEASED HDFS-4885. Improve the verifyBlockPlacement() API in BlockPlacementPolicy. (Junping Du via szetszwo) + HDFS-5363. Refactor WebHdfsFileSystem: move SPENGO-authenticated connection + creation to URLConnectionFactory. (Haohui Mai via jing9) + OPTIMIZATIONS HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java index 54aab04e58c..25227db074c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java @@ -19,49 +19,114 @@ package org.apache.hadoop.hdfs.web; import java.io.IOException; +import java.net.HttpURLConnection; import java.net.URL; import java.net.URLConnection; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.web.resources.HttpOpParam; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.client.AuthenticatedURL; +import org.apache.hadoop.security.authentication.client.AuthenticationException; +import org.apache.hadoop.security.authentication.client.ConnectionConfigurator; /** * Utilities for handling URLs */ -@InterfaceAudience.LimitedPrivate({"HDFS"}) +@InterfaceAudience.LimitedPrivate({ "HDFS" }) @InterfaceStability.Unstable public class URLConnectionFactory { + private static final Log LOG = LogFactory.getLog(URLConnectionFactory.class); + + /** SPNEGO authenticator */ + private static final KerberosUgiAuthenticator AUTH = new KerberosUgiAuthenticator(); + /** * Timeout for socket connects and reads */ - public final static int DEFAULT_SOCKET_TIMEOUT = 1*60*1000; // 1 minute + public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute + + public static final URLConnectionFactory DEFAULT_CONNECTION_FACTORY = new URLConnectionFactory( + DEFAULT_SOCKET_TIMEOUT); - public static final URLConnectionFactory DEFAULT_CONNECTION_FACTORY = new URLConnectionFactory(DEFAULT_SOCKET_TIMEOUT); - private int socketTimeout; + /** Configure connections for AuthenticatedURL */ + private ConnectionConfigurator connConfigurator = new ConnectionConfigurator() { + @Override + public HttpURLConnection configure(HttpURLConnection conn) + throws IOException { + URLConnectionFactory.setTimeouts(conn, socketTimeout); + return conn; + } + }; + public URLConnectionFactory(int socketTimeout) { this.socketTimeout = socketTimeout; } - + /** * Opens a url with read and connect timeouts - * @param url to open + * + * @param url + * to open * @return URLConnection * @throws IOException */ public URLConnection openConnection(URL url) throws IOException { URLConnection connection = url.openConnection(); - setTimeouts(connection); - return connection; + if (connection instanceof HttpURLConnection) { + connConfigurator.configure((HttpURLConnection) connection); + } + return connection; + } + + /** + * Opens a url with read and connect timeouts + * + * @param url URL to open + * @return URLConnection + * @throws IOException + * @throws AuthenticationException + */ + public URLConnection openConnection(HttpOpParam.Op op, URL url) + throws IOException, AuthenticationException { + if (op.getRequireAuth()) { + if (LOG.isDebugEnabled()) { + LOG.debug("open AuthenticatedURL connection" + url); + } + UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab(); + final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token(); + return new AuthenticatedURL(AUTH, connConfigurator).openConnection(url, + authToken); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("open URL connection"); + } + return openConnection(url); + } + } + + public ConnectionConfigurator getConnConfigurator() { + return connConfigurator; + } + + public void setConnConfigurator(ConnectionConfigurator connConfigurator) { + this.connConfigurator = connConfigurator; } /** * Sets timeout parameters on the given URLConnection. * - * @param connection URLConnection to set + * @param connection + * URLConnection to set + * @param socketTimeout + * the connection and read timeout of the connection. */ - public void setTimeouts(URLConnection connection) { + static void setTimeouts(URLConnection connection, int socketTimeout) { connection.setConnectTimeout(socketTimeout); connection.setReadTimeout(socketTimeout); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java index 71c78496a2c..1a2b1926d41 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java @@ -94,9 +94,7 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.authentication.client.AuthenticatedURL; import org.apache.hadoop.security.authentication.client.AuthenticationException; -import org.apache.hadoop.security.authentication.client.ConnectionConfigurator; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenRenewer; @@ -119,20 +117,9 @@ public class WebHdfsFileSystem extends FileSystem /** Http URI: http://namenode:port/{PATH_PREFIX}/path/to/file */ public static final String PATH_PREFIX = "/" + SCHEME + "/v" + VERSION; - /** SPNEGO authenticator */ - private static final KerberosUgiAuthenticator AUTH = new KerberosUgiAuthenticator(); /** Default connection factory may be overridden in tests to use smaller timeout values */ URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY; - /** Configures connections for AuthenticatedURL */ - private final ConnectionConfigurator CONN_CONFIGURATOR = - new ConnectionConfigurator() { - @Override - public HttpURLConnection configure(HttpURLConnection conn) - throws IOException { - connectionFactory.setTimeouts(conn); - return conn; - } - }; + /** Delegation token kind */ public static final Text TOKEN_KIND = new Text("WEBHDFS delegation"); /** Token selector */ @@ -504,16 +491,7 @@ public class WebHdfsFileSystem extends FileSystem throws IOException { final HttpURLConnection conn; try { - if (op.getRequireAuth()) { - LOG.debug("open AuthenticatedURL connection"); - UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab(); - final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token(); - conn = new AuthenticatedURL(AUTH, CONN_CONFIGURATOR).openConnection( - url, authToken); - } else { - LOG.debug("open URL connection"); - conn = (HttpURLConnection)connectionFactory.openConnection(url); - } + conn = (HttpURLConnection) connectionFactory.openConnection(op, url); } catch (AuthenticationException e) { throw new IOException(e); } @@ -635,8 +613,10 @@ public class WebHdfsFileSystem extends FileSystem checkRetry = false; //Step 2) Submit another Http request with the URL from the Location header with data. - conn = (HttpURLConnection)connectionFactory.openConnection(new URL(redirect)); - conn.setRequestProperty("Content-Type", MediaType.APPLICATION_OCTET_STREAM); + conn = (HttpURLConnection) connectionFactory.openConnection(new URL( + redirect)); + conn.setRequestProperty("Content-Type", + MediaType.APPLICATION_OCTET_STREAM); conn.setChunkedStreamingMode(32 << 10); //32kB-chunk connect(); return conn; @@ -658,7 +638,8 @@ public class WebHdfsFileSystem extends FileSystem disconnect(); checkRetry = false; - conn = (HttpURLConnection)connectionFactory.openConnection(new URL(redirect)); + conn = (HttpURLConnection) connectionFactory.openConnection(new URL( + redirect)); connect(); } @@ -892,12 +873,6 @@ public class WebHdfsFileSystem extends FileSystem .write(bufferSize); } - @SuppressWarnings("deprecation") - @Override - public boolean delete(final Path f) throws IOException { - return delete(f, true); - } - @Override public boolean delete(Path f, boolean recursive) throws IOException { final HttpOpParam.Op op = DeleteOpParam.Op.DELETE; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestURLConnectionFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestURLConnectionFactory.java new file mode 100644 index 00000000000..a8f8b2f7e2f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestURLConnectionFactory.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.web; + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.hadoop.security.authentication.client.ConnectionConfigurator; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public final class TestURLConnectionFactory { + + @Test + public void testConnConfiguratior() throws IOException { + final URL u = new URL("http://localhost"); + final List conns = Lists.newArrayList(); + URLConnectionFactory fc = new URLConnectionFactory( + URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT); + + fc.setConnConfigurator(new ConnectionConfigurator() { + @Override + public HttpURLConnection configure(HttpURLConnection conn) + throws IOException { + Assert.assertEquals(u, conn.getURL()); + conns.add(conn); + return conn; + } + }); + + fc.openConnection(u); + Assert.assertEquals(1, conns.size()); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTimeouts.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTimeouts.java index 7a007a05928..9a85cf97cbe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTimeouts.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTimeouts.java @@ -25,9 +25,11 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; +import java.net.SocketAddress; import java.net.SocketTimeoutException; import java.nio.channels.SocketChannel; import java.util.ArrayList; @@ -41,6 +43,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.NetUtils; @@ -71,8 +74,9 @@ public class TestWebHdfsTimeouts { @Before public void setUp() throws Exception { Configuration conf = WebHdfsTestUtil.createConf(); - nnHttpAddress = NameNode.getHttpAddress(conf); - serverSocket = new ServerSocket(nnHttpAddress.getPort(), CONNECTION_BACKLOG); + serverSocket = new ServerSocket(0, CONNECTION_BACKLOG); + nnHttpAddress = new InetSocketAddress("localhost", serverSocket.getLocalPort()); + conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "localhost:" + serverSocket.getLocalPort()); fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf); fs.connectionFactory = connectionFactory; clients = new ArrayList();