HDFS-7158. Reduce the memory usage of WebImageViewer. Contributed by Haohui Mai.

This commit is contained in:
Haohui Mai 2014-10-01 10:53:38 -07:00
parent bd69f86668
commit 0eec4c187c
3 changed files with 212 additions and 152 deletions

View File

@ -273,6 +273,8 @@ Release 2.6.0 - UNRELEASED
HDFS-7153. Add storagePolicy to NN edit log during file creation.
(Arpit Agarwal)
HDFS-7158. Reduce the memory usage of WebImageViewer. (wheat9)
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)

View File

@ -24,6 +24,9 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.web.JsonUtil;
import org.apache.hadoop.ipc.RemoteException;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
@ -37,99 +40,92 @@ import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.jboss.netty.handler.codec.http.QueryStringDecoder;
import javax.management.Query;
/**
* Implement the read-only WebHDFS API for fsimage.
*/
public class FSImageHandler extends SimpleChannelUpstreamHandler {
class FSImageHandler extends SimpleChannelUpstreamHandler {
public static final Log LOG = LogFactory.getLog(FSImageHandler.class);
private final FSImageLoader loader;
private final FSImageLoader image;
public FSImageHandler(FSImageLoader loader) throws IOException {
this.loader = loader;
FSImageHandler(FSImageLoader image) throws IOException {
this.image = image;
}
@Override
public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) throws Exception {
String op = getOp(e);
ChannelFuture future = e.getFuture();
try {
String path = getPath(e);
handleOperation(op, path, e);
} catch (Exception ex) {
notFoundResponse(e);
LOG.warn(ex.getMessage());
future = handleOperation(e);
} finally {
e.getFuture().addListener(ChannelFutureListener.CLOSE);
future.addListener(ChannelFutureListener.CLOSE);
}
}
/** return the op parameter in upper case */
private String getOp(MessageEvent e) {
Map<String, List<String>> parameters = getDecoder(e).getParameters();
if (parameters.containsKey("op")) {
return parameters.get("op").get(0).toUpperCase();
} else {
// return "" to avoid NPE
return "";
}
}
private String getPath(MessageEvent e) throws FileNotFoundException {
String path = getDecoder(e).getPath();
// trim "/webhdfs/v1" to keep compatibility with WebHDFS API
if (path.startsWith("/webhdfs/v1/")) {
return path.replaceFirst("/webhdfs/v1", "");
} else {
throw new FileNotFoundException("Path: " + path + " should " +
"start with \"/webhdfs/v1/\"");
}
}
private QueryStringDecoder getDecoder(MessageEvent e) {
HttpRequest request = (HttpRequest) e.getMessage();
return new QueryStringDecoder(request.getUri());
}
private void handleOperation(String op, String path, MessageEvent e)
private ChannelFuture handleOperation(MessageEvent e)
throws IOException {
HttpRequest request = (HttpRequest) e.getMessage();
HttpResponse response = new DefaultHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
response.setHeader(HttpHeaders.Names.CONTENT_TYPE,
"application/json");
String content = null;
HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
response.setHeader(HttpHeaders.Names.CONTENT_TYPE, "application/json");
if (request.getMethod() == HttpMethod.GET){
if (op.equals("GETFILESTATUS")) {
content = loader.getFileStatus(path);
} else if (op.equals("LISTSTATUS")) {
content = loader.listStatus(path);
} else if (op.equals("GETACLSTATUS")) {
content = loader.getAclStatus(path);
} else {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
}
} else {
// only HTTP GET is allowed since fsimage is read-only.
if (request.getMethod() != HttpMethod.GET) {
response.setStatus(HttpResponseStatus.METHOD_NOT_ALLOWED);
return e.getChannel().write(response);
}
if (content != null) {
HttpHeaders.setContentLength(response, content.length());
QueryStringDecoder decoder = new QueryStringDecoder(request.getUri());
final String op = getOp(decoder);
String content;
String path = null;
try {
path = getPath(decoder);
if ("GETFILESTATUS".equals(op)) {
content = image.getFileStatus(path);
} else if ("LISTSTATUS".equals(op)) {
content = image.listStatus(path);
} else if ("GETACLSTATUS".equals(op)) {
content = image.getAclStatus(path);
} else {
throw new IllegalArgumentException("Invalid value for webhdfs parameter" + " \"op\"");
}
} catch (IllegalArgumentException ex) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
content = JsonUtil.toJsonString(ex);
} catch (FileNotFoundException ex) {
response.setStatus(HttpResponseStatus.NOT_FOUND);
content = JsonUtil.toJsonString(ex);
} catch (Exception ex) {
content = JsonUtil.toJsonString(ex);
}
HttpHeaders.setContentLength(response, content.length());
e.getChannel().write(response);
if (content != null) {
e.getChannel().write(content);
}
ChannelFuture future = e.getChannel().write(content);
LOG.info(response.getStatus().getCode() + " method="
+ request.getMethod().getName() + " op=" + op + " target=" + path);
return future;
}
private void notFoundResponse(MessageEvent e) {
HttpResponse response = new DefaultHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND);
e.getChannel().write(response);
private static String getOp(QueryStringDecoder decoder) {
Map<String, List<String>> parameters = decoder.getParameters();
return parameters.containsKey("op")
? parameters.get("op").get(0).toUpperCase() : null;
}
private static String getPath(QueryStringDecoder decoder)
throws FileNotFoundException {
String path = decoder.getPath();
if (path.startsWith("/webhdfs/v1/")) {
return path.substring(11);
} else {
throw new FileNotFoundException("Path: " + path + " should " +
"start with \"/webhdfs/v1/\"");
}
}
}

View File

@ -18,16 +18,22 @@
package org.apache.hadoop.hdfs.tools.offlineImageViewer;
import java.io.BufferedInputStream;
import java.io.EOFException;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -55,14 +61,38 @@ import com.google.common.io.LimitInputStream;
class FSImageLoader {
public static final Log LOG = LogFactory.getLog(FSImageHandler.class);
private static String[] stringTable;
private static Map<Long, FsImageProto.INodeSection.INode> inodes =
Maps.newHashMap();
private static Map<Long, long[]> dirmap = Maps.newHashMap();
private static List<FsImageProto.INodeReferenceSection.INodeReference>
refList = Lists.newArrayList();
private final String[] stringTable;
// byte representation of inodes, sorted by id
private final byte[][] inodes;
private final Map<Long, long[]> dirmap;
private static final Comparator<byte[]> INODE_BYTES_COMPARATOR = new
Comparator<byte[]>() {
@Override
public int compare(byte[] o1, byte[] o2) {
try {
final FsImageProto.INodeSection.INode l = FsImageProto.INodeSection
.INode.parseFrom(o1);
final FsImageProto.INodeSection.INode r = FsImageProto.INodeSection
.INode.parseFrom(o2);
if (l.getId() < r.getId()) {
return -1;
} else if (l.getId() > r.getId()) {
return 1;
} else {
return 0;
}
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
};
private FSImageLoader() {}
private FSImageLoader(String[] stringTable, byte[][] inodes,
Map<Long, long[]> dirmap) {
this.stringTable = stringTable;
this.inodes = inodes;
this.dirmap = dirmap;
}
/**
* Load fsimage into the memory.
@ -79,7 +109,14 @@ class FSImageLoader {
FsImageProto.FileSummary summary = FSImageUtil.loadSummary(file);
FileInputStream fin = null;
try {
// Map to record INodeReference to the referred id
ImmutableList<Long> refIdList = null;
String[] stringTable = null;
byte[][] inodes = null;
Map<Long, long[]> dirmap = null;
fin = new FileInputStream(file.getFD());
ArrayList<FsImageProto.FileSummary.Section> sections =
@ -109,34 +146,37 @@ class FSImageLoader {
summary.getCodec(), new BufferedInputStream(new LimitInputStream(
fin, s.getLength())));
LOG.debug("Loading section " + s.getName() + " length: " + s.getLength
());
switch (FSImageFormatProtobuf.SectionName.fromString(s.getName())) {
case STRING_TABLE:
loadStringTable(is);
stringTable = loadStringTable(is);
break;
case INODE:
loadINodeSection(is);
inodes = loadINodeSection(is);
break;
case INODE_REFERENCE:
loadINodeReferenceSection(is);
refIdList = loadINodeReferenceSection(is);
break;
case INODE_DIR:
loadINodeDirectorySection(is);
dirmap = loadINodeDirectorySection(is, refIdList);
break;
default:
break;
}
}
return new FSImageLoader(stringTable, inodes, dirmap);
} finally {
IOUtils.cleanup(null, fin);
}
return new FSImageLoader();
}
private static void loadINodeDirectorySection(InputStream in)
private static Map<Long, long[]> loadINodeDirectorySection
(InputStream in, List<Long> refIdList)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Loading directory section");
}
LOG.info("Loading inode directory section");
Map<Long, long[]> dirs = Maps.newHashMap();
long counter = 0;
while (true) {
FsImageProto.INodeDirectorySection.DirEntry e =
FsImageProto.INodeDirectorySection.DirEntry.parseDelimitedFrom(in);
@ -144,31 +184,27 @@ class FSImageLoader {
if (e == null) {
break;
}
++counter;
long[] l = new long[e.getChildrenCount() + e.getRefChildrenCount()];
for (int i = 0; i < e.getChildrenCount(); ++i) {
l[i] = e.getChildren(i);
}
for (int i = e.getChildrenCount(); i < l.length; i++) {
int refId = e.getRefChildren(i - e.getChildrenCount());
l[i] = refList.get(refId).getReferredId();
}
dirmap.put(e.getParent(), l);
if (LOG.isDebugEnabled()) {
LOG.debug("Loaded directory (parent " + e.getParent()
+ ") with " + e.getChildrenCount() + " children and "
+ e.getRefChildrenCount() + " reference children");
l[i] = refIdList.get(refId);
}
dirs.put(e.getParent(), l);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Loaded " + dirmap.size() + " directories");
}
LOG.info("Loaded " + counter + " directories");
return dirs;
}
private static void loadINodeReferenceSection(InputStream in)
private static ImmutableList<Long> loadINodeReferenceSection(InputStream in)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Loading inode reference section");
}
LOG.info("Loading inode references");
ImmutableList.Builder<Long> builder = ImmutableList.builder();
long counter = 0;
while (true) {
FsImageProto.INodeReferenceSection.INodeReference e =
FsImageProto.INodeReferenceSection.INodeReference
@ -176,49 +212,44 @@ class FSImageLoader {
if (e == null) {
break;
}
refList.add(e);
if (LOG.isTraceEnabled()) {
LOG.trace("Loaded inode reference named '" + e.getName()
+ "' referring to id " + e.getReferredId() + "");
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Loaded " + refList.size() + " inode references");
++counter;
builder.add(e.getReferredId());
}
LOG.info("Loaded " + counter + " inode references");
return builder.build();
}
private static void loadINodeSection(InputStream in) throws IOException {
private static byte[][] loadINodeSection(InputStream in)
throws IOException {
FsImageProto.INodeSection s = FsImageProto.INodeSection
.parseDelimitedFrom(in);
if (LOG.isDebugEnabled()) {
LOG.debug("Found " + s.getNumInodes() + " inodes in inode section");
}
LOG.info("Loading " + s.getNumInodes() + " inodes.");
final byte[][] inodes = new byte[(int) s.getNumInodes()][];
for (int i = 0; i < s.getNumInodes(); ++i) {
FsImageProto.INodeSection.INode p = FsImageProto.INodeSection.INode
.parseDelimitedFrom(in);
inodes.put(p.getId(), p);
if (LOG.isTraceEnabled()) {
LOG.trace("Loaded inode id " + p.getId() + " type " + p.getType()
+ " name '" + p.getName().toStringUtf8() + "'");
}
int size = CodedInputStream.readRawVarint32(in.read(), in);
byte[] bytes = new byte[size];
IOUtils.readFully(in, bytes, 0, size);
inodes[i] = bytes;
}
LOG.debug("Sorting inodes");
Arrays.sort(inodes, INODE_BYTES_COMPARATOR);
LOG.debug("Finished sorting inodes");
return inodes;
}
private static void loadStringTable(InputStream in) throws IOException {
private static String[] loadStringTable(InputStream in) throws
IOException {
FsImageProto.StringTableSection s = FsImageProto.StringTableSection
.parseDelimitedFrom(in);
if (LOG.isDebugEnabled()) {
LOG.debug("Found " + s.getNumEntry() + " strings in string section");
}
stringTable = new String[s.getNumEntry() + 1];
LOG.info("Loading " + s.getNumEntry() + " strings");
String[] stringTable = new String[s.getNumEntry() + 1];
for (int i = 0; i < s.getNumEntry(); ++i) {
FsImageProto.StringTableSection.Entry e = FsImageProto
.StringTableSection.Entry.parseDelimitedFrom(in);
stringTable[e.getId()] = e.getStr();
if (LOG.isTraceEnabled()) {
LOG.trace("Loaded string " + e.getStr());
}
}
return stringTable;
}
/**
@ -229,7 +260,7 @@ class FSImageLoader {
*/
String getFileStatus(String path) throws IOException {
ObjectMapper mapper = new ObjectMapper();
FsImageProto.INodeSection.INode inode = inodes.get(getINodeId(path));
FsImageProto.INodeSection.INode inode = fromINodeId(lookup(path));
return "{\"FileStatus\":\n"
+ mapper.writeValueAsString(getFileStatus(inode, false)) + "\n}\n";
}
@ -256,10 +287,11 @@ class FSImageLoader {
return sb.toString();
}
private List<Map<String, Object>> getFileStatusList(String path) {
private List<Map<String, Object>> getFileStatusList(String path)
throws IOException {
List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
long id = getINodeId(path);
FsImageProto.INodeSection.INode inode = inodes.get(id);
long id = lookup(path);
FsImageProto.INodeSection.INode inode = fromINodeId(id);
if (inode.getType() == FsImageProto.INodeSection.INode.Type.DIRECTORY) {
if (!dirmap.containsKey(id)) {
// if the directory is empty, return empty list
@ -267,7 +299,7 @@ class FSImageLoader {
}
long[] children = dirmap.get(id);
for (long cid : children) {
list.add(getFileStatus(inodes.get(cid), true));
list.add(getFileStatus(fromINodeId(cid), true));
}
} else {
list.add(getFileStatus(inode, false));
@ -305,9 +337,9 @@ class FSImageLoader {
return sb.toString();
}
private List<AclEntry> getAclEntryList(String path) {
long id = getINodeId(path);
FsImageProto.INodeSection.INode inode = inodes.get(id);
private List<AclEntry> getAclEntryList(String path) throws IOException {
long id = lookup(path);
FsImageProto.INodeSection.INode inode = fromINodeId(id);
switch (inode.getType()) {
case FILE: {
FsImageProto.INodeSection.INodeFile f = inode.getFile();
@ -325,9 +357,9 @@ class FSImageLoader {
}
}
private PermissionStatus getPermissionStatus(String path) {
long id = getINodeId(path);
FsImageProto.INodeSection.INode inode = inodes.get(id);
private PermissionStatus getPermissionStatus(String path) throws IOException {
long id = lookup(path);
FsImageProto.INodeSection.INode inode = fromINodeId(id);
switch (inode.getType()) {
case FILE: {
FsImageProto.INodeSection.INodeFile f = inode.getFile();
@ -353,30 +385,41 @@ class FSImageLoader {
/**
* Return the INodeId of the specified path.
*/
private long getINodeId(String strPath) {
if (strPath.equals("/")) {
return INodeId.ROOT_INODE_ID;
}
String[] nameList = strPath.split("/");
Preconditions.checkArgument(nameList.length > 1,
"Illegal path: " + strPath);
private long lookup(String path) throws IOException {
Preconditions.checkArgument(path.startsWith("/"));
long id = INodeId.ROOT_INODE_ID;
for (int i = 1; i < nameList.length; i++) {
long[] children = dirmap.get(id);
Preconditions.checkNotNull(children, "File: " +
strPath + " is not found in the fsimage.");
String cName = nameList[i];
boolean findChildren = false;
for (int offset = 0, next; offset < path.length(); offset = next) {
next = path.indexOf('/', offset + 1);
if (next == -1) {
next = path.length();
}
if (offset + 1 > next) {
break;
}
final String component = path.substring(offset + 1, next);
if (component.isEmpty()) {
continue;
}
final long[] children = dirmap.get(id);
if (children == null) {
throw new FileNotFoundException(path);
}
boolean found = false;
for (long cid : children) {
if (cName.equals(inodes.get(cid).getName().toStringUtf8())) {
id = cid;
findChildren = true;
FsImageProto.INodeSection.INode child = fromINodeId(cid);
if (component.equals(child.getName().toStringUtf8())) {
found = true;
id = child.getId();
break;
}
}
Preconditions.checkArgument(findChildren, "File: " +
strPath + " is not found in the fsimage.");
if (!found) {
throw new FileNotFoundException(path);
}
}
return id;
}
@ -460,4 +503,23 @@ class FSImageLoader {
private String toString(FsPermission permission) {
return String.format("%o", permission.toShort());
}
private FsImageProto.INodeSection.INode fromINodeId(final long id)
throws IOException {
int l = 0, r = inodes.length;
while (l < r) {
int mid = l + (r - l) / 2;
FsImageProto.INodeSection.INode n = FsImageProto.INodeSection.INode
.parseFrom(inodes[mid]);
long nid = n.getId();
if (id > nid) {
l = mid + 1;
} else if (id < nid) {
r = mid;
} else {
return n;
}
}
return null;
}
}