HDFS-8622. Implement GETCONTENTSUMMARY operation for WebImageViewer. Contributed by Jagadesh Kiran N.

This commit is contained in:
Akira Ajisaka 2015-08-13 11:31:56 +09:00
parent 6cc8e38db5
commit 40f815131e
5 changed files with 345 additions and 0 deletions

View File

@ -780,6 +780,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8887. Expose storage type and storage ID in BlockLocation. (wang)
HDFS-8622. Implement GETCONTENTSUMMARY operation for WebImageViewer.
(Jagadesh Kiran N via aajisaka)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -107,6 +107,9 @@ public void channelRead0(ChannelHandlerContext ctx, HttpRequest request)
case "LISTXATTRS":
content = image.listXAttrs(path);
break;
case "GETCONTENTSUMMARY":
content = image.getContentSummary(path);
break;
default:
throw new IllegalArgumentException("Invalid value for webhdfs parameter"
+ " \"op\"");

View File

@ -48,6 +48,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf;
import org.apache.hadoop.hdfs.server.namenode.FSImageUtil;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeId;
import org.apache.hadoop.hdfs.web.JsonUtil;
import org.apache.hadoop.hdfs.web.resources.XAttrEncodingParam;
@ -310,6 +311,95 @@ private List<Map<String, Object>> getFileStatusList(String path)
return list;
}
/**
* Return the JSON formatted ContentSummary of the specified path.
* @param path a path specifies a file or directory
* @return JSON formatted ContentSummary
* @throws IOException if failed to serialize ContentSummary to JSON.
*/
String getContentSummary(String path) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return "{\"ContentSummary\":\n"
+ mapper.writeValueAsString(getContentSummaryMap(path)) + "\n}\n";
}
private Map<String, Object> getContentSummaryMap(String path)
throws IOException {
long id = lookup(path);
INode inode = fromINodeId(id);
long spaceQuota = 0;
long nsQuota = 0;
long[] data = new long[4];
FsImageProto.INodeSection.INodeFile f = inode.getFile();
switch (inode.getType()) {
case FILE:
data[0] = 0;
data[1] = 1;
data[2] = getFileSize(f);
nsQuota = -1;
data[3] = data[2] * f.getReplication();
spaceQuota = -1;
return fillSummaryMap(spaceQuota, nsQuota, data);
case DIRECTORY:
fillDirSummary(id, data);
nsQuota = inode.getDirectory().getNsQuota();
spaceQuota = inode.getDirectory().getDsQuota();
return fillSummaryMap(spaceQuota, nsQuota, data);
case SYMLINK:
data[0] = 0;
data[1] = 1;
data[2] = 0;
nsQuota = -1;
data[3] = 0;
spaceQuota = -1;
return fillSummaryMap(spaceQuota, nsQuota, data);
default:
return null;
}
}
private Map<String, Object> fillSummaryMap(long spaceQuota,
long nsQuota, long[] data) {
Map<String, Object> map = Maps.newHashMap();
map.put("directoryCount", data[0]);
map.put("fileCount", data[1]);
map.put("length", data[2]);
map.put("quota", nsQuota);
map.put("spaceConsumed", data[3]);
map.put("spaceQuota", spaceQuota);
return map;
}
private void fillDirSummary(long id, long[] data) throws IOException {
data[0]++;
long[] children = dirmap.get(id);
if (children == null) {
return;
}
for (long cid : children) {
INode node = fromINodeId(cid);
switch (node.getType()) {
case DIRECTORY:
fillDirSummary(cid, data);
break;
case FILE:
FsImageProto.INodeSection.INodeFile f = node.getFile();
long curLength = getFileSize(f);
data[1]++;
data[2] += curLength;
data[3] += (curLength) * (f.getReplication());
break;
case SYMLINK:
data[1]++;
break;
default:
break;
}
}
}
/**
* Return the JSON formatted XAttrNames of the specified file.
*

View File

@ -100,6 +100,7 @@ The Web processor now supports the following operations:
* [GETACLSTATUS](./WebHDFS.html#Get_ACL_Status)
* [GETXATTRS](./WebHDFS.html#Get_an_XAttr)
* [LISTXATTRS](./WebHDFS.html#List_all_XAttrs)
* [CONTENTSUMMARY] (./WebHDFS.html#Get_Content_Summary_of_a_Directory)
### XML Processor

View File

@ -0,0 +1,248 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.tools.offlineImageViewer;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.net.NetUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Tests GETCONTENTSUMMARY operation for WebImageViewer
*/
public class TestOfflineImageViewerForContentSummary {
private static final Log LOG = LogFactory
.getLog(TestOfflineImageViewerForContentSummary.class);
private static File originalFsimage = null;
private static ContentSummary summaryFromDFS = null;
private static ContentSummary emptyDirSummaryFromDFS = null;
private static ContentSummary fileSummaryFromDFS = null;
private static ContentSummary symLinkSummaryFromDFS = null;
private static ContentSummary symLinkSummaryForDirContainsFromDFS=null;
/**
* Create a populated namespace for later testing. Save its contents to a
* data structure and store its fsimage location. We only want to generate
* the fsimage file once and use it for multiple tests.
*/
@BeforeClass
public static void createOriginalFSImage() throws IOException {
MiniDFSCluster cluster = null;
Configuration conf = new Configuration();
try {
cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
DistributedFileSystem hdfs = cluster.getFileSystem();
Path parentDir = new Path("/parentDir");
Path childDir1 = new Path(parentDir, "childDir1");
Path childDir2 = new Path(parentDir, "childDir2");
Path dirForLinks = new Path("/dirForLinks");
hdfs.mkdirs(parentDir);
hdfs.mkdirs(childDir1);
hdfs.mkdirs(childDir2);
hdfs.mkdirs(dirForLinks);
hdfs.setQuota(parentDir, 10, 1024*1024*1024);
Path file1OnParentDir = new Path(parentDir, "file1");
try (FSDataOutputStream o = hdfs.create(file1OnParentDir)) {
o.write("123".getBytes());
}
try (FSDataOutputStream o = hdfs.create(new Path(parentDir, "file2"))) {
o.write("1234".getBytes());
}
try (FSDataOutputStream o = hdfs.create(new Path(childDir1, "file3"))) {
o.write("123".getBytes());
}
try (FSDataOutputStream o = hdfs.create(new Path(parentDir, "file4"))) {
o.write("123".getBytes());
}
Path link1 = new Path("/link1");
Path link2 = new Path("/dirForLinks/linkfordir1");
hdfs.createSymlink(new Path("/parentDir/file4"), link1, true);
summaryFromDFS = hdfs.getContentSummary(parentDir);
emptyDirSummaryFromDFS = hdfs.getContentSummary(childDir2);
fileSummaryFromDFS = hdfs.getContentSummary(file1OnParentDir);
symLinkSummaryFromDFS = hdfs.getContentSummary(link1);
hdfs.createSymlink(childDir1, link2, true);
symLinkSummaryForDirContainsFromDFS = hdfs.getContentSummary(new Path(
"/dirForLinks"));
// Write results to the fsimage file
hdfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER, false);
hdfs.saveNamespace();
// Determine the location of the fsimage file
originalFsimage = FSImageTestUtil.findLatestImageFile(FSImageTestUtil
.getFSImage(cluster.getNameNode()).getStorage().getStorageDir(0));
if (originalFsimage == null) {
throw new RuntimeException("Didn't generate or can't find fsimage");
}
LOG.debug("original FS image file is " + originalFsimage);
} finally {
if (cluster != null)
cluster.shutdown();
}
}
@AfterClass
public static void deleteOriginalFSImage() {
if (originalFsimage != null && originalFsimage.exists()) {
originalFsimage.delete();
}
}
@Test
public void testGetContentSummaryForEmptyDirectory() throws Exception {
try (WebImageViewer viewer = new WebImageViewer(
NetUtils.createSocketAddr("localhost:0"))) {
viewer.initServer(originalFsimage.getAbsolutePath());
int port = viewer.getPort();
URL url = new URL("http://localhost:" + port
+ "/webhdfs/v1/parentDir/childDir2?op=GETCONTENTSUMMARY");
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("GET");
connection.connect();
assertEquals(HttpURLConnection.HTTP_OK, connection.getResponseCode());
// create a WebHdfsFileSystem instance
URI uri = new URI("webhdfs://localhost:" + String.valueOf(port));
Configuration conf = new Configuration();
WebHdfsFileSystem webfs = (WebHdfsFileSystem) FileSystem.get(uri, conf);
ContentSummary summary = webfs.getContentSummary(new Path(
"/parentDir/childDir2"));
verifyContentSummary(emptyDirSummaryFromDFS, summary);
}
}
@Test
public void testGetContentSummaryForDirectory() throws Exception {
try (WebImageViewer viewer = new WebImageViewer(
NetUtils.createSocketAddr("localhost:0"))) {
viewer.initServer(originalFsimage.getAbsolutePath());
int port = viewer.getPort();
URL url = new URL("http://localhost:" + port
+ "/webhdfs/v1/parentDir/?op=GETCONTENTSUMMARY");
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("GET");
connection.connect();
assertEquals(HttpURLConnection.HTTP_OK, connection.getResponseCode());
// create a WebHdfsFileSystem instance
URI uri = new URI("webhdfs://localhost:" + String.valueOf(port));
Configuration conf = new Configuration();
WebHdfsFileSystem webfs = (WebHdfsFileSystem) FileSystem.get(uri, conf);
ContentSummary summary = webfs.getContentSummary(new Path("/parentDir/"));
verifyContentSummary(summaryFromDFS, summary);
}
}
@Test
public void testGetContentSummaryForFile() throws Exception {
try (WebImageViewer viewer = new WebImageViewer(
NetUtils.createSocketAddr("localhost:0"))) {
viewer.initServer(originalFsimage.getAbsolutePath());
int port = viewer.getPort();
URL url = new URL("http://localhost:" + port
+ "/webhdfs/v1/parentDir/file1?op=GETCONTENTSUMMARY");
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("GET");
connection.connect();
assertEquals(HttpURLConnection.HTTP_OK, connection.getResponseCode());
// create a WebHdfsFileSystem instance
URI uri = new URI("webhdfs://localhost:" + String.valueOf(port));
Configuration conf = new Configuration();
WebHdfsFileSystem webfs = (WebHdfsFileSystem) FileSystem.get(uri, conf);
ContentSummary summary = webfs.
getContentSummary(new Path("/parentDir/file1"));
verifyContentSummary(fileSummaryFromDFS, summary);
}
}
@Test
public void testGetContentSummaryForSymlink() throws Exception {
try (WebImageViewer viewer = new WebImageViewer(
NetUtils.createSocketAddr("localhost:0"))) {
viewer.initServer(originalFsimage.getAbsolutePath());
int port = viewer.getPort();
// create a WebHdfsFileSystem instance
URI uri = new URI("webhdfs://localhost:" + String.valueOf(port));
Configuration conf = new Configuration();
WebHdfsFileSystem webfs = (WebHdfsFileSystem) FileSystem.get(uri, conf);
ContentSummary summary = webfs.getContentSummary(new Path("/link1"));
verifyContentSummary(symLinkSummaryFromDFS, summary);
}
}
@Test
public void testGetContentSummaryForDirContainsSymlink() throws Exception {
try (WebImageViewer viewer = new WebImageViewer(
NetUtils.createSocketAddr("localhost:0"))) {
viewer.initServer(originalFsimage.getAbsolutePath());
int port = viewer.getPort();
// create a WebHdfsFileSystem instance
URI uri = new URI("webhdfs://localhost:" + String.valueOf(port));
Configuration conf = new Configuration();
WebHdfsFileSystem webfs = (WebHdfsFileSystem) FileSystem.get(uri, conf);
ContentSummary summary = webfs.getContentSummary(new Path(
"/dirForLinks/"));
verifyContentSummary(symLinkSummaryForDirContainsFromDFS, summary);
}
}
private void verifyContentSummary(ContentSummary expected,
ContentSummary actual) {
assertEquals(expected.getDirectoryCount(), actual.getDirectoryCount());
assertEquals(expected.getFileCount(), actual.getFileCount());
assertEquals(expected.getLength(), actual.getLength());
assertEquals(expected.getSpaceConsumed(), actual.getSpaceConsumed());
assertEquals(expected.getQuota(), actual.getQuota());
assertEquals(expected.getSpaceQuota(), actual.getSpaceQuota());
}
@Test
public void testGetContentSummaryResponseCode() throws Exception {
try (WebImageViewer viewer = new WebImageViewer(
NetUtils.createSocketAddr("localhost:0"))) {
viewer.initServer(originalFsimage.getAbsolutePath());
int port = viewer.getPort();
URL url = new URL("http://localhost:" + port
+ "/webhdfs/v1/dir123/?op=GETCONTENTSUMMARY");
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("GET");
connection.connect();
assertEquals(HttpURLConnection.HTTP_NOT_FOUND,
connection.getResponseCode());
}
}
}