HDFS-3667. Add retry support to WebHdfsFileSystem.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1366601 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-07-28 05:57:47 +00:00
parent 0c4e670eff
commit e4eec269d9
15 changed files with 396 additions and 255 deletions

View File

@ -373,6 +373,8 @@ Branch-2 ( Unreleased changes )
HDFS-3697. Enable fadvise readahead by default. (todd) HDFS-3697. Enable fadvise readahead by default. (todd)
HDFS-3667. Add retry support to WebHdfsFileSystem. (szetszwo)
BUG FIXES BUG FIXES
HDFS-3385. The last block of INodeFileUnderConstruction is not HDFS-3385. The last block of INodeFileUnderConstruction is not

View File

@ -57,9 +57,9 @@ public abstract class ByteRangeInputStream extends FSInputStream {
return url; return url;
} }
protected abstract HttpURLConnection openConnection() throws IOException; /** Connect to server with a data offset. */
protected abstract HttpURLConnection connect(final long offset,
protected abstract HttpURLConnection openConnection(final long offset) throws IOException; final boolean resolved) throws IOException;
} }
enum StreamStatus { enum StreamStatus {
@ -85,9 +85,6 @@ public abstract class ByteRangeInputStream extends FSInputStream {
this.resolvedURL = r; this.resolvedURL = r;
} }
protected abstract void checkResponseCode(final HttpURLConnection connection
) throws IOException;
protected abstract URL getResolvedUrl(final HttpURLConnection connection protected abstract URL getResolvedUrl(final HttpURLConnection connection
) throws IOException; ) throws IOException;
@ -113,13 +110,10 @@ public abstract class ByteRangeInputStream extends FSInputStream {
protected InputStream openInputStream() throws IOException { protected InputStream openInputStream() throws IOException {
// Use the original url if no resolved url exists, eg. if // Use the original url if no resolved url exists, eg. if
// it's the first time a request is made. // it's the first time a request is made.
final URLOpener opener = final boolean resolved = resolvedURL.getURL() != null;
(resolvedURL.getURL() == null) ? originalURL : resolvedURL; final URLOpener opener = resolved? resolvedURL: originalURL;
final HttpURLConnection connection = opener.openConnection(startPos);
connection.connect();
checkResponseCode(connection);
final HttpURLConnection connection = opener.connect(startPos, resolved);
final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH); final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
if (cl == null) { if (cl == null) {
throw new IOException(StreamFile.CONTENT_LENGTH+" header is missing"); throw new IOException(StreamFile.CONTENT_LENGTH+" header is missing");

View File

@ -342,19 +342,28 @@ public class HftpFileSystem extends FileSystem
super(url); super(url);
} }
@Override
protected HttpURLConnection openConnection() throws IOException { protected HttpURLConnection openConnection() throws IOException {
return (HttpURLConnection)URLUtils.openConnection(url); return (HttpURLConnection)URLUtils.openConnection(url);
} }
/** Use HTTP Range header for specifying offset. */ /** Use HTTP Range header for specifying offset. */
@Override @Override
protected HttpURLConnection openConnection(final long offset) throws IOException { protected HttpURLConnection connect(final long offset,
final boolean resolved) throws IOException {
final HttpURLConnection conn = openConnection(); final HttpURLConnection conn = openConnection();
conn.setRequestMethod("GET"); conn.setRequestMethod("GET");
if (offset != 0L) { if (offset != 0L) {
conn.setRequestProperty("Range", "bytes=" + offset + "-"); conn.setRequestProperty("Range", "bytes=" + offset + "-");
} }
conn.connect();
//Expects HTTP_OK or HTTP_PARTIAL response codes.
final int code = conn.getResponseCode();
if (offset != 0L && code != HttpURLConnection.HTTP_PARTIAL) {
throw new IOException("HTTP_PARTIAL expected, received " + code);
} else if (offset == 0L && code != HttpURLConnection.HTTP_OK) {
throw new IOException("HTTP_OK expected, received " + code);
}
return conn; return conn;
} }
} }
@ -368,22 +377,6 @@ public class HftpFileSystem extends FileSystem
this(new RangeHeaderUrlOpener(url), new RangeHeaderUrlOpener(null)); this(new RangeHeaderUrlOpener(url), new RangeHeaderUrlOpener(null));
} }
/** Expects HTTP_OK and HTTP_PARTIAL response codes. */
@Override
protected void checkResponseCode(final HttpURLConnection connection
) throws IOException {
final int code = connection.getResponseCode();
if (startPos != 0 && code != HttpURLConnection.HTTP_PARTIAL) {
// We asked for a byte range but did not receive a partial content
// response...
throw new IOException("HTTP_PARTIAL expected, received " + code);
} else if (startPos == 0 && code != HttpURLConnection.HTTP_OK) {
// We asked for all bytes from the beginning but didn't receive a 200
// response (none of the other 2xx codes are valid here)
throw new IOException("HTTP_OK expected, received " + code);
}
}
@Override @Override
protected URL getResolvedUrl(final HttpURLConnection connection) { protected URL getResolvedUrl(final HttpURLConnection connection) {
return connection.getURL(); return connection.getURL();

View File

@ -259,7 +259,7 @@ public class NameNodeProxies {
* *
* Note that dfs.client.retry.max < 0 is not allowed. * Note that dfs.client.retry.max < 0 is not allowed.
*/ */
private static RetryPolicy getDefaultRpcRetryPolicy(Configuration conf) { public static RetryPolicy getDefaultRetryPolicy(Configuration conf) {
final RetryPolicy multipleLinearRandomRetry = getMultipleLinearRandomRetry(conf); final RetryPolicy multipleLinearRandomRetry = getMultipleLinearRandomRetry(conf);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("multipleLinearRandomRetry = " + multipleLinearRandomRetry); LOG.debug("multipleLinearRandomRetry = " + multipleLinearRandomRetry);
@ -300,6 +300,13 @@ public class NameNodeProxies {
+ p.getClass().getSimpleName() + ", exception=" + e); + p.getClass().getSimpleName() + ", exception=" + e);
return p.shouldRetry(e, retries, failovers, isMethodIdempotent); return p.shouldRetry(e, retries, failovers, isMethodIdempotent);
} }
@Override
public String toString() {
return "RetryPolicy[" + multipleLinearRandomRetry + ", "
+ RetryPolicies.TRY_ONCE_THEN_FAIL.getClass().getSimpleName()
+ "]";
}
}; };
} }
} }
@ -335,7 +342,7 @@ public class NameNodeProxies {
boolean withRetries) throws IOException { boolean withRetries) throws IOException {
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class); RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
final RetryPolicy defaultPolicy = getDefaultRpcRetryPolicy(conf); final RetryPolicy defaultPolicy = getDefaultRetryPolicy(conf);
final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class); final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy( ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
ClientNamenodeProtocolPB.class, version, address, ugi, conf, ClientNamenodeProtocolPB.class, version, address, ugi, conf,

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.ByteRangeInputStream; import org.apache.hadoop.hdfs.ByteRangeInputStream;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
@ -88,6 +89,7 @@ import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam; import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
import org.apache.hadoop.hdfs.web.resources.UserParam; import org.apache.hadoop.hdfs.web.resources.UserParam;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
@ -147,6 +149,7 @@ public class WebHdfsFileSystem extends FileSystem
private URI uri; private URI uri;
private Token<?> delegationToken; private Token<?> delegationToken;
private final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token(); private final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token();
private RetryPolicy retryPolicy = null;
private Path workingDir; private Path workingDir;
{ {
@ -179,6 +182,7 @@ public class WebHdfsFileSystem extends FileSystem
throw new IllegalArgumentException(e); throw new IllegalArgumentException(e);
} }
this.nnAddr = NetUtils.createSocketAddr(uri.getAuthority(), getDefaultPort()); this.nnAddr = NetUtils.createSocketAddr(uri.getAuthority(), getDefaultPort());
this.retryPolicy = NameNodeProxies.getDefaultRetryPolicy(conf);
this.workingDir = getHomeDirectory(); this.workingDir = getHomeDirectory();
if (UserGroupInformation.isSecurityEnabled()) { if (UserGroupInformation.isSecurityEnabled()) {
@ -276,13 +280,13 @@ public class WebHdfsFileSystem extends FileSystem
} }
private static Map<?, ?> validateResponse(final HttpOpParam.Op op, private static Map<?, ?> validateResponse(final HttpOpParam.Op op,
final HttpURLConnection conn) throws IOException { final HttpURLConnection conn, boolean unwrapException) throws IOException {
final int code = conn.getResponseCode(); final int code = conn.getResponseCode();
if (code != op.getExpectedHttpResponseCode()) { if (code != op.getExpectedHttpResponseCode()) {
final Map<?, ?> m; final Map<?, ?> m;
try { try {
m = jsonParse(conn, true); m = jsonParse(conn, true);
} catch(IOException e) { } catch(Exception e) {
throw new IOException("Unexpected HTTP response: code=" + code + " != " throw new IOException("Unexpected HTTP response: code=" + code + " != "
+ op.getExpectedHttpResponseCode() + ", " + op.toQueryString() + op.getExpectedHttpResponseCode() + ", " + op.toQueryString()
+ ", message=" + conn.getResponseMessage(), e); + ", message=" + conn.getResponseMessage(), e);
@ -293,7 +297,30 @@ public class WebHdfsFileSystem extends FileSystem
} }
final RemoteException re = JsonUtil.toRemoteException(m); final RemoteException re = JsonUtil.toRemoteException(m);
throw re.unwrapRemoteException(AccessControlException.class, throw unwrapException? toIOException(re): re;
}
return null;
}
/**
* Covert an exception to an IOException.
*
* For a non-IOException, wrap it with IOException.
* For a RemoteException, unwrap it.
* For an IOException which is not a RemoteException, return it.
*/
private static IOException toIOException(Exception e) {
if (!(e instanceof IOException)) {
return new IOException(e);
}
final IOException ioe = (IOException)e;
if (!(ioe instanceof RemoteException)) {
return ioe;
}
final RemoteException re = (RemoteException)ioe;
return re.unwrapRemoteException(AccessControlException.class,
InvalidToken.class, InvalidToken.class,
AuthenticationException.class, AuthenticationException.class,
AuthorizationException.class, AuthorizationException.class,
@ -305,8 +332,6 @@ public class WebHdfsFileSystem extends FileSystem
DSQuotaExceededException.class, DSQuotaExceededException.class,
NSQuotaExceededException.class); NSQuotaExceededException.class);
} }
return null;
}
/** /**
* Return a URL pointing to given path on the namenode. * Return a URL pointing to given path on the namenode.
@ -362,68 +387,13 @@ public class WebHdfsFileSystem extends FileSystem
} }
private HttpURLConnection getHttpUrlConnection(URL url) private HttpURLConnection getHttpUrlConnection(URL url)
throws IOException { throws IOException, AuthenticationException {
final HttpURLConnection conn; final HttpURLConnection conn;
try {
if (ugi.hasKerberosCredentials()) { if (ugi.hasKerberosCredentials()) {
conn = new AuthenticatedURL(AUTH).openConnection(url, authToken); conn = new AuthenticatedURL(AUTH).openConnection(url, authToken);
} else { } else {
conn = (HttpURLConnection)url.openConnection(); conn = (HttpURLConnection)url.openConnection();
} }
} catch (AuthenticationException e) {
throw new IOException("Authentication failed, url=" + url, e);
}
return conn;
}
private HttpURLConnection httpConnect(final HttpOpParam.Op op, final Path fspath,
final Param<?,?>... parameters) throws IOException {
final URL url = toUrl(op, fspath, parameters);
//connect and get response
HttpURLConnection conn = getHttpUrlConnection(url);
try {
conn.setRequestMethod(op.getType().toString());
if (op.getDoOutput()) {
conn = twoStepWrite(conn, op);
conn.setRequestProperty("Content-Type", "application/octet-stream");
}
conn.setDoOutput(op.getDoOutput());
conn.connect();
return conn;
} catch (IOException e) {
conn.disconnect();
throw e;
}
}
/**
* Two-step Create/Append:
* Step 1) Submit a Http request with neither auto-redirect nor data.
* Step 2) Submit another Http request with the URL from the Location header with data.
*
* The reason of having two-step create/append is for preventing clients to
* send out the data before the redirect. This issue is addressed by the
* "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3.
* Unfortunately, there are software library bugs (e.g. Jetty 6 http server
* and Java 6 http client), which do not correctly implement "Expect:
* 100-continue". The two-step create/append is a temporary workaround for
* the software library bugs.
*/
static HttpURLConnection twoStepWrite(HttpURLConnection conn,
final HttpOpParam.Op op) throws IOException {
//Step 1) Submit a Http request with neither auto-redirect nor data.
conn.setInstanceFollowRedirects(false);
conn.setDoOutput(false);
conn.connect();
validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op), conn);
final String redirect = conn.getHeaderField("Location");
conn.disconnect();
//Step 2) Submit another Http request with the URL from the Location header with data.
conn = (HttpURLConnection)new URL(redirect).openConnection();
conn.setRequestMethod(op.getType().toString());
conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
return conn; return conn;
} }
@ -439,12 +409,158 @@ public class WebHdfsFileSystem extends FileSystem
*/ */
private Map<?, ?> run(final HttpOpParam.Op op, final Path fspath, private Map<?, ?> run(final HttpOpParam.Op op, final Path fspath,
final Param<?,?>... parameters) throws IOException { final Param<?,?>... parameters) throws IOException {
final HttpURLConnection conn = httpConnect(op, fspath, parameters); return new Runner(op, fspath, parameters).run().json;
}
/**
* This class is for initialing a HTTP connection, connecting to server,
* obtaining a response, and also handling retry on failures.
*/
class Runner {
private final HttpOpParam.Op op;
private final URL url;
private final boolean redirected;
private boolean checkRetry;
private HttpURLConnection conn = null;
private Map<?, ?> json = null;
Runner(final HttpOpParam.Op op, final URL url, final boolean redirected) {
this.op = op;
this.url = url;
this.redirected = redirected;
}
Runner(final HttpOpParam.Op op, final Path fspath,
final Param<?,?>... parameters) throws IOException {
this(op, toUrl(op, fspath, parameters), false);
}
Runner(final HttpOpParam.Op op, final HttpURLConnection conn) {
this(op, null, false);
this.conn = conn;
}
private void init() throws IOException {
checkRetry = !redirected;
try { try {
final Map<?, ?> m = validateResponse(op, conn); conn = getHttpUrlConnection(url);
return m != null? m: jsonParse(conn, false); } catch(AuthenticationException ae) {
} finally { checkRetry = false;
throw new IOException("Authentication failed, url=" + url, ae);
}
}
private void connect() throws IOException {
connect(op.getDoOutput());
}
private void connect(boolean doOutput) throws IOException {
conn.setRequestMethod(op.getType().toString());
conn.setDoOutput(doOutput);
conn.setInstanceFollowRedirects(false);
conn.connect();
}
private void disconnect() {
if (conn != null) {
conn.disconnect(); conn.disconnect();
conn = null;
}
}
Runner run() throws IOException {
for(int retry = 0; ; retry++) {
try {
init();
if (op.getDoOutput()) {
twoStepWrite();
} else {
getResponse(op != GetOpParam.Op.OPEN);
}
return this;
} catch(IOException ioe) {
shouldRetry(ioe, retry);
}
}
}
private void shouldRetry(final IOException ioe, final int retry
) throws IOException {
if (checkRetry) {
try {
final RetryPolicy.RetryAction a = retryPolicy.shouldRetry(
ioe, retry, 0, true);
if (a.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
LOG.info("Retrying connect to namenode: " + nnAddr
+ ". Already tried " + retry + " time(s); retry policy is "
+ retryPolicy + ", delay " + a.delayMillis + "ms.");
Thread.sleep(a.delayMillis);
return;
}
} catch(Exception e) {
LOG.warn("Original exception is ", ioe);
throw toIOException(e);
}
}
throw toIOException(ioe);
}
/**
* Two-step Create/Append:
* Step 1) Submit a Http request with neither auto-redirect nor data.
* Step 2) Submit another Http request with the URL from the Location header with data.
*
* The reason of having two-step create/append is for preventing clients to
* send out the data before the redirect. This issue is addressed by the
* "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3.
* Unfortunately, there are software library bugs (e.g. Jetty 6 http server
* and Java 6 http client), which do not correctly implement "Expect:
* 100-continue". The two-step create/append is a temporary workaround for
* the software library bugs.
*/
HttpURLConnection twoStepWrite() throws IOException {
//Step 1) Submit a Http request with neither auto-redirect nor data.
connect(false);
validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op), conn, false);
final String redirect = conn.getHeaderField("Location");
disconnect();
checkRetry = false;
//Step 2) Submit another Http request with the URL from the Location header with data.
conn = (HttpURLConnection)new URL(redirect).openConnection();
conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
connect();
return conn;
}
FSDataOutputStream write(final int bufferSize) throws IOException {
return WebHdfsFileSystem.this.write(op, conn, bufferSize);
}
void getResponse(boolean getJsonAndDisconnect) throws IOException {
try {
connect();
if (!redirected && op.getRedirect()) {
final String redirect = conn.getHeaderField("Location");
json = validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op),
conn, false);
disconnect();
checkRetry = false;
conn = (HttpURLConnection)new URL(redirect).openConnection();
connect();
}
json = validateResponse(op, conn, false);
if (json == null && getJsonAndDisconnect) {
json = jsonParse(conn, false);
}
} finally {
if (getJsonAndDisconnect) {
disconnect();
}
}
} }
} }
@ -578,7 +694,7 @@ public class WebHdfsFileSystem extends FileSystem
super.close(); super.close();
} finally { } finally {
try { try {
validateResponse(op, conn); validateResponse(op, conn, true);
} finally { } finally {
conn.disconnect(); conn.disconnect();
} }
@ -594,13 +710,14 @@ public class WebHdfsFileSystem extends FileSystem
statistics.incrementWriteOps(1); statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.CREATE; final HttpOpParam.Op op = PutOpParam.Op.CREATE;
final HttpURLConnection conn = httpConnect(op, f, return new Runner(op, f,
new PermissionParam(applyUMask(permission)), new PermissionParam(applyUMask(permission)),
new OverwriteParam(overwrite), new OverwriteParam(overwrite),
new BufferSizeParam(bufferSize), new BufferSizeParam(bufferSize),
new ReplicationParam(replication), new ReplicationParam(replication),
new BlockSizeParam(blockSize)); new BlockSizeParam(blockSize))
return write(op, conn, bufferSize); .run()
.write(bufferSize);
} }
@Override @Override
@ -609,9 +726,9 @@ public class WebHdfsFileSystem extends FileSystem
statistics.incrementWriteOps(1); statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PostOpParam.Op.APPEND; final HttpOpParam.Op op = PostOpParam.Op.APPEND;
final HttpURLConnection conn = httpConnect(op, f, return new Runner(op, f, new BufferSizeParam(bufferSize))
new BufferSizeParam(bufferSize)); .run()
return write(op, conn, bufferSize); .write(bufferSize);
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@ -638,26 +755,17 @@ public class WebHdfsFileSystem extends FileSystem
} }
class OffsetUrlOpener extends ByteRangeInputStream.URLOpener { class OffsetUrlOpener extends ByteRangeInputStream.URLOpener {
/** The url with offset parameter */
private URL offsetUrl;
OffsetUrlOpener(final URL url) { OffsetUrlOpener(final URL url) {
super(url); super(url);
} }
/** Open connection with offset url. */ /** Setup offset url and connect. */
@Override @Override
protected HttpURLConnection openConnection() throws IOException { protected HttpURLConnection connect(final long offset,
return getHttpUrlConnection(offsetUrl); final boolean resolved) throws IOException {
} final URL offsetUrl = offset == 0L? url
: new URL(url + "&" + new OffsetParam(offset));
/** Setup offset url before open connection. */ return new Runner(GetOpParam.Op.OPEN, offsetUrl, resolved).run().conn;
@Override
protected HttpURLConnection openConnection(final long offset) throws IOException {
offsetUrl = offset == 0L? url: new URL(url + "&" + new OffsetParam(offset));
final HttpURLConnection conn = openConnection();
conn.setRequestMethod("GET");
return conn;
} }
} }
@ -699,12 +807,6 @@ public class WebHdfsFileSystem extends FileSystem
super(o, r); super(o, r);
} }
@Override
protected void checkResponseCode(final HttpURLConnection connection
) throws IOException {
validateResponse(GetOpParam.Op.OPEN, connection);
}
/** Remove offset parameter before returning the resolved url. */ /** Remove offset parameter before returning the resolved url. */
@Override @Override
protected URL getResolvedUrl(final HttpURLConnection connection protected URL getResolvedUrl(final HttpURLConnection connection

View File

@ -43,6 +43,11 @@ public class DeleteOpParam extends HttpOpParam<DeleteOpParam.Op> {
return false; return false;
} }
@Override
public boolean getRedirect() {
return false;
}
@Override @Override
public int getExpectedHttpResponseCode() { public int getExpectedHttpResponseCode() {
return expectedHttpResponseCode; return expectedHttpResponseCode;

View File

@ -23,25 +23,27 @@ import java.net.HttpURLConnection;
public class GetOpParam extends HttpOpParam<GetOpParam.Op> { public class GetOpParam extends HttpOpParam<GetOpParam.Op> {
/** Get operations. */ /** Get operations. */
public static enum Op implements HttpOpParam.Op { public static enum Op implements HttpOpParam.Op {
OPEN(HttpURLConnection.HTTP_OK), OPEN(true, HttpURLConnection.HTTP_OK),
GETFILESTATUS(HttpURLConnection.HTTP_OK), GETFILESTATUS(false, HttpURLConnection.HTTP_OK),
LISTSTATUS(HttpURLConnection.HTTP_OK), LISTSTATUS(false, HttpURLConnection.HTTP_OK),
GETCONTENTSUMMARY(HttpURLConnection.HTTP_OK), GETCONTENTSUMMARY(false, HttpURLConnection.HTTP_OK),
GETFILECHECKSUM(HttpURLConnection.HTTP_OK), GETFILECHECKSUM(true, HttpURLConnection.HTTP_OK),
GETHOMEDIRECTORY(HttpURLConnection.HTTP_OK), GETHOMEDIRECTORY(false, HttpURLConnection.HTTP_OK),
GETDELEGATIONTOKEN(HttpURLConnection.HTTP_OK), GETDELEGATIONTOKEN(false, HttpURLConnection.HTTP_OK),
GETDELEGATIONTOKENS(HttpURLConnection.HTTP_OK), GETDELEGATIONTOKENS(false, HttpURLConnection.HTTP_OK),
/** GET_BLOCK_LOCATIONS is a private unstable op. */ /** GET_BLOCK_LOCATIONS is a private unstable op. */
GET_BLOCK_LOCATIONS(HttpURLConnection.HTTP_OK), GET_BLOCK_LOCATIONS(false, HttpURLConnection.HTTP_OK),
NULL(HttpURLConnection.HTTP_NOT_IMPLEMENTED); NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED);
final boolean redirect;
final int expectedHttpResponseCode; final int expectedHttpResponseCode;
Op(final int expectedHttpResponseCode) { Op(final boolean redirect, final int expectedHttpResponseCode) {
this.redirect = redirect;
this.expectedHttpResponseCode = expectedHttpResponseCode; this.expectedHttpResponseCode = expectedHttpResponseCode;
} }
@ -55,6 +57,11 @@ public class GetOpParam extends HttpOpParam<GetOpParam.Op> {
return false; return false;
} }
@Override
public boolean getRedirect() {
return redirect;
}
@Override @Override
public int getExpectedHttpResponseCode() { public int getExpectedHttpResponseCode() {
return expectedHttpResponseCode; return expectedHttpResponseCode;

View File

@ -17,6 +17,10 @@
*/ */
package org.apache.hadoop.hdfs.web.resources; package org.apache.hadoop.hdfs.web.resources;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
@ -42,6 +46,9 @@ public abstract class HttpOpParam<E extends Enum<E> & HttpOpParam.Op>
/** @return true if the operation will do output. */ /** @return true if the operation will do output. */
public boolean getDoOutput(); public boolean getDoOutput();
/** @return true if the operation will be redirected. */
public boolean getRedirect();
/** @return true the expected http response code. */ /** @return true the expected http response code. */
public int getExpectedHttpResponseCode(); public int getExpectedHttpResponseCode();
@ -51,15 +58,25 @@ public abstract class HttpOpParam<E extends Enum<E> & HttpOpParam.Op>
/** Expects HTTP response 307 "Temporary Redirect". */ /** Expects HTTP response 307 "Temporary Redirect". */
public static class TemporaryRedirectOp implements Op { public static class TemporaryRedirectOp implements Op {
static final TemporaryRedirectOp CREATE = new TemporaryRedirectOp(PutOpParam.Op.CREATE); static final TemporaryRedirectOp CREATE = new TemporaryRedirectOp(
static final TemporaryRedirectOp APPEND = new TemporaryRedirectOp(PostOpParam.Op.APPEND); PutOpParam.Op.CREATE);
static final TemporaryRedirectOp APPEND = new TemporaryRedirectOp(
PostOpParam.Op.APPEND);
static final TemporaryRedirectOp OPEN = new TemporaryRedirectOp(
GetOpParam.Op.OPEN);
static final TemporaryRedirectOp GETFILECHECKSUM = new TemporaryRedirectOp(
GetOpParam.Op.GETFILECHECKSUM);
static final List<TemporaryRedirectOp> values
= Collections.unmodifiableList(Arrays.asList(
new TemporaryRedirectOp[]{CREATE, APPEND, OPEN, GETFILECHECKSUM}));
/** Get an object for the given op. */ /** Get an object for the given op. */
public static TemporaryRedirectOp valueOf(final Op op) { public static TemporaryRedirectOp valueOf(final Op op) {
if (op == CREATE.op) { for(TemporaryRedirectOp t : values) {
return CREATE; if (op == t.op) {
} else if (op == APPEND.op) { return t;
return APPEND; }
} }
throw new IllegalArgumentException(op + " not found."); throw new IllegalArgumentException(op + " not found.");
} }
@ -80,6 +97,11 @@ public abstract class HttpOpParam<E extends Enum<E> & HttpOpParam.Op>
return op.getDoOutput(); return op.getDoOutput();
} }
@Override
public boolean getRedirect() {
return false;
}
/** Override the original expected response with "Temporary Redirect". */ /** Override the original expected response with "Temporary Redirect". */
@Override @Override
public int getExpectedHttpResponseCode() { public int getExpectedHttpResponseCode() {

View File

@ -43,6 +43,11 @@ public class PostOpParam extends HttpOpParam<PostOpParam.Op> {
return true; return true;
} }
@Override
public boolean getRedirect() {
return true;
}
@Override @Override
public int getExpectedHttpResponseCode() { public int getExpectedHttpResponseCode() {
return expectedHttpResponseCode; return expectedHttpResponseCode;

View File

@ -39,11 +39,11 @@ public class PutOpParam extends HttpOpParam<PutOpParam.Op> {
NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED); NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED);
final boolean doOutput; final boolean doOutputAndRedirect;
final int expectedHttpResponseCode; final int expectedHttpResponseCode;
Op(final boolean doOutput, final int expectedHttpResponseCode) { Op(final boolean doOutputAndRedirect, final int expectedHttpResponseCode) {
this.doOutput = doOutput; this.doOutputAndRedirect = doOutputAndRedirect;
this.expectedHttpResponseCode = expectedHttpResponseCode; this.expectedHttpResponseCode = expectedHttpResponseCode;
} }
@ -54,7 +54,12 @@ public class PutOpParam extends HttpOpParam<PutOpParam.Op> {
@Override @Override
public boolean getDoOutput() { public boolean getDoOutput() {
return doOutput; return doOutputAndRedirect;
}
@Override
public boolean getRedirect() {
return doOutputAndRedirect;
} }
@Override @Override

View File

@ -47,6 +47,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger; import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@ -66,6 +67,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
@ -74,6 +76,7 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.log4j.Level; import org.apache.log4j.Level;
@ -825,13 +828,17 @@ public class TestDFSClientRetries {
/** Test client retry with namenode restarting. */ /** Test client retry with namenode restarting. */
@Test @Test
public void testNamenodeRestart() throws Exception { public void testNamenodeRestart() throws Exception {
namenodeRestartTest(new Configuration(), false);
}
public static void namenodeRestartTest(final Configuration conf,
final boolean isWebHDFS) throws Exception {
((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL); ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
final List<Exception> exceptions = new ArrayList<Exception>(); final List<Exception> exceptions = new ArrayList<Exception>();
final Path dir = new Path("/testNamenodeRestart"); final Path dir = new Path("/testNamenodeRestart");
final Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, true); conf.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, true);
final short numDatanodes = 3; final short numDatanodes = 3;
@ -841,16 +848,18 @@ public class TestDFSClientRetries {
try { try {
cluster.waitActive(); cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem(); final DistributedFileSystem dfs = cluster.getFileSystem();
final FileSystem fs = isWebHDFS?
WebHdfsTestUtil.getWebHdfsFileSystem(conf): dfs;
final URI uri = dfs.getUri(); final URI uri = dfs.getUri();
assertTrue(HdfsUtils.isHealthy(uri)); assertTrue(HdfsUtils.isHealthy(uri));
//create a file //create a file
final long length = 1L << 20; final long length = 1L << 20;
final Path file1 = new Path(dir, "foo"); final Path file1 = new Path(dir, "foo");
DFSTestUtil.createFile(dfs, file1, length, numDatanodes, 20120406L); DFSTestUtil.createFile(fs, file1, length, numDatanodes, 20120406L);
//get file status //get file status
final FileStatus s1 = dfs.getFileStatus(file1); final FileStatus s1 = fs.getFileStatus(file1);
assertEquals(length, s1.getLen()); assertEquals(length, s1.getLen());
//shutdown namenode //shutdown namenode
@ -858,6 +867,25 @@ public class TestDFSClientRetries {
cluster.shutdownNameNode(0); cluster.shutdownNameNode(0);
assertFalse(HdfsUtils.isHealthy(uri)); assertFalse(HdfsUtils.isHealthy(uri));
//namenode is down, read the file in a thread
final Thread reader = new Thread(new Runnable() {
@Override
public void run() {
try {
//it should retry till namenode is up.
final FileSystem fs = createFsWithDifferentUsername(conf, isWebHDFS);
final FSDataInputStream in = fs.open(file1);
int count = 0;
for(; in.read() != -1; count++);
in.close();
assertEquals(s1.getLen(), count);
} catch (Exception e) {
exceptions.add(e);
}
}
});
reader.start();
//namenode is down, create another file in a thread //namenode is down, create another file in a thread
final Path file3 = new Path(dir, "file"); final Path file3 = new Path(dir, "file");
final Thread thread = new Thread(new Runnable() { final Thread thread = new Thread(new Runnable() {
@ -865,7 +893,7 @@ public class TestDFSClientRetries {
public void run() { public void run() {
try { try {
//it should retry till namenode is up. //it should retry till namenode is up.
final FileSystem fs = AppendTestUtil.createHdfsWithDifferentUsername(conf); final FileSystem fs = createFsWithDifferentUsername(conf, isWebHDFS);
DFSTestUtil.createFile(fs, file3, length, numDatanodes, 20120406L); DFSTestUtil.createFile(fs, file3, length, numDatanodes, 20120406L);
} catch (Exception e) { } catch (Exception e) {
exceptions.add(e); exceptions.add(e);
@ -892,12 +920,15 @@ public class TestDFSClientRetries {
}).start(); }).start();
//namenode is down, it should retry until namenode is up again. //namenode is down, it should retry until namenode is up again.
final FileStatus s2 = dfs.getFileStatus(file1); final FileStatus s2 = fs.getFileStatus(file1);
assertEquals(s1, s2); assertEquals(s1, s2);
//check file1 and file3 //check file1 and file3
thread.join(); thread.join();
assertEquals(dfs.getFileChecksum(file1), dfs.getFileChecksum(file3)); assertEquals(s1.getLen(), fs.getFileStatus(file3).getLen());
assertEquals(fs.getFileChecksum(file1), fs.getFileChecksum(file3));
reader.join();
//enter safe mode //enter safe mode
assertTrue(HdfsUtils.isHealthy(uri)); assertTrue(HdfsUtils.isHealthy(uri));
@ -922,8 +953,8 @@ public class TestDFSClientRetries {
//namenode is in safe mode, create should retry until it leaves safe mode. //namenode is in safe mode, create should retry until it leaves safe mode.
final Path file2 = new Path(dir, "bar"); final Path file2 = new Path(dir, "bar");
DFSTestUtil.createFile(dfs, file2, length, numDatanodes, 20120406L); DFSTestUtil.createFile(fs, file2, length, numDatanodes, 20120406L);
assertEquals(dfs.getFileChecksum(file1), dfs.getFileChecksum(file2)); assertEquals(fs.getFileChecksum(file1), fs.getFileChecksum(file2));
assertTrue(HdfsUtils.isHealthy(uri)); assertTrue(HdfsUtils.isHealthy(uri));
@ -931,7 +962,7 @@ public class TestDFSClientRetries {
final Path nonExisting = new Path(dir, "nonExisting"); final Path nonExisting = new Path(dir, "nonExisting");
LOG.info("setPermission: " + nonExisting); LOG.info("setPermission: " + nonExisting);
try { try {
dfs.setPermission(nonExisting, new FsPermission((short)0)); fs.setPermission(nonExisting, new FsPermission((short)0));
fail(); fail();
} catch(FileNotFoundException fnfe) { } catch(FileNotFoundException fnfe) {
LOG.info("GOOD!", fnfe); LOG.info("GOOD!", fnfe);
@ -949,6 +980,17 @@ public class TestDFSClientRetries {
} }
} }
public static FileSystem createFsWithDifferentUsername(
final Configuration conf, final boolean isWebHDFS
) throws IOException, InterruptedException {
String username = UserGroupInformation.getCurrentUser().getShortUserName()+"_XXX";
UserGroupInformation ugi =
UserGroupInformation.createUserForTesting(username, new String[]{"supergroup"});
return isWebHDFS? WebHdfsTestUtil.getWebHdfsFileSystemAs(ugi, conf)
: DFSTestUtil.getFileSystemAs(ugi, conf);
}
@Test @Test
public void testMultipleLinearRandomRetry() { public void testMultipleLinearRandomRetry() {
parseMultipleLinearRandomRetry(null, ""); parseMultipleLinearRandomRetry(null, "");

View File

@ -44,7 +44,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@ -140,9 +139,7 @@ public class TestDelegationTokenForProxyUser {
.doAs(new PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() { .doAs(new PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() {
@Override @Override
public Token<DelegationTokenIdentifier> run() throws IOException { public Token<DelegationTokenIdentifier> run() throws IOException {
DistributedFileSystem dfs = (DistributedFileSystem) cluster return cluster.getFileSystem().getDelegationToken("RenewerUser");
.getFileSystem();
return dfs.getDelegationToken("RenewerUser");
} }
}); });
DelegationTokenIdentifier identifier = new DelegationTokenIdentifier(); DelegationTokenIdentifier identifier = new DelegationTokenIdentifier();
@ -206,7 +203,7 @@ public class TestDelegationTokenForProxyUser {
final PutOpParam.Op op = PutOpParam.Op.CREATE; final PutOpParam.Op op = PutOpParam.Op.CREATE;
final URL url = WebHdfsTestUtil.toUrl(webhdfs, op, f, new DoAsParam(PROXY_USER)); final URL url = WebHdfsTestUtil.toUrl(webhdfs, op, f, new DoAsParam(PROXY_USER));
HttpURLConnection conn = (HttpURLConnection) url.openConnection(); HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn = WebHdfsTestUtil.twoStepWrite(conn, op); conn = WebHdfsTestUtil.twoStepWrite(webhdfs, op, conn);
final FSDataOutputStream out = WebHdfsTestUtil.write(webhdfs, op, conn, 4096); final FSDataOutputStream out = WebHdfsTestUtil.write(webhdfs, op, conn, 4096);
out.write("Hello, webhdfs user!".getBytes()); out.write("Hello, webhdfs user!".getBytes());
out.close(); out.close();
@ -221,7 +218,7 @@ public class TestDelegationTokenForProxyUser {
final PostOpParam.Op op = PostOpParam.Op.APPEND; final PostOpParam.Op op = PostOpParam.Op.APPEND;
final URL url = WebHdfsTestUtil.toUrl(webhdfs, op, f, new DoAsParam(PROXY_USER)); final URL url = WebHdfsTestUtil.toUrl(webhdfs, op, f, new DoAsParam(PROXY_USER));
HttpURLConnection conn = (HttpURLConnection) url.openConnection(); HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn = WebHdfsTestUtil.twoStepWrite(conn, op); conn = WebHdfsTestUtil.twoStepWrite(webhdfs, op, conn);
final FSDataOutputStream out = WebHdfsTestUtil.write(webhdfs, op, conn, 4096); final FSDataOutputStream out = WebHdfsTestUtil.write(webhdfs, op, conn, 4096);
out.write("\nHello again!".getBytes()); out.write("\nHello again!".getBytes());
out.close(); out.close();

View File

@ -18,22 +18,10 @@
package org.apache.hadoop.hdfs.web; package org.apache.hadoop.hdfs.web;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
import java.net.URL; import java.net.URL;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.TestByteRangeInputStream.MockHttpURLConnection;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.OffsetUrlInputStream;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.OffsetUrlOpener;
import org.junit.Test; import org.junit.Test;
public class TestOffsetUrlInputStream { public class TestOffsetUrlInputStream {
@ -73,65 +61,4 @@ public class TestOffsetUrlInputStream {
WebHdfsFileSystem.removeOffsetParam(new URL(s)).toString()); WebHdfsFileSystem.removeOffsetParam(new URL(s)).toString());
} }
} }
@Test
public void testByteRange() throws Exception {
final Configuration conf = new Configuration();
final String uri = WebHdfsFileSystem.SCHEME + "://localhost:50070/";
final WebHdfsFileSystem webhdfs = (WebHdfsFileSystem)FileSystem.get(new URI(uri), conf);
OffsetUrlOpener ospy = spy(webhdfs.new OffsetUrlOpener(new URL("http://test/")));
doReturn(new MockHttpURLConnection(ospy.getURL())).when(ospy)
.openConnection();
OffsetUrlOpener rspy = spy(webhdfs.new OffsetUrlOpener((URL) null));
doReturn(new MockHttpURLConnection(rspy.getURL())).when(rspy)
.openConnection();
final OffsetUrlInputStream is = new OffsetUrlInputStream(ospy, rspy);
assertEquals("getPos wrong", 0, is.getPos());
is.read();
assertNull("Initial call made incorrectly (Range Check)", ospy
.openConnection().getRequestProperty("Range"));
assertEquals("getPos should be 1 after reading one byte", 1, is.getPos());
is.read();
assertEquals("getPos should be 2 after reading two bytes", 2, is.getPos());
// No additional connections should have been made (no seek)
rspy.setURL(new URL("http://resolvedurl/"));
is.seek(100);
is.read();
assertEquals("getPos should be 101 after reading one byte", 101,
is.getPos());
verify(rspy, times(1)).openConnection();
is.seek(101);
is.read();
verify(rspy, times(1)).openConnection();
// Seek to 101 should not result in another request"
is.seek(2500);
is.read();
((MockHttpURLConnection) rspy.openConnection()).setResponseCode(206);
is.seek(0);
try {
is.read();
fail("Exception should be thrown when 206 response is given "
+ "but 200 is expected");
} catch (IOException e) {
WebHdfsFileSystem.LOG.info(e.toString());
}
}
} }

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.web;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.TestDFSClientRetries;
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
import org.apache.log4j.Level;
import org.junit.Test;
/** Test WebHdfsFileSystem retry on failures. */
public class TestWebHdfsRetries {
/** Test client retry with namenode restarting. */
@Test
public void testNamenodeRestart() throws Exception {
((Log4JLogger)NamenodeWebHdfsMethods.LOG).getLogger().setLevel(Level.ALL);
final Configuration conf = WebHdfsTestUtil.createConf();
TestDFSClientRetries.namenodeRestartTest(conf, true);
}
}

View File

@ -79,13 +79,9 @@ public class WebHdfsTestUtil {
return WebHdfsFileSystem.jsonParse(conn, false); return WebHdfsFileSystem.jsonParse(conn, false);
} }
public static HttpURLConnection twoStepWrite(HttpURLConnection conn, public static HttpURLConnection twoStepWrite(final WebHdfsFileSystem webhdfs,
final HttpOpParam.Op op) throws IOException { final HttpOpParam.Op op, HttpURLConnection conn) throws IOException {
conn.setRequestMethod(op.getType().toString()); return webhdfs.new Runner(op, conn).twoStepWrite();
conn = WebHdfsFileSystem.twoStepWrite(conn, op);
conn.setDoOutput(true);
conn.connect();
return conn;
} }
public static FSDataOutputStream write(final WebHdfsFileSystem webhdfs, public static FSDataOutputStream write(final WebHdfsFileSystem webhdfs,