HDFS-8080. Separate JSON related routines used by WebHdfsFileSystem to a package local class. Contributed by Haohui Mai.
This commit is contained in:
parent
d505c8acd3
commit
ab04ff9efe
|
@ -388,6 +388,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
HDFS-8073. Split BlockPlacementPolicyDefault.chooseTarget(..) so it
|
HDFS-8073. Split BlockPlacementPolicyDefault.chooseTarget(..) so it
|
||||||
can be easily overrided. (Walter Su via vinayakumarb)
|
can be easily overrided. (Walter Su via vinayakumarb)
|
||||||
|
|
||||||
|
HDFS-8080. Separate JSON related routines used by WebHdfsFileSystem to a
|
||||||
|
package local class. (wheat9)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||||
|
|
|
@ -21,34 +21,22 @@ import org.apache.hadoop.fs.*;
|
||||||
import org.apache.hadoop.fs.permission.AclEntry;
|
import org.apache.hadoop.fs.permission.AclEntry;
|
||||||
import org.apache.hadoop.fs.permission.AclStatus;
|
import org.apache.hadoop.fs.permission.AclStatus;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
|
||||||
import org.apache.hadoop.hdfs.XAttrHelper;
|
import org.apache.hadoop.hdfs.XAttrHelper;
|
||||||
import org.apache.hadoop.hdfs.protocol.*;
|
import org.apache.hadoop.hdfs.protocol.*;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
|
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
|
||||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INodeId;
|
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
import org.apache.hadoop.util.DataChecksum;
|
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.codehaus.jackson.map.ObjectMapper;
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
import org.codehaus.jackson.map.ObjectReader;
|
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
|
||||||
import java.io.DataInputStream;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
|
||||||
/** JSON Utilities */
|
/** JSON Utilities */
|
||||||
public class JsonUtil {
|
public class JsonUtil {
|
||||||
private static final Object[] EMPTY_OBJECT_ARRAY = {};
|
private static final Object[] EMPTY_OBJECT_ARRAY = {};
|
||||||
private static final DatanodeInfo[] EMPTY_DATANODE_INFO_ARRAY = {};
|
|
||||||
|
|
||||||
/** Convert a token object to a Json string. */
|
/** Convert a token object to a Json string. */
|
||||||
public static String toJsonString(final Token<? extends TokenIdentifier> token
|
public static String toJsonString(final Token<? extends TokenIdentifier> token
|
||||||
|
@ -67,34 +55,6 @@ public class JsonUtil {
|
||||||
return m;
|
return m;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Convert a Json map to a Token. */
|
|
||||||
public static Token<? extends TokenIdentifier> toToken(
|
|
||||||
final Map<?, ?> m) throws IOException {
|
|
||||||
if (m == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
final Token<DelegationTokenIdentifier> token
|
|
||||||
= new Token<DelegationTokenIdentifier>();
|
|
||||||
token.decodeFromUrlString((String)m.get("urlString"));
|
|
||||||
return token;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Convert a Json map to a Token of DelegationTokenIdentifier. */
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
public static Token<DelegationTokenIdentifier> toDelegationToken(
|
|
||||||
final Map<?, ?> json) throws IOException {
|
|
||||||
final Map<?, ?> m = (Map<?, ?>)json.get(Token.class.getSimpleName());
|
|
||||||
return (Token<DelegationTokenIdentifier>)toToken(m);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Convert a Json map to a Token of BlockTokenIdentifier. */
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private static Token<BlockTokenIdentifier> toBlockToken(
|
|
||||||
final Map<?, ?> m) throws IOException {
|
|
||||||
return (Token<BlockTokenIdentifier>)toToken(m);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Convert an exception object to a Json string. */
|
/** Convert an exception object to a Json string. */
|
||||||
public static String toJsonString(final Exception e) {
|
public static String toJsonString(final Exception e) {
|
||||||
final Map<String, Object> m = new TreeMap<String, Object>();
|
final Map<String, Object> m = new TreeMap<String, Object>();
|
||||||
|
@ -104,14 +64,6 @@ public class JsonUtil {
|
||||||
return toJsonString(RemoteException.class, m);
|
return toJsonString(RemoteException.class, m);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Convert a Json map to a RemoteException. */
|
|
||||||
public static RemoteException toRemoteException(final Map<?, ?> json) {
|
|
||||||
final Map<?, ?> m = (Map<?, ?>)json.get(RemoteException.class.getSimpleName());
|
|
||||||
final String message = (String)m.get("message");
|
|
||||||
final String javaClassName = (String)m.get("javaClassName");
|
|
||||||
return new RemoteException(javaClassName, message);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static String toJsonString(final Class<?> clazz, final Object value) {
|
private static String toJsonString(final Class<?> clazz, final Object value) {
|
||||||
return toJsonString(clazz.getSimpleName(), value);
|
return toJsonString(clazz.getSimpleName(), value);
|
||||||
}
|
}
|
||||||
|
@ -133,27 +85,6 @@ public class JsonUtil {
|
||||||
return String.format("%o", permission.toShort());
|
return String.format("%o", permission.toShort());
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Convert a string to a FsPermission object. */
|
|
||||||
private static FsPermission toFsPermission(final String s, Boolean aclBit,
|
|
||||||
Boolean encBit) {
|
|
||||||
FsPermission perm = new FsPermission(Short.parseShort(s, 8));
|
|
||||||
final boolean aBit = (aclBit != null) ? aclBit : false;
|
|
||||||
final boolean eBit = (encBit != null) ? encBit : false;
|
|
||||||
if (aBit || eBit) {
|
|
||||||
return new FsPermissionExtension(perm, aBit, eBit);
|
|
||||||
} else {
|
|
||||||
return perm;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static enum PathType {
|
|
||||||
FILE, DIRECTORY, SYMLINK;
|
|
||||||
|
|
||||||
static PathType valueOf(HdfsFileStatus status) {
|
|
||||||
return status.isDir()? DIRECTORY: status.isSymlink()? SYMLINK: FILE;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Convert a HdfsFileStatus object to a Json string. */
|
/** Convert a HdfsFileStatus object to a Json string. */
|
||||||
public static String toJsonString(final HdfsFileStatus status,
|
public static String toJsonString(final HdfsFileStatus status,
|
||||||
boolean includeType) {
|
boolean includeType) {
|
||||||
|
@ -162,7 +93,7 @@ public class JsonUtil {
|
||||||
}
|
}
|
||||||
final Map<String, Object> m = new TreeMap<String, Object>();
|
final Map<String, Object> m = new TreeMap<String, Object>();
|
||||||
m.put("pathSuffix", status.getLocalName());
|
m.put("pathSuffix", status.getLocalName());
|
||||||
m.put("type", PathType.valueOf(status));
|
m.put("type", WebHdfsConstants.PathType.valueOf(status));
|
||||||
if (status.isSymlink()) {
|
if (status.isSymlink()) {
|
||||||
m.put("symlink", status.getSymlink());
|
m.put("symlink", status.getSymlink());
|
||||||
}
|
}
|
||||||
|
@ -194,42 +125,6 @@ public class JsonUtil {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Convert a Json map to a HdfsFileStatus object. */
|
|
||||||
public static HdfsFileStatus toFileStatus(final Map<?, ?> json, boolean includesType) {
|
|
||||||
if (json == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
final Map<?, ?> m = includesType ?
|
|
||||||
(Map<?, ?>)json.get(FileStatus.class.getSimpleName()) : json;
|
|
||||||
final String localName = (String) m.get("pathSuffix");
|
|
||||||
final PathType type = PathType.valueOf((String) m.get("type"));
|
|
||||||
final byte[] symlink = type != PathType.SYMLINK? null
|
|
||||||
: DFSUtil.string2Bytes((String)m.get("symlink"));
|
|
||||||
|
|
||||||
final long len = ((Number) m.get("length")).longValue();
|
|
||||||
final String owner = (String) m.get("owner");
|
|
||||||
final String group = (String) m.get("group");
|
|
||||||
final FsPermission permission = toFsPermission((String) m.get("permission"),
|
|
||||||
(Boolean)m.get("aclBit"), (Boolean)m.get("encBit"));
|
|
||||||
final long aTime = ((Number) m.get("accessTime")).longValue();
|
|
||||||
final long mTime = ((Number) m.get("modificationTime")).longValue();
|
|
||||||
final long blockSize = ((Number) m.get("blockSize")).longValue();
|
|
||||||
final boolean isLazyPersist = m.containsKey("lazyPersist")
|
|
||||||
? (Boolean) m.get("lazyPersist") : false;
|
|
||||||
final short replication = ((Number) m.get("replication")).shortValue();
|
|
||||||
final long fileId = m.containsKey("fileId") ?
|
|
||||||
((Number) m.get("fileId")).longValue() : INodeId.GRANDFATHER_INODE_ID;
|
|
||||||
final int childrenNum = getInt(m, "childrenNum", -1);
|
|
||||||
final byte storagePolicy = m.containsKey("storagePolicy") ?
|
|
||||||
(byte) ((Number) m.get("storagePolicy")).longValue() :
|
|
||||||
BlockStoragePolicySuite.ID_UNSPECIFIED;
|
|
||||||
return new HdfsFileStatus(len, type == PathType.DIRECTORY, replication,
|
|
||||||
blockSize, mTime, aTime, permission, owner, group,
|
|
||||||
symlink, DFSUtil.string2Bytes(localName), fileId, childrenNum, null,
|
|
||||||
storagePolicy);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Convert an ExtendedBlock to a Json map. */
|
/** Convert an ExtendedBlock to a Json map. */
|
||||||
private static Map<String, Object> toJsonMap(final ExtendedBlock extendedblock) {
|
private static Map<String, Object> toJsonMap(final ExtendedBlock extendedblock) {
|
||||||
if (extendedblock == null) {
|
if (extendedblock == null) {
|
||||||
|
@ -244,20 +139,6 @@ public class JsonUtil {
|
||||||
return m;
|
return m;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Convert a Json map to an ExtendedBlock object. */
|
|
||||||
private static ExtendedBlock toExtendedBlock(final Map<?, ?> m) {
|
|
||||||
if (m == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
final String blockPoolId = (String)m.get("blockPoolId");
|
|
||||||
final long blockId = ((Number) m.get("blockId")).longValue();
|
|
||||||
final long numBytes = ((Number) m.get("numBytes")).longValue();
|
|
||||||
final long generationStamp =
|
|
||||||
((Number) m.get("generationStamp")).longValue();
|
|
||||||
return new ExtendedBlock(blockPoolId, blockId, numBytes, generationStamp);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Convert a DatanodeInfo to a Json map. */
|
/** Convert a DatanodeInfo to a Json map. */
|
||||||
static Map<String, Object> toJsonMap(final DatanodeInfo datanodeinfo) {
|
static Map<String, Object> toJsonMap(final DatanodeInfo datanodeinfo) {
|
||||||
if (datanodeinfo == null) {
|
if (datanodeinfo == null) {
|
||||||
|
@ -291,101 +172,6 @@ public class JsonUtil {
|
||||||
return m;
|
return m;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static int getInt(Map<?, ?> m, String key, final int defaultValue) {
|
|
||||||
Object value = m.get(key);
|
|
||||||
if (value == null) {
|
|
||||||
return defaultValue;
|
|
||||||
}
|
|
||||||
return ((Number) value).intValue();
|
|
||||||
}
|
|
||||||
|
|
||||||
private static long getLong(Map<?, ?> m, String key, final long defaultValue) {
|
|
||||||
Object value = m.get(key);
|
|
||||||
if (value == null) {
|
|
||||||
return defaultValue;
|
|
||||||
}
|
|
||||||
return ((Number) value).longValue();
|
|
||||||
}
|
|
||||||
|
|
||||||
private static String getString(Map<?, ?> m, String key,
|
|
||||||
final String defaultValue) {
|
|
||||||
Object value = m.get(key);
|
|
||||||
if (value == null) {
|
|
||||||
return defaultValue;
|
|
||||||
}
|
|
||||||
return (String) value;
|
|
||||||
}
|
|
||||||
|
|
||||||
static List<?> getList(Map<?, ?> m, String key) {
|
|
||||||
Object list = m.get(key);
|
|
||||||
if (list instanceof List<?>) {
|
|
||||||
return (List<?>) list;
|
|
||||||
} else {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Convert a Json map to an DatanodeInfo object. */
|
|
||||||
static DatanodeInfo toDatanodeInfo(final Map<?, ?> m)
|
|
||||||
throws IOException {
|
|
||||||
if (m == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
// ipAddr and xferPort are the critical fields for accessing data.
|
|
||||||
// If any one of the two is missing, an exception needs to be thrown.
|
|
||||||
|
|
||||||
// Handle the case of old servers (1.x, 0.23.x) sending 'name' instead
|
|
||||||
// of ipAddr and xferPort.
|
|
||||||
String ipAddr = getString(m, "ipAddr", null);
|
|
||||||
int xferPort = getInt(m, "xferPort", -1);
|
|
||||||
if (ipAddr == null) {
|
|
||||||
String name = getString(m, "name", null);
|
|
||||||
if (name != null) {
|
|
||||||
int colonIdx = name.indexOf(':');
|
|
||||||
if (colonIdx > 0) {
|
|
||||||
ipAddr = name.substring(0, colonIdx);
|
|
||||||
xferPort = Integer.parseInt(name.substring(colonIdx +1));
|
|
||||||
} else {
|
|
||||||
throw new IOException(
|
|
||||||
"Invalid value in server response: name=[" + name + "]");
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
throw new IOException(
|
|
||||||
"Missing both 'ipAddr' and 'name' in server response.");
|
|
||||||
}
|
|
||||||
// ipAddr is non-null & non-empty string at this point.
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check the validity of xferPort.
|
|
||||||
if (xferPort == -1) {
|
|
||||||
throw new IOException(
|
|
||||||
"Invalid or missing 'xferPort' in server response.");
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: Fix storageID
|
|
||||||
return new DatanodeInfo(
|
|
||||||
ipAddr,
|
|
||||||
(String)m.get("hostName"),
|
|
||||||
(String)m.get("storageID"),
|
|
||||||
xferPort,
|
|
||||||
((Number) m.get("infoPort")).intValue(),
|
|
||||||
getInt(m, "infoSecurePort", 0),
|
|
||||||
((Number) m.get("ipcPort")).intValue(),
|
|
||||||
|
|
||||||
getLong(m, "capacity", 0l),
|
|
||||||
getLong(m, "dfsUsed", 0l),
|
|
||||||
getLong(m, "remaining", 0l),
|
|
||||||
getLong(m, "blockPoolUsed", 0l),
|
|
||||||
getLong(m, "cacheCapacity", 0l),
|
|
||||||
getLong(m, "cacheUsed", 0l),
|
|
||||||
getLong(m, "lastUpdate", 0l),
|
|
||||||
getLong(m, "lastUpdateMonotonic", 0l),
|
|
||||||
getInt(m, "xceiverCount", 0),
|
|
||||||
getString(m, "networkLocation", ""),
|
|
||||||
AdminStates.valueOf(getString(m, "adminState", "NORMAL")));
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Convert a DatanodeInfo[] to a Json array. */
|
/** Convert a DatanodeInfo[] to a Json array. */
|
||||||
private static Object[] toJsonArray(final DatanodeInfo[] array) {
|
private static Object[] toJsonArray(final DatanodeInfo[] array) {
|
||||||
if (array == null) {
|
if (array == null) {
|
||||||
|
@ -401,23 +187,6 @@ public class JsonUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Convert an Object[] to a DatanodeInfo[]. */
|
|
||||||
private static DatanodeInfo[] toDatanodeInfoArray(final List<?> objects)
|
|
||||||
throws IOException {
|
|
||||||
if (objects == null) {
|
|
||||||
return null;
|
|
||||||
} else if (objects.isEmpty()) {
|
|
||||||
return EMPTY_DATANODE_INFO_ARRAY;
|
|
||||||
} else {
|
|
||||||
final DatanodeInfo[] array = new DatanodeInfo[objects.size()];
|
|
||||||
int i = 0;
|
|
||||||
for (Object object : objects) {
|
|
||||||
array[i++] = toDatanodeInfo((Map<?, ?>) object);
|
|
||||||
}
|
|
||||||
return array;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Convert a LocatedBlock to a Json map. */
|
/** Convert a LocatedBlock to a Json map. */
|
||||||
private static Map<String, Object> toJsonMap(final LocatedBlock locatedblock
|
private static Map<String, Object> toJsonMap(final LocatedBlock locatedblock
|
||||||
) throws IOException {
|
) throws IOException {
|
||||||
|
@ -435,26 +204,6 @@ public class JsonUtil {
|
||||||
return m;
|
return m;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Convert a Json map to LocatedBlock. */
|
|
||||||
private static LocatedBlock toLocatedBlock(final Map<?, ?> m) throws IOException {
|
|
||||||
if (m == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
final ExtendedBlock b = toExtendedBlock((Map<?, ?>)m.get("block"));
|
|
||||||
final DatanodeInfo[] locations = toDatanodeInfoArray(
|
|
||||||
getList(m, "locations"));
|
|
||||||
final long startOffset = ((Number) m.get("startOffset")).longValue();
|
|
||||||
final boolean isCorrupt = (Boolean)m.get("isCorrupt");
|
|
||||||
final DatanodeInfo[] cachedLocations = toDatanodeInfoArray(
|
|
||||||
getList(m, "cachedLocations"));
|
|
||||||
|
|
||||||
final LocatedBlock locatedblock = new LocatedBlock(b, locations,
|
|
||||||
null, null, startOffset, isCorrupt, cachedLocations);
|
|
||||||
locatedblock.setBlockToken(toBlockToken((Map<?, ?>)m.get("blockToken")));
|
|
||||||
return locatedblock;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Convert a LocatedBlock[] to a Json array. */
|
/** Convert a LocatedBlock[] to a Json array. */
|
||||||
private static Object[] toJsonArray(final List<LocatedBlock> array
|
private static Object[] toJsonArray(final List<LocatedBlock> array
|
||||||
) throws IOException {
|
) throws IOException {
|
||||||
|
@ -471,22 +220,6 @@ public class JsonUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Convert an List of Object to a List of LocatedBlock. */
|
|
||||||
private static List<LocatedBlock> toLocatedBlockList(
|
|
||||||
final List<?> objects) throws IOException {
|
|
||||||
if (objects == null) {
|
|
||||||
return null;
|
|
||||||
} else if (objects.isEmpty()) {
|
|
||||||
return Collections.emptyList();
|
|
||||||
} else {
|
|
||||||
final List<LocatedBlock> list = new ArrayList<>(objects.size());
|
|
||||||
for (Object object : objects) {
|
|
||||||
list.add(toLocatedBlock((Map<?, ?>) object));
|
|
||||||
}
|
|
||||||
return list;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Convert LocatedBlocks to a Json string. */
|
/** Convert LocatedBlocks to a Json string. */
|
||||||
public static String toJsonString(final LocatedBlocks locatedblocks
|
public static String toJsonString(final LocatedBlocks locatedblocks
|
||||||
) throws IOException {
|
) throws IOException {
|
||||||
|
@ -504,25 +237,6 @@ public class JsonUtil {
|
||||||
return toJsonString(LocatedBlocks.class, m);
|
return toJsonString(LocatedBlocks.class, m);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Convert a Json map to LocatedBlock. */
|
|
||||||
public static LocatedBlocks toLocatedBlocks(final Map<?, ?> json
|
|
||||||
) throws IOException {
|
|
||||||
if (json == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
final Map<?, ?> m = (Map<?, ?>)json.get(LocatedBlocks.class.getSimpleName());
|
|
||||||
final long fileLength = ((Number) m.get("fileLength")).longValue();
|
|
||||||
final boolean isUnderConstruction = (Boolean)m.get("isUnderConstruction");
|
|
||||||
final List<LocatedBlock> locatedBlocks = toLocatedBlockList(
|
|
||||||
getList(m, "locatedBlocks"));
|
|
||||||
final LocatedBlock lastLocatedBlock = toLocatedBlock(
|
|
||||||
(Map<?, ?>)m.get("lastLocatedBlock"));
|
|
||||||
final boolean isLastBlockComplete = (Boolean)m.get("isLastBlockComplete");
|
|
||||||
return new LocatedBlocks(fileLength, isUnderConstruction, locatedBlocks,
|
|
||||||
lastLocatedBlock, isLastBlockComplete, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Convert a ContentSummary to a Json string. */
|
/** Convert a ContentSummary to a Json string. */
|
||||||
public static String toJsonString(final ContentSummary contentsummary) {
|
public static String toJsonString(final ContentSummary contentsummary) {
|
||||||
if (contentsummary == null) {
|
if (contentsummary == null) {
|
||||||
|
@ -539,25 +253,6 @@ public class JsonUtil {
|
||||||
return toJsonString(ContentSummary.class, m);
|
return toJsonString(ContentSummary.class, m);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Convert a Json map to a ContentSummary. */
|
|
||||||
public static ContentSummary toContentSummary(final Map<?, ?> json) {
|
|
||||||
if (json == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
final Map<?, ?> m = (Map<?, ?>)json.get(ContentSummary.class.getSimpleName());
|
|
||||||
final long length = ((Number) m.get("length")).longValue();
|
|
||||||
final long fileCount = ((Number) m.get("fileCount")).longValue();
|
|
||||||
final long directoryCount = ((Number) m.get("directoryCount")).longValue();
|
|
||||||
final long quota = ((Number) m.get("quota")).longValue();
|
|
||||||
final long spaceConsumed = ((Number) m.get("spaceConsumed")).longValue();
|
|
||||||
final long spaceQuota = ((Number) m.get("spaceQuota")).longValue();
|
|
||||||
|
|
||||||
return new ContentSummary.Builder().length(length).fileCount(fileCount).
|
|
||||||
directoryCount(directoryCount).quota(quota).spaceConsumed(spaceConsumed).
|
|
||||||
spaceQuota(spaceQuota).build();
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Convert a MD5MD5CRC32FileChecksum to a Json string. */
|
/** Convert a MD5MD5CRC32FileChecksum to a Json string. */
|
||||||
public static String toJsonString(final MD5MD5CRC32FileChecksum checksum) {
|
public static String toJsonString(final MD5MD5CRC32FileChecksum checksum) {
|
||||||
if (checksum == null) {
|
if (checksum == null) {
|
||||||
|
@ -571,49 +266,6 @@ public class JsonUtil {
|
||||||
return toJsonString(FileChecksum.class, m);
|
return toJsonString(FileChecksum.class, m);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Convert a Json map to a MD5MD5CRC32FileChecksum. */
|
|
||||||
public static MD5MD5CRC32FileChecksum toMD5MD5CRC32FileChecksum(
|
|
||||||
final Map<?, ?> json) throws IOException {
|
|
||||||
if (json == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
final Map<?, ?> m = (Map<?, ?>)json.get(FileChecksum.class.getSimpleName());
|
|
||||||
final String algorithm = (String)m.get("algorithm");
|
|
||||||
final int length = ((Number) m.get("length")).intValue();
|
|
||||||
final byte[] bytes = StringUtils.hexStringToByte((String)m.get("bytes"));
|
|
||||||
|
|
||||||
final DataInputStream in = new DataInputStream(new ByteArrayInputStream(bytes));
|
|
||||||
final DataChecksum.Type crcType =
|
|
||||||
MD5MD5CRC32FileChecksum.getCrcTypeFromAlgorithmName(algorithm);
|
|
||||||
final MD5MD5CRC32FileChecksum checksum;
|
|
||||||
|
|
||||||
// Recreate what DFSClient would have returned.
|
|
||||||
switch(crcType) {
|
|
||||||
case CRC32:
|
|
||||||
checksum = new MD5MD5CRC32GzipFileChecksum();
|
|
||||||
break;
|
|
||||||
case CRC32C:
|
|
||||||
checksum = new MD5MD5CRC32CastagnoliFileChecksum();
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
throw new IOException("Unknown algorithm: " + algorithm);
|
|
||||||
}
|
|
||||||
checksum.readFields(in);
|
|
||||||
|
|
||||||
//check algorithm name
|
|
||||||
if (!checksum.getAlgorithmName().equals(algorithm)) {
|
|
||||||
throw new IOException("Algorithm not matched. Expected " + algorithm
|
|
||||||
+ ", Received " + checksum.getAlgorithmName());
|
|
||||||
}
|
|
||||||
//check length
|
|
||||||
if (length != checksum.getLength()) {
|
|
||||||
throw new IOException("Length not matched: length=" + length
|
|
||||||
+ ", checksum.getLength()=" + checksum.getLength());
|
|
||||||
}
|
|
||||||
|
|
||||||
return checksum;
|
|
||||||
}
|
|
||||||
/** Convert a AclStatus object to a Json string. */
|
/** Convert a AclStatus object to a Json string. */
|
||||||
public static String toJsonString(final AclStatus status) {
|
public static String toJsonString(final AclStatus status) {
|
||||||
if (status == null) {
|
if (status == null) {
|
||||||
|
@ -653,35 +305,6 @@ public class JsonUtil {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Convert a Json map to a AclStatus object. */
|
|
||||||
public static AclStatus toAclStatus(final Map<?, ?> json) {
|
|
||||||
if (json == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
final Map<?, ?> m = (Map<?, ?>) json.get(AclStatus.class.getSimpleName());
|
|
||||||
|
|
||||||
AclStatus.Builder aclStatusBuilder = new AclStatus.Builder();
|
|
||||||
aclStatusBuilder.owner((String) m.get("owner"));
|
|
||||||
aclStatusBuilder.group((String) m.get("group"));
|
|
||||||
aclStatusBuilder.stickyBit((Boolean) m.get("stickyBit"));
|
|
||||||
String permString = (String) m.get("permission");
|
|
||||||
if (permString != null) {
|
|
||||||
final FsPermission permission = toFsPermission(permString,
|
|
||||||
(Boolean) m.get("aclBit"), (Boolean) m.get("encBit"));
|
|
||||||
aclStatusBuilder.setPermission(permission);
|
|
||||||
}
|
|
||||||
final List<?> entries = (List<?>) m.get("entries");
|
|
||||||
|
|
||||||
List<AclEntry> aclEntryList = new ArrayList<AclEntry>();
|
|
||||||
for (Object entry : entries) {
|
|
||||||
AclEntry aclEntry = AclEntry.parseAclEntry((String) entry, true);
|
|
||||||
aclEntryList.add(aclEntry);
|
|
||||||
}
|
|
||||||
aclStatusBuilder.addEntries(aclEntryList);
|
|
||||||
return aclStatusBuilder.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
private static Map<String, Object> toJsonMap(final XAttr xAttr,
|
private static Map<String, Object> toJsonMap(final XAttr xAttr,
|
||||||
final XAttrCodec encoding) throws IOException {
|
final XAttrCodec encoding) throws IOException {
|
||||||
if (xAttr == null) {
|
if (xAttr == null) {
|
||||||
|
@ -731,69 +354,4 @@ public class JsonUtil {
|
||||||
return mapper.writeValueAsString(finalMap);
|
return mapper.writeValueAsString(finalMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static byte[] getXAttr(final Map<?, ?> json, final String name)
|
|
||||||
throws IOException {
|
|
||||||
if (json == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
Map<String, byte[]> xAttrs = toXAttrs(json);
|
|
||||||
if (xAttrs != null) {
|
|
||||||
return xAttrs.get(name);
|
|
||||||
}
|
|
||||||
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Map<String, byte[]> toXAttrs(final Map<?, ?> json)
|
|
||||||
throws IOException {
|
|
||||||
if (json == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
return toXAttrMap(getList(json, "XAttrs"));
|
|
||||||
}
|
|
||||||
|
|
||||||
public static List<String> toXAttrNames(final Map<?, ?> json)
|
|
||||||
throws IOException {
|
|
||||||
if (json == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
final String namesInJson = (String) json.get("XAttrNames");
|
|
||||||
ObjectReader reader = new ObjectMapper().reader(List.class);
|
|
||||||
final List<Object> xattrs = reader.readValue(namesInJson);
|
|
||||||
final List<String> names =
|
|
||||||
Lists.newArrayListWithCapacity(json.keySet().size());
|
|
||||||
|
|
||||||
for (Object xattr : xattrs) {
|
|
||||||
names.add((String) xattr);
|
|
||||||
}
|
|
||||||
return names;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static Map<String, byte[]> toXAttrMap(final List<?> objects)
|
|
||||||
throws IOException {
|
|
||||||
if (objects == null) {
|
|
||||||
return null;
|
|
||||||
} else if (objects.isEmpty()) {
|
|
||||||
return Maps.newHashMap();
|
|
||||||
} else {
|
|
||||||
final Map<String, byte[]> xAttrs = Maps.newHashMap();
|
|
||||||
for (Object object : objects) {
|
|
||||||
Map<?, ?> m = (Map<?, ?>) object;
|
|
||||||
String name = (String) m.get("name");
|
|
||||||
String value = (String) m.get("value");
|
|
||||||
xAttrs.put(name, decodeXAttrValue(value));
|
|
||||||
}
|
|
||||||
return xAttrs;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static byte[] decodeXAttrValue(String value) throws IOException {
|
|
||||||
if (value != null) {
|
|
||||||
return XAttrCodec.decodeValue(value);
|
|
||||||
} else {
|
|
||||||
return new byte[0];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,485 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.web;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
import org.apache.hadoop.fs.ContentSummary;
|
||||||
|
import org.apache.hadoop.fs.FileChecksum;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum;
|
||||||
|
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
|
||||||
|
import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
|
||||||
|
import org.apache.hadoop.fs.XAttrCodec;
|
||||||
|
import org.apache.hadoop.fs.permission.AclEntry;
|
||||||
|
import org.apache.hadoop.fs.permission.AclStatus;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||||
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.INodeId;
|
||||||
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
|
import org.codehaus.jackson.map.ObjectReader;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.io.DataInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
class JsonUtilClient {
|
||||||
|
static final DatanodeInfo[] EMPTY_DATANODE_INFO_ARRAY = {};
|
||||||
|
|
||||||
|
/** Convert a Json map to a RemoteException. */
|
||||||
|
static RemoteException toRemoteException(final Map<?, ?> json) {
|
||||||
|
final Map<?, ?> m = (Map<?, ?>)json.get(RemoteException.class.getSimpleName());
|
||||||
|
final String message = (String)m.get("message");
|
||||||
|
final String javaClassName = (String)m.get("javaClassName");
|
||||||
|
return new RemoteException(javaClassName, message);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Convert a Json map to a Token. */
|
||||||
|
static Token<? extends TokenIdentifier> toToken(
|
||||||
|
final Map<?, ?> m) throws IOException {
|
||||||
|
if (m == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
final Token<DelegationTokenIdentifier> token
|
||||||
|
= new Token<>();
|
||||||
|
token.decodeFromUrlString((String)m.get("urlString"));
|
||||||
|
return token;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Convert a Json map to a Token of BlockTokenIdentifier. */
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
static Token<BlockTokenIdentifier> toBlockToken(
|
||||||
|
final Map<?, ?> m) throws IOException {
|
||||||
|
return (Token<BlockTokenIdentifier>)toToken(m);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Convert a string to a FsPermission object. */
|
||||||
|
static FsPermission toFsPermission(
|
||||||
|
final String s, Boolean aclBit, Boolean encBit) {
|
||||||
|
FsPermission perm = new FsPermission(Short.parseShort(s, 8));
|
||||||
|
final boolean aBit = (aclBit != null) ? aclBit : false;
|
||||||
|
final boolean eBit = (encBit != null) ? encBit : false;
|
||||||
|
if (aBit || eBit) {
|
||||||
|
return new FsPermissionExtension(perm, aBit, eBit);
|
||||||
|
} else {
|
||||||
|
return perm;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Convert a Json map to a HdfsFileStatus object. */
|
||||||
|
static HdfsFileStatus toFileStatus(final Map<?, ?> json, boolean includesType) {
|
||||||
|
if (json == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
final Map<?, ?> m = includesType ?
|
||||||
|
(Map<?, ?>)json.get(FileStatus.class.getSimpleName()) : json;
|
||||||
|
final String localName = (String) m.get("pathSuffix");
|
||||||
|
final WebHdfsConstants.PathType type = WebHdfsConstants.PathType.valueOf((String) m.get("type"));
|
||||||
|
final byte[] symlink = type != WebHdfsConstants.PathType.SYMLINK? null
|
||||||
|
: DFSUtil.string2Bytes((String) m.get("symlink"));
|
||||||
|
|
||||||
|
final long len = ((Number) m.get("length")).longValue();
|
||||||
|
final String owner = (String) m.get("owner");
|
||||||
|
final String group = (String) m.get("group");
|
||||||
|
final FsPermission permission = toFsPermission((String) m.get("permission"),
|
||||||
|
(Boolean) m.get("aclBit"),
|
||||||
|
(Boolean) m.get("encBit"));
|
||||||
|
final long aTime = ((Number) m.get("accessTime")).longValue();
|
||||||
|
final long mTime = ((Number) m.get("modificationTime")).longValue();
|
||||||
|
final long blockSize = ((Number) m.get("blockSize")).longValue();
|
||||||
|
final short replication = ((Number) m.get("replication")).shortValue();
|
||||||
|
final long fileId = m.containsKey("fileId") ?
|
||||||
|
((Number) m.get("fileId")).longValue() : INodeId.GRANDFATHER_INODE_ID;
|
||||||
|
final int childrenNum = getInt(m, "childrenNum", -1);
|
||||||
|
final byte storagePolicy = m.containsKey("storagePolicy") ?
|
||||||
|
(byte) ((Number) m.get("storagePolicy")).longValue() :
|
||||||
|
BlockStoragePolicySuite.ID_UNSPECIFIED;
|
||||||
|
return new HdfsFileStatus(len, type == WebHdfsConstants.PathType.DIRECTORY, replication,
|
||||||
|
blockSize, mTime, aTime, permission, owner, group,
|
||||||
|
symlink, DFSUtil.string2Bytes(localName), fileId, childrenNum, null,
|
||||||
|
storagePolicy);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Convert a Json map to an ExtendedBlock object. */
|
||||||
|
static ExtendedBlock toExtendedBlock(final Map<?, ?> m) {
|
||||||
|
if (m == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
final String blockPoolId = (String)m.get("blockPoolId");
|
||||||
|
final long blockId = ((Number) m.get("blockId")).longValue();
|
||||||
|
final long numBytes = ((Number) m.get("numBytes")).longValue();
|
||||||
|
final long generationStamp =
|
||||||
|
((Number) m.get("generationStamp")).longValue();
|
||||||
|
return new ExtendedBlock(blockPoolId, blockId, numBytes, generationStamp);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int getInt(Map<?, ?> m, String key, final int defaultValue) {
|
||||||
|
Object value = m.get(key);
|
||||||
|
if (value == null) {
|
||||||
|
return defaultValue;
|
||||||
|
}
|
||||||
|
return ((Number) value).intValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
static long getLong(Map<?, ?> m, String key, final long defaultValue) {
|
||||||
|
Object value = m.get(key);
|
||||||
|
if (value == null) {
|
||||||
|
return defaultValue;
|
||||||
|
}
|
||||||
|
return ((Number) value).longValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
static String getString(
|
||||||
|
Map<?, ?> m, String key, final String defaultValue) {
|
||||||
|
Object value = m.get(key);
|
||||||
|
if (value == null) {
|
||||||
|
return defaultValue;
|
||||||
|
}
|
||||||
|
return (String) value;
|
||||||
|
}
|
||||||
|
|
||||||
|
static List<?> getList(Map<?, ?> m, String key) {
|
||||||
|
Object list = m.get(key);
|
||||||
|
if (list instanceof List<?>) {
|
||||||
|
return (List<?>) list;
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Convert a Json map to an DatanodeInfo object. */
|
||||||
|
static DatanodeInfo toDatanodeInfo(final Map<?, ?> m)
|
||||||
|
throws IOException {
|
||||||
|
if (m == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ipAddr and xferPort are the critical fields for accessing data.
|
||||||
|
// If any one of the two is missing, an exception needs to be thrown.
|
||||||
|
|
||||||
|
// Handle the case of old servers (1.x, 0.23.x) sending 'name' instead
|
||||||
|
// of ipAddr and xferPort.
|
||||||
|
String ipAddr = getString(m, "ipAddr", null);
|
||||||
|
int xferPort = getInt(m, "xferPort", -1);
|
||||||
|
if (ipAddr == null) {
|
||||||
|
String name = getString(m, "name", null);
|
||||||
|
if (name != null) {
|
||||||
|
int colonIdx = name.indexOf(':');
|
||||||
|
if (colonIdx > 0) {
|
||||||
|
ipAddr = name.substring(0, colonIdx);
|
||||||
|
xferPort = Integer.parseInt(name.substring(colonIdx +1));
|
||||||
|
} else {
|
||||||
|
throw new IOException(
|
||||||
|
"Invalid value in server response: name=[" + name + "]");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw new IOException(
|
||||||
|
"Missing both 'ipAddr' and 'name' in server response.");
|
||||||
|
}
|
||||||
|
// ipAddr is non-null & non-empty string at this point.
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check the validity of xferPort.
|
||||||
|
if (xferPort == -1) {
|
||||||
|
throw new IOException(
|
||||||
|
"Invalid or missing 'xferPort' in server response.");
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Fix storageID
|
||||||
|
return new DatanodeInfo(
|
||||||
|
ipAddr,
|
||||||
|
(String)m.get("hostName"),
|
||||||
|
(String)m.get("storageID"),
|
||||||
|
xferPort,
|
||||||
|
((Number) m.get("infoPort")).intValue(),
|
||||||
|
getInt(m, "infoSecurePort", 0),
|
||||||
|
((Number) m.get("ipcPort")).intValue(),
|
||||||
|
|
||||||
|
getLong(m, "capacity", 0l),
|
||||||
|
getLong(m, "dfsUsed", 0l),
|
||||||
|
getLong(m, "remaining", 0l),
|
||||||
|
getLong(m, "blockPoolUsed", 0l),
|
||||||
|
getLong(m, "cacheCapacity", 0l),
|
||||||
|
getLong(m, "cacheUsed", 0l),
|
||||||
|
getLong(m, "lastUpdate", 0l),
|
||||||
|
getLong(m, "lastUpdateMonotonic", 0l),
|
||||||
|
getInt(m, "xceiverCount", 0),
|
||||||
|
getString(m, "networkLocation", ""),
|
||||||
|
DatanodeInfo.AdminStates.valueOf(getString(m, "adminState", "NORMAL")));
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Convert an Object[] to a DatanodeInfo[]. */
|
||||||
|
static DatanodeInfo[] toDatanodeInfoArray(final List<?> objects)
|
||||||
|
throws IOException {
|
||||||
|
if (objects == null) {
|
||||||
|
return null;
|
||||||
|
} else if (objects.isEmpty()) {
|
||||||
|
return EMPTY_DATANODE_INFO_ARRAY;
|
||||||
|
} else {
|
||||||
|
final DatanodeInfo[] array = new DatanodeInfo[objects.size()];
|
||||||
|
int i = 0;
|
||||||
|
for (Object object : objects) {
|
||||||
|
array[i++] = toDatanodeInfo((Map<?, ?>) object);
|
||||||
|
}
|
||||||
|
return array;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Convert a Json map to LocatedBlock. */
|
||||||
|
static LocatedBlock toLocatedBlock(final Map<?, ?> m) throws IOException {
|
||||||
|
if (m == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
final ExtendedBlock b = toExtendedBlock((Map<?, ?>)m.get("block"));
|
||||||
|
final DatanodeInfo[] locations = toDatanodeInfoArray(
|
||||||
|
getList(m, "locations"));
|
||||||
|
final long startOffset = ((Number) m.get("startOffset")).longValue();
|
||||||
|
final boolean isCorrupt = (Boolean)m.get("isCorrupt");
|
||||||
|
final DatanodeInfo[] cachedLocations = toDatanodeInfoArray(
|
||||||
|
getList(m, "cachedLocations"));
|
||||||
|
|
||||||
|
final LocatedBlock locatedblock = new LocatedBlock(b, locations,
|
||||||
|
null, null, startOffset, isCorrupt, cachedLocations);
|
||||||
|
locatedblock.setBlockToken(toBlockToken((Map<?, ?>)m.get("blockToken")));
|
||||||
|
return locatedblock;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Convert an List of Object to a List of LocatedBlock. */
|
||||||
|
static List<LocatedBlock> toLocatedBlockList(
|
||||||
|
final List<?> objects) throws IOException {
|
||||||
|
if (objects == null) {
|
||||||
|
return null;
|
||||||
|
} else if (objects.isEmpty()) {
|
||||||
|
return Collections.emptyList();
|
||||||
|
} else {
|
||||||
|
final List<LocatedBlock> list = new ArrayList<>(objects.size());
|
||||||
|
for (Object object : objects) {
|
||||||
|
list.add(toLocatedBlock((Map<?, ?>) object));
|
||||||
|
}
|
||||||
|
return list;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Convert a Json map to a ContentSummary. */
|
||||||
|
static ContentSummary toContentSummary(final Map<?, ?> json) {
|
||||||
|
if (json == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
final Map<?, ?> m = (Map<?, ?>)json.get(ContentSummary.class.getSimpleName());
|
||||||
|
final long length = ((Number) m.get("length")).longValue();
|
||||||
|
final long fileCount = ((Number) m.get("fileCount")).longValue();
|
||||||
|
final long directoryCount = ((Number) m.get("directoryCount")).longValue();
|
||||||
|
final long quota = ((Number) m.get("quota")).longValue();
|
||||||
|
final long spaceConsumed = ((Number) m.get("spaceConsumed")).longValue();
|
||||||
|
final long spaceQuota = ((Number) m.get("spaceQuota")).longValue();
|
||||||
|
|
||||||
|
return new ContentSummary.Builder().length(length).fileCount(fileCount).
|
||||||
|
directoryCount(directoryCount).quota(quota).spaceConsumed(spaceConsumed).
|
||||||
|
spaceQuota(spaceQuota).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Convert a Json map to a MD5MD5CRC32FileChecksum. */
|
||||||
|
static MD5MD5CRC32FileChecksum toMD5MD5CRC32FileChecksum(
|
||||||
|
final Map<?, ?> json) throws IOException {
|
||||||
|
if (json == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
final Map<?, ?> m = (Map<?, ?>)json.get(FileChecksum.class.getSimpleName());
|
||||||
|
final String algorithm = (String)m.get("algorithm");
|
||||||
|
final int length = ((Number) m.get("length")).intValue();
|
||||||
|
final byte[] bytes = StringUtils.hexStringToByte((String) m.get("bytes"));
|
||||||
|
|
||||||
|
final DataInputStream in = new DataInputStream(new ByteArrayInputStream(bytes));
|
||||||
|
final DataChecksum.Type crcType =
|
||||||
|
MD5MD5CRC32FileChecksum.getCrcTypeFromAlgorithmName(algorithm);
|
||||||
|
final MD5MD5CRC32FileChecksum checksum;
|
||||||
|
|
||||||
|
// Recreate what DFSClient would have returned.
|
||||||
|
switch(crcType) {
|
||||||
|
case CRC32:
|
||||||
|
checksum = new MD5MD5CRC32GzipFileChecksum();
|
||||||
|
break;
|
||||||
|
case CRC32C:
|
||||||
|
checksum = new MD5MD5CRC32CastagnoliFileChecksum();
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new IOException("Unknown algorithm: " + algorithm);
|
||||||
|
}
|
||||||
|
checksum.readFields(in);
|
||||||
|
|
||||||
|
//check algorithm name
|
||||||
|
if (!checksum.getAlgorithmName().equals(algorithm)) {
|
||||||
|
throw new IOException("Algorithm not matched. Expected " + algorithm
|
||||||
|
+ ", Received " + checksum.getAlgorithmName());
|
||||||
|
}
|
||||||
|
//check length
|
||||||
|
if (length != checksum.getLength()) {
|
||||||
|
throw new IOException("Length not matched: length=" + length
|
||||||
|
+ ", checksum.getLength()=" + checksum.getLength());
|
||||||
|
}
|
||||||
|
|
||||||
|
return checksum;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Convert a Json map to a AclStatus object. */
|
||||||
|
static AclStatus toAclStatus(final Map<?, ?> json) {
|
||||||
|
if (json == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
final Map<?, ?> m = (Map<?, ?>) json.get(AclStatus.class.getSimpleName());
|
||||||
|
|
||||||
|
AclStatus.Builder aclStatusBuilder = new AclStatus.Builder();
|
||||||
|
aclStatusBuilder.owner((String) m.get("owner"));
|
||||||
|
aclStatusBuilder.group((String) m.get("group"));
|
||||||
|
aclStatusBuilder.stickyBit((Boolean) m.get("stickyBit"));
|
||||||
|
String permString = (String) m.get("permission");
|
||||||
|
if (permString != null) {
|
||||||
|
final FsPermission permission = toFsPermission(permString,
|
||||||
|
(Boolean) m.get("aclBit"), (Boolean) m.get("encBit"));
|
||||||
|
aclStatusBuilder.setPermission(permission);
|
||||||
|
}
|
||||||
|
final List<?> entries = (List<?>) m.get("entries");
|
||||||
|
|
||||||
|
List<AclEntry> aclEntryList = new ArrayList<>();
|
||||||
|
for (Object entry : entries) {
|
||||||
|
AclEntry aclEntry = AclEntry.parseAclEntry((String) entry, true);
|
||||||
|
aclEntryList.add(aclEntry);
|
||||||
|
}
|
||||||
|
aclStatusBuilder.addEntries(aclEntryList);
|
||||||
|
return aclStatusBuilder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
static byte[] getXAttr(final Map<?, ?> json, final String name)
|
||||||
|
throws IOException {
|
||||||
|
if (json == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, byte[]> xAttrs = toXAttrs(json);
|
||||||
|
if (xAttrs != null) {
|
||||||
|
return xAttrs.get(name);
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
static Map<String, byte[]> toXAttrs(final Map<?, ?> json)
|
||||||
|
throws IOException {
|
||||||
|
if (json == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return toXAttrMap(getList(json, "XAttrs"));
|
||||||
|
}
|
||||||
|
|
||||||
|
static List<String> toXAttrNames(final Map<?, ?> json)
|
||||||
|
throws IOException {
|
||||||
|
if (json == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
final String namesInJson = (String) json.get("XAttrNames");
|
||||||
|
ObjectReader reader = new ObjectMapper().reader(List.class);
|
||||||
|
final List<Object> xattrs = reader.readValue(namesInJson);
|
||||||
|
final List<String> names =
|
||||||
|
Lists.newArrayListWithCapacity(json.keySet().size());
|
||||||
|
|
||||||
|
for (Object xattr : xattrs) {
|
||||||
|
names.add((String) xattr);
|
||||||
|
}
|
||||||
|
return names;
|
||||||
|
}
|
||||||
|
|
||||||
|
static Map<String, byte[]> toXAttrMap(final List<?> objects)
|
||||||
|
throws IOException {
|
||||||
|
if (objects == null) {
|
||||||
|
return null;
|
||||||
|
} else if (objects.isEmpty()) {
|
||||||
|
return Maps.newHashMap();
|
||||||
|
} else {
|
||||||
|
final Map<String, byte[]> xAttrs = Maps.newHashMap();
|
||||||
|
for (Object object : objects) {
|
||||||
|
Map<?, ?> m = (Map<?, ?>) object;
|
||||||
|
String name = (String) m.get("name");
|
||||||
|
String value = (String) m.get("value");
|
||||||
|
xAttrs.put(name, decodeXAttrValue(value));
|
||||||
|
}
|
||||||
|
return xAttrs;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static byte[] decodeXAttrValue(String value) throws IOException {
|
||||||
|
if (value != null) {
|
||||||
|
return XAttrCodec.decodeValue(value);
|
||||||
|
} else {
|
||||||
|
return new byte[0];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Convert a Json map to a Token of DelegationTokenIdentifier. */
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
static Token<DelegationTokenIdentifier> toDelegationToken(
|
||||||
|
final Map<?, ?> json) throws IOException {
|
||||||
|
final Map<?, ?> m = (Map<?, ?>)json.get(Token.class.getSimpleName());
|
||||||
|
return (Token<DelegationTokenIdentifier>) toToken(m);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Convert a Json map to LocatedBlock. */
|
||||||
|
static LocatedBlocks toLocatedBlocks(
|
||||||
|
final Map<?, ?> json) throws IOException {
|
||||||
|
if (json == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
final Map<?, ?> m = (Map<?, ?>)json.get(LocatedBlocks.class.getSimpleName());
|
||||||
|
final long fileLength = ((Number) m.get("fileLength")).longValue();
|
||||||
|
final boolean isUnderConstruction = (Boolean)m.get("isUnderConstruction");
|
||||||
|
final List<LocatedBlock> locatedBlocks = toLocatedBlockList(
|
||||||
|
getList(m, "locatedBlocks"));
|
||||||
|
final LocatedBlock lastLocatedBlock = toLocatedBlock(
|
||||||
|
(Map<?, ?>) m.get("lastLocatedBlock"));
|
||||||
|
final boolean isLastBlockComplete = (Boolean)m.get("isLastBlockComplete");
|
||||||
|
return new LocatedBlocks(fileLength, isUnderConstruction, locatedBlocks,
|
||||||
|
lastLocatedBlock, isLastBlockComplete, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,30 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.web;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
|
|
||||||
|
class WebHdfsConstants {
|
||||||
|
enum PathType {
|
||||||
|
FILE, DIRECTORY, SYMLINK;
|
||||||
|
|
||||||
|
static PathType valueOf(HdfsFileStatus status) {
|
||||||
|
return status.isDir()? DIRECTORY: status.isSymlink()? SYMLINK: FILE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -353,7 +353,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
return m;
|
return m;
|
||||||
}
|
}
|
||||||
|
|
||||||
IOException re = JsonUtil.toRemoteException(m);
|
IOException re = JsonUtilClient.toRemoteException(m);
|
||||||
// extract UGI-related exceptions and unwrap InvalidToken
|
// extract UGI-related exceptions and unwrap InvalidToken
|
||||||
// the NN mangles these exceptions but the DN does not and may need
|
// the NN mangles these exceptions but the DN does not and may need
|
||||||
// to re-fetch a token if either report the token is expired
|
// to re-fetch a token if either report the token is expired
|
||||||
|
@ -841,7 +841,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
HdfsFileStatus status = new FsPathResponseRunner<HdfsFileStatus>(op, f) {
|
HdfsFileStatus status = new FsPathResponseRunner<HdfsFileStatus>(op, f) {
|
||||||
@Override
|
@Override
|
||||||
HdfsFileStatus decodeResponse(Map<?,?> json) {
|
HdfsFileStatus decodeResponse(Map<?,?> json) {
|
||||||
return JsonUtil.toFileStatus(json, true);
|
return JsonUtilClient.toFileStatus(json, true);
|
||||||
}
|
}
|
||||||
}.run();
|
}.run();
|
||||||
if (status == null) {
|
if (status == null) {
|
||||||
|
@ -870,7 +870,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
AclStatus status = new FsPathResponseRunner<AclStatus>(op, f) {
|
AclStatus status = new FsPathResponseRunner<AclStatus>(op, f) {
|
||||||
@Override
|
@Override
|
||||||
AclStatus decodeResponse(Map<?,?> json) {
|
AclStatus decodeResponse(Map<?,?> json) {
|
||||||
return JsonUtil.toAclStatus(json);
|
return JsonUtilClient.toAclStatus(json);
|
||||||
}
|
}
|
||||||
}.run();
|
}.run();
|
||||||
if (status == null) {
|
if (status == null) {
|
||||||
|
@ -945,7 +945,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
new XAttrEncodingParam(XAttrCodec.HEX)) {
|
new XAttrEncodingParam(XAttrCodec.HEX)) {
|
||||||
@Override
|
@Override
|
||||||
byte[] decodeResponse(Map<?, ?> json) throws IOException {
|
byte[] decodeResponse(Map<?, ?> json) throws IOException {
|
||||||
return JsonUtil.getXAttr(json, name);
|
return JsonUtilClient.getXAttr(json, name);
|
||||||
}
|
}
|
||||||
}.run();
|
}.run();
|
||||||
}
|
}
|
||||||
|
@ -957,7 +957,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
new XAttrEncodingParam(XAttrCodec.HEX)) {
|
new XAttrEncodingParam(XAttrCodec.HEX)) {
|
||||||
@Override
|
@Override
|
||||||
Map<String, byte[]> decodeResponse(Map<?, ?> json) throws IOException {
|
Map<String, byte[]> decodeResponse(Map<?, ?> json) throws IOException {
|
||||||
return JsonUtil.toXAttrs(json);
|
return JsonUtilClient.toXAttrs(json);
|
||||||
}
|
}
|
||||||
}.run();
|
}.run();
|
||||||
}
|
}
|
||||||
|
@ -977,7 +977,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
return new FsPathResponseRunner<Map<String, byte[]>>(op, parameters, p) {
|
return new FsPathResponseRunner<Map<String, byte[]>>(op, parameters, p) {
|
||||||
@Override
|
@Override
|
||||||
Map<String, byte[]> decodeResponse(Map<?, ?> json) throws IOException {
|
Map<String, byte[]> decodeResponse(Map<?, ?> json) throws IOException {
|
||||||
return JsonUtil.toXAttrs(json);
|
return JsonUtilClient.toXAttrs(json);
|
||||||
}
|
}
|
||||||
}.run();
|
}.run();
|
||||||
}
|
}
|
||||||
|
@ -988,7 +988,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
return new FsPathResponseRunner<List<String>>(op, p) {
|
return new FsPathResponseRunner<List<String>>(op, p) {
|
||||||
@Override
|
@Override
|
||||||
List<String> decodeResponse(Map<?, ?> json) throws IOException {
|
List<String> decodeResponse(Map<?, ?> json) throws IOException {
|
||||||
return JsonUtil.toXAttrNames(json);
|
return JsonUtilClient.toXAttrNames(json);
|
||||||
}
|
}
|
||||||
}.run();
|
}.run();
|
||||||
}
|
}
|
||||||
|
@ -1291,15 +1291,15 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
@Override
|
@Override
|
||||||
FileStatus[] decodeResponse(Map<?,?> json) {
|
FileStatus[] decodeResponse(Map<?,?> json) {
|
||||||
final Map<?, ?> rootmap = (Map<?, ?>)json.get(FileStatus.class.getSimpleName() + "es");
|
final Map<?, ?> rootmap = (Map<?, ?>)json.get(FileStatus.class.getSimpleName() + "es");
|
||||||
final List<?> array = JsonUtil.getList(
|
final List<?> array = JsonUtilClient.getList(rootmap,
|
||||||
rootmap, FileStatus.class.getSimpleName());
|
FileStatus.class.getSimpleName());
|
||||||
|
|
||||||
//convert FileStatus
|
//convert FileStatus
|
||||||
final FileStatus[] statuses = new FileStatus[array.size()];
|
final FileStatus[] statuses = new FileStatus[array.size()];
|
||||||
int i = 0;
|
int i = 0;
|
||||||
for (Object object : array) {
|
for (Object object : array) {
|
||||||
final Map<?, ?> m = (Map<?, ?>) object;
|
final Map<?, ?> m = (Map<?, ?>) object;
|
||||||
statuses[i++] = makeQualified(JsonUtil.toFileStatus(m, false), f);
|
statuses[i++] = makeQualified(JsonUtilClient.toFileStatus(m, false), f);
|
||||||
}
|
}
|
||||||
return statuses;
|
return statuses;
|
||||||
}
|
}
|
||||||
|
@ -1316,7 +1316,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
@Override
|
@Override
|
||||||
Token<DelegationTokenIdentifier> decodeResponse(Map<?,?> json)
|
Token<DelegationTokenIdentifier> decodeResponse(Map<?,?> json)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return JsonUtil.toDelegationToken(json);
|
return JsonUtilClient.toDelegationToken(json);
|
||||||
}
|
}
|
||||||
}.run();
|
}.run();
|
||||||
if (token != null) {
|
if (token != null) {
|
||||||
|
@ -1384,7 +1384,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
@Override
|
@Override
|
||||||
BlockLocation[] decodeResponse(Map<?,?> json) throws IOException {
|
BlockLocation[] decodeResponse(Map<?,?> json) throws IOException {
|
||||||
return DFSUtil.locatedBlocks2Locations(
|
return DFSUtil.locatedBlocks2Locations(
|
||||||
JsonUtil.toLocatedBlocks(json));
|
JsonUtilClient.toLocatedBlocks(json));
|
||||||
}
|
}
|
||||||
}.run();
|
}.run();
|
||||||
}
|
}
|
||||||
|
@ -1403,7 +1403,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
return new FsPathResponseRunner<ContentSummary>(op, p) {
|
return new FsPathResponseRunner<ContentSummary>(op, p) {
|
||||||
@Override
|
@Override
|
||||||
ContentSummary decodeResponse(Map<?,?> json) {
|
ContentSummary decodeResponse(Map<?,?> json) {
|
||||||
return JsonUtil.toContentSummary(json);
|
return JsonUtilClient.toContentSummary(json);
|
||||||
}
|
}
|
||||||
}.run();
|
}.run();
|
||||||
}
|
}
|
||||||
|
@ -1417,7 +1417,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
return new FsPathResponseRunner<MD5MD5CRC32FileChecksum>(op, p) {
|
return new FsPathResponseRunner<MD5MD5CRC32FileChecksum>(op, p) {
|
||||||
@Override
|
@Override
|
||||||
MD5MD5CRC32FileChecksum decodeResponse(Map<?,?> json) throws IOException {
|
MD5MD5CRC32FileChecksum decodeResponse(Map<?,?> json) throws IOException {
|
||||||
return JsonUtil.toMD5MD5CRC32FileChecksum(json);
|
return JsonUtilClient.toMD5MD5CRC32FileChecksum(json);
|
||||||
}
|
}
|
||||||
}.run();
|
}.run();
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,10 +33,7 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
import org.apache.hadoop.hdfs.web.JsonUtil;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.ExceptionHandler;
|
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
|
||||||
import org.apache.hadoop.ipc.RetriableException;
|
import org.apache.hadoop.ipc.RetriableException;
|
||||||
import org.apache.hadoop.ipc.StandbyException;
|
import org.apache.hadoop.ipc.StandbyException;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
|
@ -49,10 +46,7 @@ import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.internal.util.reflection.Whitebox;
|
import org.mockito.internal.util.reflection.Whitebox;
|
||||||
import org.mortbay.util.ajax.JSON;
|
|
||||||
|
|
||||||
import javax.servlet.http.HttpServletResponse;
|
|
||||||
import javax.ws.rs.core.Response;
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -61,10 +55,8 @@ import java.net.URI;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test case for client support of delegation tokens in an HA cluster.
|
* Test case for client support of delegation tokens in an HA cluster.
|
||||||
|
@ -374,90 +366,6 @@ public class TestDelegationTokensWithHA {
|
||||||
token.cancel(conf);
|
token.cancel(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Test if StandbyException can be thrown from StandbyNN, when it's requested for
|
|
||||||
* password. (HDFS-6475). With StandbyException, the client can failover to try
|
|
||||||
* activeNN.
|
|
||||||
*/
|
|
||||||
@Test(timeout = 300000)
|
|
||||||
public void testDelegationTokenStandbyNNAppearFirst() throws Exception {
|
|
||||||
// make nn0 the standby NN, and nn1 the active NN
|
|
||||||
cluster.transitionToStandby(0);
|
|
||||||
cluster.transitionToActive(1);
|
|
||||||
|
|
||||||
final DelegationTokenSecretManager stSecretManager =
|
|
||||||
NameNodeAdapter.getDtSecretManager(
|
|
||||||
nn1.getNamesystem());
|
|
||||||
|
|
||||||
// create token
|
|
||||||
final Token<DelegationTokenIdentifier> token =
|
|
||||||
getDelegationToken(fs, "JobTracker");
|
|
||||||
final DelegationTokenIdentifier identifier = new DelegationTokenIdentifier();
|
|
||||||
byte[] tokenId = token.getIdentifier();
|
|
||||||
identifier.readFields(new DataInputStream(
|
|
||||||
new ByteArrayInputStream(tokenId)));
|
|
||||||
|
|
||||||
assertTrue(null != stSecretManager.retrievePassword(identifier));
|
|
||||||
|
|
||||||
final UserGroupInformation ugi = UserGroupInformation
|
|
||||||
.createRemoteUser("JobTracker");
|
|
||||||
ugi.addToken(token);
|
|
||||||
|
|
||||||
ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
||||||
@Override
|
|
||||||
public Object run() {
|
|
||||||
try {
|
|
||||||
try {
|
|
||||||
byte[] tmppw = dtSecretManager.retrievePassword(identifier);
|
|
||||||
fail("InvalidToken with cause StandbyException is expected"
|
|
||||||
+ " since nn0 is standby");
|
|
||||||
return tmppw;
|
|
||||||
} catch (IOException e) {
|
|
||||||
// Mimic the UserProvider class logic (server side) by throwing
|
|
||||||
// SecurityException here
|
|
||||||
throw new SecurityException(
|
|
||||||
SecurityUtil.FAILED_TO_GET_UGI_MSG_HEADER + " " + e, e);
|
|
||||||
}
|
|
||||||
} catch (Exception oe) {
|
|
||||||
//
|
|
||||||
// The exception oe caught here is
|
|
||||||
// java.lang.SecurityException: Failed to obtain user group
|
|
||||||
// information: org.apache.hadoop.security.token.
|
|
||||||
// SecretManager$InvalidToken: StandbyException
|
|
||||||
//
|
|
||||||
HttpServletResponse response = mock(HttpServletResponse.class);
|
|
||||||
ExceptionHandler eh = new ExceptionHandler();
|
|
||||||
eh.initResponse(response);
|
|
||||||
|
|
||||||
// The Response (resp) below is what the server will send to client
|
|
||||||
//
|
|
||||||
// BEFORE HDFS-6475 fix, the resp.entity is
|
|
||||||
// {"RemoteException":{"exception":"SecurityException",
|
|
||||||
// "javaClassName":"java.lang.SecurityException",
|
|
||||||
// "message":"Failed to obtain user group information:
|
|
||||||
// org.apache.hadoop.security.token.SecretManager$InvalidToken:
|
|
||||||
// StandbyException"}}
|
|
||||||
// AFTER the fix, the resp.entity is
|
|
||||||
// {"RemoteException":{"exception":"StandbyException",
|
|
||||||
// "javaClassName":"org.apache.hadoop.ipc.StandbyException",
|
|
||||||
// "message":"Operation category READ is not supported in
|
|
||||||
// state standby"}}
|
|
||||||
//
|
|
||||||
Response resp = eh.toResponse(oe);
|
|
||||||
|
|
||||||
// Mimic the client side logic by parsing the response from server
|
|
||||||
//
|
|
||||||
Map<?, ?> m = (Map<?, ?>)JSON.parse(resp.getEntity().toString());
|
|
||||||
RemoteException re = JsonUtil.toRemoteException(m);
|
|
||||||
Exception unwrapped = ((RemoteException)re).unwrapRemoteException(
|
|
||||||
StandbyException.class);
|
|
||||||
assertTrue (unwrapped instanceof StandbyException);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private Token<DelegationTokenIdentifier> getDelegationToken(FileSystem fs,
|
private Token<DelegationTokenIdentifier> getDelegationToken(FileSystem fs,
|
||||||
String renewer) throws IOException {
|
String renewer) throws IOException {
|
||||||
|
|
|
@ -73,7 +73,7 @@ public class TestJsonUtil {
|
||||||
System.out.println("json = " + json.replace(",", ",\n "));
|
System.out.println("json = " + json.replace(",", ",\n "));
|
||||||
ObjectReader reader = new ObjectMapper().reader(Map.class);
|
ObjectReader reader = new ObjectMapper().reader(Map.class);
|
||||||
final HdfsFileStatus s2 =
|
final HdfsFileStatus s2 =
|
||||||
JsonUtil.toFileStatus((Map<?, ?>) reader.readValue(json), true);
|
JsonUtilClient.toFileStatus((Map<?, ?>) reader.readValue(json), true);
|
||||||
final FileStatus fs2 = toFileStatus(s2, parent);
|
final FileStatus fs2 = toFileStatus(s2, parent);
|
||||||
System.out.println("s2 = " + s2);
|
System.out.println("s2 = " + s2);
|
||||||
System.out.println("fs2 = " + fs2);
|
System.out.println("fs2 = " + fs2);
|
||||||
|
@ -102,7 +102,7 @@ public class TestJsonUtil {
|
||||||
response.put("cacheCapacity", 123l);
|
response.put("cacheCapacity", 123l);
|
||||||
response.put("cacheUsed", 321l);
|
response.put("cacheUsed", 321l);
|
||||||
|
|
||||||
JsonUtil.toDatanodeInfo(response);
|
JsonUtilClient.toDatanodeInfo(response);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -128,7 +128,7 @@ public class TestJsonUtil {
|
||||||
response.put("cacheCapacity", 123l);
|
response.put("cacheCapacity", 123l);
|
||||||
response.put("cacheUsed", 321l);
|
response.put("cacheUsed", 321l);
|
||||||
|
|
||||||
DatanodeInfo di = JsonUtil.toDatanodeInfo(response);
|
DatanodeInfo di = JsonUtilClient.toDatanodeInfo(response);
|
||||||
Assert.assertEquals(name, di.getXferAddr());
|
Assert.assertEquals(name, di.getXferAddr());
|
||||||
|
|
||||||
// The encoded result should contain name, ipAddr and xferPort.
|
// The encoded result should contain name, ipAddr and xferPort.
|
||||||
|
@ -175,7 +175,7 @@ public class TestJsonUtil {
|
||||||
aclStatusBuilder.stickyBit(false);
|
aclStatusBuilder.stickyBit(false);
|
||||||
|
|
||||||
Assert.assertEquals("Should be equal", aclStatusBuilder.build(),
|
Assert.assertEquals("Should be equal", aclStatusBuilder.build(),
|
||||||
JsonUtil.toAclStatus(json));
|
JsonUtilClient.toAclStatus(json));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -229,7 +229,7 @@ public class TestJsonUtil {
|
||||||
xAttrs.add(xAttr1);
|
xAttrs.add(xAttr1);
|
||||||
xAttrs.add(xAttr2);
|
xAttrs.add(xAttr2);
|
||||||
Map<String, byte[]> xAttrMap = XAttrHelper.buildXAttrMap(xAttrs);
|
Map<String, byte[]> xAttrMap = XAttrHelper.buildXAttrMap(xAttrs);
|
||||||
Map<String, byte[]> parsedXAttrMap = JsonUtil.toXAttrs(json);
|
Map<String, byte[]> parsedXAttrMap = JsonUtilClient.toXAttrs(json);
|
||||||
|
|
||||||
Assert.assertEquals(xAttrMap.size(), parsedXAttrMap.size());
|
Assert.assertEquals(xAttrMap.size(), parsedXAttrMap.size());
|
||||||
Iterator<Entry<String, byte[]>> iter = xAttrMap.entrySet().iterator();
|
Iterator<Entry<String, byte[]>> iter = xAttrMap.entrySet().iterator();
|
||||||
|
@ -249,13 +249,13 @@ public class TestJsonUtil {
|
||||||
Map<?, ?> json = reader.readValue(jsonString);
|
Map<?, ?> json = reader.readValue(jsonString);
|
||||||
|
|
||||||
// Get xattr: user.a2
|
// Get xattr: user.a2
|
||||||
byte[] value = JsonUtil.getXAttr(json, "user.a2");
|
byte[] value = JsonUtilClient.getXAttr(json, "user.a2");
|
||||||
Assert.assertArrayEquals(XAttrCodec.decodeValue("0x313131"), value);
|
Assert.assertArrayEquals(XAttrCodec.decodeValue("0x313131"), value);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkDecodeFailure(Map<String, Object> map) {
|
private void checkDecodeFailure(Map<String, Object> map) {
|
||||||
try {
|
try {
|
||||||
JsonUtil.toDatanodeInfo(map);
|
JsonUtilClient.toDatanodeInfo(map);
|
||||||
Assert.fail("Exception not thrown against bad input.");
|
Assert.fail("Exception not thrown against bad input.");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// expected
|
// expected
|
||||||
|
|
|
@ -19,9 +19,12 @@
|
||||||
package org.apache.hadoop.hdfs.web;
|
package org.apache.hadoop.hdfs.web;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.io.DataInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -38,14 +41,26 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
|
import org.apache.hadoop.hdfs.web.resources.ExceptionHandler;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
import org.apache.hadoop.ipc.StandbyException;
|
||||||
|
import org.apache.hadoop.security.token.SecretManager;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
import org.mockito.internal.util.reflection.Whitebox;
|
import org.mockito.internal.util.reflection.Whitebox;
|
||||||
|
import org.mortbay.util.ajax.JSON;
|
||||||
|
|
||||||
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
import javax.ws.rs.core.Response;
|
||||||
|
|
||||||
public class TestWebHDFSForHA {
|
public class TestWebHDFSForHA {
|
||||||
private static final String LOGICAL_NAME = "minidfs";
|
private static final String LOGICAL_NAME = "minidfs";
|
||||||
|
@ -123,6 +138,75 @@ public class TestWebHDFSForHA {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClientFailoverWhenStandbyNNHasStaleCredentials()
|
||||||
|
throws IOException {
|
||||||
|
Configuration conf = DFSTestUtil.newHAConfiguration(LOGICAL_NAME);
|
||||||
|
conf.setBoolean(DFSConfigKeys
|
||||||
|
.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
|
||||||
|
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
WebHdfsFileSystem fs = null;
|
||||||
|
try {
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).nnTopology(topo).numDataNodes(
|
||||||
|
0).build();
|
||||||
|
|
||||||
|
HATestUtil.setFailoverConfigurations(cluster, conf, LOGICAL_NAME);
|
||||||
|
cluster.waitActive();
|
||||||
|
|
||||||
|
fs = (WebHdfsFileSystem) FileSystem.get(WEBHDFS_URI, conf);
|
||||||
|
|
||||||
|
cluster.transitionToActive(0);
|
||||||
|
Token<?> token = fs.getDelegationToken(null);
|
||||||
|
final DelegationTokenIdentifier identifier = new DelegationTokenIdentifier();
|
||||||
|
identifier.readFields(
|
||||||
|
new DataInputStream(new ByteArrayInputStream(token.getIdentifier())));
|
||||||
|
cluster.transitionToStandby(0);
|
||||||
|
cluster.transitionToActive(1);
|
||||||
|
|
||||||
|
final DelegationTokenSecretManager secretManager = NameNodeAdapter.getDtSecretManager(
|
||||||
|
cluster.getNamesystem(0));
|
||||||
|
|
||||||
|
ExceptionHandler eh = new ExceptionHandler();
|
||||||
|
eh.initResponse(mock(HttpServletResponse.class));
|
||||||
|
Response resp = null;
|
||||||
|
try {
|
||||||
|
secretManager.retrievePassword(identifier);
|
||||||
|
} catch (IOException e) {
|
||||||
|
// Mimic the UserProvider class logic (server side) by throwing
|
||||||
|
// SecurityException here
|
||||||
|
Assert.assertTrue(e instanceof SecretManager.InvalidToken);
|
||||||
|
resp = eh.toResponse(new SecurityException(e));
|
||||||
|
}
|
||||||
|
// The Response (resp) below is what the server will send to client
|
||||||
|
//
|
||||||
|
// BEFORE HDFS-6475 fix, the resp.entity is
|
||||||
|
// {"RemoteException":{"exception":"SecurityException",
|
||||||
|
// "javaClassName":"java.lang.SecurityException",
|
||||||
|
// "message":"Failed to obtain user group information:
|
||||||
|
// org.apache.hadoop.security.token.SecretManager$InvalidToken:
|
||||||
|
// StandbyException"}}
|
||||||
|
// AFTER the fix, the resp.entity is
|
||||||
|
// {"RemoteException":{"exception":"StandbyException",
|
||||||
|
// "javaClassName":"org.apache.hadoop.ipc.StandbyException",
|
||||||
|
// "message":"Operation category READ is not supported in
|
||||||
|
// state standby"}}
|
||||||
|
//
|
||||||
|
|
||||||
|
// Mimic the client side logic by parsing the response from server
|
||||||
|
//
|
||||||
|
Map<?, ?> m = (Map<?, ?>) JSON.parse(resp.getEntity().toString());
|
||||||
|
RemoteException re = JsonUtilClient.toRemoteException(m);
|
||||||
|
Exception unwrapped = re.unwrapRemoteException(StandbyException.class);
|
||||||
|
Assert.assertTrue(unwrapped instanceof StandbyException);
|
||||||
|
} finally {
|
||||||
|
IOUtils.cleanup(null, fs);
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFailoverAfterOpen() throws IOException {
|
public void testFailoverAfterOpen() throws IOException {
|
||||||
Configuration conf = DFSTestUtil.newHAConfiguration(LOGICAL_NAME);
|
Configuration conf = DFSTestUtil.newHAConfiguration(LOGICAL_NAME);
|
||||||
|
|
|
@ -36,7 +36,6 @@ import java.net.URLConnection;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.commons.httpclient.HttpConnection;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
@ -281,7 +280,7 @@ public class TestWebHdfsTokens {
|
||||||
@Override
|
@Override
|
||||||
Token<DelegationTokenIdentifier> decodeResponse(Map<?, ?> json)
|
Token<DelegationTokenIdentifier> decodeResponse(Map<?, ?> json)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return JsonUtil.toDelegationToken(json);
|
return JsonUtilClient.toDelegationToken(json);
|
||||||
}
|
}
|
||||||
}.run();
|
}.run();
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue