HDFS-2317. Support read access to HDFS in webhdfs.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1170085 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b6d5d02fc0
commit
6c3b59505b
|
@ -8,6 +8,8 @@ Trunk (unreleased changes)
|
||||||
HDFS-2284. Add a new FileSystem, webhdfs://, for supporting write Http
|
HDFS-2284. Add a new FileSystem, webhdfs://, for supporting write Http
|
||||||
access to HDFS. (szetszwo)
|
access to HDFS. (szetszwo)
|
||||||
|
|
||||||
|
HDFS-2317. Support read access to HDFS in webhdfs. (szetszwo)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
HADOOP-7524 Change RPC to allow multiple protocols including multuple versions of the same protocol (sanjay Radia)
|
HADOOP-7524 Change RPC to allow multiple protocols including multuple versions of the same protocol (sanjay Radia)
|
||||||
|
|
|
@ -33,7 +33,7 @@ import org.apache.hadoop.hdfs.server.namenode.StreamFile;
|
||||||
* is made on the successive read(). The normal input stream functions are
|
* is made on the successive read(). The normal input stream functions are
|
||||||
* connected to the currently active input stream.
|
* connected to the currently active input stream.
|
||||||
*/
|
*/
|
||||||
class ByteRangeInputStream extends FSInputStream {
|
public class ByteRangeInputStream extends FSInputStream {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class wraps a URL to allow easy mocking when testing. The URL class
|
* This class wraps a URL to allow easy mocking when testing. The URL class
|
||||||
|
@ -71,7 +71,8 @@ class ByteRangeInputStream extends FSInputStream {
|
||||||
|
|
||||||
StreamStatus status = StreamStatus.SEEK;
|
StreamStatus status = StreamStatus.SEEK;
|
||||||
|
|
||||||
ByteRangeInputStream(final URL url) {
|
/** Create an input stream with the URL. */
|
||||||
|
public ByteRangeInputStream(final URL url) {
|
||||||
this(new URLOpener(url), new URLOpener(null));
|
this(new URLOpener(url), new URLOpener(null));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -229,12 +229,11 @@ public class DistributedFileSystem extends FileSystem {
|
||||||
return dfs.recoverLease(getPathName(f));
|
return dfs.recoverLease(getPathName(f));
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
@Override
|
@Override
|
||||||
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
|
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
|
||||||
statistics.incrementReadOps(1);
|
statistics.incrementReadOps(1);
|
||||||
return new DFSClient.DFSDataInputStream(
|
return new DFSClient.DFSDataInputStream(
|
||||||
dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics));
|
dfs.open(getPathName(f), bufferSize, verifyChecksum));
|
||||||
}
|
}
|
||||||
|
|
||||||
/** This optional operation is not yet supported. */
|
/** This optional operation is not yet supported. */
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode.web.resources;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
|
@ -27,6 +28,7 @@ import java.util.EnumSet;
|
||||||
import javax.servlet.ServletContext;
|
import javax.servlet.ServletContext;
|
||||||
import javax.ws.rs.Consumes;
|
import javax.ws.rs.Consumes;
|
||||||
import javax.ws.rs.DefaultValue;
|
import javax.ws.rs.DefaultValue;
|
||||||
|
import javax.ws.rs.GET;
|
||||||
import javax.ws.rs.POST;
|
import javax.ws.rs.POST;
|
||||||
import javax.ws.rs.PUT;
|
import javax.ws.rs.PUT;
|
||||||
import javax.ws.rs.Path;
|
import javax.ws.rs.Path;
|
||||||
|
@ -36,6 +38,7 @@ import javax.ws.rs.QueryParam;
|
||||||
import javax.ws.rs.core.Context;
|
import javax.ws.rs.core.Context;
|
||||||
import javax.ws.rs.core.MediaType;
|
import javax.ws.rs.core.MediaType;
|
||||||
import javax.ws.rs.core.Response;
|
import javax.ws.rs.core.Response;
|
||||||
|
import javax.ws.rs.core.StreamingOutput;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -43,12 +46,16 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CreateFlag;
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
|
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
|
||||||
import org.apache.hadoop.hdfs.DFSOutputStream;
|
import org.apache.hadoop.hdfs.DFSOutputStream;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||||
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
|
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
|
||||||
import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
|
import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
|
||||||
|
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
|
||||||
|
import org.apache.hadoop.hdfs.web.resources.LengthParam;
|
||||||
|
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
|
||||||
import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
|
import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
|
||||||
import org.apache.hadoop.hdfs.web.resources.Param;
|
import org.apache.hadoop.hdfs.web.resources.Param;
|
||||||
import org.apache.hadoop.hdfs.web.resources.PermissionParam;
|
import org.apache.hadoop.hdfs.web.resources.PermissionParam;
|
||||||
|
@ -61,7 +68,7 @@ import org.apache.hadoop.io.IOUtils;
|
||||||
/** Web-hdfs DataNode implementation. */
|
/** Web-hdfs DataNode implementation. */
|
||||||
@Path("")
|
@Path("")
|
||||||
public class DatanodeWebHdfsMethods {
|
public class DatanodeWebHdfsMethods {
|
||||||
private static final Log LOG = LogFactory.getLog(DatanodeWebHdfsMethods.class);
|
public static final Log LOG = LogFactory.getLog(DatanodeWebHdfsMethods.class);
|
||||||
|
|
||||||
private @Context ServletContext context;
|
private @Context ServletContext context;
|
||||||
|
|
||||||
|
@ -166,4 +173,56 @@ public class DatanodeWebHdfsMethods {
|
||||||
throw new UnsupportedOperationException(op + " is not supported");
|
throw new UnsupportedOperationException(op + " is not supported");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Handle HTTP GET request. */
|
||||||
|
@GET
|
||||||
|
@Path("{" + UriFsPathParam.NAME + ":.*}")
|
||||||
|
@Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
|
||||||
|
public Response get(
|
||||||
|
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
|
||||||
|
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
|
||||||
|
final GetOpParam op,
|
||||||
|
@QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT)
|
||||||
|
final OffsetParam offset,
|
||||||
|
@QueryParam(LengthParam.NAME) @DefaultValue(LengthParam.DEFAULT)
|
||||||
|
final LengthParam length,
|
||||||
|
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
|
||||||
|
final BufferSizeParam bufferSize
|
||||||
|
) throws IOException, URISyntaxException {
|
||||||
|
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace(op + ": " + path
|
||||||
|
+ Param.toSortedString(", ", offset, length, bufferSize));
|
||||||
|
}
|
||||||
|
|
||||||
|
final String fullpath = path.getAbsolutePath();
|
||||||
|
final DataNode datanode = (DataNode)context.getAttribute("datanode");
|
||||||
|
|
||||||
|
switch(op.getValue()) {
|
||||||
|
case OPEN:
|
||||||
|
{
|
||||||
|
final Configuration conf = new Configuration(datanode.getConf());
|
||||||
|
final InetSocketAddress nnRpcAddr = NameNode.getAddress(conf);
|
||||||
|
final DFSClient dfsclient = new DFSClient(nnRpcAddr, conf);
|
||||||
|
final DFSDataInputStream in = new DFSClient.DFSDataInputStream(
|
||||||
|
dfsclient.open(fullpath, bufferSize.getValue(), true));
|
||||||
|
in.seek(offset.getValue());
|
||||||
|
|
||||||
|
final StreamingOutput streaming = new StreamingOutput() {
|
||||||
|
@Override
|
||||||
|
public void write(final OutputStream out) throws IOException {
|
||||||
|
final Long n = length.getValue();
|
||||||
|
if (n == null) {
|
||||||
|
IOUtils.copyBytes(in, out, bufferSize.getValue());
|
||||||
|
} else {
|
||||||
|
IOUtils.copyBytes(in, out, n, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
return Response.ok(streaming).type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
throw new UnsupportedOperationException(op + " is not supported");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,8 +17,11 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode.web.resources;
|
package org.apache.hadoop.hdfs.server.namenode.web.resources;
|
||||||
|
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.io.PrintStream;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
@ -37,11 +40,13 @@ import javax.ws.rs.QueryParam;
|
||||||
import javax.ws.rs.core.Context;
|
import javax.ws.rs.core.Context;
|
||||||
import javax.ws.rs.core.MediaType;
|
import javax.ws.rs.core.MediaType;
|
||||||
import javax.ws.rs.core.Response;
|
import javax.ws.rs.core.Response;
|
||||||
|
import javax.ws.rs.core.StreamingOutput;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.fs.Options;
|
import org.apache.hadoop.fs.Options;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||||
|
@ -58,7 +63,9 @@ import org.apache.hadoop.hdfs.web.resources.DstPathParam;
|
||||||
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
|
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
|
||||||
import org.apache.hadoop.hdfs.web.resources.GroupParam;
|
import org.apache.hadoop.hdfs.web.resources.GroupParam;
|
||||||
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
|
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
|
||||||
|
import org.apache.hadoop.hdfs.web.resources.LengthParam;
|
||||||
import org.apache.hadoop.hdfs.web.resources.ModificationTimeParam;
|
import org.apache.hadoop.hdfs.web.resources.ModificationTimeParam;
|
||||||
|
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
|
||||||
import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
|
import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
|
||||||
import org.apache.hadoop.hdfs.web.resources.OwnerParam;
|
import org.apache.hadoop.hdfs.web.resources.OwnerParam;
|
||||||
import org.apache.hadoop.hdfs.web.resources.Param;
|
import org.apache.hadoop.hdfs.web.resources.Param;
|
||||||
|
@ -79,15 +86,23 @@ public class NamenodeWebHdfsMethods {
|
||||||
private @Context ServletContext context;
|
private @Context ServletContext context;
|
||||||
|
|
||||||
private static DatanodeInfo chooseDatanode(final NameNode namenode,
|
private static DatanodeInfo chooseDatanode(final NameNode namenode,
|
||||||
final String path, final HttpOpParam.Op op) throws IOException {
|
final String path, final HttpOpParam.Op op, final long openOffset
|
||||||
if (op == PostOpParam.Op.APPEND) {
|
) throws IOException {
|
||||||
final HdfsFileStatus status = namenode.getRpcServer().getFileInfo(path);
|
if (op == GetOpParam.Op.OPEN || op == PostOpParam.Op.APPEND) {
|
||||||
|
final NamenodeProtocols np = namenode.getRpcServer();
|
||||||
|
final HdfsFileStatus status = np.getFileInfo(path);
|
||||||
final long len = status.getLen();
|
final long len = status.getLen();
|
||||||
|
if (op == GetOpParam.Op.OPEN && (openOffset < 0L || openOffset >= len)) {
|
||||||
|
throw new IOException("Offset=" + openOffset + " out of the range [0, "
|
||||||
|
+ len + "); " + op + ", path=" + path);
|
||||||
|
}
|
||||||
|
|
||||||
if (len > 0) {
|
if (len > 0) {
|
||||||
final LocatedBlocks locations = namenode.getRpcServer().getBlockLocations(path, len-1, 1);
|
final long offset = op == GetOpParam.Op.OPEN? openOffset: len - 1;
|
||||||
|
final LocatedBlocks locations = np.getBlockLocations(path, offset, 1);
|
||||||
final int count = locations.locatedBlockCount();
|
final int count = locations.locatedBlockCount();
|
||||||
if (count > 0) {
|
if (count > 0) {
|
||||||
return JspHelper.bestNode(locations.get(count - 1));
|
return JspHelper.bestNode(locations.get(0));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -98,9 +113,9 @@ public class NamenodeWebHdfsMethods {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static URI redirectURI(final NameNode namenode,
|
private static URI redirectURI(final NameNode namenode,
|
||||||
final String path, final HttpOpParam.Op op,
|
final String path, final HttpOpParam.Op op, final long openOffset,
|
||||||
final Param<?, ?>... parameters) throws URISyntaxException, IOException {
|
final Param<?, ?>... parameters) throws URISyntaxException, IOException {
|
||||||
final DatanodeInfo dn = chooseDatanode(namenode, path, op);
|
final DatanodeInfo dn = chooseDatanode(namenode, path, op, openOffset);
|
||||||
final String query = op.toQueryString() + Param.toSortedString("&", parameters);
|
final String query = op.toQueryString() + Param.toSortedString("&", parameters);
|
||||||
final String uripath = "/" + WebHdfsFileSystem.PATH_PREFIX + path;
|
final String uripath = "/" + WebHdfsFileSystem.PATH_PREFIX + path;
|
||||||
|
|
||||||
|
@ -148,8 +163,9 @@ public class NamenodeWebHdfsMethods {
|
||||||
|
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace(op + ": " + path
|
LOG.trace(op + ": " + path
|
||||||
+ Param.toSortedString(", ", dstPath, owner, group, permission,
|
+ Param.toSortedString(", ", dstPath, owner, group, permission,
|
||||||
overwrite, bufferSize, replication, blockSize));
|
overwrite, bufferSize, replication, blockSize,
|
||||||
|
modificationTime, accessTime, renameOptions));
|
||||||
}
|
}
|
||||||
|
|
||||||
final String fullpath = path.getAbsolutePath();
|
final String fullpath = path.getAbsolutePath();
|
||||||
|
@ -159,7 +175,7 @@ public class NamenodeWebHdfsMethods {
|
||||||
switch(op.getValue()) {
|
switch(op.getValue()) {
|
||||||
case CREATE:
|
case CREATE:
|
||||||
{
|
{
|
||||||
final URI uri = redirectURI(namenode, fullpath, op.getValue(),
|
final URI uri = redirectURI(namenode, fullpath, op.getValue(), -1L,
|
||||||
permission, overwrite, bufferSize, replication, blockSize);
|
permission, overwrite, bufferSize, replication, blockSize);
|
||||||
return Response.temporaryRedirect(uri).build();
|
return Response.temporaryRedirect(uri).build();
|
||||||
}
|
}
|
||||||
|
@ -234,7 +250,8 @@ public class NamenodeWebHdfsMethods {
|
||||||
switch(op.getValue()) {
|
switch(op.getValue()) {
|
||||||
case APPEND:
|
case APPEND:
|
||||||
{
|
{
|
||||||
final URI uri = redirectURI(namenode, fullpath, op.getValue(), bufferSize);
|
final URI uri = redirectURI(namenode, fullpath, op.getValue(), -1L,
|
||||||
|
bufferSize);
|
||||||
return Response.temporaryRedirect(uri).build();
|
return Response.temporaryRedirect(uri).build();
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
|
@ -250,9 +267,15 @@ public class NamenodeWebHdfsMethods {
|
||||||
@Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
|
@Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
|
||||||
public Response root(
|
public Response root(
|
||||||
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
|
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
|
||||||
final GetOpParam op
|
final GetOpParam op,
|
||||||
) throws IOException {
|
@QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT)
|
||||||
return get(ROOT, op);
|
final OffsetParam offset,
|
||||||
|
@QueryParam(LengthParam.NAME) @DefaultValue(LengthParam.DEFAULT)
|
||||||
|
final LengthParam length,
|
||||||
|
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
|
||||||
|
final BufferSizeParam bufferSize
|
||||||
|
) throws IOException, URISyntaxException {
|
||||||
|
return get(ROOT, op, offset, length, bufferSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Handle HTTP GET request. */
|
/** Handle HTTP GET request. */
|
||||||
|
@ -262,27 +285,89 @@ public class NamenodeWebHdfsMethods {
|
||||||
public Response get(
|
public Response get(
|
||||||
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
|
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
|
||||||
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
|
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
|
||||||
final GetOpParam op
|
final GetOpParam op,
|
||||||
) throws IOException {
|
@QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT)
|
||||||
|
final OffsetParam offset,
|
||||||
|
@QueryParam(LengthParam.NAME) @DefaultValue(LengthParam.DEFAULT)
|
||||||
|
final LengthParam length,
|
||||||
|
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
|
||||||
|
final BufferSizeParam bufferSize
|
||||||
|
) throws IOException, URISyntaxException {
|
||||||
|
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace(op + ", " + path
|
LOG.trace(op + ", " + path
|
||||||
+ Param.toSortedString(", "));
|
+ Param.toSortedString(", ", offset, length, bufferSize));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
||||||
|
final String fullpath = path.getAbsolutePath();
|
||||||
|
final NamenodeProtocols np = namenode.getRpcServer();
|
||||||
|
|
||||||
switch(op.getValue()) {
|
switch(op.getValue()) {
|
||||||
|
case OPEN:
|
||||||
|
{
|
||||||
|
final URI uri = redirectURI(namenode, fullpath, op.getValue(),
|
||||||
|
offset.getValue(), offset, length, bufferSize);
|
||||||
|
return Response.temporaryRedirect(uri).build();
|
||||||
|
}
|
||||||
case GETFILESTATUS:
|
case GETFILESTATUS:
|
||||||
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
{
|
||||||
final String fullpath = path.getAbsolutePath();
|
final HdfsFileStatus status = np.getFileInfo(fullpath);
|
||||||
final HdfsFileStatus status = namenode.getRpcServer().getFileInfo(fullpath);
|
|
||||||
final String js = JsonUtil.toJsonString(status);
|
final String js = JsonUtil.toJsonString(status);
|
||||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||||
|
}
|
||||||
|
case LISTSTATUS:
|
||||||
|
{
|
||||||
|
final StreamingOutput streaming = getListingStream(np, fullpath);
|
||||||
|
return Response.ok(streaming).type(MediaType.APPLICATION_JSON).build();
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
throw new UnsupportedOperationException(op + " is not supported");
|
throw new UnsupportedOperationException(op + " is not supported");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static DirectoryListing getDirectoryListing(final NamenodeProtocols np,
|
||||||
|
final String p, byte[] startAfter) throws IOException {
|
||||||
|
final DirectoryListing listing = np.getListing(p, startAfter, false);
|
||||||
|
if (listing == null) { // the directory does not exist
|
||||||
|
throw new FileNotFoundException("File " + p + " does not exist.");
|
||||||
|
}
|
||||||
|
return listing;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static StreamingOutput getListingStream(final NamenodeProtocols np,
|
||||||
|
final String p) throws IOException {
|
||||||
|
final DirectoryListing first = getDirectoryListing(np, p,
|
||||||
|
HdfsFileStatus.EMPTY_NAME);
|
||||||
|
|
||||||
|
return new StreamingOutput() {
|
||||||
|
@Override
|
||||||
|
public void write(final OutputStream outstream) throws IOException {
|
||||||
|
final PrintStream out = new PrintStream(outstream);
|
||||||
|
out.print('[');
|
||||||
|
|
||||||
|
final HdfsFileStatus[] partial = first.getPartialListing();
|
||||||
|
if (partial.length > 0) {
|
||||||
|
out.print(JsonUtil.toJsonString(partial[0]));
|
||||||
|
}
|
||||||
|
for(int i = 1; i < partial.length; i++) {
|
||||||
|
out.println(',');
|
||||||
|
out.print(JsonUtil.toJsonString(partial[i]));
|
||||||
|
}
|
||||||
|
|
||||||
|
for(DirectoryListing curr = first; curr.hasMore(); ) {
|
||||||
|
curr = getDirectoryListing(np, p, curr.getLastName());
|
||||||
|
for(HdfsFileStatus s : curr.getPartialListing()) {
|
||||||
|
out.println(',');
|
||||||
|
out.print(JsonUtil.toJsonString(s));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
out.println(']');
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
/** Handle HTTP DELETE request. */
|
/** Handle HTTP DELETE request. */
|
||||||
@DELETE
|
@DELETE
|
||||||
@Path("{path:.*}")
|
@Path("{path:.*}")
|
||||||
|
|
|
@ -30,6 +30,7 @@ import java.net.URL;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
@ -37,6 +38,7 @@ import org.apache.hadoop.fs.Options;
|
||||||
import org.apache.hadoop.fs.ParentNotDirectoryException;
|
import org.apache.hadoop.fs.ParentNotDirectoryException;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.hdfs.ByteRangeInputStream;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.HftpFileSystem;
|
import org.apache.hadoop.hdfs.HftpFileSystem;
|
||||||
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
||||||
|
@ -122,12 +124,11 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private static Map<String, Object> jsonParse(final InputStream in
|
private static <T> T jsonParse(final InputStream in) throws IOException {
|
||||||
) throws IOException {
|
|
||||||
if (in == null) {
|
if (in == null) {
|
||||||
throw new IOException("The input stream is null.");
|
throw new IOException("The input stream is null.");
|
||||||
}
|
}
|
||||||
return (Map<String, Object>)JSON.parse(new InputStreamReader(in));
|
return (T)JSON.parse(new InputStreamReader(in));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void validateResponse(final HttpOpParam.Op op,
|
private static void validateResponse(final HttpOpParam.Op op,
|
||||||
|
@ -138,7 +139,7 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
||||||
try {
|
try {
|
||||||
m = jsonParse(conn.getErrorStream());
|
m = jsonParse(conn.getErrorStream());
|
||||||
} catch(IOException e) {
|
} catch(IOException e) {
|
||||||
throw new IOException("Unexpected HTTP response: code = " + code + " != "
|
throw new IOException("Unexpected HTTP response: code=" + code + " != "
|
||||||
+ op.getExpectedHttpResponseCode() + ", " + op.toQueryString()
|
+ op.getExpectedHttpResponseCode() + ", " + op.toQueryString()
|
||||||
+ ", message=" + conn.getResponseMessage(), e);
|
+ ", message=" + conn.getResponseMessage(), e);
|
||||||
}
|
}
|
||||||
|
@ -155,22 +156,26 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
private URL toUrl(final HttpOpParam.Op op, final Path fspath,
|
||||||
protected HttpURLConnection openConnection(String path, String query)
|
final Param<?,?>... parameters) throws IOException {
|
||||||
throws IOException {
|
//initialize URI path and query
|
||||||
query = addDelegationTokenParam(query);
|
final String path = "/" + PATH_PREFIX
|
||||||
|
+ makeQualified(fspath).toUri().getPath();
|
||||||
|
final String query = op.toQueryString()
|
||||||
|
+ Param.toSortedString("&", parameters);
|
||||||
final URL url = getNamenodeURL(path, query);
|
final URL url = getNamenodeURL(path, query);
|
||||||
return (HttpURLConnection)url.openConnection();
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("url=" + url);
|
||||||
|
}
|
||||||
|
return url;
|
||||||
}
|
}
|
||||||
|
|
||||||
private HttpURLConnection httpConnect(final HttpOpParam.Op op, final Path fspath,
|
private HttpURLConnection httpConnect(final HttpOpParam.Op op, final Path fspath,
|
||||||
final Param<?,?>... parameters) throws IOException {
|
final Param<?,?>... parameters) throws IOException {
|
||||||
//initialize URI path and query
|
final URL url = toUrl(op, fspath, parameters);
|
||||||
final String uripath = "/" + PATH_PREFIX + makeQualified(fspath).toUri().getPath();
|
|
||||||
final String query = op.toQueryString() + Param.toSortedString("&", parameters);
|
|
||||||
|
|
||||||
//connect and get response
|
//connect and get response
|
||||||
final HttpURLConnection conn = openConnection(uripath, query);
|
final HttpURLConnection conn = (HttpURLConnection)url.openConnection();
|
||||||
try {
|
try {
|
||||||
conn.setRequestMethod(op.getType().toString());
|
conn.setRequestMethod(op.getType().toString());
|
||||||
conn.setDoOutput(op.getDoOutput());
|
conn.setDoOutput(op.getDoOutput());
|
||||||
|
@ -186,7 +191,17 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Map<String, Object> run(final HttpOpParam.Op op, final Path fspath,
|
/**
|
||||||
|
* Run a http operation.
|
||||||
|
* Connect to the http server, validate response, and obtain the JSON output.
|
||||||
|
*
|
||||||
|
* @param op http operation
|
||||||
|
* @param fspath file system path
|
||||||
|
* @param parameters parameters for the operation
|
||||||
|
* @return a JSON object, e.g. Object[], Map<String, Object>, etc.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private <T> T run(final HttpOpParam.Op op, final Path fspath,
|
||||||
final Param<?,?>... parameters) throws IOException {
|
final Param<?,?>... parameters) throws IOException {
|
||||||
final HttpURLConnection conn = httpConnect(op, fspath, parameters);
|
final HttpURLConnection conn = httpConnect(op, fspath, parameters);
|
||||||
validateResponse(op, conn);
|
validateResponse(op, conn);
|
||||||
|
@ -342,4 +357,30 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
||||||
final Map<String, Object> json = run(op, f, new RecursiveParam(recursive));
|
final Map<String, Object> json = run(op, f, new RecursiveParam(recursive));
|
||||||
return (Boolean)json.get(op.toString());
|
return (Boolean)json.get(op.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FSDataInputStream open(final Path f, final int buffersize
|
||||||
|
) throws IOException {
|
||||||
|
statistics.incrementReadOps(1);
|
||||||
|
final HttpOpParam.Op op = GetOpParam.Op.OPEN;
|
||||||
|
final URL url = toUrl(op, f, new BufferSizeParam(buffersize));
|
||||||
|
return new FSDataInputStream(new ByteRangeInputStream(url));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FileStatus[] listStatus(final Path f) throws IOException {
|
||||||
|
statistics.incrementReadOps(1);
|
||||||
|
|
||||||
|
final HttpOpParam.Op op = GetOpParam.Op.LISTSTATUS;
|
||||||
|
final Object[] array = run(op, f);
|
||||||
|
|
||||||
|
//convert FileStatus
|
||||||
|
final FileStatus[] statuses = new FileStatus[array.length];
|
||||||
|
for(int i = 0; i < array.length; i++) {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
final Map<String, Object> m = (Map<String, Object>)array[i];
|
||||||
|
statuses[i] = makeQualified(JsonUtil.toFileStatus(m), f);
|
||||||
|
}
|
||||||
|
return statuses;
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -26,7 +26,11 @@ public class GetOpParam extends HttpOpParam<GetOpParam.Op> {
|
||||||
|
|
||||||
/** Get operations. */
|
/** Get operations. */
|
||||||
public static enum Op implements HttpOpParam.Op {
|
public static enum Op implements HttpOpParam.Op {
|
||||||
|
OPEN(HttpURLConnection.HTTP_OK),
|
||||||
|
|
||||||
GETFILESTATUS(HttpURLConnection.HTTP_OK),
|
GETFILESTATUS(HttpURLConnection.HTTP_OK),
|
||||||
|
LISTSTATUS(HttpURLConnection.HTTP_OK),
|
||||||
|
|
||||||
NULL(HttpURLConnection.HTTP_NOT_IMPLEMENTED);
|
NULL(HttpURLConnection.HTTP_NOT_IMPLEMENTED);
|
||||||
|
|
||||||
final int expectedHttpResponseCode;
|
final int expectedHttpResponseCode;
|
||||||
|
|
|
@ -0,0 +1,49 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.web.resources;
|
||||||
|
|
||||||
|
/** Length parameter. */
|
||||||
|
public class LengthParam extends LongParam {
|
||||||
|
/** Parameter name. */
|
||||||
|
public static final String NAME = "length";
|
||||||
|
/** Default parameter value. */
|
||||||
|
public static final String DEFAULT = NULL;
|
||||||
|
|
||||||
|
private static final Domain DOMAIN = new Domain(NAME);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor.
|
||||||
|
* @param value the parameter value.
|
||||||
|
*/
|
||||||
|
public LengthParam(final Long value) {
|
||||||
|
super(DOMAIN, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor.
|
||||||
|
* @param str a string representation of the parameter value.
|
||||||
|
*/
|
||||||
|
public LengthParam(final String str) {
|
||||||
|
this(DOMAIN.parse(str));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getName() {
|
||||||
|
return NAME;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,49 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.web.resources;
|
||||||
|
|
||||||
|
/** Offset parameter. */
|
||||||
|
public class OffsetParam extends LongParam {
|
||||||
|
/** Parameter name. */
|
||||||
|
public static final String NAME = "offset";
|
||||||
|
/** Default parameter value. */
|
||||||
|
public static final String DEFAULT = "0";
|
||||||
|
|
||||||
|
private static final Domain DOMAIN = new Domain(NAME);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor.
|
||||||
|
* @param value the parameter value.
|
||||||
|
*/
|
||||||
|
public OffsetParam(final Long value) {
|
||||||
|
super(DOMAIN, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor.
|
||||||
|
* @param str a string representation of the parameter value.
|
||||||
|
*/
|
||||||
|
public OffsetParam(final String str) {
|
||||||
|
this(DOMAIN.parse(str));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getName() {
|
||||||
|
return NAME;
|
||||||
|
}
|
||||||
|
}
|
|
@ -39,7 +39,7 @@ public class UriFsPathParam extends StringParam {
|
||||||
|
|
||||||
/** @return the absolute path. */
|
/** @return the absolute path. */
|
||||||
public final String getAbsolutePath() {
|
public final String getAbsolutePath() {
|
||||||
final String path = getValue();
|
final String path = getValue(); //The first / has been stripped out.
|
||||||
return path == null? null: "/" + path;
|
return path == null? null: "/" + path;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,7 +63,6 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
||||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
|
||||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
import org.apache.hadoop.net.DNSToSwitchMapping;
|
import org.apache.hadoop.net.DNSToSwitchMapping;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
@ -1461,18 +1460,6 @@ public class MiniDFSCluster {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @return a {@link WebHdfsFileSystem} object.
|
|
||||||
*/
|
|
||||||
public WebHdfsFileSystem getWebHdfsFileSystem() throws IOException {
|
|
||||||
final String str = WebHdfsFileSystem.SCHEME + "://" + conf.get("dfs.http.address");
|
|
||||||
try {
|
|
||||||
return (WebHdfsFileSystem)FileSystem.get(new URI(str), conf);
|
|
||||||
} catch (URISyntaxException e) {
|
|
||||||
throw new IOException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return a {@link HftpFileSystem} object as specified user.
|
* @return a {@link HftpFileSystem} object as specified user.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -17,80 +17,63 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.web;
|
package org.apache.hadoop.hdfs.web;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
import static org.apache.hadoop.fs.FileSystemTestHelper.exists;
|
|
||||||
import static org.apache.hadoop.fs.FileSystemTestHelper.getDefaultBlockSize;
|
|
||||||
import static org.apache.hadoop.fs.FileSystemTestHelper.getTestRootPath;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.impl.Log4JLogger;
|
import org.apache.commons.logging.impl.Log4JLogger;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
||||||
import org.apache.hadoop.fs.FSMainOperationsBaseTest;
|
import org.apache.hadoop.fs.FSMainOperationsBaseTest;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
|
||||||
import org.apache.hadoop.hdfs.web.resources.ExceptionHandler;
|
import org.apache.hadoop.hdfs.web.resources.ExceptionHandler;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.junit.Assert;
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestFSMainOperationsWebHdfs extends FSMainOperationsBaseTest {
|
public class TestFSMainOperationsWebHdfs extends FSMainOperationsBaseTest {
|
||||||
{
|
{
|
||||||
((Log4JLogger)ExceptionHandler.LOG).getLogger().setLevel(Level.ALL);
|
((Log4JLogger)ExceptionHandler.LOG).getLogger().setLevel(Level.ALL);
|
||||||
|
((Log4JLogger)DatanodeWebHdfsMethods.LOG).getLogger().setLevel(Level.ALL);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final MiniDFSCluster cluster;
|
private static MiniDFSCluster cluster = null;
|
||||||
private static final Path defaultWorkingDirectory;
|
private static Path defaultWorkingDirectory;
|
||||||
|
|
||||||
static {
|
@BeforeClass
|
||||||
|
public static void setupCluster() {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
try {
|
try {
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
||||||
fSys = cluster.getWebHdfsFileSystem();
|
cluster.waitActive();
|
||||||
|
|
||||||
|
final String uri = WebHdfsFileSystem.SCHEME + "://"
|
||||||
|
+ conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
|
||||||
|
fSys = FileSystem.get(new URI(uri), conf);
|
||||||
defaultWorkingDirectory = fSys.getWorkingDirectory();
|
defaultWorkingDirectory = fSys.getWorkingDirectory();
|
||||||
} catch (IOException e) {
|
} catch (Exception e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void shutdownCluster() {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
cluster = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Path getDefaultWorkingDirectory() {
|
protected Path getDefaultWorkingDirectory() {
|
||||||
return defaultWorkingDirectory;
|
return defaultWorkingDirectory;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Override the following method without using position read. */
|
|
||||||
@Override
|
|
||||||
protected void writeReadAndDelete(int len) throws IOException {
|
|
||||||
Path path = getTestRootPath(fSys, "test/hadoop/file");
|
|
||||||
fSys.mkdirs(path.getParent());
|
|
||||||
|
|
||||||
FSDataOutputStream out =
|
|
||||||
fSys.create(path, false, 4096, (short) 1, getDefaultBlockSize() );
|
|
||||||
out.write(data, 0, len);
|
|
||||||
out.close();
|
|
||||||
|
|
||||||
Assert.assertTrue("Exists", exists(fSys, path));
|
|
||||||
Assert.assertEquals("Length", len, fSys.getFileStatus(path).getLen());
|
|
||||||
|
|
||||||
FSDataInputStream in = fSys.open(path);
|
|
||||||
for (int i = 0; i < len; i++) {
|
|
||||||
final int b = in.read();
|
|
||||||
Assert.assertEquals("Position " + i, data[i], b);
|
|
||||||
}
|
|
||||||
in.close();
|
|
||||||
Assert.assertTrue("Deleted", fSys.delete(path, false));
|
|
||||||
Assert.assertFalse("No longer exists", exists(fSys, path));
|
|
||||||
}
|
|
||||||
|
|
||||||
|
//The following test failed since WebHdfsFileSystem did not support
|
||||||
//The following tests failed for HftpFileSystem,
|
//authentication.
|
||||||
//Disable it for WebHdfsFileSystem
|
//Disable it.
|
||||||
@Test
|
|
||||||
public void testListStatusThrowsExceptionForNonExistentFile() {}
|
|
||||||
@Test
|
@Test
|
||||||
public void testListStatusThrowsExceptionForUnreadableDir() {}
|
public void testListStatusThrowsExceptionForUnreadableDir() {}
|
||||||
@Test
|
|
||||||
public void testGlobStatusThrowsExceptionForNonExistentFile() {}
|
|
||||||
}
|
}
|
|
@ -19,23 +19,23 @@
|
||||||
package org.apache.hadoop.hdfs.web;
|
package org.apache.hadoop.hdfs.web;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
||||||
import org.apache.hadoop.fs.FileSystemContractBaseTest;
|
import org.apache.hadoop.fs.FileSystemContractBaseTest;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
|
||||||
|
|
||||||
public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest {
|
public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest {
|
||||||
|
private static final Configuration conf = new Configuration();
|
||||||
private static final MiniDFSCluster cluster;
|
private static final MiniDFSCluster cluster;
|
||||||
private String defaultWorkingDirectory;
|
private String defaultWorkingDirectory;
|
||||||
|
|
||||||
static {
|
static {
|
||||||
Configuration conf = new Configuration();
|
|
||||||
try {
|
try {
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
||||||
|
cluster.waitActive();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
|
@ -43,44 +43,14 @@ public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void setUp() throws Exception {
|
protected void setUp() throws Exception {
|
||||||
fs = cluster.getWebHdfsFileSystem();
|
final String uri = WebHdfsFileSystem.SCHEME + "://"
|
||||||
defaultWorkingDirectory = "/user/"
|
+ conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
|
||||||
+ UserGroupInformation.getCurrentUser().getShortUserName();
|
fs = FileSystem.get(new URI(uri), conf);
|
||||||
|
defaultWorkingDirectory = fs.getWorkingDirectory().toUri().getPath();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected String getDefaultWorkingDirectory() {
|
protected String getDefaultWorkingDirectory() {
|
||||||
return defaultWorkingDirectory;
|
return defaultWorkingDirectory;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Override the following method without using position read. */
|
|
||||||
@Override
|
|
||||||
protected void writeReadAndDelete(int len) throws IOException {
|
|
||||||
Path path = path("/test/hadoop/file");
|
|
||||||
|
|
||||||
fs.mkdirs(path.getParent());
|
|
||||||
|
|
||||||
FSDataOutputStream out = fs.create(path, false,
|
|
||||||
fs.getConf().getInt("io.file.buffer.size", 4096),
|
|
||||||
(short) 1, getBlockSize());
|
|
||||||
out.write(data, 0, len);
|
|
||||||
out.close();
|
|
||||||
|
|
||||||
assertTrue("Exists", fs.exists(path));
|
|
||||||
assertEquals("Length", len, fs.getFileStatus(path).getLen());
|
|
||||||
|
|
||||||
FSDataInputStream in = fs.open(path);
|
|
||||||
for (int i = 0; i < len; i++) {
|
|
||||||
final int b = in.read();
|
|
||||||
assertEquals("Position " + i, data[i], b);
|
|
||||||
}
|
|
||||||
in.close();
|
|
||||||
|
|
||||||
assertTrue("Deleted", fs.delete(path, false));
|
|
||||||
assertFalse("No longer exists", fs.exists(path));
|
|
||||||
}
|
|
||||||
|
|
||||||
//The following test failed for HftpFileSystem,
|
|
||||||
//Disable it for WebHdfsFileSystem
|
|
||||||
public void testListStatusThrowsExceptionForNonExistentFile() {}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue