From e4eec269d91ae541a321ae2f28ff03310682b3fe Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Sat, 28 Jul 2012 05:57:47 +0000 Subject: [PATCH] HDFS-3667. Add retry support to WebHdfsFileSystem. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1366601 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../hadoop/hdfs/ByteRangeInputStream.java | 18 +- .../apache/hadoop/hdfs/HftpFileSystem.java | 29 +- .../apache/hadoop/hdfs/NameNodeProxies.java | 11 +- .../hadoop/hdfs/web/WebHdfsFileSystem.java | 316 ++++++++++++------ .../hdfs/web/resources/DeleteOpParam.java | 5 + .../hadoop/hdfs/web/resources/GetOpParam.java | 29 +- .../hdfs/web/resources/HttpOpParam.java | 34 +- .../hdfs/web/resources/PostOpParam.java | 5 + .../hadoop/hdfs/web/resources/PutOpParam.java | 13 +- .../hadoop/hdfs/TestDFSClientRetries.java | 60 +++- .../TestDelegationTokenForProxyUser.java | 9 +- .../hdfs/web/TestOffsetUrlInputStream.java | 73 ---- .../hadoop/hdfs/web/TestWebHdfsRetries.java | 37 ++ .../hadoop/hdfs/web/WebHdfsTestUtil.java | 10 +- 15 files changed, 396 insertions(+), 255 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsRetries.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 1f9ce6f58d3..5b5a503c8f5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -373,6 +373,8 @@ Branch-2 ( Unreleased changes ) HDFS-3697. Enable fadvise readahead by default. (todd) + HDFS-3667. Add retry support to WebHdfsFileSystem. (szetszwo) + BUG FIXES HDFS-3385. The last block of INodeFileUnderConstruction is not diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java index 62c67f9e9d0..e745714c9be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java @@ -57,9 +57,9 @@ public abstract class ByteRangeInputStream extends FSInputStream { return url; } - protected abstract HttpURLConnection openConnection() throws IOException; - - protected abstract HttpURLConnection openConnection(final long offset) throws IOException; + /** Connect to server with a data offset. */ + protected abstract HttpURLConnection connect(final long offset, + final boolean resolved) throws IOException; } enum StreamStatus { @@ -85,9 +85,6 @@ public abstract class ByteRangeInputStream extends FSInputStream { this.resolvedURL = r; } - protected abstract void checkResponseCode(final HttpURLConnection connection - ) throws IOException; - protected abstract URL getResolvedUrl(final HttpURLConnection connection ) throws IOException; @@ -113,13 +110,10 @@ public abstract class ByteRangeInputStream extends FSInputStream { protected InputStream openInputStream() throws IOException { // Use the original url if no resolved url exists, eg. if // it's the first time a request is made. - final URLOpener opener = - (resolvedURL.getURL() == null) ? originalURL : resolvedURL; - - final HttpURLConnection connection = opener.openConnection(startPos); - connection.connect(); - checkResponseCode(connection); + final boolean resolved = resolvedURL.getURL() != null; + final URLOpener opener = resolved? resolvedURL: originalURL; + final HttpURLConnection connection = opener.connect(startPos, resolved); final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH); if (cl == null) { throw new IOException(StreamFile.CONTENT_LENGTH+" header is missing"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java index 6df5f2dd9b0..8c73e2a6bee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java @@ -342,19 +342,28 @@ public class HftpFileSystem extends FileSystem super(url); } - @Override protected HttpURLConnection openConnection() throws IOException { return (HttpURLConnection)URLUtils.openConnection(url); } /** Use HTTP Range header for specifying offset. */ @Override - protected HttpURLConnection openConnection(final long offset) throws IOException { + protected HttpURLConnection connect(final long offset, + final boolean resolved) throws IOException { final HttpURLConnection conn = openConnection(); conn.setRequestMethod("GET"); if (offset != 0L) { conn.setRequestProperty("Range", "bytes=" + offset + "-"); } + conn.connect(); + + //Expects HTTP_OK or HTTP_PARTIAL response codes. + final int code = conn.getResponseCode(); + if (offset != 0L && code != HttpURLConnection.HTTP_PARTIAL) { + throw new IOException("HTTP_PARTIAL expected, received " + code); + } else if (offset == 0L && code != HttpURLConnection.HTTP_OK) { + throw new IOException("HTTP_OK expected, received " + code); + } return conn; } } @@ -368,22 +377,6 @@ public class HftpFileSystem extends FileSystem this(new RangeHeaderUrlOpener(url), new RangeHeaderUrlOpener(null)); } - /** Expects HTTP_OK and HTTP_PARTIAL response codes. */ - @Override - protected void checkResponseCode(final HttpURLConnection connection - ) throws IOException { - final int code = connection.getResponseCode(); - if (startPos != 0 && code != HttpURLConnection.HTTP_PARTIAL) { - // We asked for a byte range but did not receive a partial content - // response... - throw new IOException("HTTP_PARTIAL expected, received " + code); - } else if (startPos == 0 && code != HttpURLConnection.HTTP_OK) { - // We asked for all bytes from the beginning but didn't receive a 200 - // response (none of the other 2xx codes are valid here) - throw new IOException("HTTP_OK expected, received " + code); - } - } - @Override protected URL getResolvedUrl(final HttpURLConnection connection) { return connection.getURL(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java index cc6517daa52..12ec985fb25 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java @@ -259,7 +259,7 @@ public class NameNodeProxies { * * Note that dfs.client.retry.max < 0 is not allowed. */ - private static RetryPolicy getDefaultRpcRetryPolicy(Configuration conf) { + public static RetryPolicy getDefaultRetryPolicy(Configuration conf) { final RetryPolicy multipleLinearRandomRetry = getMultipleLinearRandomRetry(conf); if (LOG.isDebugEnabled()) { LOG.debug("multipleLinearRandomRetry = " + multipleLinearRandomRetry); @@ -300,6 +300,13 @@ public class NameNodeProxies { + p.getClass().getSimpleName() + ", exception=" + e); return p.shouldRetry(e, retries, failovers, isMethodIdempotent); } + + @Override + public String toString() { + return "RetryPolicy[" + multipleLinearRandomRetry + ", " + + RetryPolicies.TRY_ONCE_THEN_FAIL.getClass().getSimpleName() + + "]"; + } }; } } @@ -335,7 +342,7 @@ public class NameNodeProxies { boolean withRetries) throws IOException { RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class); - final RetryPolicy defaultPolicy = getDefaultRpcRetryPolicy(conf); + final RetryPolicy defaultPolicy = getDefaultRetryPolicy(conf); final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class); ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy( ClientNamenodeProtocolPB.class, version, address, ugi, conf, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java index 420d74d5829..44e70e8636a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java @@ -55,6 +55,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.ByteRangeInputStream; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; @@ -88,6 +89,7 @@ import org.apache.hadoop.hdfs.web.resources.ReplicationParam; import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam; import org.apache.hadoop.hdfs.web.resources.UserParam; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; @@ -147,6 +149,7 @@ public class WebHdfsFileSystem extends FileSystem private URI uri; private Token delegationToken; private final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token(); + private RetryPolicy retryPolicy = null; private Path workingDir; { @@ -179,6 +182,7 @@ public class WebHdfsFileSystem extends FileSystem throw new IllegalArgumentException(e); } this.nnAddr = NetUtils.createSocketAddr(uri.getAuthority(), getDefaultPort()); + this.retryPolicy = NameNodeProxies.getDefaultRetryPolicy(conf); this.workingDir = getHomeDirectory(); if (UserGroupInformation.isSecurityEnabled()) { @@ -276,13 +280,13 @@ public class WebHdfsFileSystem extends FileSystem } private static Map validateResponse(final HttpOpParam.Op op, - final HttpURLConnection conn) throws IOException { + final HttpURLConnection conn, boolean unwrapException) throws IOException { final int code = conn.getResponseCode(); if (code != op.getExpectedHttpResponseCode()) { final Map m; try { m = jsonParse(conn, true); - } catch(IOException e) { + } catch(Exception e) { throw new IOException("Unexpected HTTP response: code=" + code + " != " + op.getExpectedHttpResponseCode() + ", " + op.toQueryString() + ", message=" + conn.getResponseMessage(), e); @@ -293,21 +297,42 @@ public class WebHdfsFileSystem extends FileSystem } final RemoteException re = JsonUtil.toRemoteException(m); - throw re.unwrapRemoteException(AccessControlException.class, - InvalidToken.class, - AuthenticationException.class, - AuthorizationException.class, - FileAlreadyExistsException.class, - FileNotFoundException.class, - ParentNotDirectoryException.class, - UnresolvedPathException.class, - SafeModeException.class, - DSQuotaExceededException.class, - NSQuotaExceededException.class); + throw unwrapException? toIOException(re): re; } return null; } + /** + * Covert an exception to an IOException. + * + * For a non-IOException, wrap it with IOException. + * For a RemoteException, unwrap it. + * For an IOException which is not a RemoteException, return it. + */ + private static IOException toIOException(Exception e) { + if (!(e instanceof IOException)) { + return new IOException(e); + } + + final IOException ioe = (IOException)e; + if (!(ioe instanceof RemoteException)) { + return ioe; + } + + final RemoteException re = (RemoteException)ioe; + return re.unwrapRemoteException(AccessControlException.class, + InvalidToken.class, + AuthenticationException.class, + AuthorizationException.class, + FileAlreadyExistsException.class, + FileNotFoundException.class, + ParentNotDirectoryException.class, + UnresolvedPathException.class, + SafeModeException.class, + DSQuotaExceededException.class, + NSQuotaExceededException.class); + } + /** * Return a URL pointing to given path on the namenode. * @@ -362,70 +387,15 @@ public class WebHdfsFileSystem extends FileSystem } private HttpURLConnection getHttpUrlConnection(URL url) - throws IOException { + throws IOException, AuthenticationException { final HttpURLConnection conn; - try { - if (ugi.hasKerberosCredentials()) { - conn = new AuthenticatedURL(AUTH).openConnection(url, authToken); - } else { - conn = (HttpURLConnection)url.openConnection(); - } - } catch (AuthenticationException e) { - throw new IOException("Authentication failed, url=" + url, e); + if (ugi.hasKerberosCredentials()) { + conn = new AuthenticatedURL(AUTH).openConnection(url, authToken); + } else { + conn = (HttpURLConnection)url.openConnection(); } return conn; } - - private HttpURLConnection httpConnect(final HttpOpParam.Op op, final Path fspath, - final Param... parameters) throws IOException { - final URL url = toUrl(op, fspath, parameters); - - //connect and get response - HttpURLConnection conn = getHttpUrlConnection(url); - try { - conn.setRequestMethod(op.getType().toString()); - if (op.getDoOutput()) { - conn = twoStepWrite(conn, op); - conn.setRequestProperty("Content-Type", "application/octet-stream"); - } - conn.setDoOutput(op.getDoOutput()); - conn.connect(); - return conn; - } catch (IOException e) { - conn.disconnect(); - throw e; - } - } - - /** - * Two-step Create/Append: - * Step 1) Submit a Http request with neither auto-redirect nor data. - * Step 2) Submit another Http request with the URL from the Location header with data. - * - * The reason of having two-step create/append is for preventing clients to - * send out the data before the redirect. This issue is addressed by the - * "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3. - * Unfortunately, there are software library bugs (e.g. Jetty 6 http server - * and Java 6 http client), which do not correctly implement "Expect: - * 100-continue". The two-step create/append is a temporary workaround for - * the software library bugs. - */ - static HttpURLConnection twoStepWrite(HttpURLConnection conn, - final HttpOpParam.Op op) throws IOException { - //Step 1) Submit a Http request with neither auto-redirect nor data. - conn.setInstanceFollowRedirects(false); - conn.setDoOutput(false); - conn.connect(); - validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op), conn); - final String redirect = conn.getHeaderField("Location"); - conn.disconnect(); - - //Step 2) Submit another Http request with the URL from the Location header with data. - conn = (HttpURLConnection)new URL(redirect).openConnection(); - conn.setRequestMethod(op.getType().toString()); - conn.setChunkedStreamingMode(32 << 10); //32kB-chunk - return conn; - } /** * Run a http operation. @@ -439,12 +409,158 @@ public class WebHdfsFileSystem extends FileSystem */ private Map run(final HttpOpParam.Op op, final Path fspath, final Param... parameters) throws IOException { - final HttpURLConnection conn = httpConnect(op, fspath, parameters); - try { - final Map m = validateResponse(op, conn); - return m != null? m: jsonParse(conn, false); - } finally { - conn.disconnect(); + return new Runner(op, fspath, parameters).run().json; + } + + /** + * This class is for initialing a HTTP connection, connecting to server, + * obtaining a response, and also handling retry on failures. + */ + class Runner { + private final HttpOpParam.Op op; + private final URL url; + private final boolean redirected; + + private boolean checkRetry; + private HttpURLConnection conn = null; + private Map json = null; + + Runner(final HttpOpParam.Op op, final URL url, final boolean redirected) { + this.op = op; + this.url = url; + this.redirected = redirected; + } + + Runner(final HttpOpParam.Op op, final Path fspath, + final Param... parameters) throws IOException { + this(op, toUrl(op, fspath, parameters), false); + } + + Runner(final HttpOpParam.Op op, final HttpURLConnection conn) { + this(op, null, false); + this.conn = conn; + } + + private void init() throws IOException { + checkRetry = !redirected; + try { + conn = getHttpUrlConnection(url); + } catch(AuthenticationException ae) { + checkRetry = false; + throw new IOException("Authentication failed, url=" + url, ae); + } + } + + private void connect() throws IOException { + connect(op.getDoOutput()); + } + + private void connect(boolean doOutput) throws IOException { + conn.setRequestMethod(op.getType().toString()); + conn.setDoOutput(doOutput); + conn.setInstanceFollowRedirects(false); + conn.connect(); + } + + private void disconnect() { + if (conn != null) { + conn.disconnect(); + conn = null; + } + } + + Runner run() throws IOException { + for(int retry = 0; ; retry++) { + try { + init(); + if (op.getDoOutput()) { + twoStepWrite(); + } else { + getResponse(op != GetOpParam.Op.OPEN); + } + return this; + } catch(IOException ioe) { + shouldRetry(ioe, retry); + } + } + } + + private void shouldRetry(final IOException ioe, final int retry + ) throws IOException { + if (checkRetry) { + try { + final RetryPolicy.RetryAction a = retryPolicy.shouldRetry( + ioe, retry, 0, true); + if (a.action == RetryPolicy.RetryAction.RetryDecision.RETRY) { + LOG.info("Retrying connect to namenode: " + nnAddr + + ". Already tried " + retry + " time(s); retry policy is " + + retryPolicy + ", delay " + a.delayMillis + "ms."); + Thread.sleep(a.delayMillis); + return; + } + } catch(Exception e) { + LOG.warn("Original exception is ", ioe); + throw toIOException(e); + } + } + throw toIOException(ioe); + } + + /** + * Two-step Create/Append: + * Step 1) Submit a Http request with neither auto-redirect nor data. + * Step 2) Submit another Http request with the URL from the Location header with data. + * + * The reason of having two-step create/append is for preventing clients to + * send out the data before the redirect. This issue is addressed by the + * "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3. + * Unfortunately, there are software library bugs (e.g. Jetty 6 http server + * and Java 6 http client), which do not correctly implement "Expect: + * 100-continue". The two-step create/append is a temporary workaround for + * the software library bugs. + */ + HttpURLConnection twoStepWrite() throws IOException { + //Step 1) Submit a Http request with neither auto-redirect nor data. + connect(false); + validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op), conn, false); + final String redirect = conn.getHeaderField("Location"); + disconnect(); + checkRetry = false; + + //Step 2) Submit another Http request with the URL from the Location header with data. + conn = (HttpURLConnection)new URL(redirect).openConnection(); + conn.setChunkedStreamingMode(32 << 10); //32kB-chunk + connect(); + return conn; + } + + FSDataOutputStream write(final int bufferSize) throws IOException { + return WebHdfsFileSystem.this.write(op, conn, bufferSize); + } + + void getResponse(boolean getJsonAndDisconnect) throws IOException { + try { + connect(); + if (!redirected && op.getRedirect()) { + final String redirect = conn.getHeaderField("Location"); + json = validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op), + conn, false); + disconnect(); + + checkRetry = false; + conn = (HttpURLConnection)new URL(redirect).openConnection(); + connect(); + } + + json = validateResponse(op, conn, false); + if (json == null && getJsonAndDisconnect) { + json = jsonParse(conn, false); + } + } finally { + if (getJsonAndDisconnect) { + disconnect(); + } + } } } @@ -578,7 +694,7 @@ public class WebHdfsFileSystem extends FileSystem super.close(); } finally { try { - validateResponse(op, conn); + validateResponse(op, conn, true); } finally { conn.disconnect(); } @@ -594,13 +710,14 @@ public class WebHdfsFileSystem extends FileSystem statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.CREATE; - final HttpURLConnection conn = httpConnect(op, f, + return new Runner(op, f, new PermissionParam(applyUMask(permission)), new OverwriteParam(overwrite), new BufferSizeParam(bufferSize), new ReplicationParam(replication), - new BlockSizeParam(blockSize)); - return write(op, conn, bufferSize); + new BlockSizeParam(blockSize)) + .run() + .write(bufferSize); } @Override @@ -609,9 +726,9 @@ public class WebHdfsFileSystem extends FileSystem statistics.incrementWriteOps(1); final HttpOpParam.Op op = PostOpParam.Op.APPEND; - final HttpURLConnection conn = httpConnect(op, f, - new BufferSizeParam(bufferSize)); - return write(op, conn, bufferSize); + return new Runner(op, f, new BufferSizeParam(bufferSize)) + .run() + .write(bufferSize); } @SuppressWarnings("deprecation") @@ -638,26 +755,17 @@ public class WebHdfsFileSystem extends FileSystem } class OffsetUrlOpener extends ByteRangeInputStream.URLOpener { - /** The url with offset parameter */ - private URL offsetUrl; - OffsetUrlOpener(final URL url) { super(url); } - /** Open connection with offset url. */ + /** Setup offset url and connect. */ @Override - protected HttpURLConnection openConnection() throws IOException { - return getHttpUrlConnection(offsetUrl); - } - - /** Setup offset url before open connection. */ - @Override - protected HttpURLConnection openConnection(final long offset) throws IOException { - offsetUrl = offset == 0L? url: new URL(url + "&" + new OffsetParam(offset)); - final HttpURLConnection conn = openConnection(); - conn.setRequestMethod("GET"); - return conn; + protected HttpURLConnection connect(final long offset, + final boolean resolved) throws IOException { + final URL offsetUrl = offset == 0L? url + : new URL(url + "&" + new OffsetParam(offset)); + return new Runner(GetOpParam.Op.OPEN, offsetUrl, resolved).run().conn; } } @@ -698,12 +806,6 @@ public class WebHdfsFileSystem extends FileSystem OffsetUrlInputStream(OffsetUrlOpener o, OffsetUrlOpener r) { super(o, r); } - - @Override - protected void checkResponseCode(final HttpURLConnection connection - ) throws IOException { - validateResponse(GetOpParam.Op.OPEN, connection); - } /** Remove offset parameter before returning the resolved url. */ @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/DeleteOpParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/DeleteOpParam.java index 12962b4a4ee..a82b8a72c8e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/DeleteOpParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/DeleteOpParam.java @@ -43,6 +43,11 @@ public class DeleteOpParam extends HttpOpParam { return false; } + @Override + public boolean getRedirect() { + return false; + } + @Override public int getExpectedHttpResponseCode() { return expectedHttpResponseCode; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java index 34d6e12ecf9..6bbd9e21693 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java @@ -23,25 +23,27 @@ import java.net.HttpURLConnection; public class GetOpParam extends HttpOpParam { /** Get operations. */ public static enum Op implements HttpOpParam.Op { - OPEN(HttpURLConnection.HTTP_OK), + OPEN(true, HttpURLConnection.HTTP_OK), - GETFILESTATUS(HttpURLConnection.HTTP_OK), - LISTSTATUS(HttpURLConnection.HTTP_OK), - GETCONTENTSUMMARY(HttpURLConnection.HTTP_OK), - GETFILECHECKSUM(HttpURLConnection.HTTP_OK), + GETFILESTATUS(false, HttpURLConnection.HTTP_OK), + LISTSTATUS(false, HttpURLConnection.HTTP_OK), + GETCONTENTSUMMARY(false, HttpURLConnection.HTTP_OK), + GETFILECHECKSUM(true, HttpURLConnection.HTTP_OK), - GETHOMEDIRECTORY(HttpURLConnection.HTTP_OK), - GETDELEGATIONTOKEN(HttpURLConnection.HTTP_OK), - GETDELEGATIONTOKENS(HttpURLConnection.HTTP_OK), + GETHOMEDIRECTORY(false, HttpURLConnection.HTTP_OK), + GETDELEGATIONTOKEN(false, HttpURLConnection.HTTP_OK), + GETDELEGATIONTOKENS(false, HttpURLConnection.HTTP_OK), /** GET_BLOCK_LOCATIONS is a private unstable op. */ - GET_BLOCK_LOCATIONS(HttpURLConnection.HTTP_OK), + GET_BLOCK_LOCATIONS(false, HttpURLConnection.HTTP_OK), - NULL(HttpURLConnection.HTTP_NOT_IMPLEMENTED); + NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED); + final boolean redirect; final int expectedHttpResponseCode; - Op(final int expectedHttpResponseCode) { + Op(final boolean redirect, final int expectedHttpResponseCode) { + this.redirect = redirect; this.expectedHttpResponseCode = expectedHttpResponseCode; } @@ -55,6 +57,11 @@ public class GetOpParam extends HttpOpParam { return false; } + @Override + public boolean getRedirect() { + return redirect; + } + @Override public int getExpectedHttpResponseCode() { return expectedHttpResponseCode; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java index ab32ab59aa3..9765f67996c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java @@ -17,6 +17,10 @@ */ package org.apache.hadoop.hdfs.web.resources; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + import javax.ws.rs.core.Response; @@ -42,6 +46,9 @@ public abstract class HttpOpParam & HttpOpParam.Op> /** @return true if the operation will do output. */ public boolean getDoOutput(); + /** @return true if the operation will be redirected. */ + public boolean getRedirect(); + /** @return true the expected http response code. */ public int getExpectedHttpResponseCode(); @@ -51,15 +58,25 @@ public abstract class HttpOpParam & HttpOpParam.Op> /** Expects HTTP response 307 "Temporary Redirect". */ public static class TemporaryRedirectOp implements Op { - static final TemporaryRedirectOp CREATE = new TemporaryRedirectOp(PutOpParam.Op.CREATE); - static final TemporaryRedirectOp APPEND = new TemporaryRedirectOp(PostOpParam.Op.APPEND); + static final TemporaryRedirectOp CREATE = new TemporaryRedirectOp( + PutOpParam.Op.CREATE); + static final TemporaryRedirectOp APPEND = new TemporaryRedirectOp( + PostOpParam.Op.APPEND); + static final TemporaryRedirectOp OPEN = new TemporaryRedirectOp( + GetOpParam.Op.OPEN); + static final TemporaryRedirectOp GETFILECHECKSUM = new TemporaryRedirectOp( + GetOpParam.Op.GETFILECHECKSUM); + static final List values + = Collections.unmodifiableList(Arrays.asList( + new TemporaryRedirectOp[]{CREATE, APPEND, OPEN, GETFILECHECKSUM})); + /** Get an object for the given op. */ public static TemporaryRedirectOp valueOf(final Op op) { - if (op == CREATE.op) { - return CREATE; - } else if (op == APPEND.op) { - return APPEND; + for(TemporaryRedirectOp t : values) { + if (op == t.op) { + return t; + } } throw new IllegalArgumentException(op + " not found."); } @@ -80,6 +97,11 @@ public abstract class HttpOpParam & HttpOpParam.Op> return op.getDoOutput(); } + @Override + public boolean getRedirect() { + return false; + } + /** Override the original expected response with "Temporary Redirect". */ @Override public int getExpectedHttpResponseCode() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PostOpParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PostOpParam.java index 89c02fe0d79..83f2cd19155 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PostOpParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PostOpParam.java @@ -43,6 +43,11 @@ public class PostOpParam extends HttpOpParam { return true; } + @Override + public boolean getRedirect() { + return true; + } + @Override public int getExpectedHttpResponseCode() { return expectedHttpResponseCode; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java index b6fc9198801..77bad214225 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java @@ -39,11 +39,11 @@ public class PutOpParam extends HttpOpParam { NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED); - final boolean doOutput; + final boolean doOutputAndRedirect; final int expectedHttpResponseCode; - Op(final boolean doOutput, final int expectedHttpResponseCode) { - this.doOutput = doOutput; + Op(final boolean doOutputAndRedirect, final int expectedHttpResponseCode) { + this.doOutputAndRedirect = doOutputAndRedirect; this.expectedHttpResponseCode = expectedHttpResponseCode; } @@ -54,7 +54,12 @@ public class PutOpParam extends HttpOpParam { @Override public boolean getDoOutput() { - return doOutput; + return doOutputAndRedirect; + } + + @Override + public boolean getRedirect() { + return doOutputAndRedirect; } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java index 5c5de9db5ec..03ba1e96ba5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java @@ -47,6 +47,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileStatus; @@ -66,6 +67,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.hdfs.web.WebHdfsTestUtil; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; @@ -74,6 +76,7 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; import org.apache.log4j.Level; @@ -825,13 +828,17 @@ public class TestDFSClientRetries { /** Test client retry with namenode restarting. */ @Test public void testNamenodeRestart() throws Exception { + namenodeRestartTest(new Configuration(), false); + } + + public static void namenodeRestartTest(final Configuration conf, + final boolean isWebHDFS) throws Exception { ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL); final List exceptions = new ArrayList(); final Path dir = new Path("/testNamenodeRestart"); - final Configuration conf = new Configuration(); conf.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, true); final short numDatanodes = 3; @@ -841,16 +848,18 @@ public class TestDFSClientRetries { try { cluster.waitActive(); final DistributedFileSystem dfs = cluster.getFileSystem(); + final FileSystem fs = isWebHDFS? + WebHdfsTestUtil.getWebHdfsFileSystem(conf): dfs; final URI uri = dfs.getUri(); assertTrue(HdfsUtils.isHealthy(uri)); //create a file final long length = 1L << 20; final Path file1 = new Path(dir, "foo"); - DFSTestUtil.createFile(dfs, file1, length, numDatanodes, 20120406L); + DFSTestUtil.createFile(fs, file1, length, numDatanodes, 20120406L); //get file status - final FileStatus s1 = dfs.getFileStatus(file1); + final FileStatus s1 = fs.getFileStatus(file1); assertEquals(length, s1.getLen()); //shutdown namenode @@ -858,6 +867,25 @@ public class TestDFSClientRetries { cluster.shutdownNameNode(0); assertFalse(HdfsUtils.isHealthy(uri)); + //namenode is down, read the file in a thread + final Thread reader = new Thread(new Runnable() { + @Override + public void run() { + try { + //it should retry till namenode is up. + final FileSystem fs = createFsWithDifferentUsername(conf, isWebHDFS); + final FSDataInputStream in = fs.open(file1); + int count = 0; + for(; in.read() != -1; count++); + in.close(); + assertEquals(s1.getLen(), count); + } catch (Exception e) { + exceptions.add(e); + } + } + }); + reader.start(); + //namenode is down, create another file in a thread final Path file3 = new Path(dir, "file"); final Thread thread = new Thread(new Runnable() { @@ -865,7 +893,7 @@ public class TestDFSClientRetries { public void run() { try { //it should retry till namenode is up. - final FileSystem fs = AppendTestUtil.createHdfsWithDifferentUsername(conf); + final FileSystem fs = createFsWithDifferentUsername(conf, isWebHDFS); DFSTestUtil.createFile(fs, file3, length, numDatanodes, 20120406L); } catch (Exception e) { exceptions.add(e); @@ -892,12 +920,15 @@ public class TestDFSClientRetries { }).start(); //namenode is down, it should retry until namenode is up again. - final FileStatus s2 = dfs.getFileStatus(file1); + final FileStatus s2 = fs.getFileStatus(file1); assertEquals(s1, s2); //check file1 and file3 thread.join(); - assertEquals(dfs.getFileChecksum(file1), dfs.getFileChecksum(file3)); + assertEquals(s1.getLen(), fs.getFileStatus(file3).getLen()); + assertEquals(fs.getFileChecksum(file1), fs.getFileChecksum(file3)); + + reader.join(); //enter safe mode assertTrue(HdfsUtils.isHealthy(uri)); @@ -922,8 +953,8 @@ public class TestDFSClientRetries { //namenode is in safe mode, create should retry until it leaves safe mode. final Path file2 = new Path(dir, "bar"); - DFSTestUtil.createFile(dfs, file2, length, numDatanodes, 20120406L); - assertEquals(dfs.getFileChecksum(file1), dfs.getFileChecksum(file2)); + DFSTestUtil.createFile(fs, file2, length, numDatanodes, 20120406L); + assertEquals(fs.getFileChecksum(file1), fs.getFileChecksum(file2)); assertTrue(HdfsUtils.isHealthy(uri)); @@ -931,7 +962,7 @@ public class TestDFSClientRetries { final Path nonExisting = new Path(dir, "nonExisting"); LOG.info("setPermission: " + nonExisting); try { - dfs.setPermission(nonExisting, new FsPermission((short)0)); + fs.setPermission(nonExisting, new FsPermission((short)0)); fail(); } catch(FileNotFoundException fnfe) { LOG.info("GOOD!", fnfe); @@ -949,6 +980,17 @@ public class TestDFSClientRetries { } } + public static FileSystem createFsWithDifferentUsername( + final Configuration conf, final boolean isWebHDFS + ) throws IOException, InterruptedException { + String username = UserGroupInformation.getCurrentUser().getShortUserName()+"_XXX"; + UserGroupInformation ugi = + UserGroupInformation.createUserForTesting(username, new String[]{"supergroup"}); + + return isWebHDFS? WebHdfsTestUtil.getWebHdfsFileSystemAs(ugi, conf) + : DFSTestUtil.getFileSystemAs(ugi, conf); + } + @Test public void testMultipleLinearRandomRetry() { parseMultipleLinearRandomRetry(null, ""); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java index 02c04141050..fe32fcdfbe5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java @@ -44,7 +44,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; @@ -140,9 +139,7 @@ public class TestDelegationTokenForProxyUser { .doAs(new PrivilegedExceptionAction>() { @Override public Token run() throws IOException { - DistributedFileSystem dfs = (DistributedFileSystem) cluster - .getFileSystem(); - return dfs.getDelegationToken("RenewerUser"); + return cluster.getFileSystem().getDelegationToken("RenewerUser"); } }); DelegationTokenIdentifier identifier = new DelegationTokenIdentifier(); @@ -206,7 +203,7 @@ public class TestDelegationTokenForProxyUser { final PutOpParam.Op op = PutOpParam.Op.CREATE; final URL url = WebHdfsTestUtil.toUrl(webhdfs, op, f, new DoAsParam(PROXY_USER)); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); - conn = WebHdfsTestUtil.twoStepWrite(conn, op); + conn = WebHdfsTestUtil.twoStepWrite(webhdfs, op, conn); final FSDataOutputStream out = WebHdfsTestUtil.write(webhdfs, op, conn, 4096); out.write("Hello, webhdfs user!".getBytes()); out.close(); @@ -221,7 +218,7 @@ public class TestDelegationTokenForProxyUser { final PostOpParam.Op op = PostOpParam.Op.APPEND; final URL url = WebHdfsTestUtil.toUrl(webhdfs, op, f, new DoAsParam(PROXY_USER)); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); - conn = WebHdfsTestUtil.twoStepWrite(conn, op); + conn = WebHdfsTestUtil.twoStepWrite(webhdfs, op, conn); final FSDataOutputStream out = WebHdfsTestUtil.write(webhdfs, op, conn, 4096); out.write("\nHello again!".getBytes()); out.close(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestOffsetUrlInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestOffsetUrlInputStream.java index 4ef0dd680e7..1a95fc8c327 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestOffsetUrlInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestOffsetUrlInputStream.java @@ -18,22 +18,10 @@ package org.apache.hadoop.hdfs.web; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import java.io.IOException; -import java.net.URI; import java.net.URL; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hdfs.TestByteRangeInputStream.MockHttpURLConnection; -import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.OffsetUrlInputStream; -import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.OffsetUrlOpener; import org.junit.Test; public class TestOffsetUrlInputStream { @@ -73,65 +61,4 @@ public class TestOffsetUrlInputStream { WebHdfsFileSystem.removeOffsetParam(new URL(s)).toString()); } } - - @Test - public void testByteRange() throws Exception { - final Configuration conf = new Configuration(); - final String uri = WebHdfsFileSystem.SCHEME + "://localhost:50070/"; - final WebHdfsFileSystem webhdfs = (WebHdfsFileSystem)FileSystem.get(new URI(uri), conf); - - OffsetUrlOpener ospy = spy(webhdfs.new OffsetUrlOpener(new URL("http://test/"))); - doReturn(new MockHttpURLConnection(ospy.getURL())).when(ospy) - .openConnection(); - OffsetUrlOpener rspy = spy(webhdfs.new OffsetUrlOpener((URL) null)); - doReturn(new MockHttpURLConnection(rspy.getURL())).when(rspy) - .openConnection(); - final OffsetUrlInputStream is = new OffsetUrlInputStream(ospy, rspy); - - assertEquals("getPos wrong", 0, is.getPos()); - - is.read(); - - assertNull("Initial call made incorrectly (Range Check)", ospy - .openConnection().getRequestProperty("Range")); - - assertEquals("getPos should be 1 after reading one byte", 1, is.getPos()); - - is.read(); - - assertEquals("getPos should be 2 after reading two bytes", 2, is.getPos()); - - // No additional connections should have been made (no seek) - - rspy.setURL(new URL("http://resolvedurl/")); - - is.seek(100); - is.read(); - - assertEquals("getPos should be 101 after reading one byte", 101, - is.getPos()); - - verify(rspy, times(1)).openConnection(); - - is.seek(101); - is.read(); - - verify(rspy, times(1)).openConnection(); - - // Seek to 101 should not result in another request" - - is.seek(2500); - is.read(); - - ((MockHttpURLConnection) rspy.openConnection()).setResponseCode(206); - is.seek(0); - - try { - is.read(); - fail("Exception should be thrown when 206 response is given " - + "but 200 is expected"); - } catch (IOException e) { - WebHdfsFileSystem.LOG.info(e.toString()); - } - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsRetries.java new file mode 100644 index 00000000000..27ff56d8395 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsRetries.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.web; + +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.TestDFSClientRetries; +import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods; +import org.apache.log4j.Level; +import org.junit.Test; + +/** Test WebHdfsFileSystem retry on failures. */ +public class TestWebHdfsRetries { + /** Test client retry with namenode restarting. */ + @Test + public void testNamenodeRestart() throws Exception { + ((Log4JLogger)NamenodeWebHdfsMethods.LOG).getLogger().setLevel(Level.ALL); + final Configuration conf = WebHdfsTestUtil.createConf(); + TestDFSClientRetries.namenodeRestartTest(conf, true); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java index 9ae0fb28c21..e51a2524ecd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java @@ -79,13 +79,9 @@ public class WebHdfsTestUtil { return WebHdfsFileSystem.jsonParse(conn, false); } - public static HttpURLConnection twoStepWrite(HttpURLConnection conn, - final HttpOpParam.Op op) throws IOException { - conn.setRequestMethod(op.getType().toString()); - conn = WebHdfsFileSystem.twoStepWrite(conn, op); - conn.setDoOutput(true); - conn.connect(); - return conn; + public static HttpURLConnection twoStepWrite(final WebHdfsFileSystem webhdfs, + final HttpOpParam.Op op, HttpURLConnection conn) throws IOException { + return webhdfs.new Runner(op, conn).twoStepWrite(); } public static FSDataOutputStream write(final WebHdfsFileSystem webhdfs,