svn merge -c 1186508 from trunk for HDFS-2453.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1189492 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
01671cd290
commit
4f9ea3cc2e
|
@ -1111,6 +1111,10 @@ Release 0.23.0 - Unreleased
|
||||||
file or creating a file without specifying the replication parameter.
|
file or creating a file without specifying the replication parameter.
|
||||||
(szetszwo)
|
(szetszwo)
|
||||||
|
|
||||||
|
HDFS-2453. Fix http response code for partial content in webhdfs, added
|
||||||
|
getDefaultBlockSize() and getDefaultReplication() in WebHdfsFileSystem
|
||||||
|
and cleared content type in ExceptionHandler. (szetszwo)
|
||||||
|
|
||||||
BREAKDOWN OF HDFS-1073 SUBTASKS
|
BREAKDOWN OF HDFS-1073 SUBTASKS
|
||||||
|
|
||||||
HDFS-1521. Persist transaction ID on disk between NN restarts.
|
HDFS-1521. Persist transaction ID on disk between NN restarts.
|
||||||
|
|
|
@ -22,10 +22,13 @@ import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.net.HttpURLConnection;
|
import java.net.HttpURLConnection;
|
||||||
|
import java.net.MalformedURLException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
import java.util.StringTokenizer;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FSInputStream;
|
import org.apache.hadoop.fs.FSInputStream;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
|
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
|
||||||
|
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* To support HTTP byte streams, a new connection to an HTTP server needs to be
|
* To support HTTP byte streams, a new connection to an HTTP server needs to be
|
||||||
|
@ -42,6 +45,8 @@ public class ByteRangeInputStream extends FSInputStream {
|
||||||
*/
|
*/
|
||||||
static class URLOpener {
|
static class URLOpener {
|
||||||
protected URL url;
|
protected URL url;
|
||||||
|
/** The url with offset parameter */
|
||||||
|
private URL offsetUrl;
|
||||||
|
|
||||||
public URLOpener(URL u) {
|
public URLOpener(URL u) {
|
||||||
url = u;
|
url = u;
|
||||||
|
@ -54,12 +59,55 @@ public class ByteRangeInputStream extends FSInputStream {
|
||||||
public URL getURL() {
|
public URL getURL() {
|
||||||
return url;
|
return url;
|
||||||
}
|
}
|
||||||
|
|
||||||
public HttpURLConnection openConnection() throws IOException {
|
HttpURLConnection openConnection() throws IOException {
|
||||||
return (HttpURLConnection)url.openConnection();
|
return (HttpURLConnection)offsetUrl.openConnection();
|
||||||
|
}
|
||||||
|
|
||||||
|
private HttpURLConnection openConnection(final long offset) throws IOException {
|
||||||
|
offsetUrl = offset == 0L? url: new URL(url + "&" + new OffsetParam(offset));
|
||||||
|
final HttpURLConnection conn = openConnection();
|
||||||
|
conn.setRequestMethod("GET");
|
||||||
|
if (offset != 0L) {
|
||||||
|
conn.setRequestProperty("Range", "bytes=" + offset + "-");
|
||||||
|
}
|
||||||
|
return conn;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static private final String OFFSET_PARAM_PREFIX = OffsetParam.NAME + "=";
|
||||||
|
|
||||||
|
/** Remove offset parameter, if there is any, from the url */
|
||||||
|
static URL removeOffsetParam(final URL url) throws MalformedURLException {
|
||||||
|
String query = url.getQuery();
|
||||||
|
if (query == null) {
|
||||||
|
return url;
|
||||||
|
}
|
||||||
|
final String lower = query.toLowerCase();
|
||||||
|
if (!lower.startsWith(OFFSET_PARAM_PREFIX)
|
||||||
|
&& !lower.contains("&" + OFFSET_PARAM_PREFIX)) {
|
||||||
|
return url;
|
||||||
|
}
|
||||||
|
|
||||||
|
//rebuild query
|
||||||
|
StringBuilder b = null;
|
||||||
|
for(final StringTokenizer st = new StringTokenizer(query, "&");
|
||||||
|
st.hasMoreTokens();) {
|
||||||
|
final String token = st.nextToken();
|
||||||
|
if (!token.toLowerCase().startsWith(OFFSET_PARAM_PREFIX)) {
|
||||||
|
if (b == null) {
|
||||||
|
b = new StringBuilder("?").append(token);
|
||||||
|
} else {
|
||||||
|
b.append('&').append(token);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
query = b == null? "": b.toString();
|
||||||
|
|
||||||
|
final String urlStr = url.toString();
|
||||||
|
return new URL(urlStr.substring(0, urlStr.indexOf('?')) + query);
|
||||||
|
}
|
||||||
|
|
||||||
enum StreamStatus {
|
enum StreamStatus {
|
||||||
NORMAL, SEEK
|
NORMAL, SEEK
|
||||||
}
|
}
|
||||||
|
@ -95,12 +143,8 @@ public class ByteRangeInputStream extends FSInputStream {
|
||||||
final URLOpener opener =
|
final URLOpener opener =
|
||||||
(resolvedURL.getURL() == null) ? originalURL : resolvedURL;
|
(resolvedURL.getURL() == null) ? originalURL : resolvedURL;
|
||||||
|
|
||||||
final HttpURLConnection connection = opener.openConnection();
|
final HttpURLConnection connection = opener.openConnection(startPos);
|
||||||
try {
|
try {
|
||||||
connection.setRequestMethod("GET");
|
|
||||||
if (startPos != 0) {
|
|
||||||
connection.setRequestProperty("Range", "bytes="+startPos+"-");
|
|
||||||
}
|
|
||||||
connection.connect();
|
connection.connect();
|
||||||
final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
|
final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
|
||||||
filelength = (cl == null) ? -1 : Long.parseLong(cl);
|
filelength = (cl == null) ? -1 : Long.parseLong(cl);
|
||||||
|
@ -125,7 +169,7 @@ public class ByteRangeInputStream extends FSInputStream {
|
||||||
throw new IOException("HTTP_OK expected, received " + respCode);
|
throw new IOException("HTTP_OK expected, received " + respCode);
|
||||||
}
|
}
|
||||||
|
|
||||||
resolvedURL.setURL(connection.getURL());
|
resolvedURL.setURL(removeOffsetParam(connection.getURL()));
|
||||||
status = StreamStatus.NORMAL;
|
status = StreamStatus.NORMAL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -102,7 +102,7 @@ public class DatanodeWebHdfsMethods {
|
||||||
final ReplicationParam replication,
|
final ReplicationParam replication,
|
||||||
@QueryParam(BlockSizeParam.NAME) @DefaultValue(BlockSizeParam.DEFAULT)
|
@QueryParam(BlockSizeParam.NAME) @DefaultValue(BlockSizeParam.DEFAULT)
|
||||||
final BlockSizeParam blockSize
|
final BlockSizeParam blockSize
|
||||||
) throws IOException, URISyntaxException, InterruptedException {
|
) throws IOException, InterruptedException {
|
||||||
|
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace(op + ": " + path + ", ugi=" + ugi
|
LOG.trace(op + ": " + path + ", ugi=" + ugi
|
||||||
|
@ -162,7 +162,7 @@ public class DatanodeWebHdfsMethods {
|
||||||
final PostOpParam op,
|
final PostOpParam op,
|
||||||
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
|
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
|
||||||
final BufferSizeParam bufferSize
|
final BufferSizeParam bufferSize
|
||||||
) throws IOException, URISyntaxException, InterruptedException {
|
) throws IOException, InterruptedException {
|
||||||
|
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace(op + ": " + path + ", ugi=" + ugi
|
LOG.trace(op + ": " + path + ", ugi=" + ugi
|
||||||
|
@ -216,7 +216,7 @@ public class DatanodeWebHdfsMethods {
|
||||||
final LengthParam length,
|
final LengthParam length,
|
||||||
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
|
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
|
||||||
final BufferSizeParam bufferSize
|
final BufferSizeParam bufferSize
|
||||||
) throws IOException, URISyntaxException, InterruptedException {
|
) throws IOException, InterruptedException {
|
||||||
|
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace(op + ": " + path + ", ugi=" + ugi
|
LOG.trace(op + ": " + path + ", ugi=" + ugi
|
||||||
|
@ -255,7 +255,11 @@ public class DatanodeWebHdfsMethods {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
return Response.ok(streaming).type(MediaType.APPLICATION_OCTET_STREAM).build();
|
|
||||||
|
final int status = offset.getValue() == 0?
|
||||||
|
HttpServletResponse.SC_OK: HttpServletResponse.SC_PARTIAL_CONTENT;
|
||||||
|
return Response.status(status).entity(streaming).type(
|
||||||
|
MediaType.APPLICATION_OCTET_STREAM).build();
|
||||||
}
|
}
|
||||||
case GETFILECHECKSUM:
|
case GETFILECHECKSUM:
|
||||||
{
|
{
|
||||||
|
|
|
@ -31,6 +31,8 @@ import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.BlockLocation;
|
import org.apache.hadoop.fs.BlockLocation;
|
||||||
import org.apache.hadoop.fs.ContentSummary;
|
import org.apache.hadoop.fs.ContentSummary;
|
||||||
|
@ -44,6 +46,7 @@ import org.apache.hadoop.fs.ParentNotDirectoryException;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.ByteRangeInputStream;
|
import org.apache.hadoop.hdfs.ByteRangeInputStream;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.HftpFileSystem;
|
import org.apache.hadoop.hdfs.HftpFileSystem;
|
||||||
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
||||||
|
@ -86,6 +89,7 @@ import org.mortbay.util.ajax.JSON;
|
||||||
|
|
||||||
/** A FileSystem for HDFS over the web. */
|
/** A FileSystem for HDFS over the web. */
|
||||||
public class WebHdfsFileSystem extends HftpFileSystem {
|
public class WebHdfsFileSystem extends HftpFileSystem {
|
||||||
|
public static final Log LOG = LogFactory.getLog(WebHdfsFileSystem.class);
|
||||||
/** File System URI: {SCHEME}://namenode:port/path/to/file */
|
/** File System URI: {SCHEME}://namenode:port/path/to/file */
|
||||||
public static final String SCHEME = "webhdfs";
|
public static final String SCHEME = "webhdfs";
|
||||||
/** Http URI: http://namenode:port/{PATH_PREFIX}/path/to/file */
|
/** Http URI: http://namenode:port/{PATH_PREFIX}/path/to/file */
|
||||||
|
@ -340,6 +344,18 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
||||||
run(op, p, new ModificationTimeParam(mtime), new AccessTimeParam(atime));
|
run(op, p, new ModificationTimeParam(mtime), new AccessTimeParam(atime));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getDefaultBlockSize() {
|
||||||
|
return getConf().getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
|
||||||
|
DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public short getDefaultReplication() {
|
||||||
|
return (short)getConf().getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
|
||||||
|
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
private FSDataOutputStream write(final HttpOpParam.Op op,
|
private FSDataOutputStream write(final HttpOpParam.Op op,
|
||||||
final HttpURLConnection conn, final int bufferSize) throws IOException {
|
final HttpURLConnection conn, final int bufferSize) throws IOException {
|
||||||
return new FSDataOutputStream(new BufferedOutputStream(
|
return new FSDataOutputStream(new BufferedOutputStream(
|
||||||
|
|
|
@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.web.resources;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
import javax.ws.rs.core.Context;
|
||||||
import javax.ws.rs.core.MediaType;
|
import javax.ws.rs.core.MediaType;
|
||||||
import javax.ws.rs.core.Response;
|
import javax.ws.rs.core.Response;
|
||||||
import javax.ws.rs.ext.ExceptionMapper;
|
import javax.ws.rs.ext.ExceptionMapper;
|
||||||
|
@ -36,12 +38,17 @@ import com.sun.jersey.api.ParamException;
|
||||||
public class ExceptionHandler implements ExceptionMapper<Exception> {
|
public class ExceptionHandler implements ExceptionMapper<Exception> {
|
||||||
public static final Log LOG = LogFactory.getLog(ExceptionHandler.class);
|
public static final Log LOG = LogFactory.getLog(ExceptionHandler.class);
|
||||||
|
|
||||||
|
private @Context HttpServletResponse response;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Response toResponse(Exception e) {
|
public Response toResponse(Exception e) {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("GOT EXCEPITION", e);
|
LOG.trace("GOT EXCEPITION", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//clear content type
|
||||||
|
response.setContentType(null);
|
||||||
|
|
||||||
//Convert exception
|
//Convert exception
|
||||||
if (e instanceof ParamException) {
|
if (e instanceof ParamException) {
|
||||||
final ParamException paramexception = (ParamException)e;
|
final ParamException paramexception = (ParamException)e;
|
||||||
|
|
|
@ -35,28 +35,29 @@ import org.apache.hadoop.hdfs.ByteRangeInputStream.URLOpener;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
class MockHttpURLConnection extends HttpURLConnection {
|
class MockHttpURLConnection extends HttpURLConnection {
|
||||||
private int responseCode = -1;
|
|
||||||
URL m;
|
|
||||||
|
|
||||||
public MockHttpURLConnection(URL u) {
|
public MockHttpURLConnection(URL u) {
|
||||||
super(u);
|
super(u);
|
||||||
m = u;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public boolean usingProxy(){
|
public boolean usingProxy(){
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void disconnect() {
|
public void disconnect() {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void connect() throws IOException {
|
@Override
|
||||||
|
public void connect() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public InputStream getInputStream() throws IOException {
|
public InputStream getInputStream() throws IOException {
|
||||||
return new ByteArrayInputStream("asdf".getBytes());
|
return new ByteArrayInputStream("asdf".getBytes());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public URL getURL() {
|
public URL getURL() {
|
||||||
URL u = null;
|
URL u = null;
|
||||||
try {
|
try {
|
||||||
|
@ -67,6 +68,7 @@ class MockHttpURLConnection extends HttpURLConnection {
|
||||||
return u;
|
return u;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public int getResponseCode() {
|
public int getResponseCode() {
|
||||||
if (responseCode != -1) {
|
if (responseCode != -1) {
|
||||||
return responseCode;
|
return responseCode;
|
||||||
|
@ -82,10 +84,45 @@ class MockHttpURLConnection extends HttpURLConnection {
|
||||||
public void setResponseCode(int resCode) {
|
public void setResponseCode(int resCode) {
|
||||||
responseCode = resCode;
|
responseCode = resCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public class TestByteRangeInputStream {
|
public class TestByteRangeInputStream {
|
||||||
|
@Test
|
||||||
|
public void testRemoveOffset() throws IOException {
|
||||||
|
{ //no offset
|
||||||
|
String s = "http://test/Abc?Length=99";
|
||||||
|
assertEquals(s, ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
{ //no parameters
|
||||||
|
String s = "http://test/Abc";
|
||||||
|
assertEquals(s, ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
{ //offset as first parameter
|
||||||
|
String s = "http://test/Abc?offset=10&Length=99";
|
||||||
|
assertEquals("http://test/Abc?Length=99",
|
||||||
|
ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
{ //offset as second parameter
|
||||||
|
String s = "http://test/Abc?op=read&OFFset=10&Length=99";
|
||||||
|
assertEquals("http://test/Abc?op=read&Length=99",
|
||||||
|
ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
{ //offset as last parameter
|
||||||
|
String s = "http://test/Abc?Length=99&offset=10";
|
||||||
|
assertEquals("http://test/Abc?Length=99",
|
||||||
|
ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
{ //offset as the only parameter
|
||||||
|
String s = "http://test/Abc?offset=10";
|
||||||
|
assertEquals("http://test/Abc",
|
||||||
|
ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testByteRange() throws IOException {
|
public void testByteRange() throws IOException {
|
||||||
|
|
|
@ -123,6 +123,8 @@ public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//the following are new tests (i.e. not over-riding the super class methods)
|
||||||
|
|
||||||
public void testGetFileBlockLocations() throws IOException {
|
public void testGetFileBlockLocations() throws IOException {
|
||||||
final String f = "/test/testGetFileBlockLocations";
|
final String f = "/test/testGetFileBlockLocations";
|
||||||
createFile(path(f));
|
createFile(path(f));
|
||||||
|
@ -172,4 +174,45 @@ public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest {
|
||||||
WebHdfsFileSystem.LOG.info("This is expected.", fnfe);
|
WebHdfsFileSystem.LOG.info("This is expected.", fnfe);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testSeek() throws IOException {
|
||||||
|
final Path p = new Path("/test/testSeek");
|
||||||
|
createFile(p);
|
||||||
|
|
||||||
|
final int one_third = data.length/3;
|
||||||
|
final int two_third = one_third*2;
|
||||||
|
|
||||||
|
{ //test seek
|
||||||
|
final int offset = one_third;
|
||||||
|
final int len = data.length - offset;
|
||||||
|
final byte[] buf = new byte[len];
|
||||||
|
|
||||||
|
final FSDataInputStream in = fs.open(p);
|
||||||
|
in.seek(offset);
|
||||||
|
|
||||||
|
//read all remaining data
|
||||||
|
in.readFully(buf);
|
||||||
|
in.close();
|
||||||
|
|
||||||
|
for (int i = 0; i < buf.length; i++) {
|
||||||
|
assertEquals("Position " + i + ", offset=" + offset + ", length=" + len,
|
||||||
|
data[i + offset], buf[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
{ //test position read (read the data after the two_third location)
|
||||||
|
final int offset = two_third;
|
||||||
|
final int len = data.length - offset;
|
||||||
|
final byte[] buf = new byte[len];
|
||||||
|
|
||||||
|
final FSDataInputStream in = fs.open(p);
|
||||||
|
in.readFully(offset, buf);
|
||||||
|
in.close();
|
||||||
|
|
||||||
|
for (int i = 0; i < buf.length; i++) {
|
||||||
|
assertEquals("Position " + i + ", offset=" + offset + ", length=" + len,
|
||||||
|
data[i + offset], buf[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue