svn merge -c 1177905 from trunk for HDFS-2348.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1189446 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a86d4addb4
commit
bb20f94c2f
|
@ -322,6 +322,9 @@ Release 0.23.0 - Unreleased
|
||||||
HDFS-2340. Support getFileBlockLocations and getDelegationToken in webhdfs.
|
HDFS-2340. Support getFileBlockLocations and getDelegationToken in webhdfs.
|
||||||
(szetszwo)
|
(szetszwo)
|
||||||
|
|
||||||
|
HDFS-2348. Support getContentSummary and getFileChecksum in webhdfs.
|
||||||
|
(szetszwo)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
HDFS-1875. MiniDFSCluster hard-codes dfs.datanode.address to localhost
|
HDFS-1875. MiniDFSCluster hard-codes dfs.datanode.address to localhost
|
||||||
|
|
|
@ -46,10 +46,12 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CreateFlag;
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
|
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
|
import org.apache.hadoop.hdfs.web.JsonUtil;
|
||||||
import org.apache.hadoop.hdfs.web.ParamFilter;
|
import org.apache.hadoop.hdfs.web.ParamFilter;
|
||||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||||
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
|
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
|
||||||
|
@ -219,13 +221,13 @@ public class DatanodeWebHdfsMethods {
|
||||||
|
|
||||||
final String fullpath = path.getAbsolutePath();
|
final String fullpath = path.getAbsolutePath();
|
||||||
final DataNode datanode = (DataNode)context.getAttribute("datanode");
|
final DataNode datanode = (DataNode)context.getAttribute("datanode");
|
||||||
|
final Configuration conf = new Configuration(datanode.getConf());
|
||||||
|
final InetSocketAddress nnRpcAddr = NameNode.getAddress(conf);
|
||||||
|
final DFSClient dfsclient = new DFSClient(nnRpcAddr, conf);
|
||||||
|
|
||||||
switch(op.getValue()) {
|
switch(op.getValue()) {
|
||||||
case OPEN:
|
case OPEN:
|
||||||
{
|
{
|
||||||
final Configuration conf = new Configuration(datanode.getConf());
|
|
||||||
final InetSocketAddress nnRpcAddr = NameNode.getAddress(conf);
|
|
||||||
final DFSClient dfsclient = new DFSClient(nnRpcAddr, conf);
|
|
||||||
final int b = bufferSize.getValue(conf);
|
final int b = bufferSize.getValue(conf);
|
||||||
final DFSDataInputStream in = new DFSClient.DFSDataInputStream(
|
final DFSDataInputStream in = new DFSClient.DFSDataInputStream(
|
||||||
dfsclient.open(fullpath, b, true));
|
dfsclient.open(fullpath, b, true));
|
||||||
|
@ -244,6 +246,12 @@ public class DatanodeWebHdfsMethods {
|
||||||
};
|
};
|
||||||
return Response.ok(streaming).type(MediaType.APPLICATION_OCTET_STREAM).build();
|
return Response.ok(streaming).type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||||
}
|
}
|
||||||
|
case GETFILECHECKSUM:
|
||||||
|
{
|
||||||
|
final MD5MD5CRC32FileChecksum checksum = dfsclient.getFileChecksum(fullpath);
|
||||||
|
final String js = JsonUtil.toJsonString(checksum);
|
||||||
|
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
throw new UnsupportedOperationException(op + " is not supported");
|
throw new UnsupportedOperationException(op + " is not supported");
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,6 +46,7 @@ import javax.ws.rs.core.StreamingOutput;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.fs.ContentSummary;
|
||||||
import org.apache.hadoop.fs.Options;
|
import org.apache.hadoop.fs.Options;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||||
|
@ -112,7 +113,9 @@ public class NamenodeWebHdfsMethods {
|
||||||
private static DatanodeInfo chooseDatanode(final NameNode namenode,
|
private static DatanodeInfo chooseDatanode(final NameNode namenode,
|
||||||
final String path, final HttpOpParam.Op op, final long openOffset
|
final String path, final HttpOpParam.Op op, final long openOffset
|
||||||
) throws IOException {
|
) throws IOException {
|
||||||
if (op == GetOpParam.Op.OPEN || op == PostOpParam.Op.APPEND) {
|
if (op == GetOpParam.Op.OPEN
|
||||||
|
|| op == GetOpParam.Op.GETFILECHECKSUM
|
||||||
|
|| op == PostOpParam.Op.APPEND) {
|
||||||
final NamenodeProtocols np = namenode.getRpcServer();
|
final NamenodeProtocols np = namenode.getRpcServer();
|
||||||
final HdfsFileStatus status = np.getFileInfo(path);
|
final HdfsFileStatus status = np.getFileInfo(path);
|
||||||
final long len = status.getLen();
|
final long len = status.getLen();
|
||||||
|
@ -431,6 +434,18 @@ public class NamenodeWebHdfsMethods {
|
||||||
final StreamingOutput streaming = getListingStream(np, fullpath);
|
final StreamingOutput streaming = getListingStream(np, fullpath);
|
||||||
return Response.ok(streaming).type(MediaType.APPLICATION_JSON).build();
|
return Response.ok(streaming).type(MediaType.APPLICATION_JSON).build();
|
||||||
}
|
}
|
||||||
|
case GETCONTENTSUMMARY:
|
||||||
|
{
|
||||||
|
final ContentSummary contentsummary = np.getContentSummary(fullpath);
|
||||||
|
final String js = JsonUtil.toJsonString(contentsummary);
|
||||||
|
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||||
|
}
|
||||||
|
case GETFILECHECKSUM:
|
||||||
|
{
|
||||||
|
final URI uri = redirectURI(namenode, ugi, delegation, fullpath,
|
||||||
|
op.getValue(), -1L);
|
||||||
|
return Response.temporaryRedirect(uri).build();
|
||||||
|
}
|
||||||
case GETDELEGATIONTOKEN:
|
case GETDELEGATIONTOKEN:
|
||||||
{
|
{
|
||||||
final Token<? extends TokenIdentifier> token = generateDelegationToken(
|
final Token<? extends TokenIdentifier> token = generateDelegationToken(
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.web;
|
package org.apache.hadoop.hdfs.web;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.io.DataInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -24,6 +26,8 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.ContentSummary;
|
||||||
|
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
@ -34,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.io.MD5Hash;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
|
@ -398,4 +403,73 @@ public class JsonUtil {
|
||||||
return new LocatedBlocks(fileLength, isUnderConstruction, locatedBlocks,
|
return new LocatedBlocks(fileLength, isUnderConstruction, locatedBlocks,
|
||||||
lastLocatedBlock, isLastBlockComplete);
|
lastLocatedBlock, isLastBlockComplete);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Convert a ContentSummary to a Json string. */
|
||||||
|
public static String toJsonString(final ContentSummary contentsummary
|
||||||
|
) throws IOException {
|
||||||
|
if (contentsummary == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
final Map<String, Object> m = jsonMap.get();
|
||||||
|
m.put("length", contentsummary.getLength());
|
||||||
|
m.put("fileCount", contentsummary.getFileCount());
|
||||||
|
m.put("directoryCount", contentsummary.getDirectoryCount());
|
||||||
|
m.put("quota", contentsummary.getQuota());
|
||||||
|
m.put("spaceConsumed", contentsummary.getSpaceConsumed());
|
||||||
|
m.put("spaceQuota", contentsummary.getSpaceQuota());
|
||||||
|
return JSON.toString(m);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Convert a Json map to a ContentSummary. */
|
||||||
|
public static ContentSummary toContentSummary(final Map<String, Object> m
|
||||||
|
) throws IOException {
|
||||||
|
if (m == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
final long length = (Long)m.get("length");
|
||||||
|
final long fileCount = (Long)m.get("fileCount");
|
||||||
|
final long directoryCount = (Long)m.get("directoryCount");
|
||||||
|
final long quota = (Long)m.get("quota");
|
||||||
|
final long spaceConsumed = (Long)m.get("spaceConsumed");
|
||||||
|
final long spaceQuota = (Long)m.get("spaceQuota");
|
||||||
|
|
||||||
|
return new ContentSummary(length, fileCount, directoryCount,
|
||||||
|
quota, spaceConsumed, spaceQuota);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Convert a MD5MD5CRC32FileChecksum to a Json string. */
|
||||||
|
public static String toJsonString(final MD5MD5CRC32FileChecksum checksum
|
||||||
|
) throws IOException {
|
||||||
|
if (checksum == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
final Map<String, Object> m = jsonMap.get();
|
||||||
|
final byte[] bytes = checksum.getBytes();
|
||||||
|
final DataInputStream in = new DataInputStream(new ByteArrayInputStream(bytes));
|
||||||
|
final int bytesPerCRC = in.readInt();
|
||||||
|
final long crcPerBlock = in.readLong();
|
||||||
|
final MD5Hash md5 = MD5Hash.read(in);
|
||||||
|
m.put("bytesPerCRC", bytesPerCRC);
|
||||||
|
m.put("crcPerBlock", crcPerBlock);
|
||||||
|
m.put("md5", "" + md5);
|
||||||
|
return JSON.toString(m);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Convert a Json map to a MD5MD5CRC32FileChecksum. */
|
||||||
|
public static MD5MD5CRC32FileChecksum toMD5MD5CRC32FileChecksum(
|
||||||
|
final Map<String, Object> m) throws IOException {
|
||||||
|
if (m == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
final int bytesPerCRC = (int)(long)(Long)m.get("bytesPerCRC");
|
||||||
|
final long crcPerBlock = (Long)m.get("crcPerBlock");
|
||||||
|
final String md5 = (String)m.get("md5");
|
||||||
|
|
||||||
|
return new MD5MD5CRC32FileChecksum(bytesPerCRC, crcPerBlock,
|
||||||
|
new MD5Hash(md5));
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -33,10 +33,12 @@ import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.BlockLocation;
|
import org.apache.hadoop.fs.BlockLocation;
|
||||||
|
import org.apache.hadoop.fs.ContentSummary;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
|
||||||
import org.apache.hadoop.fs.Options;
|
import org.apache.hadoop.fs.Options;
|
||||||
import org.apache.hadoop.fs.ParentNotDirectoryException;
|
import org.apache.hadoop.fs.ParentNotDirectoryException;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -449,4 +451,23 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
||||||
new LengthParam(length));
|
new LengthParam(length));
|
||||||
return DFSUtil.locatedBlocks2Locations(JsonUtil.toLocatedBlocks(m));
|
return DFSUtil.locatedBlocks2Locations(JsonUtil.toLocatedBlocks(m));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ContentSummary getContentSummary(final Path p) throws IOException {
|
||||||
|
statistics.incrementReadOps(1);
|
||||||
|
|
||||||
|
final HttpOpParam.Op op = GetOpParam.Op.GETCONTENTSUMMARY;
|
||||||
|
final Map<String, Object> m = run(op, p);
|
||||||
|
return JsonUtil.toContentSummary(m);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MD5MD5CRC32FileChecksum getFileChecksum(final Path p
|
||||||
|
) throws IOException {
|
||||||
|
statistics.incrementReadOps(1);
|
||||||
|
|
||||||
|
final HttpOpParam.Op op = GetOpParam.Op.GETFILECHECKSUM;
|
||||||
|
final Map<String, Object> m = run(op, p);
|
||||||
|
return JsonUtil.toMD5MD5CRC32FileChecksum(m);
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -28,6 +28,8 @@ public class GetOpParam extends HttpOpParam<GetOpParam.Op> {
|
||||||
|
|
||||||
GETFILESTATUS(HttpURLConnection.HTTP_OK),
|
GETFILESTATUS(HttpURLConnection.HTTP_OK),
|
||||||
LISTSTATUS(HttpURLConnection.HTTP_OK),
|
LISTSTATUS(HttpURLConnection.HTTP_OK),
|
||||||
|
GETCONTENTSUMMARY(HttpURLConnection.HTTP_OK),
|
||||||
|
GETFILECHECKSUM(HttpURLConnection.HTTP_OK),
|
||||||
|
|
||||||
GETDELEGATIONTOKEN(HttpURLConnection.HTTP_OK),
|
GETDELEGATIONTOKEN(HttpURLConnection.HTTP_OK),
|
||||||
|
|
||||||
|
|
|
@ -26,6 +26,7 @@ import static org.junit.Assert.fail;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
import org.apache.commons.logging.impl.Log4JLogger;
|
import org.apache.commons.logging.impl.Log4JLogger;
|
||||||
|
@ -37,6 +38,7 @@ import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -399,15 +401,40 @@ public class TestDistributedFileSystem {
|
||||||
RAN.setSeed(seed);
|
RAN.setSeed(seed);
|
||||||
|
|
||||||
final Configuration conf = getTestConfiguration();
|
final Configuration conf = getTestConfiguration();
|
||||||
|
conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
|
||||||
conf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "localhost");
|
conf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "localhost");
|
||||||
|
|
||||||
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
||||||
final FileSystem hdfs = cluster.getFileSystem();
|
final FileSystem hdfs = cluster.getFileSystem();
|
||||||
final String hftpuri = "hftp://" + conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
|
|
||||||
System.out.println("hftpuri=" + hftpuri);
|
|
||||||
final FileSystem hftp = new Path(hftpuri).getFileSystem(conf);
|
|
||||||
|
|
||||||
final String dir = "/filechecksum";
|
final String nnAddr = conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
|
||||||
|
final UserGroupInformation current = UserGroupInformation.getCurrentUser();
|
||||||
|
final UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
|
||||||
|
current.getShortUserName() + "x", new String[]{"user"});
|
||||||
|
|
||||||
|
//hftp
|
||||||
|
final String hftpuri = "hftp://" + nnAddr;
|
||||||
|
System.out.println("hftpuri=" + hftpuri);
|
||||||
|
final FileSystem hftp = ugi.doAs(
|
||||||
|
new PrivilegedExceptionAction<FileSystem>() {
|
||||||
|
@Override
|
||||||
|
public FileSystem run() throws Exception {
|
||||||
|
return new Path(hftpuri).getFileSystem(conf);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
//webhdfs
|
||||||
|
final String webhdfsuri = WebHdfsFileSystem.SCHEME + "://" + nnAddr;
|
||||||
|
System.out.println("webhdfsuri=" + webhdfsuri);
|
||||||
|
final FileSystem webhdfs = ugi.doAs(
|
||||||
|
new PrivilegedExceptionAction<FileSystem>() {
|
||||||
|
@Override
|
||||||
|
public FileSystem run() throws Exception {
|
||||||
|
return new Path(webhdfsuri).getFileSystem(conf);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
final Path dir = new Path("/filechecksum");
|
||||||
final int block_size = 1024;
|
final int block_size = 1024;
|
||||||
final int buffer_size = conf.getInt("io.file.buffer.size", 4096);
|
final int buffer_size = conf.getInt("io.file.buffer.size", 4096);
|
||||||
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 512);
|
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 512);
|
||||||
|
@ -431,7 +458,8 @@ public class TestDistributedFileSystem {
|
||||||
//compute checksum
|
//compute checksum
|
||||||
final FileChecksum hdfsfoocs = hdfs.getFileChecksum(foo);
|
final FileChecksum hdfsfoocs = hdfs.getFileChecksum(foo);
|
||||||
System.out.println("hdfsfoocs=" + hdfsfoocs);
|
System.out.println("hdfsfoocs=" + hdfsfoocs);
|
||||||
|
|
||||||
|
//hftp
|
||||||
final FileChecksum hftpfoocs = hftp.getFileChecksum(foo);
|
final FileChecksum hftpfoocs = hftp.getFileChecksum(foo);
|
||||||
System.out.println("hftpfoocs=" + hftpfoocs);
|
System.out.println("hftpfoocs=" + hftpfoocs);
|
||||||
|
|
||||||
|
@ -439,6 +467,14 @@ public class TestDistributedFileSystem {
|
||||||
final FileChecksum qfoocs = hftp.getFileChecksum(qualified);
|
final FileChecksum qfoocs = hftp.getFileChecksum(qualified);
|
||||||
System.out.println("qfoocs=" + qfoocs);
|
System.out.println("qfoocs=" + qfoocs);
|
||||||
|
|
||||||
|
//webhdfs
|
||||||
|
final FileChecksum webhdfsfoocs = webhdfs.getFileChecksum(foo);
|
||||||
|
System.out.println("webhdfsfoocs=" + webhdfsfoocs);
|
||||||
|
|
||||||
|
final Path webhdfsqualified = new Path(webhdfsuri + dir, "foo" + n);
|
||||||
|
final FileChecksum webhdfs_qfoocs = webhdfs.getFileChecksum(webhdfsqualified);
|
||||||
|
System.out.println("webhdfs_qfoocs=" + webhdfs_qfoocs);
|
||||||
|
|
||||||
//write another file
|
//write another file
|
||||||
final Path bar = new Path(dir, "bar" + n);
|
final Path bar = new Path(dir, "bar" + n);
|
||||||
{
|
{
|
||||||
|
@ -454,24 +490,40 @@ public class TestDistributedFileSystem {
|
||||||
assertEquals(hdfsfoocs.hashCode(), barhashcode);
|
assertEquals(hdfsfoocs.hashCode(), barhashcode);
|
||||||
assertEquals(hdfsfoocs, barcs);
|
assertEquals(hdfsfoocs, barcs);
|
||||||
|
|
||||||
|
//hftp
|
||||||
assertEquals(hftpfoocs.hashCode(), barhashcode);
|
assertEquals(hftpfoocs.hashCode(), barhashcode);
|
||||||
assertEquals(hftpfoocs, barcs);
|
assertEquals(hftpfoocs, barcs);
|
||||||
|
|
||||||
assertEquals(qfoocs.hashCode(), barhashcode);
|
assertEquals(qfoocs.hashCode(), barhashcode);
|
||||||
assertEquals(qfoocs, barcs);
|
assertEquals(qfoocs, barcs);
|
||||||
|
|
||||||
|
//webhdfs
|
||||||
|
assertEquals(webhdfsfoocs.hashCode(), barhashcode);
|
||||||
|
assertEquals(webhdfsfoocs, barcs);
|
||||||
|
|
||||||
|
assertEquals(webhdfs_qfoocs.hashCode(), barhashcode);
|
||||||
|
assertEquals(webhdfs_qfoocs, barcs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
hdfs.setPermission(dir, new FsPermission((short)0));
|
||||||
{ //test permission error on hftp
|
{ //test permission error on hftp
|
||||||
hdfs.setPermission(new Path(dir), new FsPermission((short)0));
|
|
||||||
try {
|
try {
|
||||||
final String username = UserGroupInformation.getCurrentUser().getShortUserName() + "1";
|
hftp.getFileChecksum(qualified);
|
||||||
final HftpFileSystem hftp2 = cluster.getHftpFileSystemAs(username, conf, 0, "somegroup");
|
|
||||||
hftp2.getFileChecksum(qualified);
|
|
||||||
fail();
|
fail();
|
||||||
} catch(IOException ioe) {
|
} catch(IOException ioe) {
|
||||||
FileSystem.LOG.info("GOOD: getting an exception", ioe);
|
FileSystem.LOG.info("GOOD: getting an exception", ioe);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{ //test permission error on webhdfs
|
||||||
|
try {
|
||||||
|
webhdfs.getFileChecksum(webhdfsqualified);
|
||||||
|
fail();
|
||||||
|
} catch(IOException ioe) {
|
||||||
|
FileSystem.LOG.info("GOOD: getting an exception", ioe);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
hdfs.setPermission(dir, new FsPermission((short)0777));
|
||||||
}
|
}
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||||
import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
||||||
|
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
|
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
|
||||||
|
@ -768,6 +769,11 @@ public class TestQuota {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void checkContentSummary(final ContentSummary expected,
|
||||||
|
final ContentSummary computed) {
|
||||||
|
assertEquals(expected.toString(), computed.toString());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Violate a space quota using files of size < 1 block. Test that block
|
* Violate a space quota using files of size < 1 block. Test that block
|
||||||
* allocation conservatively assumes that for quota checking the entire
|
* allocation conservatively assumes that for quota checking the entire
|
||||||
|
@ -779,12 +785,18 @@ public class TestQuota {
|
||||||
Configuration conf = new HdfsConfiguration();
|
Configuration conf = new HdfsConfiguration();
|
||||||
final int BLOCK_SIZE = 6 * 1024;
|
final int BLOCK_SIZE = 6 * 1024;
|
||||||
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
||||||
|
conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
|
||||||
MiniDFSCluster cluster =
|
MiniDFSCluster cluster =
|
||||||
new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
FileSystem fs = cluster.getFileSystem();
|
FileSystem fs = cluster.getFileSystem();
|
||||||
DFSAdmin admin = new DFSAdmin(conf);
|
DFSAdmin admin = new DFSAdmin(conf);
|
||||||
|
|
||||||
|
final String nnAddr = conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
|
||||||
|
final String webhdfsuri = WebHdfsFileSystem.SCHEME + "://" + nnAddr;
|
||||||
|
System.out.println("webhdfsuri=" + webhdfsuri);
|
||||||
|
final FileSystem webhdfs = new Path(webhdfsuri).getFileSystem(conf);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Path dir = new Path("/test");
|
Path dir = new Path("/test");
|
||||||
Path file1 = new Path("/test/test1");
|
Path file1 = new Path("/test/test1");
|
||||||
|
@ -804,6 +816,7 @@ public class TestQuota {
|
||||||
DFSTestUtil.createFile(fs, file1, FILE_SIZE, (short) 3, 1L);
|
DFSTestUtil.createFile(fs, file1, FILE_SIZE, (short) 3, 1L);
|
||||||
DFSTestUtil.waitReplication(fs, file1, (short) 3);
|
DFSTestUtil.waitReplication(fs, file1, (short) 3);
|
||||||
c = fs.getContentSummary(dir);
|
c = fs.getContentSummary(dir);
|
||||||
|
checkContentSummary(c, webhdfs.getContentSummary(dir));
|
||||||
assertEquals("Quota is half consumed", QUOTA_SIZE / 2,
|
assertEquals("Quota is half consumed", QUOTA_SIZE / 2,
|
||||||
c.getSpaceConsumed());
|
c.getSpaceConsumed());
|
||||||
|
|
||||||
|
@ -834,12 +847,18 @@ public class TestQuota {
|
||||||
Configuration conf = new HdfsConfiguration();
|
Configuration conf = new HdfsConfiguration();
|
||||||
final int BLOCK_SIZE = 6 * 1024;
|
final int BLOCK_SIZE = 6 * 1024;
|
||||||
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
||||||
|
conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
|
||||||
MiniDFSCluster cluster =
|
MiniDFSCluster cluster =
|
||||||
new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
FileSystem fs = cluster.getFileSystem();
|
FileSystem fs = cluster.getFileSystem();
|
||||||
DFSAdmin admin = new DFSAdmin(conf);
|
DFSAdmin admin = new DFSAdmin(conf);
|
||||||
|
|
||||||
|
final String nnAddr = conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
|
||||||
|
final String webhdfsuri = WebHdfsFileSystem.SCHEME + "://" + nnAddr;
|
||||||
|
System.out.println("webhdfsuri=" + webhdfsuri);
|
||||||
|
final FileSystem webhdfs = new Path(webhdfsuri).getFileSystem(conf);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Path dir = new Path("/test");
|
Path dir = new Path("/test");
|
||||||
boolean exceededQuota = false;
|
boolean exceededQuota = false;
|
||||||
|
@ -872,6 +891,7 @@ public class TestQuota {
|
||||||
|
|
||||||
// Should account for all 59 files (almost QUOTA_SIZE)
|
// Should account for all 59 files (almost QUOTA_SIZE)
|
||||||
c = fs.getContentSummary(dir);
|
c = fs.getContentSummary(dir);
|
||||||
|
checkContentSummary(c, webhdfs.getContentSummary(dir));
|
||||||
assertEquals("Invalid space consumed", 59 * FILE_SIZE * 3,
|
assertEquals("Invalid space consumed", 59 * FILE_SIZE * 3,
|
||||||
c.getSpaceConsumed());
|
c.getSpaceConsumed());
|
||||||
assertEquals("Invalid space consumed", QUOTA_SIZE - (59 * FILE_SIZE * 3),
|
assertEquals("Invalid space consumed", QUOTA_SIZE - (59 * FILE_SIZE * 3),
|
||||||
|
|
Loading…
Reference in New Issue