HDFS-8080. Separate JSON related routines used by WebHdfsFileSystem to a package local class. Contributed by Haohui Mai.

This commit is contained in:
Haohui Mai 2015-04-07 21:23:52 -07:00
parent 3cf7ac181b
commit f9fbde3074
9 changed files with 625 additions and 559 deletions

View File

@ -73,6 +73,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8073. Split BlockPlacementPolicyDefault.chooseTarget(..) so it HDFS-8073. Split BlockPlacementPolicyDefault.chooseTarget(..) so it
can be easily overrided. (Walter Su via vinayakumarb) can be easily overrided. (Walter Su via vinayakumarb)
HDFS-8080. Separate JSON related routines used by WebHdfsFileSystem to a
package local class. (wheat9)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -21,34 +21,22 @@ import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.XAttrHelper; import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.namenode.INodeId;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.ObjectReader;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.*;
/** JSON Utilities */ /** JSON Utilities */
public class JsonUtil { public class JsonUtil {
private static final Object[] EMPTY_OBJECT_ARRAY = {}; private static final Object[] EMPTY_OBJECT_ARRAY = {};
private static final DatanodeInfo[] EMPTY_DATANODE_INFO_ARRAY = {};
/** Convert a token object to a Json string. */ /** Convert a token object to a Json string. */
public static String toJsonString(final Token<? extends TokenIdentifier> token public static String toJsonString(final Token<? extends TokenIdentifier> token
@ -67,34 +55,6 @@ public class JsonUtil {
return m; return m;
} }
/** Convert a Json map to a Token. */
public static Token<? extends TokenIdentifier> toToken(
final Map<?, ?> m) throws IOException {
if (m == null) {
return null;
}
final Token<DelegationTokenIdentifier> token
= new Token<DelegationTokenIdentifier>();
token.decodeFromUrlString((String)m.get("urlString"));
return token;
}
/** Convert a Json map to a Token of DelegationTokenIdentifier. */
@SuppressWarnings("unchecked")
public static Token<DelegationTokenIdentifier> toDelegationToken(
final Map<?, ?> json) throws IOException {
final Map<?, ?> m = (Map<?, ?>)json.get(Token.class.getSimpleName());
return (Token<DelegationTokenIdentifier>)toToken(m);
}
/** Convert a Json map to a Token of BlockTokenIdentifier. */
@SuppressWarnings("unchecked")
private static Token<BlockTokenIdentifier> toBlockToken(
final Map<?, ?> m) throws IOException {
return (Token<BlockTokenIdentifier>)toToken(m);
}
/** Convert an exception object to a Json string. */ /** Convert an exception object to a Json string. */
public static String toJsonString(final Exception e) { public static String toJsonString(final Exception e) {
final Map<String, Object> m = new TreeMap<String, Object>(); final Map<String, Object> m = new TreeMap<String, Object>();
@ -104,14 +64,6 @@ public class JsonUtil {
return toJsonString(RemoteException.class, m); return toJsonString(RemoteException.class, m);
} }
/** Convert a Json map to a RemoteException. */
public static RemoteException toRemoteException(final Map<?, ?> json) {
final Map<?, ?> m = (Map<?, ?>)json.get(RemoteException.class.getSimpleName());
final String message = (String)m.get("message");
final String javaClassName = (String)m.get("javaClassName");
return new RemoteException(javaClassName, message);
}
private static String toJsonString(final Class<?> clazz, final Object value) { private static String toJsonString(final Class<?> clazz, final Object value) {
return toJsonString(clazz.getSimpleName(), value); return toJsonString(clazz.getSimpleName(), value);
} }
@ -133,27 +85,6 @@ public class JsonUtil {
return String.format("%o", permission.toShort()); return String.format("%o", permission.toShort());
} }
/** Convert a string to a FsPermission object. */
private static FsPermission toFsPermission(final String s, Boolean aclBit,
Boolean encBit) {
FsPermission perm = new FsPermission(Short.parseShort(s, 8));
final boolean aBit = (aclBit != null) ? aclBit : false;
final boolean eBit = (encBit != null) ? encBit : false;
if (aBit || eBit) {
return new FsPermissionExtension(perm, aBit, eBit);
} else {
return perm;
}
}
static enum PathType {
FILE, DIRECTORY, SYMLINK;
static PathType valueOf(HdfsFileStatus status) {
return status.isDir()? DIRECTORY: status.isSymlink()? SYMLINK: FILE;
}
}
/** Convert a HdfsFileStatus object to a Json string. */ /** Convert a HdfsFileStatus object to a Json string. */
public static String toJsonString(final HdfsFileStatus status, public static String toJsonString(final HdfsFileStatus status,
boolean includeType) { boolean includeType) {
@ -162,7 +93,7 @@ public class JsonUtil {
} }
final Map<String, Object> m = new TreeMap<String, Object>(); final Map<String, Object> m = new TreeMap<String, Object>();
m.put("pathSuffix", status.getLocalName()); m.put("pathSuffix", status.getLocalName());
m.put("type", PathType.valueOf(status)); m.put("type", WebHdfsConstants.PathType.valueOf(status));
if (status.isSymlink()) { if (status.isSymlink()) {
m.put("symlink", status.getSymlink()); m.put("symlink", status.getSymlink());
} }
@ -194,39 +125,6 @@ public class JsonUtil {
return null; return null;
} }
/** Convert a Json map to a HdfsFileStatus object. */
public static HdfsFileStatus toFileStatus(final Map<?, ?> json, boolean includesType) {
if (json == null) {
return null;
}
final Map<?, ?> m = includesType ?
(Map<?, ?>)json.get(FileStatus.class.getSimpleName()) : json;
final String localName = (String) m.get("pathSuffix");
final PathType type = PathType.valueOf((String) m.get("type"));
final byte[] symlink = type != PathType.SYMLINK? null
: DFSUtil.string2Bytes((String)m.get("symlink"));
final long len = ((Number) m.get("length")).longValue();
final String owner = (String) m.get("owner");
final String group = (String) m.get("group");
final FsPermission permission = toFsPermission((String) m.get("permission"),
(Boolean)m.get("aclBit"), (Boolean)m.get("encBit"));
final long aTime = ((Number) m.get("accessTime")).longValue();
final long mTime = ((Number) m.get("modificationTime")).longValue();
final long blockSize = ((Number) m.get("blockSize")).longValue();
final short replication = ((Number) m.get("replication")).shortValue();
final long fileId = m.containsKey("fileId") ?
((Number) m.get("fileId")).longValue() : INodeId.GRANDFATHER_INODE_ID;
final int childrenNum = getInt(m, "childrenNum", -1);
final byte storagePolicy = m.containsKey("storagePolicy") ?
(byte) ((Number) m.get("storagePolicy")).longValue() :
BlockStoragePolicySuite.ID_UNSPECIFIED;
return new HdfsFileStatus(len, type == PathType.DIRECTORY, replication,
blockSize, mTime, aTime, permission, owner, group, symlink,
DFSUtil.string2Bytes(localName), fileId, childrenNum, null, storagePolicy);
}
/** Convert an ExtendedBlock to a Json map. */ /** Convert an ExtendedBlock to a Json map. */
private static Map<String, Object> toJsonMap(final ExtendedBlock extendedblock) { private static Map<String, Object> toJsonMap(final ExtendedBlock extendedblock) {
if (extendedblock == null) { if (extendedblock == null) {
@ -241,20 +139,6 @@ public class JsonUtil {
return m; return m;
} }
/** Convert a Json map to an ExtendedBlock object. */
private static ExtendedBlock toExtendedBlock(final Map<?, ?> m) {
if (m == null) {
return null;
}
final String blockPoolId = (String)m.get("blockPoolId");
final long blockId = ((Number) m.get("blockId")).longValue();
final long numBytes = ((Number) m.get("numBytes")).longValue();
final long generationStamp =
((Number) m.get("generationStamp")).longValue();
return new ExtendedBlock(blockPoolId, blockId, numBytes, generationStamp);
}
/** Convert a DatanodeInfo to a Json map. */ /** Convert a DatanodeInfo to a Json map. */
static Map<String, Object> toJsonMap(final DatanodeInfo datanodeinfo) { static Map<String, Object> toJsonMap(final DatanodeInfo datanodeinfo) {
if (datanodeinfo == null) { if (datanodeinfo == null) {
@ -288,104 +172,6 @@ public class JsonUtil {
return m; return m;
} }
private static int getInt(Map<?, ?> m, String key, final int defaultValue) {
Object value = m.get(key);
if (value == null) {
return defaultValue;
}
return ((Number) value).intValue();
}
private static long getLong(Map<?, ?> m, String key, final long defaultValue) {
Object value = m.get(key);
if (value == null) {
return defaultValue;
}
return ((Number) value).longValue();
}
private static String getString(Map<?, ?> m, String key,
final String defaultValue) {
Object value = m.get(key);
if (value == null) {
return defaultValue;
}
return (String) value;
}
static List<?> getList(Map<?, ?> m, String key) {
Object list = m.get(key);
if (list instanceof List<?>) {
return (List<?>) list;
} else {
return null;
}
}
/** Convert a Json map to an DatanodeInfo object. */
static DatanodeInfo toDatanodeInfo(final Map<?, ?> m)
throws IOException {
if (m == null) {
return null;
}
// ipAddr and xferPort are the critical fields for accessing data.
// If any one of the two is missing, an exception needs to be thrown.
// Handle the case of old servers (1.x, 0.23.x) sending 'name' instead
// of ipAddr and xferPort.
Object tmpValue = m.get("ipAddr");
String ipAddr = (tmpValue == null) ? null : (String)tmpValue;
tmpValue = m.get("xferPort");
int xferPort = (tmpValue == null) ? -1 : (int)(long)(Long)tmpValue;
if (ipAddr == null) {
tmpValue = m.get("name");
if (tmpValue != null) {
String name = (String)tmpValue;
int colonIdx = name.indexOf(':');
if (colonIdx > 0) {
ipAddr = name.substring(0, colonIdx);
xferPort = Integer.parseInt(name.substring(colonIdx +1));
} else {
throw new IOException(
"Invalid value in server response: name=[" + name + "]");
}
} else {
throw new IOException(
"Missing both 'ipAddr' and 'name' in server response.");
}
// ipAddr is non-null & non-empty string at this point.
}
// Check the validity of xferPort.
if (xferPort == -1) {
throw new IOException(
"Invalid or missing 'xferPort' in server response.");
}
// TODO: Fix storageID
return new DatanodeInfo(
ipAddr,
(String)m.get("hostName"),
(String)m.get("storageID"),
xferPort,
((Number) m.get("infoPort")).intValue(),
getInt(m, "infoSecurePort", 0),
((Number) m.get("ipcPort")).intValue(),
getLong(m, "capacity", 0l),
getLong(m, "dfsUsed", 0l),
getLong(m, "remaining", 0l),
getLong(m, "blockPoolUsed", 0l),
getLong(m, "cacheCapacity", 0l),
getLong(m, "cacheUsed", 0l),
getLong(m, "lastUpdate", 0l),
getLong(m, "lastUpdateMonotonic", 0l),
getInt(m, "xceiverCount", 0),
getString(m, "networkLocation", ""),
AdminStates.valueOf(getString(m, "adminState", "NORMAL")));
}
/** Convert a DatanodeInfo[] to a Json array. */ /** Convert a DatanodeInfo[] to a Json array. */
private static Object[] toJsonArray(final DatanodeInfo[] array) { private static Object[] toJsonArray(final DatanodeInfo[] array) {
if (array == null) { if (array == null) {
@ -401,23 +187,6 @@ public class JsonUtil {
} }
} }
/** Convert an Object[] to a DatanodeInfo[]. */
private static DatanodeInfo[] toDatanodeInfoArray(final List<?> objects)
throws IOException {
if (objects == null) {
return null;
} else if (objects.isEmpty()) {
return EMPTY_DATANODE_INFO_ARRAY;
} else {
final DatanodeInfo[] array = new DatanodeInfo[objects.size()];
int i = 0;
for (Object object : objects) {
array[i++] = toDatanodeInfo((Map<?, ?>) object);
}
return array;
}
}
/** Convert a LocatedBlock to a Json map. */ /** Convert a LocatedBlock to a Json map. */
private static Map<String, Object> toJsonMap(final LocatedBlock locatedblock private static Map<String, Object> toJsonMap(final LocatedBlock locatedblock
) throws IOException { ) throws IOException {
@ -435,26 +204,6 @@ public class JsonUtil {
return m; return m;
} }
/** Convert a Json map to LocatedBlock. */
private static LocatedBlock toLocatedBlock(final Map<?, ?> m) throws IOException {
if (m == null) {
return null;
}
final ExtendedBlock b = toExtendedBlock((Map<?, ?>)m.get("block"));
final DatanodeInfo[] locations = toDatanodeInfoArray(
getList(m, "locations"));
final long startOffset = ((Number) m.get("startOffset")).longValue();
final boolean isCorrupt = (Boolean)m.get("isCorrupt");
final DatanodeInfo[] cachedLocations = toDatanodeInfoArray(
getList(m, "cachedLocations"));
final LocatedBlock locatedblock = new LocatedBlock(b, locations,
null, null, startOffset, isCorrupt, cachedLocations);
locatedblock.setBlockToken(toBlockToken((Map<?, ?>)m.get("blockToken")));
return locatedblock;
}
/** Convert a LocatedBlock[] to a Json array. */ /** Convert a LocatedBlock[] to a Json array. */
private static Object[] toJsonArray(final List<LocatedBlock> array private static Object[] toJsonArray(final List<LocatedBlock> array
) throws IOException { ) throws IOException {
@ -471,22 +220,6 @@ public class JsonUtil {
} }
} }
/** Convert an List of Object to a List of LocatedBlock. */
private static List<LocatedBlock> toLocatedBlockList(
final List<?> objects) throws IOException {
if (objects == null) {
return null;
} else if (objects.isEmpty()) {
return Collections.emptyList();
} else {
final List<LocatedBlock> list = new ArrayList<>(objects.size());
for (Object object : objects) {
list.add(toLocatedBlock((Map<?, ?>) object));
}
return list;
}
}
/** Convert LocatedBlocks to a Json string. */ /** Convert LocatedBlocks to a Json string. */
public static String toJsonString(final LocatedBlocks locatedblocks public static String toJsonString(final LocatedBlocks locatedblocks
) throws IOException { ) throws IOException {
@ -504,25 +237,6 @@ public class JsonUtil {
return toJsonString(LocatedBlocks.class, m); return toJsonString(LocatedBlocks.class, m);
} }
/** Convert a Json map to LocatedBlock. */
public static LocatedBlocks toLocatedBlocks(final Map<?, ?> json
) throws IOException {
if (json == null) {
return null;
}
final Map<?, ?> m = (Map<?, ?>)json.get(LocatedBlocks.class.getSimpleName());
final long fileLength = ((Number) m.get("fileLength")).longValue();
final boolean isUnderConstruction = (Boolean)m.get("isUnderConstruction");
final List<LocatedBlock> locatedBlocks = toLocatedBlockList(
getList(m, "locatedBlocks"));
final LocatedBlock lastLocatedBlock = toLocatedBlock(
(Map<?, ?>)m.get("lastLocatedBlock"));
final boolean isLastBlockComplete = (Boolean)m.get("isLastBlockComplete");
return new LocatedBlocks(fileLength, isUnderConstruction, locatedBlocks,
lastLocatedBlock, isLastBlockComplete, null);
}
/** Convert a ContentSummary to a Json string. */ /** Convert a ContentSummary to a Json string. */
public static String toJsonString(final ContentSummary contentsummary) { public static String toJsonString(final ContentSummary contentsummary) {
if (contentsummary == null) { if (contentsummary == null) {
@ -539,25 +253,6 @@ public class JsonUtil {
return toJsonString(ContentSummary.class, m); return toJsonString(ContentSummary.class, m);
} }
/** Convert a Json map to a ContentSummary. */
public static ContentSummary toContentSummary(final Map<?, ?> json) {
if (json == null) {
return null;
}
final Map<?, ?> m = (Map<?, ?>)json.get(ContentSummary.class.getSimpleName());
final long length = ((Number) m.get("length")).longValue();
final long fileCount = ((Number) m.get("fileCount")).longValue();
final long directoryCount = ((Number) m.get("directoryCount")).longValue();
final long quota = ((Number) m.get("quota")).longValue();
final long spaceConsumed = ((Number) m.get("spaceConsumed")).longValue();
final long spaceQuota = ((Number) m.get("spaceQuota")).longValue();
return new ContentSummary.Builder().length(length).fileCount(fileCount).
directoryCount(directoryCount).quota(quota).spaceConsumed(spaceConsumed).
spaceQuota(spaceQuota).build();
}
/** Convert a MD5MD5CRC32FileChecksum to a Json string. */ /** Convert a MD5MD5CRC32FileChecksum to a Json string. */
public static String toJsonString(final MD5MD5CRC32FileChecksum checksum) { public static String toJsonString(final MD5MD5CRC32FileChecksum checksum) {
if (checksum == null) { if (checksum == null) {
@ -571,49 +266,6 @@ public class JsonUtil {
return toJsonString(FileChecksum.class, m); return toJsonString(FileChecksum.class, m);
} }
/** Convert a Json map to a MD5MD5CRC32FileChecksum. */
public static MD5MD5CRC32FileChecksum toMD5MD5CRC32FileChecksum(
final Map<?, ?> json) throws IOException {
if (json == null) {
return null;
}
final Map<?, ?> m = (Map<?, ?>)json.get(FileChecksum.class.getSimpleName());
final String algorithm = (String)m.get("algorithm");
final int length = ((Number) m.get("length")).intValue();
final byte[] bytes = StringUtils.hexStringToByte((String)m.get("bytes"));
final DataInputStream in = new DataInputStream(new ByteArrayInputStream(bytes));
final DataChecksum.Type crcType =
MD5MD5CRC32FileChecksum.getCrcTypeFromAlgorithmName(algorithm);
final MD5MD5CRC32FileChecksum checksum;
// Recreate what DFSClient would have returned.
switch(crcType) {
case CRC32:
checksum = new MD5MD5CRC32GzipFileChecksum();
break;
case CRC32C:
checksum = new MD5MD5CRC32CastagnoliFileChecksum();
break;
default:
throw new IOException("Unknown algorithm: " + algorithm);
}
checksum.readFields(in);
//check algorithm name
if (!checksum.getAlgorithmName().equals(algorithm)) {
throw new IOException("Algorithm not matched. Expected " + algorithm
+ ", Received " + checksum.getAlgorithmName());
}
//check length
if (length != checksum.getLength()) {
throw new IOException("Length not matched: length=" + length
+ ", checksum.getLength()=" + checksum.getLength());
}
return checksum;
}
/** Convert a AclStatus object to a Json string. */ /** Convert a AclStatus object to a Json string. */
public static String toJsonString(final AclStatus status) { public static String toJsonString(final AclStatus status) {
if (status == null) { if (status == null) {
@ -653,35 +305,6 @@ public class JsonUtil {
return null; return null;
} }
/** Convert a Json map to a AclStatus object. */
public static AclStatus toAclStatus(final Map<?, ?> json) {
if (json == null) {
return null;
}
final Map<?, ?> m = (Map<?, ?>) json.get(AclStatus.class.getSimpleName());
AclStatus.Builder aclStatusBuilder = new AclStatus.Builder();
aclStatusBuilder.owner((String) m.get("owner"));
aclStatusBuilder.group((String) m.get("group"));
aclStatusBuilder.stickyBit((Boolean) m.get("stickyBit"));
String permString = (String) m.get("permission");
if (permString != null) {
final FsPermission permission = toFsPermission(permString,
(Boolean) m.get("aclBit"), (Boolean) m.get("encBit"));
aclStatusBuilder.setPermission(permission);
}
final List<?> entries = (List<?>) m.get("entries");
List<AclEntry> aclEntryList = new ArrayList<AclEntry>();
for (Object entry : entries) {
AclEntry aclEntry = AclEntry.parseAclEntry((String) entry, true);
aclEntryList.add(aclEntry);
}
aclStatusBuilder.addEntries(aclEntryList);
return aclStatusBuilder.build();
}
private static Map<String, Object> toJsonMap(final XAttr xAttr, private static Map<String, Object> toJsonMap(final XAttr xAttr,
final XAttrCodec encoding) throws IOException { final XAttrCodec encoding) throws IOException {
if (xAttr == null) { if (xAttr == null) {
@ -730,70 +353,4 @@ public class JsonUtil {
finalMap.put("XAttrNames", ret); finalMap.put("XAttrNames", ret);
return mapper.writeValueAsString(finalMap); return mapper.writeValueAsString(finalMap);
} }
public static byte[] getXAttr(final Map<?, ?> json, final String name)
throws IOException {
if (json == null) {
return null;
}
Map<String, byte[]> xAttrs = toXAttrs(json);
if (xAttrs != null) {
return xAttrs.get(name);
}
return null;
}
public static Map<String, byte[]> toXAttrs(final Map<?, ?> json)
throws IOException {
if (json == null) {
return null;
}
return toXAttrMap(getList(json, "XAttrs"));
}
public static List<String> toXAttrNames(final Map<?, ?> json)
throws IOException {
if (json == null) {
return null;
}
final String namesInJson = (String) json.get("XAttrNames");
ObjectReader reader = new ObjectMapper().reader(List.class);
final List<Object> xattrs = reader.readValue(namesInJson);
final List<String> names =
Lists.newArrayListWithCapacity(json.keySet().size());
for (Object xattr : xattrs) {
names.add((String) xattr);
}
return names;
}
private static Map<String, byte[]> toXAttrMap(final List<?> objects)
throws IOException {
if (objects == null) {
return null;
} else if (objects.isEmpty()) {
return Maps.newHashMap();
} else {
final Map<String, byte[]> xAttrs = Maps.newHashMap();
for (Object object : objects) {
Map<?, ?> m = (Map<?, ?>) object;
String name = (String) m.get("name");
String value = (String) m.get("value");
xAttrs.put(name, decodeXAttrValue(value));
}
return xAttrs;
}
}
private static byte[] decodeXAttrValue(String value) throws IOException {
if (value != null) {
return XAttrCodec.decodeValue(value);
} else {
return new byte[0];
}
}
} }

View File

@ -0,0 +1,485 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.web;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum;
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
import org.apache.hadoop.fs.XAttrCodec;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.namenode.INodeId;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.ObjectReader;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
class JsonUtilClient {
static final DatanodeInfo[] EMPTY_DATANODE_INFO_ARRAY = {};
/** Convert a Json map to a RemoteException. */
static RemoteException toRemoteException(final Map<?, ?> json) {
final Map<?, ?> m = (Map<?, ?>)json.get(RemoteException.class.getSimpleName());
final String message = (String)m.get("message");
final String javaClassName = (String)m.get("javaClassName");
return new RemoteException(javaClassName, message);
}
/** Convert a Json map to a Token. */
static Token<? extends TokenIdentifier> toToken(
final Map<?, ?> m) throws IOException {
if (m == null) {
return null;
}
final Token<DelegationTokenIdentifier> token
= new Token<>();
token.decodeFromUrlString((String)m.get("urlString"));
return token;
}
/** Convert a Json map to a Token of BlockTokenIdentifier. */
@SuppressWarnings("unchecked")
static Token<BlockTokenIdentifier> toBlockToken(
final Map<?, ?> m) throws IOException {
return (Token<BlockTokenIdentifier>)toToken(m);
}
/** Convert a string to a FsPermission object. */
static FsPermission toFsPermission(
final String s, Boolean aclBit, Boolean encBit) {
FsPermission perm = new FsPermission(Short.parseShort(s, 8));
final boolean aBit = (aclBit != null) ? aclBit : false;
final boolean eBit = (encBit != null) ? encBit : false;
if (aBit || eBit) {
return new FsPermissionExtension(perm, aBit, eBit);
} else {
return perm;
}
}
/** Convert a Json map to a HdfsFileStatus object. */
static HdfsFileStatus toFileStatus(final Map<?, ?> json, boolean includesType) {
if (json == null) {
return null;
}
final Map<?, ?> m = includesType ?
(Map<?, ?>)json.get(FileStatus.class.getSimpleName()) : json;
final String localName = (String) m.get("pathSuffix");
final WebHdfsConstants.PathType type = WebHdfsConstants.PathType.valueOf((String) m.get("type"));
final byte[] symlink = type != WebHdfsConstants.PathType.SYMLINK? null
: DFSUtil.string2Bytes((String) m.get("symlink"));
final long len = ((Number) m.get("length")).longValue();
final String owner = (String) m.get("owner");
final String group = (String) m.get("group");
final FsPermission permission = toFsPermission((String) m.get("permission"),
(Boolean) m.get("aclBit"),
(Boolean) m.get("encBit"));
final long aTime = ((Number) m.get("accessTime")).longValue();
final long mTime = ((Number) m.get("modificationTime")).longValue();
final long blockSize = ((Number) m.get("blockSize")).longValue();
final short replication = ((Number) m.get("replication")).shortValue();
final long fileId = m.containsKey("fileId") ?
((Number) m.get("fileId")).longValue() : INodeId.GRANDFATHER_INODE_ID;
final int childrenNum = getInt(m, "childrenNum", -1);
final byte storagePolicy = m.containsKey("storagePolicy") ?
(byte) ((Number) m.get("storagePolicy")).longValue() :
BlockStoragePolicySuite.ID_UNSPECIFIED;
return new HdfsFileStatus(len, type == WebHdfsConstants.PathType.DIRECTORY, replication,
blockSize, mTime, aTime, permission, owner, group,
symlink, DFSUtil.string2Bytes(localName), fileId, childrenNum, null,
storagePolicy);
}
/** Convert a Json map to an ExtendedBlock object. */
static ExtendedBlock toExtendedBlock(final Map<?, ?> m) {
if (m == null) {
return null;
}
final String blockPoolId = (String)m.get("blockPoolId");
final long blockId = ((Number) m.get("blockId")).longValue();
final long numBytes = ((Number) m.get("numBytes")).longValue();
final long generationStamp =
((Number) m.get("generationStamp")).longValue();
return new ExtendedBlock(blockPoolId, blockId, numBytes, generationStamp);
}
static int getInt(Map<?, ?> m, String key, final int defaultValue) {
Object value = m.get(key);
if (value == null) {
return defaultValue;
}
return ((Number) value).intValue();
}
static long getLong(Map<?, ?> m, String key, final long defaultValue) {
Object value = m.get(key);
if (value == null) {
return defaultValue;
}
return ((Number) value).longValue();
}
static String getString(
Map<?, ?> m, String key, final String defaultValue) {
Object value = m.get(key);
if (value == null) {
return defaultValue;
}
return (String) value;
}
static List<?> getList(Map<?, ?> m, String key) {
Object list = m.get(key);
if (list instanceof List<?>) {
return (List<?>) list;
} else {
return null;
}
}
/** Convert a Json map to an DatanodeInfo object. */
static DatanodeInfo toDatanodeInfo(final Map<?, ?> m)
throws IOException {
if (m == null) {
return null;
}
// ipAddr and xferPort are the critical fields for accessing data.
// If any one of the two is missing, an exception needs to be thrown.
// Handle the case of old servers (1.x, 0.23.x) sending 'name' instead
// of ipAddr and xferPort.
String ipAddr = getString(m, "ipAddr", null);
int xferPort = getInt(m, "xferPort", -1);
if (ipAddr == null) {
String name = getString(m, "name", null);
if (name != null) {
int colonIdx = name.indexOf(':');
if (colonIdx > 0) {
ipAddr = name.substring(0, colonIdx);
xferPort = Integer.parseInt(name.substring(colonIdx +1));
} else {
throw new IOException(
"Invalid value in server response: name=[" + name + "]");
}
} else {
throw new IOException(
"Missing both 'ipAddr' and 'name' in server response.");
}
// ipAddr is non-null & non-empty string at this point.
}
// Check the validity of xferPort.
if (xferPort == -1) {
throw new IOException(
"Invalid or missing 'xferPort' in server response.");
}
// TODO: Fix storageID
return new DatanodeInfo(
ipAddr,
(String)m.get("hostName"),
(String)m.get("storageID"),
xferPort,
((Number) m.get("infoPort")).intValue(),
getInt(m, "infoSecurePort", 0),
((Number) m.get("ipcPort")).intValue(),
getLong(m, "capacity", 0l),
getLong(m, "dfsUsed", 0l),
getLong(m, "remaining", 0l),
getLong(m, "blockPoolUsed", 0l),
getLong(m, "cacheCapacity", 0l),
getLong(m, "cacheUsed", 0l),
getLong(m, "lastUpdate", 0l),
getLong(m, "lastUpdateMonotonic", 0l),
getInt(m, "xceiverCount", 0),
getString(m, "networkLocation", ""),
DatanodeInfo.AdminStates.valueOf(getString(m, "adminState", "NORMAL")));
}
/** Convert an Object[] to a DatanodeInfo[]. */
static DatanodeInfo[] toDatanodeInfoArray(final List<?> objects)
throws IOException {
if (objects == null) {
return null;
} else if (objects.isEmpty()) {
return EMPTY_DATANODE_INFO_ARRAY;
} else {
final DatanodeInfo[] array = new DatanodeInfo[objects.size()];
int i = 0;
for (Object object : objects) {
array[i++] = toDatanodeInfo((Map<?, ?>) object);
}
return array;
}
}
/** Convert a Json map to LocatedBlock. */
static LocatedBlock toLocatedBlock(final Map<?, ?> m) throws IOException {
if (m == null) {
return null;
}
final ExtendedBlock b = toExtendedBlock((Map<?, ?>)m.get("block"));
final DatanodeInfo[] locations = toDatanodeInfoArray(
getList(m, "locations"));
final long startOffset = ((Number) m.get("startOffset")).longValue();
final boolean isCorrupt = (Boolean)m.get("isCorrupt");
final DatanodeInfo[] cachedLocations = toDatanodeInfoArray(
getList(m, "cachedLocations"));
final LocatedBlock locatedblock = new LocatedBlock(b, locations,
null, null, startOffset, isCorrupt, cachedLocations);
locatedblock.setBlockToken(toBlockToken((Map<?, ?>)m.get("blockToken")));
return locatedblock;
}
/** Convert an List of Object to a List of LocatedBlock. */
static List<LocatedBlock> toLocatedBlockList(
final List<?> objects) throws IOException {
if (objects == null) {
return null;
} else if (objects.isEmpty()) {
return Collections.emptyList();
} else {
final List<LocatedBlock> list = new ArrayList<>(objects.size());
for (Object object : objects) {
list.add(toLocatedBlock((Map<?, ?>) object));
}
return list;
}
}
/** Convert a Json map to a ContentSummary. */
static ContentSummary toContentSummary(final Map<?, ?> json) {
if (json == null) {
return null;
}
final Map<?, ?> m = (Map<?, ?>)json.get(ContentSummary.class.getSimpleName());
final long length = ((Number) m.get("length")).longValue();
final long fileCount = ((Number) m.get("fileCount")).longValue();
final long directoryCount = ((Number) m.get("directoryCount")).longValue();
final long quota = ((Number) m.get("quota")).longValue();
final long spaceConsumed = ((Number) m.get("spaceConsumed")).longValue();
final long spaceQuota = ((Number) m.get("spaceQuota")).longValue();
return new ContentSummary.Builder().length(length).fileCount(fileCount).
directoryCount(directoryCount).quota(quota).spaceConsumed(spaceConsumed).
spaceQuota(spaceQuota).build();
}
/** Convert a Json map to a MD5MD5CRC32FileChecksum. */
static MD5MD5CRC32FileChecksum toMD5MD5CRC32FileChecksum(
final Map<?, ?> json) throws IOException {
if (json == null) {
return null;
}
final Map<?, ?> m = (Map<?, ?>)json.get(FileChecksum.class.getSimpleName());
final String algorithm = (String)m.get("algorithm");
final int length = ((Number) m.get("length")).intValue();
final byte[] bytes = StringUtils.hexStringToByte((String) m.get("bytes"));
final DataInputStream in = new DataInputStream(new ByteArrayInputStream(bytes));
final DataChecksum.Type crcType =
MD5MD5CRC32FileChecksum.getCrcTypeFromAlgorithmName(algorithm);
final MD5MD5CRC32FileChecksum checksum;
// Recreate what DFSClient would have returned.
switch(crcType) {
case CRC32:
checksum = new MD5MD5CRC32GzipFileChecksum();
break;
case CRC32C:
checksum = new MD5MD5CRC32CastagnoliFileChecksum();
break;
default:
throw new IOException("Unknown algorithm: " + algorithm);
}
checksum.readFields(in);
//check algorithm name
if (!checksum.getAlgorithmName().equals(algorithm)) {
throw new IOException("Algorithm not matched. Expected " + algorithm
+ ", Received " + checksum.getAlgorithmName());
}
//check length
if (length != checksum.getLength()) {
throw new IOException("Length not matched: length=" + length
+ ", checksum.getLength()=" + checksum.getLength());
}
return checksum;
}
/** Convert a Json map to a AclStatus object. */
static AclStatus toAclStatus(final Map<?, ?> json) {
if (json == null) {
return null;
}
final Map<?, ?> m = (Map<?, ?>) json.get(AclStatus.class.getSimpleName());
AclStatus.Builder aclStatusBuilder = new AclStatus.Builder();
aclStatusBuilder.owner((String) m.get("owner"));
aclStatusBuilder.group((String) m.get("group"));
aclStatusBuilder.stickyBit((Boolean) m.get("stickyBit"));
String permString = (String) m.get("permission");
if (permString != null) {
final FsPermission permission = toFsPermission(permString,
(Boolean) m.get("aclBit"), (Boolean) m.get("encBit"));
aclStatusBuilder.setPermission(permission);
}
final List<?> entries = (List<?>) m.get("entries");
List<AclEntry> aclEntryList = new ArrayList<>();
for (Object entry : entries) {
AclEntry aclEntry = AclEntry.parseAclEntry((String) entry, true);
aclEntryList.add(aclEntry);
}
aclStatusBuilder.addEntries(aclEntryList);
return aclStatusBuilder.build();
}
static byte[] getXAttr(final Map<?, ?> json, final String name)
throws IOException {
if (json == null) {
return null;
}
Map<String, byte[]> xAttrs = toXAttrs(json);
if (xAttrs != null) {
return xAttrs.get(name);
}
return null;
}
static Map<String, byte[]> toXAttrs(final Map<?, ?> json)
throws IOException {
if (json == null) {
return null;
}
return toXAttrMap(getList(json, "XAttrs"));
}
static List<String> toXAttrNames(final Map<?, ?> json)
throws IOException {
if (json == null) {
return null;
}
final String namesInJson = (String) json.get("XAttrNames");
ObjectReader reader = new ObjectMapper().reader(List.class);
final List<Object> xattrs = reader.readValue(namesInJson);
final List<String> names =
Lists.newArrayListWithCapacity(json.keySet().size());
for (Object xattr : xattrs) {
names.add((String) xattr);
}
return names;
}
static Map<String, byte[]> toXAttrMap(final List<?> objects)
throws IOException {
if (objects == null) {
return null;
} else if (objects.isEmpty()) {
return Maps.newHashMap();
} else {
final Map<String, byte[]> xAttrs = Maps.newHashMap();
for (Object object : objects) {
Map<?, ?> m = (Map<?, ?>) object;
String name = (String) m.get("name");
String value = (String) m.get("value");
xAttrs.put(name, decodeXAttrValue(value));
}
return xAttrs;
}
}
static byte[] decodeXAttrValue(String value) throws IOException {
if (value != null) {
return XAttrCodec.decodeValue(value);
} else {
return new byte[0];
}
}
/** Convert a Json map to a Token of DelegationTokenIdentifier. */
@SuppressWarnings("unchecked")
static Token<DelegationTokenIdentifier> toDelegationToken(
final Map<?, ?> json) throws IOException {
final Map<?, ?> m = (Map<?, ?>)json.get(Token.class.getSimpleName());
return (Token<DelegationTokenIdentifier>) toToken(m);
}
/** Convert a Json map to LocatedBlock. */
static LocatedBlocks toLocatedBlocks(
final Map<?, ?> json) throws IOException {
if (json == null) {
return null;
}
final Map<?, ?> m = (Map<?, ?>)json.get(LocatedBlocks.class.getSimpleName());
final long fileLength = ((Number) m.get("fileLength")).longValue();
final boolean isUnderConstruction = (Boolean)m.get("isUnderConstruction");
final List<LocatedBlock> locatedBlocks = toLocatedBlockList(
getList(m, "locatedBlocks"));
final LocatedBlock lastLocatedBlock = toLocatedBlock(
(Map<?, ?>) m.get("lastLocatedBlock"));
final boolean isLastBlockComplete = (Boolean)m.get("isLastBlockComplete");
return new LocatedBlocks(fileLength, isUnderConstruction, locatedBlocks,
lastLocatedBlock, isLastBlockComplete, null);
}
}

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.web;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
class WebHdfsConstants {
enum PathType {
FILE, DIRECTORY, SYMLINK;
static PathType valueOf(HdfsFileStatus status) {
return status.isDir()? DIRECTORY: status.isSymlink()? SYMLINK: FILE;
}
}
}

View File

@ -361,7 +361,7 @@ public class WebHdfsFileSystem extends FileSystem
return m; return m;
} }
IOException re = JsonUtil.toRemoteException(m); IOException re = JsonUtilClient.toRemoteException(m);
// extract UGI-related exceptions and unwrap InvalidToken // extract UGI-related exceptions and unwrap InvalidToken
// the NN mangles these exceptions but the DN does not and may need // the NN mangles these exceptions but the DN does not and may need
// to re-fetch a token if either report the token is expired // to re-fetch a token if either report the token is expired
@ -849,7 +849,7 @@ public class WebHdfsFileSystem extends FileSystem
HdfsFileStatus status = new FsPathResponseRunner<HdfsFileStatus>(op, f) { HdfsFileStatus status = new FsPathResponseRunner<HdfsFileStatus>(op, f) {
@Override @Override
HdfsFileStatus decodeResponse(Map<?,?> json) { HdfsFileStatus decodeResponse(Map<?,?> json) {
return JsonUtil.toFileStatus(json, true); return JsonUtilClient.toFileStatus(json, true);
} }
}.run(); }.run();
if (status == null) { if (status == null) {
@ -878,7 +878,7 @@ public class WebHdfsFileSystem extends FileSystem
AclStatus status = new FsPathResponseRunner<AclStatus>(op, f) { AclStatus status = new FsPathResponseRunner<AclStatus>(op, f) {
@Override @Override
AclStatus decodeResponse(Map<?,?> json) { AclStatus decodeResponse(Map<?,?> json) {
return JsonUtil.toAclStatus(json); return JsonUtilClient.toAclStatus(json);
} }
}.run(); }.run();
if (status == null) { if (status == null) {
@ -953,7 +953,7 @@ public class WebHdfsFileSystem extends FileSystem
new XAttrEncodingParam(XAttrCodec.HEX)) { new XAttrEncodingParam(XAttrCodec.HEX)) {
@Override @Override
byte[] decodeResponse(Map<?, ?> json) throws IOException { byte[] decodeResponse(Map<?, ?> json) throws IOException {
return JsonUtil.getXAttr(json, name); return JsonUtilClient.getXAttr(json, name);
} }
}.run(); }.run();
} }
@ -965,7 +965,7 @@ public class WebHdfsFileSystem extends FileSystem
new XAttrEncodingParam(XAttrCodec.HEX)) { new XAttrEncodingParam(XAttrCodec.HEX)) {
@Override @Override
Map<String, byte[]> decodeResponse(Map<?, ?> json) throws IOException { Map<String, byte[]> decodeResponse(Map<?, ?> json) throws IOException {
return JsonUtil.toXAttrs(json); return JsonUtilClient.toXAttrs(json);
} }
}.run(); }.run();
} }
@ -985,7 +985,7 @@ public class WebHdfsFileSystem extends FileSystem
return new FsPathResponseRunner<Map<String, byte[]>>(op, parameters, p) { return new FsPathResponseRunner<Map<String, byte[]>>(op, parameters, p) {
@Override @Override
Map<String, byte[]> decodeResponse(Map<?, ?> json) throws IOException { Map<String, byte[]> decodeResponse(Map<?, ?> json) throws IOException {
return JsonUtil.toXAttrs(json); return JsonUtilClient.toXAttrs(json);
} }
}.run(); }.run();
} }
@ -996,7 +996,7 @@ public class WebHdfsFileSystem extends FileSystem
return new FsPathResponseRunner<List<String>>(op, p) { return new FsPathResponseRunner<List<String>>(op, p) {
@Override @Override
List<String> decodeResponse(Map<?, ?> json) throws IOException { List<String> decodeResponse(Map<?, ?> json) throws IOException {
return JsonUtil.toXAttrNames(json); return JsonUtilClient.toXAttrNames(json);
} }
}.run(); }.run();
} }
@ -1299,15 +1299,15 @@ public class WebHdfsFileSystem extends FileSystem
@Override @Override
FileStatus[] decodeResponse(Map<?,?> json) { FileStatus[] decodeResponse(Map<?,?> json) {
final Map<?, ?> rootmap = (Map<?, ?>)json.get(FileStatus.class.getSimpleName() + "es"); final Map<?, ?> rootmap = (Map<?, ?>)json.get(FileStatus.class.getSimpleName() + "es");
final List<?> array = JsonUtil.getList( final List<?> array = JsonUtilClient.getList(rootmap,
rootmap, FileStatus.class.getSimpleName()); FileStatus.class.getSimpleName());
//convert FileStatus //convert FileStatus
final FileStatus[] statuses = new FileStatus[array.size()]; final FileStatus[] statuses = new FileStatus[array.size()];
int i = 0; int i = 0;
for (Object object : array) { for (Object object : array) {
final Map<?, ?> m = (Map<?, ?>) object; final Map<?, ?> m = (Map<?, ?>) object;
statuses[i++] = makeQualified(JsonUtil.toFileStatus(m, false), f); statuses[i++] = makeQualified(JsonUtilClient.toFileStatus(m, false), f);
} }
return statuses; return statuses;
} }
@ -1324,7 +1324,7 @@ public class WebHdfsFileSystem extends FileSystem
@Override @Override
Token<DelegationTokenIdentifier> decodeResponse(Map<?,?> json) Token<DelegationTokenIdentifier> decodeResponse(Map<?,?> json)
throws IOException { throws IOException {
return JsonUtil.toDelegationToken(json); return JsonUtilClient.toDelegationToken(json);
} }
}.run(); }.run();
if (token != null) { if (token != null) {
@ -1392,7 +1392,7 @@ public class WebHdfsFileSystem extends FileSystem
@Override @Override
BlockLocation[] decodeResponse(Map<?,?> json) throws IOException { BlockLocation[] decodeResponse(Map<?,?> json) throws IOException {
return DFSUtil.locatedBlocks2Locations( return DFSUtil.locatedBlocks2Locations(
JsonUtil.toLocatedBlocks(json)); JsonUtilClient.toLocatedBlocks(json));
} }
}.run(); }.run();
} }
@ -1411,7 +1411,7 @@ public class WebHdfsFileSystem extends FileSystem
return new FsPathResponseRunner<ContentSummary>(op, p) { return new FsPathResponseRunner<ContentSummary>(op, p) {
@Override @Override
ContentSummary decodeResponse(Map<?,?> json) { ContentSummary decodeResponse(Map<?,?> json) {
return JsonUtil.toContentSummary(json); return JsonUtilClient.toContentSummary(json);
} }
}.run(); }.run();
} }
@ -1425,7 +1425,7 @@ public class WebHdfsFileSystem extends FileSystem
return new FsPathResponseRunner<MD5MD5CRC32FileChecksum>(op, p) { return new FsPathResponseRunner<MD5MD5CRC32FileChecksum>(op, p) {
@Override @Override
MD5MD5CRC32FileChecksum decodeResponse(Map<?,?> json) throws IOException { MD5MD5CRC32FileChecksum decodeResponse(Map<?,?> json) throws IOException {
return JsonUtil.toMD5MD5CRC32FileChecksum(json); return JsonUtilClient.toMD5MD5CRC32FileChecksum(json);
} }
}.run(); }.run();
} }

View File

@ -33,10 +33,7 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.web.JsonUtil;
import org.apache.hadoop.hdfs.web.resources.ExceptionHandler;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
@ -49,10 +46,7 @@ import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.internal.util.reflection.Whitebox; import org.mockito.internal.util.reflection.Whitebox;
import org.mortbay.util.ajax.JSON;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Response;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.IOException; import java.io.IOException;
@ -61,10 +55,8 @@ import java.net.URI;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
/** /**
* Test case for client support of delegation tokens in an HA cluster. * Test case for client support of delegation tokens in an HA cluster.
@ -374,90 +366,6 @@ public class TestDelegationTokensWithHA {
token.cancel(conf); token.cancel(conf);
} }
/**
* Test if StandbyException can be thrown from StandbyNN, when it's requested for
* password. (HDFS-6475). With StandbyException, the client can failover to try
* activeNN.
*/
@Test(timeout = 300000)
public void testDelegationTokenStandbyNNAppearFirst() throws Exception {
// make nn0 the standby NN, and nn1 the active NN
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
final DelegationTokenSecretManager stSecretManager =
NameNodeAdapter.getDtSecretManager(
nn1.getNamesystem());
// create token
final Token<DelegationTokenIdentifier> token =
getDelegationToken(fs, "JobTracker");
final DelegationTokenIdentifier identifier = new DelegationTokenIdentifier();
byte[] tokenId = token.getIdentifier();
identifier.readFields(new DataInputStream(
new ByteArrayInputStream(tokenId)));
assertTrue(null != stSecretManager.retrievePassword(identifier));
final UserGroupInformation ugi = UserGroupInformation
.createRemoteUser("JobTracker");
ugi.addToken(token);
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() {
try {
try {
byte[] tmppw = dtSecretManager.retrievePassword(identifier);
fail("InvalidToken with cause StandbyException is expected"
+ " since nn0 is standby");
return tmppw;
} catch (IOException e) {
// Mimic the UserProvider class logic (server side) by throwing
// SecurityException here
throw new SecurityException(
SecurityUtil.FAILED_TO_GET_UGI_MSG_HEADER + " " + e, e);
}
} catch (Exception oe) {
//
// The exception oe caught here is
// java.lang.SecurityException: Failed to obtain user group
// information: org.apache.hadoop.security.token.
// SecretManager$InvalidToken: StandbyException
//
HttpServletResponse response = mock(HttpServletResponse.class);
ExceptionHandler eh = new ExceptionHandler();
eh.initResponse(response);
// The Response (resp) below is what the server will send to client
//
// BEFORE HDFS-6475 fix, the resp.entity is
// {"RemoteException":{"exception":"SecurityException",
// "javaClassName":"java.lang.SecurityException",
// "message":"Failed to obtain user group information:
// org.apache.hadoop.security.token.SecretManager$InvalidToken:
// StandbyException"}}
// AFTER the fix, the resp.entity is
// {"RemoteException":{"exception":"StandbyException",
// "javaClassName":"org.apache.hadoop.ipc.StandbyException",
// "message":"Operation category READ is not supported in
// state standby"}}
//
Response resp = eh.toResponse(oe);
// Mimic the client side logic by parsing the response from server
//
Map<?, ?> m = (Map<?, ?>)JSON.parse(resp.getEntity().toString());
RemoteException re = JsonUtil.toRemoteException(m);
Exception unwrapped = ((RemoteException)re).unwrapRemoteException(
StandbyException.class);
assertTrue (unwrapped instanceof StandbyException);
return null;
}
}
});
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private Token<DelegationTokenIdentifier> getDelegationToken(FileSystem fs, private Token<DelegationTokenIdentifier> getDelegationToken(FileSystem fs,
String renewer) throws IOException { String renewer) throws IOException {

View File

@ -73,7 +73,7 @@ public class TestJsonUtil {
System.out.println("json = " + json.replace(",", ",\n ")); System.out.println("json = " + json.replace(",", ",\n "));
ObjectReader reader = new ObjectMapper().reader(Map.class); ObjectReader reader = new ObjectMapper().reader(Map.class);
final HdfsFileStatus s2 = final HdfsFileStatus s2 =
JsonUtil.toFileStatus((Map<?, ?>) reader.readValue(json), true); JsonUtilClient.toFileStatus((Map<?, ?>) reader.readValue(json), true);
final FileStatus fs2 = toFileStatus(s2, parent); final FileStatus fs2 = toFileStatus(s2, parent);
System.out.println("s2 = " + s2); System.out.println("s2 = " + s2);
System.out.println("fs2 = " + fs2); System.out.println("fs2 = " + fs2);
@ -102,7 +102,7 @@ public class TestJsonUtil {
response.put("cacheCapacity", 123l); response.put("cacheCapacity", 123l);
response.put("cacheUsed", 321l); response.put("cacheUsed", 321l);
JsonUtil.toDatanodeInfo(response); JsonUtilClient.toDatanodeInfo(response);
} }
@Test @Test
@ -128,7 +128,7 @@ public class TestJsonUtil {
response.put("cacheCapacity", 123l); response.put("cacheCapacity", 123l);
response.put("cacheUsed", 321l); response.put("cacheUsed", 321l);
DatanodeInfo di = JsonUtil.toDatanodeInfo(response); DatanodeInfo di = JsonUtilClient.toDatanodeInfo(response);
Assert.assertEquals(name, di.getXferAddr()); Assert.assertEquals(name, di.getXferAddr());
// The encoded result should contain name, ipAddr and xferPort. // The encoded result should contain name, ipAddr and xferPort.
@ -175,7 +175,7 @@ public class TestJsonUtil {
aclStatusBuilder.stickyBit(false); aclStatusBuilder.stickyBit(false);
Assert.assertEquals("Should be equal", aclStatusBuilder.build(), Assert.assertEquals("Should be equal", aclStatusBuilder.build(),
JsonUtil.toAclStatus(json)); JsonUtilClient.toAclStatus(json));
} }
@Test @Test
@ -229,7 +229,7 @@ public class TestJsonUtil {
xAttrs.add(xAttr1); xAttrs.add(xAttr1);
xAttrs.add(xAttr2); xAttrs.add(xAttr2);
Map<String, byte[]> xAttrMap = XAttrHelper.buildXAttrMap(xAttrs); Map<String, byte[]> xAttrMap = XAttrHelper.buildXAttrMap(xAttrs);
Map<String, byte[]> parsedXAttrMap = JsonUtil.toXAttrs(json); Map<String, byte[]> parsedXAttrMap = JsonUtilClient.toXAttrs(json);
Assert.assertEquals(xAttrMap.size(), parsedXAttrMap.size()); Assert.assertEquals(xAttrMap.size(), parsedXAttrMap.size());
Iterator<Entry<String, byte[]>> iter = xAttrMap.entrySet().iterator(); Iterator<Entry<String, byte[]>> iter = xAttrMap.entrySet().iterator();
@ -249,13 +249,13 @@ public class TestJsonUtil {
Map<?, ?> json = reader.readValue(jsonString); Map<?, ?> json = reader.readValue(jsonString);
// Get xattr: user.a2 // Get xattr: user.a2
byte[] value = JsonUtil.getXAttr(json, "user.a2"); byte[] value = JsonUtilClient.getXAttr(json, "user.a2");
Assert.assertArrayEquals(XAttrCodec.decodeValue("0x313131"), value); Assert.assertArrayEquals(XAttrCodec.decodeValue("0x313131"), value);
} }
private void checkDecodeFailure(Map<String, Object> map) { private void checkDecodeFailure(Map<String, Object> map) {
try { try {
JsonUtil.toDatanodeInfo(map); JsonUtilClient.toDatanodeInfo(map);
Assert.fail("Exception not thrown against bad input."); Assert.fail("Exception not thrown against bad input.");
} catch (Exception e) { } catch (Exception e) {
// expected // expected

View File

@ -19,9 +19,12 @@
package org.apache.hadoop.hdfs.web; package org.apache.hadoop.hdfs.web;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.HashMap; import java.util.HashMap;
@ -38,14 +41,26 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.web.resources.ExceptionHandler;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox; import org.mockito.internal.util.reflection.Whitebox;
import org.mortbay.util.ajax.JSON;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Response;
public class TestWebHDFSForHA { public class TestWebHDFSForHA {
private static final String LOGICAL_NAME = "minidfs"; private static final String LOGICAL_NAME = "minidfs";
@ -123,6 +138,75 @@ public class TestWebHDFSForHA {
} }
} }
@Test
public void testClientFailoverWhenStandbyNNHasStaleCredentials()
throws IOException {
Configuration conf = DFSTestUtil.newHAConfiguration(LOGICAL_NAME);
conf.setBoolean(DFSConfigKeys
.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
MiniDFSCluster cluster = null;
WebHdfsFileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf).nnTopology(topo).numDataNodes(
0).build();
HATestUtil.setFailoverConfigurations(cluster, conf, LOGICAL_NAME);
cluster.waitActive();
fs = (WebHdfsFileSystem) FileSystem.get(WEBHDFS_URI, conf);
cluster.transitionToActive(0);
Token<?> token = fs.getDelegationToken(null);
final DelegationTokenIdentifier identifier = new DelegationTokenIdentifier();
identifier.readFields(
new DataInputStream(new ByteArrayInputStream(token.getIdentifier())));
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
final DelegationTokenSecretManager secretManager = NameNodeAdapter.getDtSecretManager(
cluster.getNamesystem(0));
ExceptionHandler eh = new ExceptionHandler();
eh.initResponse(mock(HttpServletResponse.class));
Response resp = null;
try {
secretManager.retrievePassword(identifier);
} catch (IOException e) {
// Mimic the UserProvider class logic (server side) by throwing
// SecurityException here
Assert.assertTrue(e instanceof SecretManager.InvalidToken);
resp = eh.toResponse(new SecurityException(e));
}
// The Response (resp) below is what the server will send to client
//
// BEFORE HDFS-6475 fix, the resp.entity is
// {"RemoteException":{"exception":"SecurityException",
// "javaClassName":"java.lang.SecurityException",
// "message":"Failed to obtain user group information:
// org.apache.hadoop.security.token.SecretManager$InvalidToken:
// StandbyException"}}
// AFTER the fix, the resp.entity is
// {"RemoteException":{"exception":"StandbyException",
// "javaClassName":"org.apache.hadoop.ipc.StandbyException",
// "message":"Operation category READ is not supported in
// state standby"}}
//
// Mimic the client side logic by parsing the response from server
//
Map<?, ?> m = (Map<?, ?>) JSON.parse(resp.getEntity().toString());
RemoteException re = JsonUtilClient.toRemoteException(m);
Exception unwrapped = re.unwrapRemoteException(StandbyException.class);
Assert.assertTrue(unwrapped instanceof StandbyException);
} finally {
IOUtils.cleanup(null, fs);
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test @Test
public void testFailoverAfterOpen() throws IOException { public void testFailoverAfterOpen() throws IOException {
Configuration conf = DFSTestUtil.newHAConfiguration(LOGICAL_NAME); Configuration conf = DFSTestUtil.newHAConfiguration(LOGICAL_NAME);

View File

@ -36,7 +36,6 @@ import java.net.URLConnection;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.Map; import java.util.Map;
import org.apache.commons.httpclient.HttpConnection;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
@ -282,7 +281,7 @@ public class TestWebHdfsTokens {
@Override @Override
Token<DelegationTokenIdentifier> decodeResponse(Map<?, ?> json) Token<DelegationTokenIdentifier> decodeResponse(Map<?, ?> json)
throws IOException { throws IOException {
return JsonUtil.toDelegationToken(json); return JsonUtilClient.toDelegationToken(json);
} }
}.run(); }.run();