diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 19ba987faf9..2bde9a5dfd4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1111,6 +1111,10 @@ Release 0.23.0 - Unreleased file or creating a file without specifying the replication parameter. (szetszwo) + HDFS-2453. Fix http response code for partial content in webhdfs, added + getDefaultBlockSize() and getDefaultReplication() in WebHdfsFileSystem + and cleared content type in ExceptionHandler. (szetszwo) + BREAKDOWN OF HDFS-1073 SUBTASKS HDFS-1521. Persist transaction ID on disk between NN restarts. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java index e2e8bf37c42..b364e6c25db 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java @@ -22,10 +22,13 @@ import java.io.IOException; import java.io.InputStream; import java.net.HttpURLConnection; +import java.net.MalformedURLException; import java.net.URL; +import java.util.StringTokenizer; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.hdfs.server.namenode.StreamFile; +import org.apache.hadoop.hdfs.web.resources.OffsetParam; /** * To support HTTP byte streams, a new connection to an HTTP server needs to be @@ -42,6 +45,8 @@ public class ByteRangeInputStream extends FSInputStream { */ static class URLOpener { protected URL url; + /** The url with offset parameter */ + private URL offsetUrl; public URLOpener(URL u) { url = u; @@ -54,12 +59,55 @@ public void setURL(URL u) { public URL getURL() { return url; } - - public HttpURLConnection openConnection() throws IOException { - return (HttpURLConnection)url.openConnection(); + + HttpURLConnection openConnection() throws IOException { + return (HttpURLConnection)offsetUrl.openConnection(); + } + + private HttpURLConnection openConnection(final long offset) throws IOException { + offsetUrl = offset == 0L? url: new URL(url + "&" + new OffsetParam(offset)); + final HttpURLConnection conn = openConnection(); + conn.setRequestMethod("GET"); + if (offset != 0L) { + conn.setRequestProperty("Range", "bytes=" + offset + "-"); + } + return conn; } } + static private final String OFFSET_PARAM_PREFIX = OffsetParam.NAME + "="; + + /** Remove offset parameter, if there is any, from the url */ + static URL removeOffsetParam(final URL url) throws MalformedURLException { + String query = url.getQuery(); + if (query == null) { + return url; + } + final String lower = query.toLowerCase(); + if (!lower.startsWith(OFFSET_PARAM_PREFIX) + && !lower.contains("&" + OFFSET_PARAM_PREFIX)) { + return url; + } + + //rebuild query + StringBuilder b = null; + for(final StringTokenizer st = new StringTokenizer(query, "&"); + st.hasMoreTokens();) { + final String token = st.nextToken(); + if (!token.toLowerCase().startsWith(OFFSET_PARAM_PREFIX)) { + if (b == null) { + b = new StringBuilder("?").append(token); + } else { + b.append('&').append(token); + } + } + } + query = b == null? "": b.toString(); + + final String urlStr = url.toString(); + return new URL(urlStr.substring(0, urlStr.indexOf('?')) + query); + } + enum StreamStatus { NORMAL, SEEK } @@ -95,12 +143,8 @@ private InputStream getInputStream() throws IOException { final URLOpener opener = (resolvedURL.getURL() == null) ? originalURL : resolvedURL; - final HttpURLConnection connection = opener.openConnection(); + final HttpURLConnection connection = opener.openConnection(startPos); try { - connection.setRequestMethod("GET"); - if (startPos != 0) { - connection.setRequestProperty("Range", "bytes="+startPos+"-"); - } connection.connect(); final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH); filelength = (cl == null) ? -1 : Long.parseLong(cl); @@ -125,7 +169,7 @@ private InputStream getInputStream() throws IOException { throw new IOException("HTTP_OK expected, received " + respCode); } - resolvedURL.setURL(connection.getURL()); + resolvedURL.setURL(removeOffsetParam(connection.getURL())); status = StreamStatus.NORMAL; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java index aaa1fd0173b..3d3a208fcea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java @@ -102,7 +102,7 @@ public Response put( final ReplicationParam replication, @QueryParam(BlockSizeParam.NAME) @DefaultValue(BlockSizeParam.DEFAULT) final BlockSizeParam blockSize - ) throws IOException, URISyntaxException, InterruptedException { + ) throws IOException, InterruptedException { if (LOG.isTraceEnabled()) { LOG.trace(op + ": " + path + ", ugi=" + ugi @@ -162,7 +162,7 @@ public Response post( final PostOpParam op, @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT) final BufferSizeParam bufferSize - ) throws IOException, URISyntaxException, InterruptedException { + ) throws IOException, InterruptedException { if (LOG.isTraceEnabled()) { LOG.trace(op + ": " + path + ", ugi=" + ugi @@ -216,7 +216,7 @@ public Response get( final LengthParam length, @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT) final BufferSizeParam bufferSize - ) throws IOException, URISyntaxException, InterruptedException { + ) throws IOException, InterruptedException { if (LOG.isTraceEnabled()) { LOG.trace(op + ": " + path + ", ugi=" + ugi @@ -255,7 +255,11 @@ public void write(final OutputStream out) throws IOException { } } }; - return Response.ok(streaming).type(MediaType.APPLICATION_OCTET_STREAM).build(); + + final int status = offset.getValue() == 0? + HttpServletResponse.SC_OK: HttpServletResponse.SC_PARTIAL_CONTENT; + return Response.status(status).entity(streaming).type( + MediaType.APPLICATION_OCTET_STREAM).build(); } case GETFILECHECKSUM: { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java index a039a97fb82..b8e7e7e174f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java @@ -31,6 +31,8 @@ import java.util.List; import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.ContentSummary; @@ -44,6 +46,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.ByteRangeInputStream; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HftpFileSystem; import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; @@ -86,6 +89,7 @@ /** A FileSystem for HDFS over the web. */ public class WebHdfsFileSystem extends HftpFileSystem { + public static final Log LOG = LogFactory.getLog(WebHdfsFileSystem.class); /** File System URI: {SCHEME}://namenode:port/path/to/file */ public static final String SCHEME = "webhdfs"; /** Http URI: http://namenode:port/{PATH_PREFIX}/path/to/file */ @@ -340,6 +344,18 @@ public void setTimes(final Path p, final long mtime, final long atime run(op, p, new ModificationTimeParam(mtime), new AccessTimeParam(atime)); } + @Override + public long getDefaultBlockSize() { + return getConf().getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, + DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT); + } + + @Override + public short getDefaultReplication() { + return (short)getConf().getInt(DFSConfigKeys.DFS_REPLICATION_KEY, + DFSConfigKeys.DFS_REPLICATION_DEFAULT); + } + private FSDataOutputStream write(final HttpOpParam.Op op, final HttpURLConnection conn, final int bufferSize) throws IOException { return new FSDataOutputStream(new BufferedOutputStream( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ExceptionHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ExceptionHandler.java index fdc66f3af2e..bd0003ef0e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ExceptionHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ExceptionHandler.java @@ -20,6 +20,8 @@ import java.io.FileNotFoundException; import java.io.IOException; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.ext.ExceptionMapper; @@ -36,12 +38,17 @@ public class ExceptionHandler implements ExceptionMapper { public static final Log LOG = LogFactory.getLog(ExceptionHandler.class); + private @Context HttpServletResponse response; + @Override public Response toResponse(Exception e) { if (LOG.isTraceEnabled()) { LOG.trace("GOT EXCEPITION", e); } + //clear content type + response.setContentType(null); + //Convert exception if (e instanceof ParamException) { final ParamException paramexception = (ParamException)e; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteRangeInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteRangeInputStream.java index e5846abe797..3be88d3e5cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteRangeInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteRangeInputStream.java @@ -35,28 +35,29 @@ import org.junit.Test; class MockHttpURLConnection extends HttpURLConnection { - private int responseCode = -1; - URL m; - public MockHttpURLConnection(URL u) { super(u); - m = u; } + @Override public boolean usingProxy(){ return false; } + @Override public void disconnect() { } - public void connect() throws IOException { + @Override + public void connect() { } + @Override public InputStream getInputStream() throws IOException { return new ByteArrayInputStream("asdf".getBytes()); } + @Override public URL getURL() { URL u = null; try { @@ -67,6 +68,7 @@ public URL getURL() { return u; } + @Override public int getResponseCode() { if (responseCode != -1) { return responseCode; @@ -82,10 +84,45 @@ public int getResponseCode() { public void setResponseCode(int resCode) { responseCode = resCode; } - } public class TestByteRangeInputStream { + @Test + public void testRemoveOffset() throws IOException { + { //no offset + String s = "http://test/Abc?Length=99"; + assertEquals(s, ByteRangeInputStream.removeOffsetParam(new URL(s)).toString()); + } + + { //no parameters + String s = "http://test/Abc"; + assertEquals(s, ByteRangeInputStream.removeOffsetParam(new URL(s)).toString()); + } + + { //offset as first parameter + String s = "http://test/Abc?offset=10&Length=99"; + assertEquals("http://test/Abc?Length=99", + ByteRangeInputStream.removeOffsetParam(new URL(s)).toString()); + } + + { //offset as second parameter + String s = "http://test/Abc?op=read&OFFset=10&Length=99"; + assertEquals("http://test/Abc?op=read&Length=99", + ByteRangeInputStream.removeOffsetParam(new URL(s)).toString()); + } + + { //offset as last parameter + String s = "http://test/Abc?Length=99&offset=10"; + assertEquals("http://test/Abc?Length=99", + ByteRangeInputStream.removeOffsetParam(new URL(s)).toString()); + } + + { //offset as the only parameter + String s = "http://test/Abc?offset=10"; + assertEquals("http://test/Abc", + ByteRangeInputStream.removeOffsetParam(new URL(s)).toString()); + } + } @Test public void testByteRange() throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java index 7cc938709b1..f60d8f1939e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java @@ -123,6 +123,8 @@ public void testMkdirsFailsForSubdirectoryOfExistingFile() throws Exception { } } + //the following are new tests (i.e. not over-riding the super class methods) + public void testGetFileBlockLocations() throws IOException { final String f = "/test/testGetFileBlockLocations"; createFile(path(f)); @@ -172,4 +174,45 @@ public void testOpenNonExistFile() throws IOException { WebHdfsFileSystem.LOG.info("This is expected.", fnfe); } } + + public void testSeek() throws IOException { + final Path p = new Path("/test/testSeek"); + createFile(p); + + final int one_third = data.length/3; + final int two_third = one_third*2; + + { //test seek + final int offset = one_third; + final int len = data.length - offset; + final byte[] buf = new byte[len]; + + final FSDataInputStream in = fs.open(p); + in.seek(offset); + + //read all remaining data + in.readFully(buf); + in.close(); + + for (int i = 0; i < buf.length; i++) { + assertEquals("Position " + i + ", offset=" + offset + ", length=" + len, + data[i + offset], buf[i]); + } + } + + { //test position read (read the data after the two_third location) + final int offset = two_third; + final int len = data.length - offset; + final byte[] buf = new byte[len]; + + final FSDataInputStream in = fs.open(p); + in.readFully(offset, buf); + in.close(); + + for (int i = 0; i < buf.length; i++) { + assertEquals("Position " + i + ", offset=" + offset + ", length=" + len, + data[i + offset], buf[i]); + } + } + } }