diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 1e0440e62f7..d8873b42b35 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -209,6 +209,9 @@ Release 2.5.0 - UNRELEASED HDFS-6367. EnumSetParam$Domain#parse fails for parameter containing more than one enum. (Yi Liu via umamahesh) + HDFS-6305. WebHdfs response decoding may throw RuntimeExceptions (Daryn + Sharp via jeagles) + Release 2.4.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java index 7951253a7fc..5d9ce3d36f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java @@ -58,34 +58,8 @@ import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.namenode.SafeModeException; -import org.apache.hadoop.hdfs.web.resources.AccessTimeParam; -import org.apache.hadoop.hdfs.web.resources.AclPermissionParam; -import org.apache.hadoop.hdfs.web.resources.BlockSizeParam; -import org.apache.hadoop.hdfs.web.resources.BufferSizeParam; -import org.apache.hadoop.hdfs.web.resources.ConcatSourcesParam; -import org.apache.hadoop.hdfs.web.resources.CreateParentParam; -import org.apache.hadoop.hdfs.web.resources.DelegationParam; -import org.apache.hadoop.hdfs.web.resources.DeleteOpParam; -import org.apache.hadoop.hdfs.web.resources.DestinationParam; -import org.apache.hadoop.hdfs.web.resources.DoAsParam; -import org.apache.hadoop.hdfs.web.resources.GetOpParam; -import org.apache.hadoop.hdfs.web.resources.GroupParam; -import org.apache.hadoop.hdfs.web.resources.HttpOpParam; -import org.apache.hadoop.hdfs.web.resources.LengthParam; -import org.apache.hadoop.hdfs.web.resources.ModificationTimeParam; -import org.apache.hadoop.hdfs.web.resources.OffsetParam; -import org.apache.hadoop.hdfs.web.resources.OverwriteParam; -import org.apache.hadoop.hdfs.web.resources.OwnerParam; -import org.apache.hadoop.hdfs.web.resources.Param; -import org.apache.hadoop.hdfs.web.resources.PermissionParam; -import org.apache.hadoop.hdfs.web.resources.PostOpParam; -import org.apache.hadoop.hdfs.web.resources.PutOpParam; -import org.apache.hadoop.hdfs.web.resources.RecursiveParam; -import org.apache.hadoop.hdfs.web.resources.RenameOptionSetParam; -import org.apache.hadoop.hdfs.web.resources.RenewerParam; -import org.apache.hadoop.hdfs.web.resources.ReplicationParam; -import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam; -import org.apache.hadoop.hdfs.web.resources.UserParam; +import org.apache.hadoop.hdfs.web.resources.*; +import org.apache.hadoop.hdfs.web.resources.HttpOpParam.Op; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; @@ -425,41 +399,24 @@ public class WebHdfsFileSystem extends FileSystem return url; } - /** - * Run a http operation. - * Connect to the http server, validate response, and obtain the JSON output. - * - * @param op http operation - * @param fspath file system path - * @param parameters parameters for the operation - * @return a JSON object, e.g. Object[], Map, etc. - * @throws IOException - */ - private Map run(final HttpOpParam.Op op, final Path fspath, - final Param... parameters) throws IOException { - return new FsPathRunner(op, fspath, parameters).run().json; - } - /** * This class is for initialing a HTTP connection, connecting to server, * obtaining a response, and also handling retry on failures. */ - abstract class AbstractRunner { + abstract class AbstractRunner { abstract protected URL getUrl() throws IOException; protected final HttpOpParam.Op op; private final boolean redirected; private boolean checkRetry; - protected HttpURLConnection conn = null; - private Map json = null; protected AbstractRunner(final HttpOpParam.Op op, boolean redirected) { this.op = op; this.redirected = redirected; } - AbstractRunner run() throws IOException { + T run() throws IOException { UserGroupInformation connectUgi = ugi.getRealUser(); if (connectUgi == null) { connectUgi = ugi; @@ -471,9 +428,9 @@ public class WebHdfsFileSystem extends FileSystem // the entire lifecycle of the connection must be run inside the // doAs to ensure authentication is performed correctly return connectUgi.doAs( - new PrivilegedExceptionAction() { + new PrivilegedExceptionAction() { @Override - public AbstractRunner run() throws IOException { + public T run() throws IOException { return runWithRetry(); } }); @@ -481,18 +438,51 @@ public class WebHdfsFileSystem extends FileSystem throw new IOException(e); } } - - private void init() throws IOException { - checkRetry = !redirected; - URL url = getUrl(); - conn = (HttpURLConnection) connectionFactory.openConnection(url); - } - - private void connect() throws IOException { - connect(op.getDoOutput()); + + /** + * Two-step requests redirected to a DN + * + * Create/Append: + * Step 1) Submit a Http request with neither auto-redirect nor data. + * Step 2) Submit another Http request with the URL from the Location header with data. + * + * The reason of having two-step create/append is for preventing clients to + * send out the data before the redirect. This issue is addressed by the + * "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3. + * Unfortunately, there are software library bugs (e.g. Jetty 6 http server + * and Java 6 http client), which do not correctly implement "Expect: + * 100-continue". The two-step create/append is a temporary workaround for + * the software library bugs. + * + * Open/Checksum + * Also implements two-step connects for other operations redirected to + * a DN such as open and checksum + */ + private HttpURLConnection connect(URL url) throws IOException { + // resolve redirects for a DN operation unless already resolved + if (op.getRedirect() && !redirected) { + final HttpOpParam.Op redirectOp = + HttpOpParam.TemporaryRedirectOp.valueOf(op); + final HttpURLConnection conn = connect(redirectOp, url); + // application level proxy like httpfs might not issue a redirect + if (conn.getResponseCode() == op.getExpectedHttpResponseCode()) { + return conn; + } + try { + validateResponse(redirectOp, conn, false); + url = new URL(conn.getHeaderField("Location")); + } finally { + conn.disconnect(); + } + } + return connect(op, url); } - private void connect(boolean doOutput) throws IOException { + private HttpURLConnection connect(final HttpOpParam.Op op, final URL url) + throws IOException { + final HttpURLConnection conn = + (HttpURLConnection)connectionFactory.openConnection(url); + final boolean doOutput = op.getDoOutput(); conn.setRequestMethod(op.getType().toString()); conn.setInstanceFollowRedirects(false); switch (op.getType()) { @@ -505,6 +495,10 @@ public class WebHdfsFileSystem extends FileSystem // explicitly setting content-length to 0 won't do spnego!! // opening and closing the stream will send "Content-Length: 0" conn.getOutputStream().close(); + } else { + conn.setRequestProperty("Content-Type", + MediaType.APPLICATION_OCTET_STREAM); + conn.setChunkedStreamingMode(32 << 10); //32kB-chunk } break; } @@ -514,16 +508,10 @@ public class WebHdfsFileSystem extends FileSystem } } conn.connect(); + return conn; } - private void disconnect() { - if (conn != null) { - conn.disconnect(); - conn = null; - } - } - - private AbstractRunner runWithRetry() throws IOException { + private T runWithRetry() throws IOException { /** * Do the real work. * @@ -541,15 +529,16 @@ public class WebHdfsFileSystem extends FileSystem * examines the exception and swallows it if it decides to rerun the work. */ for(int retry = 0; ; retry++) { + checkRetry = !redirected; + final URL url = getUrl(); try { - init(); - if (op.getDoOutput()) { - twoStepWrite(); - } else { - getResponse(op != GetOpParam.Op.OPEN); + final HttpURLConnection conn = connect(url); + // output streams will validate on close + if (!op.getDoOutput()) { + validateResponse(op, conn, false); } - return this; - } catch(IOException ioe) { + return getResponse(conn); + } catch (IOException ioe) { Throwable cause = ioe.getCause(); if (cause != null && cause instanceof AuthenticationException) { throw ioe; // no retries for auth failures @@ -591,87 +580,129 @@ public class WebHdfsFileSystem extends FileSystem throw toIOException(ioe); } - /** - * Two-step Create/Append: - * Step 1) Submit a Http request with neither auto-redirect nor data. - * Step 2) Submit another Http request with the URL from the Location header with data. - * - * The reason of having two-step create/append is for preventing clients to - * send out the data before the redirect. This issue is addressed by the - * "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3. - * Unfortunately, there are software library bugs (e.g. Jetty 6 http server - * and Java 6 http client), which do not correctly implement "Expect: - * 100-continue". The two-step create/append is a temporary workaround for - * the software library bugs. - */ - HttpURLConnection twoStepWrite() throws IOException { - //Step 1) Submit a Http request with neither auto-redirect nor data. - connect(false); - validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op), conn, false); - final String redirect = conn.getHeaderField("Location"); - disconnect(); - checkRetry = false; - - //Step 2) Submit another Http request with the URL from the Location header with data. - conn = (HttpURLConnection) connectionFactory.openConnection(new URL( - redirect)); - conn.setRequestProperty("Content-Type", - MediaType.APPLICATION_OCTET_STREAM); - conn.setChunkedStreamingMode(32 << 10); //32kB-chunk - connect(); - return conn; - } - - FSDataOutputStream write(final int bufferSize) throws IOException { - return WebHdfsFileSystem.this.write(op, conn, bufferSize); - } - - void getResponse(boolean getJsonAndDisconnect) throws IOException { - try { - connect(); - final int code = conn.getResponseCode(); - if (!redirected && op.getRedirect() - && code != op.getExpectedHttpResponseCode()) { - final String redirect = conn.getHeaderField("Location"); - json = validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op), - conn, false); - disconnect(); - - checkRetry = false; - conn = (HttpURLConnection) connectionFactory.openConnection(new URL( - redirect)); - connect(); - } - - json = validateResponse(op, conn, false); - if (json == null && getJsonAndDisconnect) { - json = jsonParse(conn, false); - } - } finally { - if (getJsonAndDisconnect) { - disconnect(); - } - } - } + abstract T getResponse(HttpURLConnection conn) throws IOException; } - final class FsPathRunner extends AbstractRunner { + /** + * Abstract base class to handle path-based operations with params + */ + abstract class AbstractFsPathRunner extends AbstractRunner { private final Path fspath; - private final Param[] parameters; - - FsPathRunner(final HttpOpParam.Op op, final Path fspath, final Param... parameters) { + private final Param[] parameters; + + AbstractFsPathRunner(final HttpOpParam.Op op, final Path fspath, + Param... parameters) { super(op, false); this.fspath = fspath; this.parameters = parameters; } - + @Override protected URL getUrl() throws IOException { return toUrl(op, fspath, parameters); } } - final class URLRunner extends AbstractRunner { + /** + * Default path-based implementation expects no json response + */ + class FsPathRunner extends AbstractFsPathRunner { + FsPathRunner(Op op, Path fspath, Param... parameters) { + super(op, fspath, parameters); + } + + @Override + Void getResponse(HttpURLConnection conn) throws IOException { + return null; + } + } + + /** + * Handle path-based operations with a json response + */ + abstract class FsPathResponseRunner extends AbstractFsPathRunner { + FsPathResponseRunner(final HttpOpParam.Op op, final Path fspath, + Param... parameters) { + super(op, fspath, parameters); + } + + @Override + final T getResponse(HttpURLConnection conn) throws IOException { + try { + final Map json = jsonParse(conn, false); + if (json == null) { + // match exception class thrown by parser + throw new IllegalStateException("Missing response"); + } + return decodeResponse(json); + } catch (IOException ioe) { + throw ioe; + } catch (Exception e) { // catch json parser errors + final IOException ioe = + new IOException("Response decoding failure: "+e.toString(), e); + if (LOG.isDebugEnabled()) { + LOG.debug(ioe); + } + throw ioe; + } finally { + conn.disconnect(); + } + } + + abstract T decodeResponse(Map json) throws IOException; + } + + /** + * Handle path-based operations with json boolean response + */ + class FsPathBooleanRunner extends FsPathResponseRunner { + FsPathBooleanRunner(Op op, Path fspath, Param... parameters) { + super(op, fspath, parameters); + } + + @Override + Boolean decodeResponse(Map json) throws IOException { + return (Boolean)json.get("boolean"); + } + } + + /** + * Handle create/append output streams + */ + class FsPathOutputStreamRunner extends AbstractFsPathRunner { + private final int bufferSize; + + FsPathOutputStreamRunner(Op op, Path fspath, int bufferSize, + Param... parameters) { + super(op, fspath, parameters); + this.bufferSize = bufferSize; + } + + @Override + FSDataOutputStream getResponse(final HttpURLConnection conn) + throws IOException { + return new FSDataOutputStream(new BufferedOutputStream( + conn.getOutputStream(), bufferSize), statistics) { + @Override + public void close() throws IOException { + try { + super.close(); + } finally { + try { + validateResponse(op, conn, true); + } finally { + conn.disconnect(); + } + } + } + }; + } + } + + /** + * Used by open() which tracks the resolved url itself + */ + final class URLRunner extends AbstractRunner { private final URL url; @Override protected URL getUrl() { @@ -682,6 +713,11 @@ public class WebHdfsFileSystem extends FileSystem super(op, redirected); this.url = url; } + + @Override + HttpURLConnection getResponse(HttpURLConnection conn) throws IOException { + return conn; + } } private FsPermission applyUMask(FsPermission permission) { @@ -693,8 +729,12 @@ public class WebHdfsFileSystem extends FileSystem private HdfsFileStatus getHdfsFileStatus(Path f) throws IOException { final HttpOpParam.Op op = GetOpParam.Op.GETFILESTATUS; - final Map json = run(op, f); - final HdfsFileStatus status = JsonUtil.toFileStatus(json, true); + HdfsFileStatus status = new FsPathResponseRunner(op, f) { + @Override + HdfsFileStatus decodeResponse(Map json) { + return JsonUtil.toFileStatus(json, true); + } + }.run(); if (status == null) { throw new FileNotFoundException("File does not exist: " + f); } @@ -718,8 +758,12 @@ public class WebHdfsFileSystem extends FileSystem @Override public AclStatus getAclStatus(Path f) throws IOException { final HttpOpParam.Op op = GetOpParam.Op.GETACLSTATUS; - final Map json = run(op, f); - AclStatus status = JsonUtil.toAclStatus(json); + AclStatus status = new FsPathResponseRunner(op, f) { + @Override + AclStatus decodeResponse(Map json) { + return JsonUtil.toAclStatus(json); + } + }.run(); if (status == null) { throw new FileNotFoundException("File does not exist: " + f); } @@ -730,9 +774,9 @@ public class WebHdfsFileSystem extends FileSystem public boolean mkdirs(Path f, FsPermission permission) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.MKDIRS; - final Map json = run(op, f, - new PermissionParam(applyUMask(permission))); - return (Boolean)json.get("boolean"); + return new FsPathBooleanRunner(op, f, + new PermissionParam(applyUMask(permission)) + ).run(); } /** @@ -743,17 +787,19 @@ public class WebHdfsFileSystem extends FileSystem ) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.CREATESYMLINK; - run(op, f, new DestinationParam(makeQualified(destination).toUri().getPath()), - new CreateParentParam(createParent)); + new FsPathRunner(op, f, + new DestinationParam(makeQualified(destination).toUri().getPath()), + new CreateParentParam(createParent) + ).run(); } @Override public boolean rename(final Path src, final Path dst) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.RENAME; - final Map json = run(op, src, - new DestinationParam(makeQualified(dst).toUri().getPath())); - return (Boolean)json.get("boolean"); + return new FsPathBooleanRunner(op, src, + new DestinationParam(makeQualified(dst).toUri().getPath()) + ).run(); } @SuppressWarnings("deprecation") @@ -762,8 +808,10 @@ public class WebHdfsFileSystem extends FileSystem final Options.Rename... options) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.RENAME; - run(op, src, new DestinationParam(makeQualified(dst).toUri().getPath()), - new RenameOptionSetParam(options)); + new FsPathRunner(op, src, + new DestinationParam(makeQualified(dst).toUri().getPath()), + new RenameOptionSetParam(options) + ).run(); } @Override @@ -775,7 +823,9 @@ public class WebHdfsFileSystem extends FileSystem statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.SETOWNER; - run(op, p, new OwnerParam(owner), new GroupParam(group)); + new FsPathRunner(op, p, + new OwnerParam(owner), new GroupParam(group) + ).run(); } @Override @@ -783,7 +833,7 @@ public class WebHdfsFileSystem extends FileSystem ) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.SETPERMISSION; - run(op, p, new PermissionParam(permission)); + new FsPathRunner(op, p,new PermissionParam(permission)).run(); } @Override @@ -791,7 +841,7 @@ public class WebHdfsFileSystem extends FileSystem throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.MODIFYACLENTRIES; - run(op, path, new AclPermissionParam(aclSpec)); + new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run(); } @Override @@ -799,21 +849,21 @@ public class WebHdfsFileSystem extends FileSystem throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.REMOVEACLENTRIES; - run(op, path, new AclPermissionParam(aclSpec)); + new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run(); } @Override public void removeDefaultAcl(Path path) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.REMOVEDEFAULTACL; - run(op, path); + new FsPathRunner(op, path).run(); } @Override public void removeAcl(Path path) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.REMOVEACL; - run(op, path); + new FsPathRunner(op, path).run(); } @Override @@ -821,7 +871,7 @@ public class WebHdfsFileSystem extends FileSystem throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.SETACL; - run(op, p, new AclPermissionParam(aclSpec)); + new FsPathRunner(op, p, new AclPermissionParam(aclSpec)).run(); } @Override @@ -829,8 +879,9 @@ public class WebHdfsFileSystem extends FileSystem ) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.SETREPLICATION; - final Map json = run(op, p, new ReplicationParam(replication)); - return (Boolean)json.get("boolean"); + return new FsPathBooleanRunner(op, p, + new ReplicationParam(replication) + ).run(); } @Override @@ -838,7 +889,10 @@ public class WebHdfsFileSystem extends FileSystem ) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.SETTIMES; - run(op, p, new ModificationTimeParam(mtime), new AccessTimeParam(atime)); + new FsPathRunner(op, p, + new ModificationTimeParam(mtime), + new AccessTimeParam(atime) + ).run(); } @Override @@ -853,32 +907,11 @@ public class WebHdfsFileSystem extends FileSystem DFSConfigKeys.DFS_REPLICATION_DEFAULT); } - FSDataOutputStream write(final HttpOpParam.Op op, - final HttpURLConnection conn, final int bufferSize) throws IOException { - return new FSDataOutputStream(new BufferedOutputStream( - conn.getOutputStream(), bufferSize), statistics) { - @Override - public void close() throws IOException { - try { - super.close(); - } finally { - try { - validateResponse(op, conn, true); - } finally { - conn.disconnect(); - } - } - } - }; - } - @Override public void concat(final Path trg, final Path [] srcs) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PostOpParam.Op.CONCAT; - - ConcatSourcesParam param = new ConcatSourcesParam(srcs); - run(op, trg, param); + new FsPathRunner(op, trg, new ConcatSourcesParam(srcs)).run(); } @Override @@ -888,14 +921,13 @@ public class WebHdfsFileSystem extends FileSystem statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.CREATE; - return new FsPathRunner(op, f, + return new FsPathOutputStreamRunner(op, f, bufferSize, new PermissionParam(applyUMask(permission)), new OverwriteParam(overwrite), new BufferSizeParam(bufferSize), new ReplicationParam(replication), - new BlockSizeParam(blockSize)) - .run() - .write(bufferSize); + new BlockSizeParam(blockSize) + ).run(); } @Override @@ -904,16 +936,17 @@ public class WebHdfsFileSystem extends FileSystem statistics.incrementWriteOps(1); final HttpOpParam.Op op = PostOpParam.Op.APPEND; - return new FsPathRunner(op, f, new BufferSizeParam(bufferSize)) - .run() - .write(bufferSize); + return new FsPathOutputStreamRunner(op, f, bufferSize, + new BufferSizeParam(bufferSize) + ).run(); } @Override public boolean delete(Path f, boolean recursive) throws IOException { final HttpOpParam.Op op = DeleteOpParam.Op.DELETE; - final Map json = run(op, f, new RecursiveParam(recursive)); - return (Boolean)json.get("boolean"); + return new FsPathBooleanRunner(op, f, + new RecursiveParam(recursive) + ).run(); } @Override @@ -945,7 +978,7 @@ public class WebHdfsFileSystem extends FileSystem final boolean resolved) throws IOException { final URL offsetUrl = offset == 0L? url : new URL(url + "&" + new OffsetParam(offset)); - return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved).run().conn; + return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved).run(); } } @@ -1001,25 +1034,36 @@ public class WebHdfsFileSystem extends FileSystem statistics.incrementReadOps(1); final HttpOpParam.Op op = GetOpParam.Op.LISTSTATUS; - final Map json = run(op, f); - final Map rootmap = (Map)json.get(FileStatus.class.getSimpleName() + "es"); - final Object[] array = (Object[])rootmap.get(FileStatus.class.getSimpleName()); + return new FsPathResponseRunner(op, f) { + @Override + FileStatus[] decodeResponse(Map json) { + final Map rootmap = (Map)json.get(FileStatus.class.getSimpleName() + "es"); + final Object[] array = (Object[])rootmap.get(FileStatus.class.getSimpleName()); - //convert FileStatus - final FileStatus[] statuses = new FileStatus[array.length]; - for(int i = 0; i < array.length; i++) { - final Map m = (Map)array[i]; - statuses[i] = makeQualified(JsonUtil.toFileStatus(m, false), f); - } - return statuses; + //convert FileStatus + final FileStatus[] statuses = new FileStatus[array.length]; + for (int i = 0; i < array.length; i++) { + final Map m = (Map)array[i]; + statuses[i] = makeQualified(JsonUtil.toFileStatus(m, false), f); + } + return statuses; + } + }.run(); } @Override public Token getDelegationToken( final String renewer) throws IOException { final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKEN; - final Map m = run(op, null, new RenewerParam(renewer)); - final Token token = JsonUtil.toDelegationToken(m); + Token token = + new FsPathResponseRunner>( + op, null, new RenewerParam(renewer)) { + @Override + Token decodeResponse(Map json) + throws IOException { + return JsonUtil.toDelegationToken(json); + } + }.run(); token.setService(tokenServiceName); return token; } @@ -1041,19 +1085,22 @@ public class WebHdfsFileSystem extends FileSystem public synchronized long renewDelegationToken(final Token token ) throws IOException { final HttpOpParam.Op op = PutOpParam.Op.RENEWDELEGATIONTOKEN; - TokenArgumentParam dtargParam = new TokenArgumentParam( - token.encodeToUrlString()); - final Map m = run(op, null, dtargParam); - return (Long) m.get("long"); + return new FsPathResponseRunner(op, null, + new TokenArgumentParam(token.encodeToUrlString())) { + @Override + Long decodeResponse(Map json) throws IOException { + return (Long) json.get("long"); + } + }.run(); } @Override public synchronized void cancelDelegationToken(final Token token ) throws IOException { final HttpOpParam.Op op = PutOpParam.Op.CANCELDELEGATIONTOKEN; - TokenArgumentParam dtargParam = new TokenArgumentParam( - token.encodeToUrlString()); - run(op, null, dtargParam); + new FsPathRunner(op, null, + new TokenArgumentParam(token.encodeToUrlString()) + ).run(); } @Override @@ -1071,9 +1118,14 @@ public class WebHdfsFileSystem extends FileSystem statistics.incrementReadOps(1); final HttpOpParam.Op op = GetOpParam.Op.GET_BLOCK_LOCATIONS; - final Map m = run(op, p, new OffsetParam(offset), - new LengthParam(length)); - return DFSUtil.locatedBlocks2Locations(JsonUtil.toLocatedBlocks(m)); + return new FsPathResponseRunner(op, p, + new OffsetParam(offset), new LengthParam(length)) { + @Override + BlockLocation[] decodeResponse(Map json) throws IOException { + return DFSUtil.locatedBlocks2Locations( + JsonUtil.toLocatedBlocks(json)); + } + }.run(); } @Override @@ -1081,8 +1133,12 @@ public class WebHdfsFileSystem extends FileSystem statistics.incrementReadOps(1); final HttpOpParam.Op op = GetOpParam.Op.GETCONTENTSUMMARY; - final Map m = run(op, p); - return JsonUtil.toContentSummary(m); + return new FsPathResponseRunner(op, p) { + @Override + ContentSummary decodeResponse(Map json) { + return JsonUtil.toContentSummary(json); + } + }.run(); } @Override @@ -1091,8 +1147,12 @@ public class WebHdfsFileSystem extends FileSystem statistics.incrementReadOps(1); final HttpOpParam.Op op = GetOpParam.Op.GETFILECHECKSUM; - final Map m = run(op, p); - return JsonUtil.toMD5MD5CRC32FileChecksum(m); + return new FsPathResponseRunner(op, p) { + @Override + MD5MD5CRC32FileChecksum decodeResponse(Map json) throws IOException { + return JsonUtil.toMD5MD5CRC32FileChecksum(json); + } + }.run(); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java index 50b30862555..f4c24ffda90 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java @@ -102,7 +102,7 @@ public abstract class HttpOpParam & HttpOpParam.Op> @Override public boolean getDoOutput() { - return op.getDoOutput(); + return false; } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java index 2f092cc2cf0..369285dc2b2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java @@ -94,10 +94,4 @@ public class WebHdfsTestUtil { Assert.assertEquals(expectedResponseCode, conn.getResponseCode()); return WebHdfsFileSystem.jsonParse(conn, false); } - - public static FSDataOutputStream write(final WebHdfsFileSystem webhdfs, - final HttpOpParam.Op op, final HttpURLConnection conn, - final int bufferSize) throws IOException { - return webhdfs.write(op, conn, bufferSize); - } }