From bfd2074a5732f78b6a1271f5cd45a2b5c7cd49cf Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Wed, 15 May 2013 02:31:48 +0000 Subject: [PATCH] svn merge -c 1482661 from trunk for HDFS-3180. Add socket timeouts to WebHdfsFileSystem. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1482662 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/web/URLUtils.java | 13 +- .../hadoop/hdfs/web/WebHdfsFileSystem.java | 21 +- .../hadoop/hdfs/web/TestWebHdfsTimeouts.java | 322 ++++++++++++++++++ 4 files changed, 353 insertions(+), 6 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTimeouts.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 4964c0edec3..02dbea7ca2a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -258,6 +258,9 @@ Release 2.0.5-beta - UNRELEASED HDFS-4813. Add volatile to BlocksMap.blocks so that the replication thread can see the updated value. (Jing Zhao via szetszwo) + HDFS-3180. Add socket timeouts to WebHdfsFileSystem. (Chris Nauroth via + szetszwo) + Release 2.0.4-alpha - 2013-04-25 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/URLUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/URLUtils.java index 7e4edd2c81a..09feaf5bec4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/URLUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/URLUtils.java @@ -44,8 +44,17 @@ public class URLUtils { */ public static URLConnection openConnection(URL url) throws IOException { URLConnection connection = url.openConnection(); - connection.setConnectTimeout(SOCKET_TIMEOUT); - connection.setReadTimeout(SOCKET_TIMEOUT); + setTimeouts(connection); return connection; } + + /** + * Sets timeout parameters on the given URLConnection. + * + * @param connection URLConnection to set + */ + static void setTimeouts(URLConnection connection) { + connection.setConnectTimeout(SOCKET_TIMEOUT); + connection.setReadTimeout(SOCKET_TIMEOUT); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java index 2487538db03..54f4cf5d886 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java @@ -101,6 +101,7 @@ import org.apache.hadoop.security.authentication.client.AuthenticatedURL; import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.hadoop.security.authorize.AuthorizationException; +import org.apache.hadoop.security.authentication.client.ConnectionConfigurator; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @@ -126,6 +127,16 @@ public class WebHdfsFileSystem extends FileSystem /** SPNEGO authenticator */ private static final KerberosUgiAuthenticator AUTH = new KerberosUgiAuthenticator(); + /** Configures connections for AuthenticatedURL */ + private static final ConnectionConfigurator CONN_CONFIGURATOR = + new ConnectionConfigurator() { + @Override + public HttpURLConnection configure(HttpURLConnection conn) + throws IOException { + URLUtils.setTimeouts(conn); + return conn; + } + }; /** Delegation token kind */ public static final Text TOKEN_KIND = new Text("WEBHDFS delegation"); /** Token selector */ @@ -485,10 +496,12 @@ private HttpURLConnection openHttpUrlConnection(final URL url) LOG.debug("open AuthenticatedURL connection"); UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab(); final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token(); - conn = new AuthenticatedURL(AUTH).openConnection(url, authToken); + conn = new AuthenticatedURL(AUTH, CONN_CONFIGURATOR).openConnection( + url, authToken); + URLUtils.setTimeouts(conn); } else { LOG.debug("open URL connection"); - conn = (HttpURLConnection)url.openConnection(); + conn = (HttpURLConnection)URLUtils.openConnection(url); } } catch (AuthenticationException e) { throw new IOException(e); @@ -583,7 +596,7 @@ HttpURLConnection twoStepWrite() throws IOException { checkRetry = false; //Step 2) Submit another Http request with the URL from the Location header with data. - conn = (HttpURLConnection)new URL(redirect).openConnection(); + conn = (HttpURLConnection)URLUtils.openConnection(new URL(redirect)); conn.setRequestProperty("Content-Type", MediaType.APPLICATION_OCTET_STREAM); conn.setChunkedStreamingMode(32 << 10); //32kB-chunk connect(); @@ -606,7 +619,7 @@ void getResponse(boolean getJsonAndDisconnect) throws IOException { disconnect(); checkRetry = false; - conn = (HttpURLConnection)new URL(redirect).openConnection(); + conn = (HttpURLConnection)URLUtils.openConnection(new URL(redirect)); connect(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTimeouts.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTimeouts.java new file mode 100644 index 00000000000..2071f6feb87 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTimeouts.java @@ -0,0 +1,322 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.web; + +import static org.junit.Assert.*; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.List; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.net.NetUtils; + +/** + * This test suite checks that WebHdfsFileSystem sets connection timeouts and + * read timeouts on its sockets, thus preventing threads from hanging + * indefinitely on an undefined/infinite timeout. The tests work by starting a + * bogus server on the namenode HTTP port, which is rigged to not accept new + * connections or to accept connections but not send responses. + */ +public class TestWebHdfsTimeouts { + + private static final Log LOG = LogFactory.getLog(TestWebHdfsTimeouts.class); + + private static final int CLIENTS_TO_CONSUME_BACKLOG = 100; + private static final int CONNECTION_BACKLOG = 1; + private static final int INITIAL_SOCKET_TIMEOUT = URLUtils.SOCKET_TIMEOUT; + private static final int SHORT_SOCKET_TIMEOUT = 5; + private static final int TEST_TIMEOUT = 10000; + + private List clients; + private WebHdfsFileSystem fs; + private InetSocketAddress nnHttpAddress; + private ServerSocket serverSocket; + private Thread serverThread; + + @Before + public void setUp() throws Exception { + URLUtils.SOCKET_TIMEOUT = SHORT_SOCKET_TIMEOUT; + Configuration conf = WebHdfsTestUtil.createConf(); + nnHttpAddress = NameNode.getHttpAddress(conf); + serverSocket = new ServerSocket(nnHttpAddress.getPort(), CONNECTION_BACKLOG); + fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf); + clients = new ArrayList(); + serverThread = null; + } + + @After + public void tearDown() throws Exception { + IOUtils.cleanup(LOG, clients.toArray(new SocketChannel[clients.size()])); + IOUtils.cleanup(LOG, fs); + if (serverSocket != null) { + try { + serverSocket.close(); + } catch (IOException e) { + LOG.debug("Exception in closing " + serverSocket, e); + } + } + if (serverThread != null) { + serverThread.join(); + } + } + + /** + * Expect connect timeout, because the connection backlog is consumed. + */ + @Test(timeout=TEST_TIMEOUT) + public void testConnectTimeout() throws Exception { + consumeConnectionBacklog(); + try { + fs.listFiles(new Path("/"), false); + fail("expected timeout"); + } catch (SocketTimeoutException e) { + assertEquals("connect timed out", e.getMessage()); + } + } + + /** + * Expect read timeout, because the bogus server never sends a reply. + */ + @Test(timeout=TEST_TIMEOUT) + public void testReadTimeout() throws Exception { + try { + fs.listFiles(new Path("/"), false); + fail("expected timeout"); + } catch (SocketTimeoutException e) { + assertEquals("Read timed out", e.getMessage()); + } + } + + /** + * Expect connect timeout on a URL that requires auth, because the connection + * backlog is consumed. + */ + @Test(timeout=TEST_TIMEOUT) + public void testAuthUrlConnectTimeout() throws Exception { + consumeConnectionBacklog(); + try { + fs.getDelegationToken("renewer"); + fail("expected timeout"); + } catch (SocketTimeoutException e) { + assertEquals("connect timed out", e.getMessage()); + } + } + + /** + * Expect read timeout on a URL that requires auth, because the bogus server + * never sends a reply. + */ + @Test(timeout=TEST_TIMEOUT) + public void testAuthUrlReadTimeout() throws Exception { + try { + fs.getDelegationToken("renewer"); + fail("expected timeout"); + } catch (SocketTimeoutException e) { + assertEquals("Read timed out", e.getMessage()); + } + } + + /** + * After a redirect, expect connect timeout accessing the redirect location, + * because the connection backlog is consumed. + */ + @Test(timeout=TEST_TIMEOUT) + public void testRedirectConnectTimeout() throws Exception { + startSingleTemporaryRedirectResponseThread(true); + try { + fs.getFileChecksum(new Path("/file")); + fail("expected timeout"); + } catch (SocketTimeoutException e) { + assertEquals("connect timed out", e.getMessage()); + } + } + + /** + * After a redirect, expect read timeout accessing the redirect location, + * because the bogus server never sends a reply. + */ + @Test(timeout=TEST_TIMEOUT) + public void testRedirectReadTimeout() throws Exception { + startSingleTemporaryRedirectResponseThread(false); + try { + fs.getFileChecksum(new Path("/file")); + fail("expected timeout"); + } catch (SocketTimeoutException e) { + assertEquals("Read timed out", e.getMessage()); + } + } + + /** + * On the second step of two-step write, expect connect timeout accessing the + * redirect location, because the connection backlog is consumed. + */ + @Test(timeout=TEST_TIMEOUT) + public void testTwoStepWriteConnectTimeout() throws Exception { + startSingleTemporaryRedirectResponseThread(true); + OutputStream os = null; + try { + os = fs.create(new Path("/file")); + fail("expected timeout"); + } catch (SocketTimeoutException e) { + assertEquals("connect timed out", e.getMessage()); + } finally { + IOUtils.cleanup(LOG, os); + } + } + + /** + * On the second step of two-step write, expect read timeout accessing the + * redirect location, because the bogus server never sends a reply. + */ + @Test(timeout=TEST_TIMEOUT) + public void testTwoStepWriteReadTimeout() throws Exception { + startSingleTemporaryRedirectResponseThread(false); + OutputStream os = null; + try { + os = fs.create(new Path("/file")); + os.close(); // must close stream to force reading the HTTP response + os = null; + fail("expected timeout"); + } catch (SocketTimeoutException e) { + assertEquals("Read timed out", e.getMessage()); + } finally { + IOUtils.cleanup(LOG, os); + } + } + + /** + * Starts a background thread that accepts one and only one client connection + * on the server socket, sends an HTTP 307 Temporary Redirect response, and + * then exits. This is useful for testing timeouts on the second step of + * methods that issue 2 HTTP requests (request 1, redirect, request 2). + * + * For handling the first request, this method sets socket timeout to use the + * initial values defined in URLUtils. Afterwards, it guarantees that the + * second request will use a very short timeout. + * + * Optionally, the thread may consume the connection backlog immediately after + * receiving its one and only client connection. This is useful for forcing a + * connection timeout on the second request. + * + * On tearDown, open client connections are closed, and the thread is joined. + * + * @param consumeConnectionBacklog boolean whether or not to consume connection + * backlog and thus force a connection timeout on the second request + */ + private void startSingleTemporaryRedirectResponseThread( + final boolean consumeConnectionBacklog) { + URLUtils.SOCKET_TIMEOUT = INITIAL_SOCKET_TIMEOUT; + serverThread = new Thread() { + @Override + public void run() { + Socket clientSocket = null; + OutputStream out = null; + InputStream in = null; + InputStreamReader isr = null; + BufferedReader br = null; + try { + // Accept one and only one client connection. + clientSocket = serverSocket.accept(); + + // Immediately setup conditions for subsequent connections. + URLUtils.SOCKET_TIMEOUT = SHORT_SOCKET_TIMEOUT; + if (consumeConnectionBacklog) { + consumeConnectionBacklog(); + } + + // Consume client's HTTP request by reading until EOF or empty line. + in = clientSocket.getInputStream(); + isr = new InputStreamReader(in); + br = new BufferedReader(isr); + for (;;) { + String line = br.readLine(); + if (line == null || line.isEmpty()) { + break; + } + } + + // Write response. + out = clientSocket.getOutputStream(); + out.write(temporaryRedirect().getBytes("UTF-8")); + } catch (IOException e) { + // Fail the test on any I/O error in the server thread. + LOG.error("unexpected IOException in server thread", e); + fail("unexpected IOException in server thread: " + e); + } finally { + // Clean it all up. + IOUtils.cleanup(LOG, br, isr, in, out); + IOUtils.closeSocket(clientSocket); + } + } + }; + serverThread.start(); + } + + /** + * Consumes the test server's connection backlog by spamming non-blocking + * SocketChannel client connections. We never do anything with these sockets + * beyond just initiaing the connections. The method saves a reference to each + * new SocketChannel so that it can be closed during tearDown. We define a + * very small connection backlog, but the OS may silently enforce a larger + * minimum backlog than requested. To work around this, we create far more + * client connections than our defined backlog. + * + * @throws IOException thrown for any I/O error + */ + private void consumeConnectionBacklog() throws IOException { + for (int i = 0; i < CLIENTS_TO_CONSUME_BACKLOG; ++i) { + SocketChannel client = SocketChannel.open(); + client.configureBlocking(false); + client.connect(nnHttpAddress); + clients.add(client); + } + } + + /** + * Creates an HTTP 307 response with the redirect location set back to the + * test server's address. HTTP is supposed to terminate newlines with CRLF, so + * we hard-code that instead of using the line separator property. + * + * @return String HTTP 307 response + */ + private String temporaryRedirect() { + return "HTTP/1.1 307 Temporary Redirect\r\n" + + "Location: http://" + NetUtils.getHostPortString(nnHttpAddress) + "\r\n" + + "\r\n"; + } +}