diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index c64a31096d6..24ffd66f429 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -191,8 +191,6 @@ Release 2.0.1-alpha - UNRELEASED HDFS-3697. Enable fadvise readahead by default. (todd) - HDFS-3667. Add retry support to WebHdfsFileSystem. (szetszwo) - BUG FIXES HDFS-3385. The last block of INodeFileUnderConstruction is not diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java index e745714c9be..62c67f9e9d0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java @@ -57,9 +57,9 @@ public URL getURL() { return url; } - /** Connect to server with a data offset. */ - protected abstract HttpURLConnection connect(final long offset, - final boolean resolved) throws IOException; + protected abstract HttpURLConnection openConnection() throws IOException; + + protected abstract HttpURLConnection openConnection(final long offset) throws IOException; } enum StreamStatus { @@ -85,6 +85,9 @@ public ByteRangeInputStream(URLOpener o, URLOpener r) { this.resolvedURL = r; } + protected abstract void checkResponseCode(final HttpURLConnection connection + ) throws IOException; + protected abstract URL getResolvedUrl(final HttpURLConnection connection ) throws IOException; @@ -110,10 +113,13 @@ protected InputStream getInputStream() throws IOException { protected InputStream openInputStream() throws IOException { // Use the original url if no resolved url exists, eg. if // it's the first time a request is made. - final boolean resolved = resolvedURL.getURL() != null; - final URLOpener opener = resolved? resolvedURL: originalURL; + final URLOpener opener = + (resolvedURL.getURL() == null) ? originalURL : resolvedURL; + + final HttpURLConnection connection = opener.openConnection(startPos); + connection.connect(); + checkResponseCode(connection); - final HttpURLConnection connection = opener.connect(startPos, resolved); final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH); if (cl == null) { throw new IOException(StreamFile.CONTENT_LENGTH+" header is missing"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java index 8c73e2a6bee..6df5f2dd9b0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java @@ -342,28 +342,19 @@ static class RangeHeaderUrlOpener extends ByteRangeInputStream.URLOpener { super(url); } + @Override protected HttpURLConnection openConnection() throws IOException { return (HttpURLConnection)URLUtils.openConnection(url); } /** Use HTTP Range header for specifying offset. */ @Override - protected HttpURLConnection connect(final long offset, - final boolean resolved) throws IOException { + protected HttpURLConnection openConnection(final long offset) throws IOException { final HttpURLConnection conn = openConnection(); conn.setRequestMethod("GET"); if (offset != 0L) { conn.setRequestProperty("Range", "bytes=" + offset + "-"); } - conn.connect(); - - //Expects HTTP_OK or HTTP_PARTIAL response codes. - final int code = conn.getResponseCode(); - if (offset != 0L && code != HttpURLConnection.HTTP_PARTIAL) { - throw new IOException("HTTP_PARTIAL expected, received " + code); - } else if (offset == 0L && code != HttpURLConnection.HTTP_OK) { - throw new IOException("HTTP_OK expected, received " + code); - } return conn; } } @@ -377,6 +368,22 @@ static class RangeHeaderInputStream extends ByteRangeInputStream { this(new RangeHeaderUrlOpener(url), new RangeHeaderUrlOpener(null)); } + /** Expects HTTP_OK and HTTP_PARTIAL response codes. */ + @Override + protected void checkResponseCode(final HttpURLConnection connection + ) throws IOException { + final int code = connection.getResponseCode(); + if (startPos != 0 && code != HttpURLConnection.HTTP_PARTIAL) { + // We asked for a byte range but did not receive a partial content + // response... + throw new IOException("HTTP_PARTIAL expected, received " + code); + } else if (startPos == 0 && code != HttpURLConnection.HTTP_OK) { + // We asked for all bytes from the beginning but didn't receive a 200 + // response (none of the other 2xx codes are valid here) + throw new IOException("HTTP_OK expected, received " + code); + } + } + @Override protected URL getResolvedUrl(final HttpURLConnection connection) { return connection.getURL(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java index 1f582ebe602..1de42a5c366 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java @@ -259,7 +259,7 @@ private static NamenodeProtocol createNNProxyWithNamenodeProtocol( * * Note that dfs.client.retry.max < 0 is not allowed. */ - public static RetryPolicy getDefaultRetryPolicy(Configuration conf) { + private static RetryPolicy getDefaultRpcRetryPolicy(Configuration conf) { final RetryPolicy multipleLinearRandomRetry = getMultipleLinearRandomRetry(conf); if (LOG.isDebugEnabled()) { LOG.debug("multipleLinearRandomRetry = " + multipleLinearRandomRetry); @@ -300,13 +300,6 @@ public RetryAction shouldRetry(Exception e, int retries, int failovers, + p.getClass().getSimpleName() + ", exception=" + e); return p.shouldRetry(e, retries, failovers, isMethodIdempotent); } - - @Override - public String toString() { - return "RetryPolicy[" + multipleLinearRandomRetry + ", " - + RetryPolicies.TRY_ONCE_THEN_FAIL.getClass().getSimpleName() - + "]"; - } }; } } @@ -342,7 +335,7 @@ private static ClientProtocol createNNProxyWithClientProtocol( boolean withRetries) throws IOException { RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class); - final RetryPolicy defaultPolicy = getDefaultRetryPolicy(conf); + final RetryPolicy defaultPolicy = getDefaultRpcRetryPolicy(conf); final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class); ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy( ClientNamenodeProtocolPB.class, version, address, ugi, conf, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java index 44e70e8636a..420d74d5829 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java @@ -55,7 +55,6 @@ import org.apache.hadoop.hdfs.ByteRangeInputStream; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; @@ -89,7 +88,6 @@ import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam; import org.apache.hadoop.hdfs.web.resources.UserParam; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; @@ -149,7 +147,6 @@ public static boolean isEnabled(final Configuration conf, final Log log) { private URI uri; private Token delegationToken; private final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token(); - private RetryPolicy retryPolicy = null; private Path workingDir; { @@ -182,7 +179,6 @@ public synchronized void initialize(URI uri, Configuration conf throw new IllegalArgumentException(e); } this.nnAddr = NetUtils.createSocketAddr(uri.getAuthority(), getDefaultPort()); - this.retryPolicy = NameNodeProxies.getDefaultRetryPolicy(conf); this.workingDir = getHomeDirectory(); if (UserGroupInformation.isSecurityEnabled()) { @@ -280,13 +276,13 @@ private Path makeAbsolute(Path f) { } private static Map validateResponse(final HttpOpParam.Op op, - final HttpURLConnection conn, boolean unwrapException) throws IOException { + final HttpURLConnection conn) throws IOException { final int code = conn.getResponseCode(); if (code != op.getExpectedHttpResponseCode()) { final Map m; try { m = jsonParse(conn, true); - } catch(Exception e) { + } catch(IOException e) { throw new IOException("Unexpected HTTP response: code=" + code + " != " + op.getExpectedHttpResponseCode() + ", " + op.toQueryString() + ", message=" + conn.getResponseMessage(), e); @@ -297,42 +293,21 @@ private Path makeAbsolute(Path f) { } final RemoteException re = JsonUtil.toRemoteException(m); - throw unwrapException? toIOException(re): re; + throw re.unwrapRemoteException(AccessControlException.class, + InvalidToken.class, + AuthenticationException.class, + AuthorizationException.class, + FileAlreadyExistsException.class, + FileNotFoundException.class, + ParentNotDirectoryException.class, + UnresolvedPathException.class, + SafeModeException.class, + DSQuotaExceededException.class, + NSQuotaExceededException.class); } return null; } - /** - * Covert an exception to an IOException. - * - * For a non-IOException, wrap it with IOException. - * For a RemoteException, unwrap it. - * For an IOException which is not a RemoteException, return it. - */ - private static IOException toIOException(Exception e) { - if (!(e instanceof IOException)) { - return new IOException(e); - } - - final IOException ioe = (IOException)e; - if (!(ioe instanceof RemoteException)) { - return ioe; - } - - final RemoteException re = (RemoteException)ioe; - return re.unwrapRemoteException(AccessControlException.class, - InvalidToken.class, - AuthenticationException.class, - AuthorizationException.class, - FileAlreadyExistsException.class, - FileNotFoundException.class, - ParentNotDirectoryException.class, - UnresolvedPathException.class, - SafeModeException.class, - DSQuotaExceededException.class, - NSQuotaExceededException.class); - } - /** * Return a URL pointing to given path on the namenode. * @@ -387,15 +362,70 @@ URL toUrl(final HttpOpParam.Op op, final Path fspath, } private HttpURLConnection getHttpUrlConnection(URL url) - throws IOException, AuthenticationException { + throws IOException { final HttpURLConnection conn; - if (ugi.hasKerberosCredentials()) { - conn = new AuthenticatedURL(AUTH).openConnection(url, authToken); - } else { - conn = (HttpURLConnection)url.openConnection(); + try { + if (ugi.hasKerberosCredentials()) { + conn = new AuthenticatedURL(AUTH).openConnection(url, authToken); + } else { + conn = (HttpURLConnection)url.openConnection(); + } + } catch (AuthenticationException e) { + throw new IOException("Authentication failed, url=" + url, e); } return conn; } + + private HttpURLConnection httpConnect(final HttpOpParam.Op op, final Path fspath, + final Param... parameters) throws IOException { + final URL url = toUrl(op, fspath, parameters); + + //connect and get response + HttpURLConnection conn = getHttpUrlConnection(url); + try { + conn.setRequestMethod(op.getType().toString()); + if (op.getDoOutput()) { + conn = twoStepWrite(conn, op); + conn.setRequestProperty("Content-Type", "application/octet-stream"); + } + conn.setDoOutput(op.getDoOutput()); + conn.connect(); + return conn; + } catch (IOException e) { + conn.disconnect(); + throw e; + } + } + + /** + * Two-step Create/Append: + * Step 1) Submit a Http request with neither auto-redirect nor data. + * Step 2) Submit another Http request with the URL from the Location header with data. + * + * The reason of having two-step create/append is for preventing clients to + * send out the data before the redirect. This issue is addressed by the + * "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3. + * Unfortunately, there are software library bugs (e.g. Jetty 6 http server + * and Java 6 http client), which do not correctly implement "Expect: + * 100-continue". The two-step create/append is a temporary workaround for + * the software library bugs. + */ + static HttpURLConnection twoStepWrite(HttpURLConnection conn, + final HttpOpParam.Op op) throws IOException { + //Step 1) Submit a Http request with neither auto-redirect nor data. + conn.setInstanceFollowRedirects(false); + conn.setDoOutput(false); + conn.connect(); + validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op), conn); + final String redirect = conn.getHeaderField("Location"); + conn.disconnect(); + + //Step 2) Submit another Http request with the URL from the Location header with data. + conn = (HttpURLConnection)new URL(redirect).openConnection(); + conn.setRequestMethod(op.getType().toString()); + conn.setChunkedStreamingMode(32 << 10); //32kB-chunk + return conn; + } /** * Run a http operation. @@ -409,158 +439,12 @@ private HttpURLConnection getHttpUrlConnection(URL url) */ private Map run(final HttpOpParam.Op op, final Path fspath, final Param... parameters) throws IOException { - return new Runner(op, fspath, parameters).run().json; - } - - /** - * This class is for initialing a HTTP connection, connecting to server, - * obtaining a response, and also handling retry on failures. - */ - class Runner { - private final HttpOpParam.Op op; - private final URL url; - private final boolean redirected; - - private boolean checkRetry; - private HttpURLConnection conn = null; - private Map json = null; - - Runner(final HttpOpParam.Op op, final URL url, final boolean redirected) { - this.op = op; - this.url = url; - this.redirected = redirected; - } - - Runner(final HttpOpParam.Op op, final Path fspath, - final Param... parameters) throws IOException { - this(op, toUrl(op, fspath, parameters), false); - } - - Runner(final HttpOpParam.Op op, final HttpURLConnection conn) { - this(op, null, false); - this.conn = conn; - } - - private void init() throws IOException { - checkRetry = !redirected; - try { - conn = getHttpUrlConnection(url); - } catch(AuthenticationException ae) { - checkRetry = false; - throw new IOException("Authentication failed, url=" + url, ae); - } - } - - private void connect() throws IOException { - connect(op.getDoOutput()); - } - - private void connect(boolean doOutput) throws IOException { - conn.setRequestMethod(op.getType().toString()); - conn.setDoOutput(doOutput); - conn.setInstanceFollowRedirects(false); - conn.connect(); - } - - private void disconnect() { - if (conn != null) { - conn.disconnect(); - conn = null; - } - } - - Runner run() throws IOException { - for(int retry = 0; ; retry++) { - try { - init(); - if (op.getDoOutput()) { - twoStepWrite(); - } else { - getResponse(op != GetOpParam.Op.OPEN); - } - return this; - } catch(IOException ioe) { - shouldRetry(ioe, retry); - } - } - } - - private void shouldRetry(final IOException ioe, final int retry - ) throws IOException { - if (checkRetry) { - try { - final RetryPolicy.RetryAction a = retryPolicy.shouldRetry( - ioe, retry, 0, true); - if (a.action == RetryPolicy.RetryAction.RetryDecision.RETRY) { - LOG.info("Retrying connect to namenode: " + nnAddr - + ". Already tried " + retry + " time(s); retry policy is " - + retryPolicy + ", delay " + a.delayMillis + "ms."); - Thread.sleep(a.delayMillis); - return; - } - } catch(Exception e) { - LOG.warn("Original exception is ", ioe); - throw toIOException(e); - } - } - throw toIOException(ioe); - } - - /** - * Two-step Create/Append: - * Step 1) Submit a Http request with neither auto-redirect nor data. - * Step 2) Submit another Http request with the URL from the Location header with data. - * - * The reason of having two-step create/append is for preventing clients to - * send out the data before the redirect. This issue is addressed by the - * "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3. - * Unfortunately, there are software library bugs (e.g. Jetty 6 http server - * and Java 6 http client), which do not correctly implement "Expect: - * 100-continue". The two-step create/append is a temporary workaround for - * the software library bugs. - */ - HttpURLConnection twoStepWrite() throws IOException { - //Step 1) Submit a Http request with neither auto-redirect nor data. - connect(false); - validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op), conn, false); - final String redirect = conn.getHeaderField("Location"); - disconnect(); - checkRetry = false; - - //Step 2) Submit another Http request with the URL from the Location header with data. - conn = (HttpURLConnection)new URL(redirect).openConnection(); - conn.setChunkedStreamingMode(32 << 10); //32kB-chunk - connect(); - return conn; - } - - FSDataOutputStream write(final int bufferSize) throws IOException { - return WebHdfsFileSystem.this.write(op, conn, bufferSize); - } - - void getResponse(boolean getJsonAndDisconnect) throws IOException { - try { - connect(); - if (!redirected && op.getRedirect()) { - final String redirect = conn.getHeaderField("Location"); - json = validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op), - conn, false); - disconnect(); - - checkRetry = false; - conn = (HttpURLConnection)new URL(redirect).openConnection(); - connect(); - } - - json = validateResponse(op, conn, false); - if (json == null && getJsonAndDisconnect) { - json = jsonParse(conn, false); - } - } finally { - if (getJsonAndDisconnect) { - disconnect(); - } - } + final HttpURLConnection conn = httpConnect(op, fspath, parameters); + try { + final Map m = validateResponse(op, conn); + return m != null? m: jsonParse(conn, false); + } finally { + conn.disconnect(); } } @@ -694,7 +578,7 @@ public void close() throws IOException { super.close(); } finally { try { - validateResponse(op, conn, true); + validateResponse(op, conn); } finally { conn.disconnect(); } @@ -710,14 +594,13 @@ public FSDataOutputStream create(final Path f, final FsPermission permission, statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.CREATE; - return new Runner(op, f, + final HttpURLConnection conn = httpConnect(op, f, new PermissionParam(applyUMask(permission)), new OverwriteParam(overwrite), new BufferSizeParam(bufferSize), new ReplicationParam(replication), - new BlockSizeParam(blockSize)) - .run() - .write(bufferSize); + new BlockSizeParam(blockSize)); + return write(op, conn, bufferSize); } @Override @@ -726,9 +609,9 @@ public FSDataOutputStream append(final Path f, final int bufferSize, statistics.incrementWriteOps(1); final HttpOpParam.Op op = PostOpParam.Op.APPEND; - return new Runner(op, f, new BufferSizeParam(bufferSize)) - .run() - .write(bufferSize); + final HttpURLConnection conn = httpConnect(op, f, + new BufferSizeParam(bufferSize)); + return write(op, conn, bufferSize); } @SuppressWarnings("deprecation") @@ -755,17 +638,26 @@ public FSDataInputStream open(final Path f, final int buffersize } class OffsetUrlOpener extends ByteRangeInputStream.URLOpener { + /** The url with offset parameter */ + private URL offsetUrl; + OffsetUrlOpener(final URL url) { super(url); } - /** Setup offset url and connect. */ + /** Open connection with offset url. */ @Override - protected HttpURLConnection connect(final long offset, - final boolean resolved) throws IOException { - final URL offsetUrl = offset == 0L? url - : new URL(url + "&" + new OffsetParam(offset)); - return new Runner(GetOpParam.Op.OPEN, offsetUrl, resolved).run().conn; + protected HttpURLConnection openConnection() throws IOException { + return getHttpUrlConnection(offsetUrl); + } + + /** Setup offset url before open connection. */ + @Override + protected HttpURLConnection openConnection(final long offset) throws IOException { + offsetUrl = offset == 0L? url: new URL(url + "&" + new OffsetParam(offset)); + final HttpURLConnection conn = openConnection(); + conn.setRequestMethod("GET"); + return conn; } } @@ -806,6 +698,12 @@ static class OffsetUrlInputStream extends ByteRangeInputStream { OffsetUrlInputStream(OffsetUrlOpener o, OffsetUrlOpener r) { super(o, r); } + + @Override + protected void checkResponseCode(final HttpURLConnection connection + ) throws IOException { + validateResponse(GetOpParam.Op.OPEN, connection); + } /** Remove offset parameter before returning the resolved url. */ @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/DeleteOpParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/DeleteOpParam.java index a82b8a72c8e..12962b4a4ee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/DeleteOpParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/DeleteOpParam.java @@ -43,11 +43,6 @@ public boolean getDoOutput() { return false; } - @Override - public boolean getRedirect() { - return false; - } - @Override public int getExpectedHttpResponseCode() { return expectedHttpResponseCode; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java index 6bbd9e21693..34d6e12ecf9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java @@ -23,27 +23,25 @@ public class GetOpParam extends HttpOpParam { /** Get operations. */ public static enum Op implements HttpOpParam.Op { - OPEN(true, HttpURLConnection.HTTP_OK), + OPEN(HttpURLConnection.HTTP_OK), - GETFILESTATUS(false, HttpURLConnection.HTTP_OK), - LISTSTATUS(false, HttpURLConnection.HTTP_OK), - GETCONTENTSUMMARY(false, HttpURLConnection.HTTP_OK), - GETFILECHECKSUM(true, HttpURLConnection.HTTP_OK), + GETFILESTATUS(HttpURLConnection.HTTP_OK), + LISTSTATUS(HttpURLConnection.HTTP_OK), + GETCONTENTSUMMARY(HttpURLConnection.HTTP_OK), + GETFILECHECKSUM(HttpURLConnection.HTTP_OK), - GETHOMEDIRECTORY(false, HttpURLConnection.HTTP_OK), - GETDELEGATIONTOKEN(false, HttpURLConnection.HTTP_OK), - GETDELEGATIONTOKENS(false, HttpURLConnection.HTTP_OK), + GETHOMEDIRECTORY(HttpURLConnection.HTTP_OK), + GETDELEGATIONTOKEN(HttpURLConnection.HTTP_OK), + GETDELEGATIONTOKENS(HttpURLConnection.HTTP_OK), /** GET_BLOCK_LOCATIONS is a private unstable op. */ - GET_BLOCK_LOCATIONS(false, HttpURLConnection.HTTP_OK), + GET_BLOCK_LOCATIONS(HttpURLConnection.HTTP_OK), - NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED); + NULL(HttpURLConnection.HTTP_NOT_IMPLEMENTED); - final boolean redirect; final int expectedHttpResponseCode; - Op(final boolean redirect, final int expectedHttpResponseCode) { - this.redirect = redirect; + Op(final int expectedHttpResponseCode) { this.expectedHttpResponseCode = expectedHttpResponseCode; } @@ -57,11 +55,6 @@ public boolean getDoOutput() { return false; } - @Override - public boolean getRedirect() { - return redirect; - } - @Override public int getExpectedHttpResponseCode() { return expectedHttpResponseCode; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java index 9765f67996c..ab32ab59aa3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java @@ -17,10 +17,6 @@ */ package org.apache.hadoop.hdfs.web.resources; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - import javax.ws.rs.core.Response; @@ -46,9 +42,6 @@ public static interface Op { /** @return true if the operation will do output. */ public boolean getDoOutput(); - /** @return true if the operation will be redirected. */ - public boolean getRedirect(); - /** @return true the expected http response code. */ public int getExpectedHttpResponseCode(); @@ -58,25 +51,15 @@ public static interface Op { /** Expects HTTP response 307 "Temporary Redirect". */ public static class TemporaryRedirectOp implements Op { - static final TemporaryRedirectOp CREATE = new TemporaryRedirectOp( - PutOpParam.Op.CREATE); - static final TemporaryRedirectOp APPEND = new TemporaryRedirectOp( - PostOpParam.Op.APPEND); - static final TemporaryRedirectOp OPEN = new TemporaryRedirectOp( - GetOpParam.Op.OPEN); - static final TemporaryRedirectOp GETFILECHECKSUM = new TemporaryRedirectOp( - GetOpParam.Op.GETFILECHECKSUM); + static final TemporaryRedirectOp CREATE = new TemporaryRedirectOp(PutOpParam.Op.CREATE); + static final TemporaryRedirectOp APPEND = new TemporaryRedirectOp(PostOpParam.Op.APPEND); - static final List values - = Collections.unmodifiableList(Arrays.asList( - new TemporaryRedirectOp[]{CREATE, APPEND, OPEN, GETFILECHECKSUM})); - /** Get an object for the given op. */ public static TemporaryRedirectOp valueOf(final Op op) { - for(TemporaryRedirectOp t : values) { - if (op == t.op) { - return t; - } + if (op == CREATE.op) { + return CREATE; + } else if (op == APPEND.op) { + return APPEND; } throw new IllegalArgumentException(op + " not found."); } @@ -97,11 +80,6 @@ public boolean getDoOutput() { return op.getDoOutput(); } - @Override - public boolean getRedirect() { - return false; - } - /** Override the original expected response with "Temporary Redirect". */ @Override public int getExpectedHttpResponseCode() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PostOpParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PostOpParam.java index 83f2cd19155..89c02fe0d79 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PostOpParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PostOpParam.java @@ -43,11 +43,6 @@ public boolean getDoOutput() { return true; } - @Override - public boolean getRedirect() { - return true; - } - @Override public int getExpectedHttpResponseCode() { return expectedHttpResponseCode; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java index 77bad214225..b6fc9198801 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java @@ -39,11 +39,11 @@ public static enum Op implements HttpOpParam.Op { NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED); - final boolean doOutputAndRedirect; + final boolean doOutput; final int expectedHttpResponseCode; - Op(final boolean doOutputAndRedirect, final int expectedHttpResponseCode) { - this.doOutputAndRedirect = doOutputAndRedirect; + Op(final boolean doOutput, final int expectedHttpResponseCode) { + this.doOutput = doOutput; this.expectedHttpResponseCode = expectedHttpResponseCode; } @@ -54,12 +54,7 @@ public HttpOpParam.Type getType() { @Override public boolean getDoOutput() { - return doOutputAndRedirect; - } - - @Override - public boolean getRedirect() { - return doOutputAndRedirect; + return doOutput; } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java index f36f4708f4e..520f46382cf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java @@ -47,7 +47,6 @@ import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileStatus; @@ -67,7 +66,6 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; -import org.apache.hadoop.hdfs.web.WebHdfsTestUtil; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; @@ -76,7 +74,6 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; import org.apache.log4j.Level; @@ -775,17 +772,13 @@ public void testClientDNProtocolTimeout() throws IOException { /** Test client retry with namenode restarting. */ @Test public void testNamenodeRestart() throws Exception { - namenodeRestartTest(new Configuration(), false); - } - - public static void namenodeRestartTest(final Configuration conf, - final boolean isWebHDFS) throws Exception { ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL); final List exceptions = new ArrayList(); final Path dir = new Path("/testNamenodeRestart"); + final Configuration conf = new Configuration(); conf.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, true); final short numDatanodes = 3; @@ -795,18 +788,16 @@ public static void namenodeRestartTest(final Configuration conf, try { cluster.waitActive(); final DistributedFileSystem dfs = cluster.getFileSystem(); - final FileSystem fs = isWebHDFS? - WebHdfsTestUtil.getWebHdfsFileSystem(conf): dfs; final URI uri = dfs.getUri(); assertTrue(HdfsUtils.isHealthy(uri)); //create a file final long length = 1L << 20; final Path file1 = new Path(dir, "foo"); - DFSTestUtil.createFile(fs, file1, length, numDatanodes, 20120406L); + DFSTestUtil.createFile(dfs, file1, length, numDatanodes, 20120406L); //get file status - final FileStatus s1 = fs.getFileStatus(file1); + final FileStatus s1 = dfs.getFileStatus(file1); assertEquals(length, s1.getLen()); //shutdown namenode @@ -814,25 +805,6 @@ public static void namenodeRestartTest(final Configuration conf, cluster.shutdownNameNode(0); assertFalse(HdfsUtils.isHealthy(uri)); - //namenode is down, read the file in a thread - final Thread reader = new Thread(new Runnable() { - @Override - public void run() { - try { - //it should retry till namenode is up. - final FileSystem fs = createFsWithDifferentUsername(conf, isWebHDFS); - final FSDataInputStream in = fs.open(file1); - int count = 0; - for(; in.read() != -1; count++); - in.close(); - assertEquals(s1.getLen(), count); - } catch (Exception e) { - exceptions.add(e); - } - } - }); - reader.start(); - //namenode is down, create another file in a thread final Path file3 = new Path(dir, "file"); final Thread thread = new Thread(new Runnable() { @@ -840,7 +812,7 @@ public void run() { public void run() { try { //it should retry till namenode is up. - final FileSystem fs = createFsWithDifferentUsername(conf, isWebHDFS); + final FileSystem fs = AppendTestUtil.createHdfsWithDifferentUsername(conf); DFSTestUtil.createFile(fs, file3, length, numDatanodes, 20120406L); } catch (Exception e) { exceptions.add(e); @@ -867,15 +839,12 @@ public void run() { }).start(); //namenode is down, it should retry until namenode is up again. - final FileStatus s2 = fs.getFileStatus(file1); + final FileStatus s2 = dfs.getFileStatus(file1); assertEquals(s1, s2); //check file1 and file3 thread.join(); - assertEquals(s1.getLen(), fs.getFileStatus(file3).getLen()); - assertEquals(fs.getFileChecksum(file1), fs.getFileChecksum(file3)); - - reader.join(); + assertEquals(dfs.getFileChecksum(file1), dfs.getFileChecksum(file3)); //enter safe mode assertTrue(HdfsUtils.isHealthy(uri)); @@ -900,8 +869,8 @@ public void run() { //namenode is in safe mode, create should retry until it leaves safe mode. final Path file2 = new Path(dir, "bar"); - DFSTestUtil.createFile(fs, file2, length, numDatanodes, 20120406L); - assertEquals(fs.getFileChecksum(file1), fs.getFileChecksum(file2)); + DFSTestUtil.createFile(dfs, file2, length, numDatanodes, 20120406L); + assertEquals(dfs.getFileChecksum(file1), dfs.getFileChecksum(file2)); assertTrue(HdfsUtils.isHealthy(uri)); @@ -909,7 +878,7 @@ public void run() { final Path nonExisting = new Path(dir, "nonExisting"); LOG.info("setPermission: " + nonExisting); try { - fs.setPermission(nonExisting, new FsPermission((short)0)); + dfs.setPermission(nonExisting, new FsPermission((short)0)); fail(); } catch(FileNotFoundException fnfe) { LOG.info("GOOD!", fnfe); @@ -927,17 +896,6 @@ public void run() { } } - public static FileSystem createFsWithDifferentUsername( - final Configuration conf, final boolean isWebHDFS - ) throws IOException, InterruptedException { - String username = UserGroupInformation.getCurrentUser().getShortUserName()+"_XXX"; - UserGroupInformation ugi = - UserGroupInformation.createUserForTesting(username, new String[]{"supergroup"}); - - return isWebHDFS? WebHdfsTestUtil.getWebHdfsFileSystemAs(ugi, conf) - : DFSTestUtil.getFileSystemAs(ugi, conf); - } - @Test public void testMultipleLinearRandomRetry() { parseMultipleLinearRandomRetry(null, ""); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java index fe32fcdfbe5..02c04141050 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java @@ -44,6 +44,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; @@ -139,7 +140,9 @@ public void testDelegationTokenWithRealUser() throws IOException { .doAs(new PrivilegedExceptionAction>() { @Override public Token run() throws IOException { - return cluster.getFileSystem().getDelegationToken("RenewerUser"); + DistributedFileSystem dfs = (DistributedFileSystem) cluster + .getFileSystem(); + return dfs.getDelegationToken("RenewerUser"); } }); DelegationTokenIdentifier identifier = new DelegationTokenIdentifier(); @@ -203,7 +206,7 @@ public String getName() { final PutOpParam.Op op = PutOpParam.Op.CREATE; final URL url = WebHdfsTestUtil.toUrl(webhdfs, op, f, new DoAsParam(PROXY_USER)); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); - conn = WebHdfsTestUtil.twoStepWrite(webhdfs, op, conn); + conn = WebHdfsTestUtil.twoStepWrite(conn, op); final FSDataOutputStream out = WebHdfsTestUtil.write(webhdfs, op, conn, 4096); out.write("Hello, webhdfs user!".getBytes()); out.close(); @@ -218,7 +221,7 @@ public String getName() { final PostOpParam.Op op = PostOpParam.Op.APPEND; final URL url = WebHdfsTestUtil.toUrl(webhdfs, op, f, new DoAsParam(PROXY_USER)); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); - conn = WebHdfsTestUtil.twoStepWrite(webhdfs, op, conn); + conn = WebHdfsTestUtil.twoStepWrite(conn, op); final FSDataOutputStream out = WebHdfsTestUtil.write(webhdfs, op, conn, 4096); out.write("\nHello again!".getBytes()); out.close(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestOffsetUrlInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestOffsetUrlInputStream.java index 1a95fc8c327..4ef0dd680e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestOffsetUrlInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestOffsetUrlInputStream.java @@ -18,10 +18,22 @@ package org.apache.hadoop.hdfs.web; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.io.IOException; +import java.net.URI; import java.net.URL; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.TestByteRangeInputStream.MockHttpURLConnection; +import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.OffsetUrlInputStream; +import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.OffsetUrlOpener; import org.junit.Test; public class TestOffsetUrlInputStream { @@ -61,4 +73,65 @@ public void testRemoveOffset() throws IOException { WebHdfsFileSystem.removeOffsetParam(new URL(s)).toString()); } } + + @Test + public void testByteRange() throws Exception { + final Configuration conf = new Configuration(); + final String uri = WebHdfsFileSystem.SCHEME + "://localhost:50070/"; + final WebHdfsFileSystem webhdfs = (WebHdfsFileSystem)FileSystem.get(new URI(uri), conf); + + OffsetUrlOpener ospy = spy(webhdfs.new OffsetUrlOpener(new URL("http://test/"))); + doReturn(new MockHttpURLConnection(ospy.getURL())).when(ospy) + .openConnection(); + OffsetUrlOpener rspy = spy(webhdfs.new OffsetUrlOpener((URL) null)); + doReturn(new MockHttpURLConnection(rspy.getURL())).when(rspy) + .openConnection(); + final OffsetUrlInputStream is = new OffsetUrlInputStream(ospy, rspy); + + assertEquals("getPos wrong", 0, is.getPos()); + + is.read(); + + assertNull("Initial call made incorrectly (Range Check)", ospy + .openConnection().getRequestProperty("Range")); + + assertEquals("getPos should be 1 after reading one byte", 1, is.getPos()); + + is.read(); + + assertEquals("getPos should be 2 after reading two bytes", 2, is.getPos()); + + // No additional connections should have been made (no seek) + + rspy.setURL(new URL("http://resolvedurl/")); + + is.seek(100); + is.read(); + + assertEquals("getPos should be 101 after reading one byte", 101, + is.getPos()); + + verify(rspy, times(1)).openConnection(); + + is.seek(101); + is.read(); + + verify(rspy, times(1)).openConnection(); + + // Seek to 101 should not result in another request" + + is.seek(2500); + is.read(); + + ((MockHttpURLConnection) rspy.openConnection()).setResponseCode(206); + is.seek(0); + + try { + is.read(); + fail("Exception should be thrown when 206 response is given " + + "but 200 is expected"); + } catch (IOException e) { + WebHdfsFileSystem.LOG.info(e.toString()); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsRetries.java deleted file mode 100644 index 27ff56d8395..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsRetries.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs.web; - -import org.apache.commons.logging.impl.Log4JLogger; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.TestDFSClientRetries; -import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods; -import org.apache.log4j.Level; -import org.junit.Test; - -/** Test WebHdfsFileSystem retry on failures. */ -public class TestWebHdfsRetries { - /** Test client retry with namenode restarting. */ - @Test - public void testNamenodeRestart() throws Exception { - ((Log4JLogger)NamenodeWebHdfsMethods.LOG).getLogger().setLevel(Level.ALL); - final Configuration conf = WebHdfsTestUtil.createConf(); - TestDFSClientRetries.namenodeRestartTest(conf, true); - } -} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java index e51a2524ecd..9ae0fb28c21 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java @@ -79,9 +79,13 @@ public static URL toUrl(final WebHdfsFileSystem webhdfs, return WebHdfsFileSystem.jsonParse(conn, false); } - public static HttpURLConnection twoStepWrite(final WebHdfsFileSystem webhdfs, - final HttpOpParam.Op op, HttpURLConnection conn) throws IOException { - return webhdfs.new Runner(op, conn).twoStepWrite(); + public static HttpURLConnection twoStepWrite(HttpURLConnection conn, + final HttpOpParam.Op op) throws IOException { + conn.setRequestMethod(op.getType().toString()); + conn = WebHdfsFileSystem.twoStepWrite(conn, op); + conn.setDoOutput(true); + conn.connect(); + return conn; } public static FSDataOutputStream write(final WebHdfsFileSystem webhdfs,