HDFS-2527. WebHdfs: remove the use of "Range" header in Open; use ugi username if renewer parameter is null in GetDelegationToken; response OK when setting replication for non-files; rename GETFILEBLOCKLOCATIONS to GET_BLOCK_LOCATIONS and state that it is a private unstable API; replace isDirectory and isSymlink with enum {FILE, DIRECTORY, SYMLINK} in HdfsFileStatus JSON object.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1197329 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
dae055bb70
commit
6afe3e0d22
|
@ -91,8 +91,6 @@ Trunk (unreleased changes)
|
|||
HDFS-2526. (Client)NamenodeProtocolTranslatorR23 do not need to keep a
|
||||
reference to rpcProxyWithoutRetry (atm)
|
||||
|
||||
HDFS-2416. distcp with a webhdfs uri on a secure cluster fails. (jitendra)
|
||||
|
||||
Release 0.23.1 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -113,6 +111,15 @@ Release 0.23.1 - UNRELEASED
|
|||
|
||||
BUG FIXES
|
||||
|
||||
HDFS-2416. distcp with a webhdfs uri on a secure cluster fails. (jitendra)
|
||||
|
||||
HDFS-2527. WebHdfs: remove the use of "Range" header in Open; use ugi
|
||||
username if renewer parameter is null in GetDelegationToken; response OK
|
||||
when setting replication for non-files; rename GETFILEBLOCKLOCATIONS to
|
||||
GET_BLOCK_LOCATIONS and state that it is a private unstable API; replace
|
||||
isDirectory and isSymlink with enum {FILE, DIRECTORY, SYMLINK} in
|
||||
HdfsFileStatus JSON object. (szetszwo)
|
||||
|
||||
Release 0.23.0 - 2011-11-01
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -18,17 +18,13 @@
|
|||
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
import java.util.StringTokenizer;
|
||||
|
||||
import org.apache.hadoop.fs.FSInputStream;
|
||||
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
|
||||
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
|
||||
|
||||
/**
|
||||
* To support HTTP byte streams, a new connection to an HTTP server needs to be
|
||||
|
@ -37,16 +33,14 @@ import org.apache.hadoop.hdfs.web.resources.OffsetParam;
|
|||
* is made on the successive read(). The normal input stream functions are
|
||||
* connected to the currently active input stream.
|
||||
*/
|
||||
public class ByteRangeInputStream extends FSInputStream {
|
||||
public abstract class ByteRangeInputStream extends FSInputStream {
|
||||
|
||||
/**
|
||||
* This class wraps a URL and provides method to open connection.
|
||||
* It can be overridden to change how a connection is opened.
|
||||
*/
|
||||
public static class URLOpener {
|
||||
public static abstract class URLOpener {
|
||||
protected URL url;
|
||||
/** The url with offset parameter */
|
||||
protected URL offsetUrl;
|
||||
|
||||
public URLOpener(URL u) {
|
||||
url = u;
|
||||
|
@ -60,52 +54,9 @@ public class ByteRangeInputStream extends FSInputStream {
|
|||
return url;
|
||||
}
|
||||
|
||||
protected HttpURLConnection openConnection() throws IOException {
|
||||
return (HttpURLConnection)offsetUrl.openConnection();
|
||||
}
|
||||
protected abstract HttpURLConnection openConnection() throws IOException;
|
||||
|
||||
private HttpURLConnection openConnection(final long offset) throws IOException {
|
||||
offsetUrl = offset == 0L? url: new URL(url + "&" + new OffsetParam(offset));
|
||||
final HttpURLConnection conn = openConnection();
|
||||
conn.setRequestMethod("GET");
|
||||
if (offset != 0L) {
|
||||
conn.setRequestProperty("Range", "bytes=" + offset + "-");
|
||||
}
|
||||
return conn;
|
||||
}
|
||||
}
|
||||
|
||||
static private final String OFFSET_PARAM_PREFIX = OffsetParam.NAME + "=";
|
||||
|
||||
/** Remove offset parameter, if there is any, from the url */
|
||||
static URL removeOffsetParam(final URL url) throws MalformedURLException {
|
||||
String query = url.getQuery();
|
||||
if (query == null) {
|
||||
return url;
|
||||
}
|
||||
final String lower = query.toLowerCase();
|
||||
if (!lower.startsWith(OFFSET_PARAM_PREFIX)
|
||||
&& !lower.contains("&" + OFFSET_PARAM_PREFIX)) {
|
||||
return url;
|
||||
}
|
||||
|
||||
//rebuild query
|
||||
StringBuilder b = null;
|
||||
for(final StringTokenizer st = new StringTokenizer(query, "&");
|
||||
st.hasMoreTokens();) {
|
||||
final String token = st.nextToken();
|
||||
if (!token.toLowerCase().startsWith(OFFSET_PARAM_PREFIX)) {
|
||||
if (b == null) {
|
||||
b = new StringBuilder("?").append(token);
|
||||
} else {
|
||||
b.append('&').append(token);
|
||||
}
|
||||
}
|
||||
}
|
||||
query = b == null? "": b.toString();
|
||||
|
||||
final String urlStr = url.toString();
|
||||
return new URL(urlStr.substring(0, urlStr.indexOf('?')) + query);
|
||||
protected abstract HttpURLConnection openConnection(final long offset) throws IOException;
|
||||
}
|
||||
|
||||
enum StreamStatus {
|
||||
|
@ -120,11 +71,6 @@ public class ByteRangeInputStream extends FSInputStream {
|
|||
|
||||
StreamStatus status = StreamStatus.SEEK;
|
||||
|
||||
/** Create an input stream with the URL. */
|
||||
public ByteRangeInputStream(final URL url) {
|
||||
this(new URLOpener(url), new URLOpener(null));
|
||||
}
|
||||
|
||||
/**
|
||||
* Create with the specified URLOpeners. Original url is used to open the
|
||||
* stream for the first time. Resolved url is used in subsequent requests.
|
||||
|
@ -136,6 +82,12 @@ public class ByteRangeInputStream extends FSInputStream {
|
|||
this.resolvedURL = r;
|
||||
}
|
||||
|
||||
protected abstract void checkResponseCode(final HttpURLConnection connection
|
||||
) throws IOException;
|
||||
|
||||
protected abstract URL getResolvedUrl(final HttpURLConnection connection
|
||||
) throws IOException;
|
||||
|
||||
private InputStream getInputStream() throws IOException {
|
||||
if (status != StreamStatus.NORMAL) {
|
||||
|
||||
|
@ -150,32 +102,14 @@ public class ByteRangeInputStream extends FSInputStream {
|
|||
(resolvedURL.getURL() == null) ? originalURL : resolvedURL;
|
||||
|
||||
final HttpURLConnection connection = opener.openConnection(startPos);
|
||||
try {
|
||||
connection.connect();
|
||||
final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
|
||||
filelength = (cl == null) ? -1 : Long.parseLong(cl);
|
||||
if (HftpFileSystem.LOG.isDebugEnabled()) {
|
||||
HftpFileSystem.LOG.debug("filelength = " + filelength);
|
||||
}
|
||||
in = connection.getInputStream();
|
||||
} catch (FileNotFoundException fnfe) {
|
||||
throw fnfe;
|
||||
} catch (IOException ioe) {
|
||||
HftpFileSystem.throwIOExceptionFromConnection(connection, ioe);
|
||||
}
|
||||
|
||||
int respCode = connection.getResponseCode();
|
||||
if (startPos != 0 && respCode != HttpURLConnection.HTTP_PARTIAL) {
|
||||
// We asked for a byte range but did not receive a partial content
|
||||
// response...
|
||||
throw new IOException("HTTP_PARTIAL expected, received " + respCode);
|
||||
} else if (startPos == 0 && respCode != HttpURLConnection.HTTP_OK) {
|
||||
// We asked for all bytes from the beginning but didn't receive a 200
|
||||
// response (none of the other 2xx codes are valid here)
|
||||
throw new IOException("HTTP_OK expected, received " + respCode);
|
||||
}
|
||||
connection.connect();
|
||||
checkResponseCode(connection);
|
||||
|
||||
resolvedURL.setURL(removeOffsetParam(connection.getURL()));
|
||||
final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
|
||||
filelength = (cl == null) ? -1 : Long.parseLong(cl);
|
||||
in = connection.getInputStream();
|
||||
|
||||
resolvedURL.setURL(getResolvedUrl(connection));
|
||||
status = StreamStatus.NORMAL;
|
||||
}
|
||||
|
||||
|
|
|
@ -372,13 +372,66 @@ public class HftpFileSystem extends FileSystem
|
|||
return query;
|
||||
}
|
||||
|
||||
static class RangeHeaderUrlOpener extends ByteRangeInputStream.URLOpener {
|
||||
RangeHeaderUrlOpener(final URL url) {
|
||||
super(url);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected HttpURLConnection openConnection() throws IOException {
|
||||
return (HttpURLConnection)url.openConnection();
|
||||
}
|
||||
|
||||
/** Use HTTP Range header for specifying offset. */
|
||||
@Override
|
||||
protected HttpURLConnection openConnection(final long offset) throws IOException {
|
||||
final HttpURLConnection conn = openConnection();
|
||||
conn.setRequestMethod("GET");
|
||||
if (offset != 0L) {
|
||||
conn.setRequestProperty("Range", "bytes=" + offset + "-");
|
||||
}
|
||||
return conn;
|
||||
}
|
||||
}
|
||||
|
||||
static class RangeHeaderInputStream extends ByteRangeInputStream {
|
||||
RangeHeaderInputStream(RangeHeaderUrlOpener o, RangeHeaderUrlOpener r) {
|
||||
super(o, r);
|
||||
}
|
||||
|
||||
RangeHeaderInputStream(final URL url) {
|
||||
this(new RangeHeaderUrlOpener(url), new RangeHeaderUrlOpener(null));
|
||||
}
|
||||
|
||||
/** Expects HTTP_OK and HTTP_PARTIAL response codes. */
|
||||
@Override
|
||||
protected void checkResponseCode(final HttpURLConnection connection
|
||||
) throws IOException {
|
||||
final int code = connection.getResponseCode();
|
||||
if (startPos != 0 && code != HttpURLConnection.HTTP_PARTIAL) {
|
||||
// We asked for a byte range but did not receive a partial content
|
||||
// response...
|
||||
throw new IOException("HTTP_PARTIAL expected, received " + code);
|
||||
} else if (startPos == 0 && code != HttpURLConnection.HTTP_OK) {
|
||||
// We asked for all bytes from the beginning but didn't receive a 200
|
||||
// response (none of the other 2xx codes are valid here)
|
||||
throw new IOException("HTTP_OK expected, received " + code);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected URL getResolvedUrl(final HttpURLConnection connection) {
|
||||
return connection.getURL();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public FSDataInputStream open(Path f, int buffersize) throws IOException {
|
||||
f = f.makeQualified(getUri(), getWorkingDirectory());
|
||||
String path = "/data" + ServletUtil.encodePath(f.toUri().getPath());
|
||||
String query = addDelegationTokenParam("ugi=" + getEncodedUgiParameter());
|
||||
URL u = getNamenodeURL(path, query);
|
||||
return new FSDataInputStream(new ByteRangeInputStream(u));
|
||||
return new FSDataInputStream(new RangeHeaderInputStream(u));
|
||||
}
|
||||
|
||||
/** Class to parse and store a listing reply from the server. */
|
||||
|
|
|
@ -350,9 +350,7 @@ public class DatanodeWebHdfsMethods {
|
|||
}
|
||||
};
|
||||
|
||||
final int status = offset.getValue() == 0?
|
||||
HttpServletResponse.SC_OK: HttpServletResponse.SC_PARTIAL_CONTENT;
|
||||
return Response.status(status).entity(streaming).type(
|
||||
return Response.ok(streaming).type(
|
||||
MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
}
|
||||
case GETFILECHECKSUM:
|
||||
|
|
|
@ -42,8 +42,6 @@ import javax.ws.rs.QueryParam;
|
|||
import javax.ws.rs.core.Context;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.Response.ResponseBuilder;
|
||||
import javax.ws.rs.core.Response.Status;
|
||||
import javax.ws.rs.core.StreamingOutput;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -68,7 +66,6 @@ import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
|
|||
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.DelegationParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.DestinationParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
|
||||
|
@ -87,6 +84,7 @@ import org.apache.hadoop.hdfs.web.resources.RecursiveParam;
|
|||
import org.apache.hadoop.hdfs.web.resources.RenameOptionSetParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.RenewerParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.UriFsPathParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.UserParam;
|
||||
import org.apache.hadoop.net.NodeBase;
|
||||
|
@ -153,8 +151,7 @@ public class NamenodeWebHdfsMethods {
|
|||
final NameNode namenode, final UserGroupInformation ugi,
|
||||
final String renewer) throws IOException {
|
||||
final Credentials c = DelegationTokenSecretManager.createCredentials(
|
||||
namenode, ugi,
|
||||
renewer != null? renewer: request.getUserPrincipal().getName());
|
||||
namenode, ugi, renewer != null? renewer: ugi.getShortUserName());
|
||||
final Token<? extends TokenIdentifier> t = c.getAllTokens().iterator().next();
|
||||
t.setKind(WebHdfsFileSystem.TOKEN_KIND);
|
||||
SecurityUtil.setTokenService(t, namenode.getNameNodeAddress());
|
||||
|
@ -325,8 +322,7 @@ public class NamenodeWebHdfsMethods {
|
|||
{
|
||||
final boolean b = np.setReplication(fullpath, replication.getValue(conf));
|
||||
final String js = JsonUtil.toJsonString("boolean", b);
|
||||
final ResponseBuilder r = b? Response.ok(): Response.status(Status.FORBIDDEN);
|
||||
return r.entity(js).type(MediaType.APPLICATION_JSON).build();
|
||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
}
|
||||
case SETOWNER:
|
||||
{
|
||||
|
@ -509,7 +505,7 @@ public class NamenodeWebHdfsMethods {
|
|||
op.getValue(), offset.getValue(), offset, length, bufferSize);
|
||||
return Response.temporaryRedirect(uri).build();
|
||||
}
|
||||
case GETFILEBLOCKLOCATIONS:
|
||||
case GET_BLOCK_LOCATIONS:
|
||||
{
|
||||
final long offsetValue = offset.getValue();
|
||||
final Long lengthValue = length.getValue();
|
||||
|
|
|
@ -134,6 +134,14 @@ public class JsonUtil {
|
|||
return new FsPermission(Short.parseShort(s, 8));
|
||||
}
|
||||
|
||||
static enum PathType {
|
||||
FILE, DIRECTORY, SYMLINK;
|
||||
|
||||
static PathType valueOf(HdfsFileStatus status) {
|
||||
return status.isDir()? DIRECTORY: status.isSymlink()? SYMLINK: FILE;
|
||||
}
|
||||
}
|
||||
|
||||
/** Convert a HdfsFileStatus object to a Json string. */
|
||||
public static String toJsonString(final HdfsFileStatus status,
|
||||
boolean includeType) {
|
||||
|
@ -142,13 +150,12 @@ public class JsonUtil {
|
|||
}
|
||||
final Map<String, Object> m = new TreeMap<String, Object>();
|
||||
m.put("localName", status.getLocalName());
|
||||
m.put("isDir", status.isDir());
|
||||
m.put("isSymlink", status.isSymlink());
|
||||
m.put("type", PathType.valueOf(status));
|
||||
if (status.isSymlink()) {
|
||||
m.put("symlink", status.getSymlink());
|
||||
}
|
||||
|
||||
m.put("len", status.getLen());
|
||||
m.put("length", status.getLen());
|
||||
m.put("owner", status.getOwner());
|
||||
m.put("group", status.getGroup());
|
||||
m.put("permission", toString(status.getPermission()));
|
||||
|
@ -169,12 +176,11 @@ public class JsonUtil {
|
|||
final Map<?, ?> m = includesType ?
|
||||
(Map<?, ?>)json.get(HdfsFileStatus.class.getSimpleName()) : json;
|
||||
final String localName = (String) m.get("localName");
|
||||
final boolean isDir = (Boolean) m.get("isDir");
|
||||
final boolean isSymlink = (Boolean) m.get("isSymlink");
|
||||
final byte[] symlink = isSymlink?
|
||||
DFSUtil.string2Bytes((String)m.get("symlink")): null;
|
||||
final PathType type = PathType.valueOf((String) m.get("type"));
|
||||
final byte[] symlink = type != PathType.SYMLINK? null
|
||||
: DFSUtil.string2Bytes((String)m.get("symlink"));
|
||||
|
||||
final long len = (Long) m.get("len");
|
||||
final long len = (Long) m.get("length");
|
||||
final String owner = (String) m.get("owner");
|
||||
final String group = (String) m.get("group");
|
||||
final FsPermission permission = toFsPermission((String) m.get("permission"));
|
||||
|
@ -182,8 +188,8 @@ public class JsonUtil {
|
|||
final long mTime = (Long) m.get("modificationTime");
|
||||
final long blockSize = (Long) m.get("blockSize");
|
||||
final short replication = (short) (long) (Long) m.get("replication");
|
||||
return new HdfsFileStatus(len, isDir, replication, blockSize, mTime, aTime,
|
||||
permission, owner, group,
|
||||
return new HdfsFileStatus(len, type == PathType.DIRECTORY, replication,
|
||||
blockSize, mTime, aTime, permission, owner, group,
|
||||
symlink, DFSUtil.string2Bytes(localName));
|
||||
}
|
||||
|
||||
|
|
|
@ -25,12 +25,14 @@ import java.io.InputStream;
|
|||
import java.io.InputStreamReader;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.StringTokenizer;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -62,7 +64,6 @@ import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
|
|||
import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.DestinationParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
|
||||
|
@ -81,6 +82,7 @@ import org.apache.hadoop.hdfs.web.resources.RecursiveParam;
|
|||
import org.apache.hadoop.hdfs.web.resources.RenameOptionSetParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.RenewerParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.UserParam;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
|
@ -388,9 +390,9 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
|
||||
private FileStatus makeQualified(HdfsFileStatus f, Path parent) {
|
||||
return new FileStatus(f.getLen(), f.isDir(), f.getReplication(),
|
||||
f.getBlockSize(), f.getModificationTime(),
|
||||
f.getAccessTime(),
|
||||
f.getBlockSize(), f.getModificationTime(), f.getAccessTime(),
|
||||
f.getPermission(), f.getOwner(), f.getGroup(),
|
||||
f.isSymlink() ? new Path(f.getSymlink()) : null,
|
||||
f.getFullPath(parent).makeQualified(getUri(), getWorkingDirectory()));
|
||||
}
|
||||
|
||||
|
@ -532,24 +534,84 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
statistics.incrementReadOps(1);
|
||||
final HttpOpParam.Op op = GetOpParam.Op.OPEN;
|
||||
final URL url = toUrl(op, f, new BufferSizeParam(buffersize));
|
||||
ByteRangeInputStream str = getByteRangeInputStream(url);
|
||||
return new FSDataInputStream(str);
|
||||
return new FSDataInputStream(new OffsetUrlInputStream(
|
||||
new OffsetUrlOpener(url), new OffsetUrlOpener(null)));
|
||||
}
|
||||
|
||||
private class URLOpener extends ByteRangeInputStream.URLOpener {
|
||||
|
||||
public URLOpener(URL u) {
|
||||
super(u);
|
||||
class OffsetUrlOpener extends ByteRangeInputStream.URLOpener {
|
||||
/** The url with offset parameter */
|
||||
private URL offsetUrl;
|
||||
|
||||
OffsetUrlOpener(final URL url) {
|
||||
super(url);
|
||||
}
|
||||
|
||||
/** Open connection with offset url. */
|
||||
@Override
|
||||
public HttpURLConnection openConnection() throws IOException {
|
||||
protected HttpURLConnection openConnection() throws IOException {
|
||||
return getHttpUrlConnection(offsetUrl);
|
||||
}
|
||||
|
||||
/** Setup offset url before open connection. */
|
||||
@Override
|
||||
protected HttpURLConnection openConnection(final long offset) throws IOException {
|
||||
offsetUrl = offset == 0L? url: new URL(url + "&" + new OffsetParam(offset));
|
||||
final HttpURLConnection conn = openConnection();
|
||||
conn.setRequestMethod("GET");
|
||||
return conn;
|
||||
}
|
||||
}
|
||||
|
||||
private ByteRangeInputStream getByteRangeInputStream(URL url) {
|
||||
return new ByteRangeInputStream(new URLOpener(url), new URLOpener(null));
|
||||
|
||||
private static final String OFFSET_PARAM_PREFIX = OffsetParam.NAME + "=";
|
||||
|
||||
/** Remove offset parameter, if there is any, from the url */
|
||||
static URL removeOffsetParam(final URL url) throws MalformedURLException {
|
||||
String query = url.getQuery();
|
||||
if (query == null) {
|
||||
return url;
|
||||
}
|
||||
final String lower = query.toLowerCase();
|
||||
if (!lower.startsWith(OFFSET_PARAM_PREFIX)
|
||||
&& !lower.contains("&" + OFFSET_PARAM_PREFIX)) {
|
||||
return url;
|
||||
}
|
||||
|
||||
//rebuild query
|
||||
StringBuilder b = null;
|
||||
for(final StringTokenizer st = new StringTokenizer(query, "&");
|
||||
st.hasMoreTokens();) {
|
||||
final String token = st.nextToken();
|
||||
if (!token.toLowerCase().startsWith(OFFSET_PARAM_PREFIX)) {
|
||||
if (b == null) {
|
||||
b = new StringBuilder("?").append(token);
|
||||
} else {
|
||||
b.append('&').append(token);
|
||||
}
|
||||
}
|
||||
}
|
||||
query = b == null? "": b.toString();
|
||||
|
||||
final String urlStr = url.toString();
|
||||
return new URL(urlStr.substring(0, urlStr.indexOf('?')) + query);
|
||||
}
|
||||
|
||||
static class OffsetUrlInputStream extends ByteRangeInputStream {
|
||||
OffsetUrlInputStream(URLOpener o, URLOpener r) {
|
||||
super(o, r);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void checkResponseCode(final HttpURLConnection connection
|
||||
) throws IOException {
|
||||
validateResponse(GetOpParam.Op.OPEN, connection);
|
||||
}
|
||||
|
||||
/** Remove offset parameter before returning the resolved url. */
|
||||
@Override
|
||||
protected URL getResolvedUrl(final HttpURLConnection connection
|
||||
) throws MalformedURLException {
|
||||
return removeOffsetParam(connection.getURL());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -641,7 +703,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
final long offset, final long length) throws IOException {
|
||||
statistics.incrementReadOps(1);
|
||||
|
||||
final HttpOpParam.Op op = GetOpParam.Op.GETFILEBLOCKLOCATIONS;
|
||||
final HttpOpParam.Op op = GetOpParam.Op.GET_BLOCK_LOCATIONS;
|
||||
final Map<?, ?> m = run(op, p, new OffsetParam(offset),
|
||||
new LengthParam(length));
|
||||
return DFSUtil.locatedBlocks2Locations(JsonUtil.toLocatedBlocks(m));
|
||||
|
|
|
@ -24,7 +24,6 @@ public class GetOpParam extends HttpOpParam<GetOpParam.Op> {
|
|||
/** Get operations. */
|
||||
public static enum Op implements HttpOpParam.Op {
|
||||
OPEN(HttpURLConnection.HTTP_OK),
|
||||
GETFILEBLOCKLOCATIONS(HttpURLConnection.HTTP_OK),
|
||||
|
||||
GETFILESTATUS(HttpURLConnection.HTTP_OK),
|
||||
LISTSTATUS(HttpURLConnection.HTTP_OK),
|
||||
|
@ -33,6 +32,9 @@ public class GetOpParam extends HttpOpParam<GetOpParam.Op> {
|
|||
|
||||
GETDELEGATIONTOKEN(HttpURLConnection.HTTP_OK),
|
||||
|
||||
/** GET_BLOCK_LOCATIONS is a private unstable op. */
|
||||
GET_BLOCK_LOCATIONS(HttpURLConnection.HTTP_OK),
|
||||
|
||||
NULL(HttpURLConnection.HTTP_NOT_IMPLEMENTED);
|
||||
|
||||
final int expectedHttpResponseCode;
|
||||
|
|
|
@ -31,10 +31,10 @@ import java.io.InputStream;
|
|||
import java.net.HttpURLConnection;
|
||||
import java.net.URL;
|
||||
|
||||
import org.apache.hadoop.hdfs.ByteRangeInputStream.URLOpener;
|
||||
import org.junit.Test;
|
||||
|
||||
class MockHttpURLConnection extends HttpURLConnection {
|
||||
public class TestByteRangeInputStream {
|
||||
public static class MockHttpURLConnection extends HttpURLConnection {
|
||||
public MockHttpURLConnection(URL u) {
|
||||
super(u);
|
||||
}
|
||||
|
@ -85,54 +85,18 @@ class MockHttpURLConnection extends HttpURLConnection {
|
|||
responseCode = resCode;
|
||||
}
|
||||
}
|
||||
|
||||
public class TestByteRangeInputStream {
|
||||
@Test
|
||||
public void testRemoveOffset() throws IOException {
|
||||
{ //no offset
|
||||
String s = "http://test/Abc?Length=99";
|
||||
assertEquals(s, ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
|
||||
}
|
||||
|
||||
{ //no parameters
|
||||
String s = "http://test/Abc";
|
||||
assertEquals(s, ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
|
||||
}
|
||||
|
||||
{ //offset as first parameter
|
||||
String s = "http://test/Abc?offset=10&Length=99";
|
||||
assertEquals("http://test/Abc?Length=99",
|
||||
ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
|
||||
}
|
||||
|
||||
{ //offset as second parameter
|
||||
String s = "http://test/Abc?op=read&OFFset=10&Length=99";
|
||||
assertEquals("http://test/Abc?op=read&Length=99",
|
||||
ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
|
||||
}
|
||||
|
||||
{ //offset as last parameter
|
||||
String s = "http://test/Abc?Length=99&offset=10";
|
||||
assertEquals("http://test/Abc?Length=99",
|
||||
ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
|
||||
}
|
||||
|
||||
{ //offset as the only parameter
|
||||
String s = "http://test/Abc?offset=10";
|
||||
assertEquals("http://test/Abc",
|
||||
ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testByteRange() throws IOException {
|
||||
URLOpener ospy = spy(new URLOpener(new URL("http://test/")));
|
||||
HftpFileSystem.RangeHeaderUrlOpener ospy = spy(
|
||||
new HftpFileSystem.RangeHeaderUrlOpener(new URL("http://test/")));
|
||||
doReturn(new MockHttpURLConnection(ospy.getURL())).when(ospy)
|
||||
.openConnection();
|
||||
URLOpener rspy = spy(new URLOpener((URL) null));
|
||||
HftpFileSystem.RangeHeaderUrlOpener rspy = spy(
|
||||
new HftpFileSystem.RangeHeaderUrlOpener((URL) null));
|
||||
doReturn(new MockHttpURLConnection(rspy.getURL())).when(rspy)
|
||||
.openConnection();
|
||||
ByteRangeInputStream is = new ByteRangeInputStream(ospy, rspy);
|
||||
ByteRangeInputStream is = new HftpFileSystem.RangeHeaderInputStream(ospy, rspy);
|
||||
|
||||
assertEquals("getPos wrong", 0, is.getPos());
|
||||
|
||||
|
|
|
@ -0,0 +1,137 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.web;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hdfs.TestByteRangeInputStream.MockHttpURLConnection;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.OffsetUrlInputStream;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.OffsetUrlOpener;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestOffsetUrlInputStream {
|
||||
@Test
|
||||
public void testRemoveOffset() throws IOException {
|
||||
{ //no offset
|
||||
String s = "http://test/Abc?Length=99";
|
||||
assertEquals(s, WebHdfsFileSystem.removeOffsetParam(new URL(s)).toString());
|
||||
}
|
||||
|
||||
{ //no parameters
|
||||
String s = "http://test/Abc";
|
||||
assertEquals(s, WebHdfsFileSystem.removeOffsetParam(new URL(s)).toString());
|
||||
}
|
||||
|
||||
{ //offset as first parameter
|
||||
String s = "http://test/Abc?offset=10&Length=99";
|
||||
assertEquals("http://test/Abc?Length=99",
|
||||
WebHdfsFileSystem.removeOffsetParam(new URL(s)).toString());
|
||||
}
|
||||
|
||||
{ //offset as second parameter
|
||||
String s = "http://test/Abc?op=read&OFFset=10&Length=99";
|
||||
assertEquals("http://test/Abc?op=read&Length=99",
|
||||
WebHdfsFileSystem.removeOffsetParam(new URL(s)).toString());
|
||||
}
|
||||
|
||||
{ //offset as last parameter
|
||||
String s = "http://test/Abc?Length=99&offset=10";
|
||||
assertEquals("http://test/Abc?Length=99",
|
||||
WebHdfsFileSystem.removeOffsetParam(new URL(s)).toString());
|
||||
}
|
||||
|
||||
{ //offset as the only parameter
|
||||
String s = "http://test/Abc?offset=10";
|
||||
assertEquals("http://test/Abc",
|
||||
WebHdfsFileSystem.removeOffsetParam(new URL(s)).toString());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testByteRange() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
final String uri = WebHdfsFileSystem.SCHEME + "://localhost:50070/";
|
||||
final WebHdfsFileSystem webhdfs = (WebHdfsFileSystem)FileSystem.get(new URI(uri), conf);
|
||||
|
||||
OffsetUrlOpener ospy = spy(webhdfs.new OffsetUrlOpener(new URL("http://test/")));
|
||||
doReturn(new MockHttpURLConnection(ospy.getURL())).when(ospy)
|
||||
.openConnection();
|
||||
OffsetUrlOpener rspy = spy(webhdfs.new OffsetUrlOpener((URL) null));
|
||||
doReturn(new MockHttpURLConnection(rspy.getURL())).when(rspy)
|
||||
.openConnection();
|
||||
final OffsetUrlInputStream is = new OffsetUrlInputStream(ospy, rspy);
|
||||
|
||||
assertEquals("getPos wrong", 0, is.getPos());
|
||||
|
||||
is.read();
|
||||
|
||||
assertNull("Initial call made incorrectly (Range Check)", ospy
|
||||
.openConnection().getRequestProperty("Range"));
|
||||
|
||||
assertEquals("getPos should be 1 after reading one byte", 1, is.getPos());
|
||||
|
||||
is.read();
|
||||
|
||||
assertEquals("getPos should be 2 after reading two bytes", 2, is.getPos());
|
||||
|
||||
// No additional connections should have been made (no seek)
|
||||
|
||||
rspy.setURL(new URL("http://resolvedurl/"));
|
||||
|
||||
is.seek(100);
|
||||
is.read();
|
||||
|
||||
assertEquals("getPos should be 101 after reading one byte", 101,
|
||||
is.getPos());
|
||||
|
||||
verify(rspy, times(1)).openConnection();
|
||||
|
||||
is.seek(101);
|
||||
is.read();
|
||||
|
||||
verify(rspy, times(1)).openConnection();
|
||||
|
||||
// Seek to 101 should not result in another request"
|
||||
|
||||
is.seek(2500);
|
||||
is.read();
|
||||
|
||||
((MockHttpURLConnection) rspy.openConnection()).setResponseCode(206);
|
||||
is.seek(0);
|
||||
|
||||
try {
|
||||
is.read();
|
||||
fail("Exception should be thrown when 206 response is given "
|
||||
+ "but 200 is expected");
|
||||
} catch (IOException e) {
|
||||
WebHdfsFileSystem.LOG.info(e.toString());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -280,7 +280,7 @@ public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest {
|
|||
final HttpURLConnection conn = (HttpURLConnection) url.openConnection();
|
||||
conn.setRequestMethod(op.getType().toString());
|
||||
conn.connect();
|
||||
assertEquals(HttpServletResponse.SC_FORBIDDEN, conn.getResponseCode());
|
||||
assertEquals(HttpServletResponse.SC_OK, conn.getResponseCode());
|
||||
|
||||
assertFalse(webhdfs.setReplication(dir, (short)1));
|
||||
conn.disconnect();
|
||||
|
|
Loading…
Reference in New Issue