HDFS-6305. WebHdfs response decoding may throw RuntimeExceptions (Daryn Sharp via jeagles)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1594274 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jonathan Turner Eagles 2014-05-13 16:43:30 +00:00
parent 8379c0c944
commit d389c8940e
4 changed files with 288 additions and 231 deletions

View File

@ -209,6 +209,9 @@ Release 2.5.0 - UNRELEASED
HDFS-6367. EnumSetParam$Domain#parse fails for parameter containing more than one enum. HDFS-6367. EnumSetParam$Domain#parse fails for parameter containing more than one enum.
(Yi Liu via umamahesh) (Yi Liu via umamahesh)
HDFS-6305. WebHdfs response decoding may throw RuntimeExceptions (Daryn
Sharp via jeagles)
Release 2.4.1 - UNRELEASED Release 2.4.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -58,34 +58,8 @@ import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.web.resources.AccessTimeParam; import org.apache.hadoop.hdfs.web.resources.*;
import org.apache.hadoop.hdfs.web.resources.AclPermissionParam; import org.apache.hadoop.hdfs.web.resources.HttpOpParam.Op;
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
import org.apache.hadoop.hdfs.web.resources.ConcatSourcesParam;
import org.apache.hadoop.hdfs.web.resources.CreateParentParam;
import org.apache.hadoop.hdfs.web.resources.DelegationParam;
import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
import org.apache.hadoop.hdfs.web.resources.DestinationParam;
import org.apache.hadoop.hdfs.web.resources.DoAsParam;
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
import org.apache.hadoop.hdfs.web.resources.GroupParam;
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
import org.apache.hadoop.hdfs.web.resources.LengthParam;
import org.apache.hadoop.hdfs.web.resources.ModificationTimeParam;
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
import org.apache.hadoop.hdfs.web.resources.OwnerParam;
import org.apache.hadoop.hdfs.web.resources.Param;
import org.apache.hadoop.hdfs.web.resources.PermissionParam;
import org.apache.hadoop.hdfs.web.resources.PostOpParam;
import org.apache.hadoop.hdfs.web.resources.PutOpParam;
import org.apache.hadoop.hdfs.web.resources.RecursiveParam;
import org.apache.hadoop.hdfs.web.resources.RenameOptionSetParam;
import org.apache.hadoop.hdfs.web.resources.RenewerParam;
import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
import org.apache.hadoop.hdfs.web.resources.UserParam;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy;
@ -425,41 +399,24 @@ public class WebHdfsFileSystem extends FileSystem
return url; return url;
} }
/**
* Run a http operation.
* Connect to the http server, validate response, and obtain the JSON output.
*
* @param op http operation
* @param fspath file system path
* @param parameters parameters for the operation
* @return a JSON object, e.g. Object[], Map<?, ?>, etc.
* @throws IOException
*/
private Map<?, ?> run(final HttpOpParam.Op op, final Path fspath,
final Param<?,?>... parameters) throws IOException {
return new FsPathRunner(op, fspath, parameters).run().json;
}
/** /**
* This class is for initialing a HTTP connection, connecting to server, * This class is for initialing a HTTP connection, connecting to server,
* obtaining a response, and also handling retry on failures. * obtaining a response, and also handling retry on failures.
*/ */
abstract class AbstractRunner { abstract class AbstractRunner<T> {
abstract protected URL getUrl() throws IOException; abstract protected URL getUrl() throws IOException;
protected final HttpOpParam.Op op; protected final HttpOpParam.Op op;
private final boolean redirected; private final boolean redirected;
private boolean checkRetry; private boolean checkRetry;
protected HttpURLConnection conn = null;
private Map<?, ?> json = null;
protected AbstractRunner(final HttpOpParam.Op op, boolean redirected) { protected AbstractRunner(final HttpOpParam.Op op, boolean redirected) {
this.op = op; this.op = op;
this.redirected = redirected; this.redirected = redirected;
} }
AbstractRunner run() throws IOException { T run() throws IOException {
UserGroupInformation connectUgi = ugi.getRealUser(); UserGroupInformation connectUgi = ugi.getRealUser();
if (connectUgi == null) { if (connectUgi == null) {
connectUgi = ugi; connectUgi = ugi;
@ -471,9 +428,9 @@ public class WebHdfsFileSystem extends FileSystem
// the entire lifecycle of the connection must be run inside the // the entire lifecycle of the connection must be run inside the
// doAs to ensure authentication is performed correctly // doAs to ensure authentication is performed correctly
return connectUgi.doAs( return connectUgi.doAs(
new PrivilegedExceptionAction<AbstractRunner>() { new PrivilegedExceptionAction<T>() {
@Override @Override
public AbstractRunner run() throws IOException { public T run() throws IOException {
return runWithRetry(); return runWithRetry();
} }
}); });
@ -482,17 +439,50 @@ public class WebHdfsFileSystem extends FileSystem
} }
} }
private void init() throws IOException { /**
checkRetry = !redirected; * Two-step requests redirected to a DN
URL url = getUrl(); *
conn = (HttpURLConnection) connectionFactory.openConnection(url); * Create/Append:
* Step 1) Submit a Http request with neither auto-redirect nor data.
* Step 2) Submit another Http request with the URL from the Location header with data.
*
* The reason of having two-step create/append is for preventing clients to
* send out the data before the redirect. This issue is addressed by the
* "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3.
* Unfortunately, there are software library bugs (e.g. Jetty 6 http server
* and Java 6 http client), which do not correctly implement "Expect:
* 100-continue". The two-step create/append is a temporary workaround for
* the software library bugs.
*
* Open/Checksum
* Also implements two-step connects for other operations redirected to
* a DN such as open and checksum
*/
private HttpURLConnection connect(URL url) throws IOException {
// resolve redirects for a DN operation unless already resolved
if (op.getRedirect() && !redirected) {
final HttpOpParam.Op redirectOp =
HttpOpParam.TemporaryRedirectOp.valueOf(op);
final HttpURLConnection conn = connect(redirectOp, url);
// application level proxy like httpfs might not issue a redirect
if (conn.getResponseCode() == op.getExpectedHttpResponseCode()) {
return conn;
}
try {
validateResponse(redirectOp, conn, false);
url = new URL(conn.getHeaderField("Location"));
} finally {
conn.disconnect();
}
}
return connect(op, url);
} }
private void connect() throws IOException { private HttpURLConnection connect(final HttpOpParam.Op op, final URL url)
connect(op.getDoOutput()); throws IOException {
} final HttpURLConnection conn =
(HttpURLConnection)connectionFactory.openConnection(url);
private void connect(boolean doOutput) throws IOException { final boolean doOutput = op.getDoOutput();
conn.setRequestMethod(op.getType().toString()); conn.setRequestMethod(op.getType().toString());
conn.setInstanceFollowRedirects(false); conn.setInstanceFollowRedirects(false);
switch (op.getType()) { switch (op.getType()) {
@ -505,6 +495,10 @@ public class WebHdfsFileSystem extends FileSystem
// explicitly setting content-length to 0 won't do spnego!! // explicitly setting content-length to 0 won't do spnego!!
// opening and closing the stream will send "Content-Length: 0" // opening and closing the stream will send "Content-Length: 0"
conn.getOutputStream().close(); conn.getOutputStream().close();
} else {
conn.setRequestProperty("Content-Type",
MediaType.APPLICATION_OCTET_STREAM);
conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
} }
break; break;
} }
@ -514,16 +508,10 @@ public class WebHdfsFileSystem extends FileSystem
} }
} }
conn.connect(); conn.connect();
return conn;
} }
private void disconnect() { private T runWithRetry() throws IOException {
if (conn != null) {
conn.disconnect();
conn = null;
}
}
private AbstractRunner runWithRetry() throws IOException {
/** /**
* Do the real work. * Do the real work.
* *
@ -541,14 +529,15 @@ public class WebHdfsFileSystem extends FileSystem
* examines the exception and swallows it if it decides to rerun the work. * examines the exception and swallows it if it decides to rerun the work.
*/ */
for(int retry = 0; ; retry++) { for(int retry = 0; ; retry++) {
checkRetry = !redirected;
final URL url = getUrl();
try { try {
init(); final HttpURLConnection conn = connect(url);
if (op.getDoOutput()) { // output streams will validate on close
twoStepWrite(); if (!op.getDoOutput()) {
} else { validateResponse(op, conn, false);
getResponse(op != GetOpParam.Op.OPEN);
} }
return this; return getResponse(conn);
} catch (IOException ioe) { } catch (IOException ioe) {
Throwable cause = ioe.getCause(); Throwable cause = ioe.getCause();
if (cause != null && cause instanceof AuthenticationException) { if (cause != null && cause instanceof AuthenticationException) {
@ -591,75 +580,18 @@ public class WebHdfsFileSystem extends FileSystem
throw toIOException(ioe); throw toIOException(ioe);
} }
abstract T getResponse(HttpURLConnection conn) throws IOException;
}
/** /**
* Two-step Create/Append: * Abstract base class to handle path-based operations with params
* Step 1) Submit a Http request with neither auto-redirect nor data.
* Step 2) Submit another Http request with the URL from the Location header with data.
*
* The reason of having two-step create/append is for preventing clients to
* send out the data before the redirect. This issue is addressed by the
* "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3.
* Unfortunately, there are software library bugs (e.g. Jetty 6 http server
* and Java 6 http client), which do not correctly implement "Expect:
* 100-continue". The two-step create/append is a temporary workaround for
* the software library bugs.
*/ */
HttpURLConnection twoStepWrite() throws IOException { abstract class AbstractFsPathRunner<T> extends AbstractRunner<T> {
//Step 1) Submit a Http request with neither auto-redirect nor data.
connect(false);
validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op), conn, false);
final String redirect = conn.getHeaderField("Location");
disconnect();
checkRetry = false;
//Step 2) Submit another Http request with the URL from the Location header with data.
conn = (HttpURLConnection) connectionFactory.openConnection(new URL(
redirect));
conn.setRequestProperty("Content-Type",
MediaType.APPLICATION_OCTET_STREAM);
conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
connect();
return conn;
}
FSDataOutputStream write(final int bufferSize) throws IOException {
return WebHdfsFileSystem.this.write(op, conn, bufferSize);
}
void getResponse(boolean getJsonAndDisconnect) throws IOException {
try {
connect();
final int code = conn.getResponseCode();
if (!redirected && op.getRedirect()
&& code != op.getExpectedHttpResponseCode()) {
final String redirect = conn.getHeaderField("Location");
json = validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op),
conn, false);
disconnect();
checkRetry = false;
conn = (HttpURLConnection) connectionFactory.openConnection(new URL(
redirect));
connect();
}
json = validateResponse(op, conn, false);
if (json == null && getJsonAndDisconnect) {
json = jsonParse(conn, false);
}
} finally {
if (getJsonAndDisconnect) {
disconnect();
}
}
}
}
final class FsPathRunner extends AbstractRunner {
private final Path fspath; private final Path fspath;
private final Param<?,?>[] parameters; private final Param<?,?>[] parameters;
FsPathRunner(final HttpOpParam.Op op, final Path fspath, final Param<?,?>... parameters) { AbstractFsPathRunner(final HttpOpParam.Op op, final Path fspath,
Param<?,?>... parameters) {
super(op, false); super(op, false);
this.fspath = fspath; this.fspath = fspath;
this.parameters = parameters; this.parameters = parameters;
@ -671,7 +603,106 @@ public class WebHdfsFileSystem extends FileSystem
} }
} }
final class URLRunner extends AbstractRunner { /**
* Default path-based implementation expects no json response
*/
class FsPathRunner extends AbstractFsPathRunner<Void> {
FsPathRunner(Op op, Path fspath, Param<?,?>... parameters) {
super(op, fspath, parameters);
}
@Override
Void getResponse(HttpURLConnection conn) throws IOException {
return null;
}
}
/**
* Handle path-based operations with a json response
*/
abstract class FsPathResponseRunner<T> extends AbstractFsPathRunner<T> {
FsPathResponseRunner(final HttpOpParam.Op op, final Path fspath,
Param<?,?>... parameters) {
super(op, fspath, parameters);
}
@Override
final T getResponse(HttpURLConnection conn) throws IOException {
try {
final Map<?,?> json = jsonParse(conn, false);
if (json == null) {
// match exception class thrown by parser
throw new IllegalStateException("Missing response");
}
return decodeResponse(json);
} catch (IOException ioe) {
throw ioe;
} catch (Exception e) { // catch json parser errors
final IOException ioe =
new IOException("Response decoding failure: "+e.toString(), e);
if (LOG.isDebugEnabled()) {
LOG.debug(ioe);
}
throw ioe;
} finally {
conn.disconnect();
}
}
abstract T decodeResponse(Map<?,?> json) throws IOException;
}
/**
* Handle path-based operations with json boolean response
*/
class FsPathBooleanRunner extends FsPathResponseRunner<Boolean> {
FsPathBooleanRunner(Op op, Path fspath, Param<?,?>... parameters) {
super(op, fspath, parameters);
}
@Override
Boolean decodeResponse(Map<?,?> json) throws IOException {
return (Boolean)json.get("boolean");
}
}
/**
* Handle create/append output streams
*/
class FsPathOutputStreamRunner extends AbstractFsPathRunner<FSDataOutputStream> {
private final int bufferSize;
FsPathOutputStreamRunner(Op op, Path fspath, int bufferSize,
Param<?,?>... parameters) {
super(op, fspath, parameters);
this.bufferSize = bufferSize;
}
@Override
FSDataOutputStream getResponse(final HttpURLConnection conn)
throws IOException {
return new FSDataOutputStream(new BufferedOutputStream(
conn.getOutputStream(), bufferSize), statistics) {
@Override
public void close() throws IOException {
try {
super.close();
} finally {
try {
validateResponse(op, conn, true);
} finally {
conn.disconnect();
}
}
}
};
}
}
/**
* Used by open() which tracks the resolved url itself
*/
final class URLRunner extends AbstractRunner<HttpURLConnection> {
private final URL url; private final URL url;
@Override @Override
protected URL getUrl() { protected URL getUrl() {
@ -682,6 +713,11 @@ public class WebHdfsFileSystem extends FileSystem
super(op, redirected); super(op, redirected);
this.url = url; this.url = url;
} }
@Override
HttpURLConnection getResponse(HttpURLConnection conn) throws IOException {
return conn;
}
} }
private FsPermission applyUMask(FsPermission permission) { private FsPermission applyUMask(FsPermission permission) {
@ -693,8 +729,12 @@ public class WebHdfsFileSystem extends FileSystem
private HdfsFileStatus getHdfsFileStatus(Path f) throws IOException { private HdfsFileStatus getHdfsFileStatus(Path f) throws IOException {
final HttpOpParam.Op op = GetOpParam.Op.GETFILESTATUS; final HttpOpParam.Op op = GetOpParam.Op.GETFILESTATUS;
final Map<?, ?> json = run(op, f); HdfsFileStatus status = new FsPathResponseRunner<HdfsFileStatus>(op, f) {
final HdfsFileStatus status = JsonUtil.toFileStatus(json, true); @Override
HdfsFileStatus decodeResponse(Map<?,?> json) {
return JsonUtil.toFileStatus(json, true);
}
}.run();
if (status == null) { if (status == null) {
throw new FileNotFoundException("File does not exist: " + f); throw new FileNotFoundException("File does not exist: " + f);
} }
@ -718,8 +758,12 @@ public class WebHdfsFileSystem extends FileSystem
@Override @Override
public AclStatus getAclStatus(Path f) throws IOException { public AclStatus getAclStatus(Path f) throws IOException {
final HttpOpParam.Op op = GetOpParam.Op.GETACLSTATUS; final HttpOpParam.Op op = GetOpParam.Op.GETACLSTATUS;
final Map<?, ?> json = run(op, f); AclStatus status = new FsPathResponseRunner<AclStatus>(op, f) {
AclStatus status = JsonUtil.toAclStatus(json); @Override
AclStatus decodeResponse(Map<?,?> json) {
return JsonUtil.toAclStatus(json);
}
}.run();
if (status == null) { if (status == null) {
throw new FileNotFoundException("File does not exist: " + f); throw new FileNotFoundException("File does not exist: " + f);
} }
@ -730,9 +774,9 @@ public class WebHdfsFileSystem extends FileSystem
public boolean mkdirs(Path f, FsPermission permission) throws IOException { public boolean mkdirs(Path f, FsPermission permission) throws IOException {
statistics.incrementWriteOps(1); statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.MKDIRS; final HttpOpParam.Op op = PutOpParam.Op.MKDIRS;
final Map<?, ?> json = run(op, f, return new FsPathBooleanRunner(op, f,
new PermissionParam(applyUMask(permission))); new PermissionParam(applyUMask(permission))
return (Boolean)json.get("boolean"); ).run();
} }
/** /**
@ -743,17 +787,19 @@ public class WebHdfsFileSystem extends FileSystem
) throws IOException { ) throws IOException {
statistics.incrementWriteOps(1); statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.CREATESYMLINK; final HttpOpParam.Op op = PutOpParam.Op.CREATESYMLINK;
run(op, f, new DestinationParam(makeQualified(destination).toUri().getPath()), new FsPathRunner(op, f,
new CreateParentParam(createParent)); new DestinationParam(makeQualified(destination).toUri().getPath()),
new CreateParentParam(createParent)
).run();
} }
@Override @Override
public boolean rename(final Path src, final Path dst) throws IOException { public boolean rename(final Path src, final Path dst) throws IOException {
statistics.incrementWriteOps(1); statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.RENAME; final HttpOpParam.Op op = PutOpParam.Op.RENAME;
final Map<?, ?> json = run(op, src, return new FsPathBooleanRunner(op, src,
new DestinationParam(makeQualified(dst).toUri().getPath())); new DestinationParam(makeQualified(dst).toUri().getPath())
return (Boolean)json.get("boolean"); ).run();
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@ -762,8 +808,10 @@ public class WebHdfsFileSystem extends FileSystem
final Options.Rename... options) throws IOException { final Options.Rename... options) throws IOException {
statistics.incrementWriteOps(1); statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.RENAME; final HttpOpParam.Op op = PutOpParam.Op.RENAME;
run(op, src, new DestinationParam(makeQualified(dst).toUri().getPath()), new FsPathRunner(op, src,
new RenameOptionSetParam(options)); new DestinationParam(makeQualified(dst).toUri().getPath()),
new RenameOptionSetParam(options)
).run();
} }
@Override @Override
@ -775,7 +823,9 @@ public class WebHdfsFileSystem extends FileSystem
statistics.incrementWriteOps(1); statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.SETOWNER; final HttpOpParam.Op op = PutOpParam.Op.SETOWNER;
run(op, p, new OwnerParam(owner), new GroupParam(group)); new FsPathRunner(op, p,
new OwnerParam(owner), new GroupParam(group)
).run();
} }
@Override @Override
@ -783,7 +833,7 @@ public class WebHdfsFileSystem extends FileSystem
) throws IOException { ) throws IOException {
statistics.incrementWriteOps(1); statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.SETPERMISSION; final HttpOpParam.Op op = PutOpParam.Op.SETPERMISSION;
run(op, p, new PermissionParam(permission)); new FsPathRunner(op, p,new PermissionParam(permission)).run();
} }
@Override @Override
@ -791,7 +841,7 @@ public class WebHdfsFileSystem extends FileSystem
throws IOException { throws IOException {
statistics.incrementWriteOps(1); statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.MODIFYACLENTRIES; final HttpOpParam.Op op = PutOpParam.Op.MODIFYACLENTRIES;
run(op, path, new AclPermissionParam(aclSpec)); new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run();
} }
@Override @Override
@ -799,21 +849,21 @@ public class WebHdfsFileSystem extends FileSystem
throws IOException { throws IOException {
statistics.incrementWriteOps(1); statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.REMOVEACLENTRIES; final HttpOpParam.Op op = PutOpParam.Op.REMOVEACLENTRIES;
run(op, path, new AclPermissionParam(aclSpec)); new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run();
} }
@Override @Override
public void removeDefaultAcl(Path path) throws IOException { public void removeDefaultAcl(Path path) throws IOException {
statistics.incrementWriteOps(1); statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.REMOVEDEFAULTACL; final HttpOpParam.Op op = PutOpParam.Op.REMOVEDEFAULTACL;
run(op, path); new FsPathRunner(op, path).run();
} }
@Override @Override
public void removeAcl(Path path) throws IOException { public void removeAcl(Path path) throws IOException {
statistics.incrementWriteOps(1); statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.REMOVEACL; final HttpOpParam.Op op = PutOpParam.Op.REMOVEACL;
run(op, path); new FsPathRunner(op, path).run();
} }
@Override @Override
@ -821,7 +871,7 @@ public class WebHdfsFileSystem extends FileSystem
throws IOException { throws IOException {
statistics.incrementWriteOps(1); statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.SETACL; final HttpOpParam.Op op = PutOpParam.Op.SETACL;
run(op, p, new AclPermissionParam(aclSpec)); new FsPathRunner(op, p, new AclPermissionParam(aclSpec)).run();
} }
@Override @Override
@ -829,8 +879,9 @@ public class WebHdfsFileSystem extends FileSystem
) throws IOException { ) throws IOException {
statistics.incrementWriteOps(1); statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.SETREPLICATION; final HttpOpParam.Op op = PutOpParam.Op.SETREPLICATION;
final Map<?, ?> json = run(op, p, new ReplicationParam(replication)); return new FsPathBooleanRunner(op, p,
return (Boolean)json.get("boolean"); new ReplicationParam(replication)
).run();
} }
@Override @Override
@ -838,7 +889,10 @@ public class WebHdfsFileSystem extends FileSystem
) throws IOException { ) throws IOException {
statistics.incrementWriteOps(1); statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.SETTIMES; final HttpOpParam.Op op = PutOpParam.Op.SETTIMES;
run(op, p, new ModificationTimeParam(mtime), new AccessTimeParam(atime)); new FsPathRunner(op, p,
new ModificationTimeParam(mtime),
new AccessTimeParam(atime)
).run();
} }
@Override @Override
@ -853,32 +907,11 @@ public class WebHdfsFileSystem extends FileSystem
DFSConfigKeys.DFS_REPLICATION_DEFAULT); DFSConfigKeys.DFS_REPLICATION_DEFAULT);
} }
FSDataOutputStream write(final HttpOpParam.Op op,
final HttpURLConnection conn, final int bufferSize) throws IOException {
return new FSDataOutputStream(new BufferedOutputStream(
conn.getOutputStream(), bufferSize), statistics) {
@Override
public void close() throws IOException {
try {
super.close();
} finally {
try {
validateResponse(op, conn, true);
} finally {
conn.disconnect();
}
}
}
};
}
@Override @Override
public void concat(final Path trg, final Path [] srcs) throws IOException { public void concat(final Path trg, final Path [] srcs) throws IOException {
statistics.incrementWriteOps(1); statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PostOpParam.Op.CONCAT; final HttpOpParam.Op op = PostOpParam.Op.CONCAT;
new FsPathRunner(op, trg, new ConcatSourcesParam(srcs)).run();
ConcatSourcesParam param = new ConcatSourcesParam(srcs);
run(op, trg, param);
} }
@Override @Override
@ -888,14 +921,13 @@ public class WebHdfsFileSystem extends FileSystem
statistics.incrementWriteOps(1); statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.CREATE; final HttpOpParam.Op op = PutOpParam.Op.CREATE;
return new FsPathRunner(op, f, return new FsPathOutputStreamRunner(op, f, bufferSize,
new PermissionParam(applyUMask(permission)), new PermissionParam(applyUMask(permission)),
new OverwriteParam(overwrite), new OverwriteParam(overwrite),
new BufferSizeParam(bufferSize), new BufferSizeParam(bufferSize),
new ReplicationParam(replication), new ReplicationParam(replication),
new BlockSizeParam(blockSize)) new BlockSizeParam(blockSize)
.run() ).run();
.write(bufferSize);
} }
@Override @Override
@ -904,16 +936,17 @@ public class WebHdfsFileSystem extends FileSystem
statistics.incrementWriteOps(1); statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PostOpParam.Op.APPEND; final HttpOpParam.Op op = PostOpParam.Op.APPEND;
return new FsPathRunner(op, f, new BufferSizeParam(bufferSize)) return new FsPathOutputStreamRunner(op, f, bufferSize,
.run() new BufferSizeParam(bufferSize)
.write(bufferSize); ).run();
} }
@Override @Override
public boolean delete(Path f, boolean recursive) throws IOException { public boolean delete(Path f, boolean recursive) throws IOException {
final HttpOpParam.Op op = DeleteOpParam.Op.DELETE; final HttpOpParam.Op op = DeleteOpParam.Op.DELETE;
final Map<?, ?> json = run(op, f, new RecursiveParam(recursive)); return new FsPathBooleanRunner(op, f,
return (Boolean)json.get("boolean"); new RecursiveParam(recursive)
).run();
} }
@Override @Override
@ -945,7 +978,7 @@ public class WebHdfsFileSystem extends FileSystem
final boolean resolved) throws IOException { final boolean resolved) throws IOException {
final URL offsetUrl = offset == 0L? url final URL offsetUrl = offset == 0L? url
: new URL(url + "&" + new OffsetParam(offset)); : new URL(url + "&" + new OffsetParam(offset));
return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved).run().conn; return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved).run();
} }
} }
@ -1001,7 +1034,9 @@ public class WebHdfsFileSystem extends FileSystem
statistics.incrementReadOps(1); statistics.incrementReadOps(1);
final HttpOpParam.Op op = GetOpParam.Op.LISTSTATUS; final HttpOpParam.Op op = GetOpParam.Op.LISTSTATUS;
final Map<?, ?> json = run(op, f); return new FsPathResponseRunner<FileStatus[]>(op, f) {
@Override
FileStatus[] decodeResponse(Map<?,?> json) {
final Map<?, ?> rootmap = (Map<?, ?>)json.get(FileStatus.class.getSimpleName() + "es"); final Map<?, ?> rootmap = (Map<?, ?>)json.get(FileStatus.class.getSimpleName() + "es");
final Object[] array = (Object[])rootmap.get(FileStatus.class.getSimpleName()); final Object[] array = (Object[])rootmap.get(FileStatus.class.getSimpleName());
@ -1013,13 +1048,22 @@ public class WebHdfsFileSystem extends FileSystem
} }
return statuses; return statuses;
} }
}.run();
}
@Override @Override
public Token<DelegationTokenIdentifier> getDelegationToken( public Token<DelegationTokenIdentifier> getDelegationToken(
final String renewer) throws IOException { final String renewer) throws IOException {
final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKEN; final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKEN;
final Map<?, ?> m = run(op, null, new RenewerParam(renewer)); Token<DelegationTokenIdentifier> token =
final Token<DelegationTokenIdentifier> token = JsonUtil.toDelegationToken(m); new FsPathResponseRunner<Token<DelegationTokenIdentifier>>(
op, null, new RenewerParam(renewer)) {
@Override
Token<DelegationTokenIdentifier> decodeResponse(Map<?,?> json)
throws IOException {
return JsonUtil.toDelegationToken(json);
}
}.run();
token.setService(tokenServiceName); token.setService(tokenServiceName);
return token; return token;
} }
@ -1041,19 +1085,22 @@ public class WebHdfsFileSystem extends FileSystem
public synchronized long renewDelegationToken(final Token<?> token public synchronized long renewDelegationToken(final Token<?> token
) throws IOException { ) throws IOException {
final HttpOpParam.Op op = PutOpParam.Op.RENEWDELEGATIONTOKEN; final HttpOpParam.Op op = PutOpParam.Op.RENEWDELEGATIONTOKEN;
TokenArgumentParam dtargParam = new TokenArgumentParam( return new FsPathResponseRunner<Long>(op, null,
token.encodeToUrlString()); new TokenArgumentParam(token.encodeToUrlString())) {
final Map<?, ?> m = run(op, null, dtargParam); @Override
return (Long) m.get("long"); Long decodeResponse(Map<?,?> json) throws IOException {
return (Long) json.get("long");
}
}.run();
} }
@Override @Override
public synchronized void cancelDelegationToken(final Token<?> token public synchronized void cancelDelegationToken(final Token<?> token
) throws IOException { ) throws IOException {
final HttpOpParam.Op op = PutOpParam.Op.CANCELDELEGATIONTOKEN; final HttpOpParam.Op op = PutOpParam.Op.CANCELDELEGATIONTOKEN;
TokenArgumentParam dtargParam = new TokenArgumentParam( new FsPathRunner(op, null,
token.encodeToUrlString()); new TokenArgumentParam(token.encodeToUrlString())
run(op, null, dtargParam); ).run();
} }
@Override @Override
@ -1071,9 +1118,14 @@ public class WebHdfsFileSystem extends FileSystem
statistics.incrementReadOps(1); statistics.incrementReadOps(1);
final HttpOpParam.Op op = GetOpParam.Op.GET_BLOCK_LOCATIONS; final HttpOpParam.Op op = GetOpParam.Op.GET_BLOCK_LOCATIONS;
final Map<?, ?> m = run(op, p, new OffsetParam(offset), return new FsPathResponseRunner<BlockLocation[]>(op, p,
new LengthParam(length)); new OffsetParam(offset), new LengthParam(length)) {
return DFSUtil.locatedBlocks2Locations(JsonUtil.toLocatedBlocks(m)); @Override
BlockLocation[] decodeResponse(Map<?,?> json) throws IOException {
return DFSUtil.locatedBlocks2Locations(
JsonUtil.toLocatedBlocks(json));
}
}.run();
} }
@Override @Override
@ -1081,8 +1133,12 @@ public class WebHdfsFileSystem extends FileSystem
statistics.incrementReadOps(1); statistics.incrementReadOps(1);
final HttpOpParam.Op op = GetOpParam.Op.GETCONTENTSUMMARY; final HttpOpParam.Op op = GetOpParam.Op.GETCONTENTSUMMARY;
final Map<?, ?> m = run(op, p); return new FsPathResponseRunner<ContentSummary>(op, p) {
return JsonUtil.toContentSummary(m); @Override
ContentSummary decodeResponse(Map<?,?> json) {
return JsonUtil.toContentSummary(json);
}
}.run();
} }
@Override @Override
@ -1091,8 +1147,12 @@ public class WebHdfsFileSystem extends FileSystem
statistics.incrementReadOps(1); statistics.incrementReadOps(1);
final HttpOpParam.Op op = GetOpParam.Op.GETFILECHECKSUM; final HttpOpParam.Op op = GetOpParam.Op.GETFILECHECKSUM;
final Map<?, ?> m = run(op, p); return new FsPathResponseRunner<MD5MD5CRC32FileChecksum>(op, p) {
return JsonUtil.toMD5MD5CRC32FileChecksum(m); @Override
MD5MD5CRC32FileChecksum decodeResponse(Map<?,?> json) throws IOException {
return JsonUtil.toMD5MD5CRC32FileChecksum(json);
}
}.run();
} }
/** /**

View File

@ -102,7 +102,7 @@ public abstract class HttpOpParam<E extends Enum<E> & HttpOpParam.Op>
@Override @Override
public boolean getDoOutput() { public boolean getDoOutput() {
return op.getDoOutput(); return false;
} }
@Override @Override

View File

@ -94,10 +94,4 @@ public class WebHdfsTestUtil {
Assert.assertEquals(expectedResponseCode, conn.getResponseCode()); Assert.assertEquals(expectedResponseCode, conn.getResponseCode());
return WebHdfsFileSystem.jsonParse(conn, false); return WebHdfsFileSystem.jsonParse(conn, false);
} }
public static FSDataOutputStream write(final WebHdfsFileSystem webhdfs,
final HttpOpParam.Op op, final HttpURLConnection conn,
final int bufferSize) throws IOException {
return webhdfs.write(op, conn, bufferSize);
}
} }