HDFS-6430. HTTPFS - Implement XAttr support. (Yi Liu via tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1605118 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2014-06-24 15:59:50 +00:00
parent 29c102cad0
commit 46162a213f
13 changed files with 1188 additions and 27 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.fs.http.client;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@ -31,10 +32,13 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.XAttrCodec;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.lib.wsrs.EnumSetParam;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
@ -46,6 +50,12 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
@ -96,6 +106,10 @@ public class HttpFSFileSystem extends FileSystem
public static final String GROUP_PARAM = "group";
public static final String MODIFICATION_TIME_PARAM = "modificationtime";
public static final String ACCESS_TIME_PARAM = "accesstime";
public static final String XATTR_NAME_PARAM = "xattr.name";
public static final String XATTR_VALUE_PARAM = "xattr.value";
public static final String XATTR_SET_FLAG_PARAM = "flag";
public static final String XATTR_ENCODING_PARAM = "encoding";
public static final Short DEFAULT_PERMISSION = 0755;
public static final String ACLSPEC_DEFAULT = "";
@ -142,6 +156,10 @@ public class HttpFSFileSystem extends FileSystem
public static final String MODIFICATION_TIME_JSON = "modificationTime";
public static final String BLOCK_SIZE_JSON = "blockSize";
public static final String REPLICATION_JSON = "replication";
public static final String XATTRS_JSON = "XAttrs";
public static final String XATTR_NAME_JSON = "name";
public static final String XATTR_VALUE_JSON = "value";
public static final String XATTRNAMES_JSON = "XAttrNames";
public static final String FILE_CHECKSUM_JSON = "FileChecksum";
public static final String CHECKSUM_ALGORITHM_JSON = "algorithm";
@ -184,7 +202,8 @@ public class HttpFSFileSystem extends FileSystem
SETPERMISSION(HTTP_PUT), SETREPLICATION(HTTP_PUT), SETTIMES(HTTP_PUT),
MODIFYACLENTRIES(HTTP_PUT), REMOVEACLENTRIES(HTTP_PUT),
REMOVEDEFAULTACL(HTTP_PUT), REMOVEACL(HTTP_PUT), SETACL(HTTP_PUT),
DELETE(HTTP_DELETE);
DELETE(HTTP_DELETE), SETXATTR(HTTP_PUT), GETXATTRS(HTTP_GET),
REMOVEXATTR(HTTP_PUT), LISTXATTRS(HTTP_GET);
private String httpMethod;
@ -247,6 +266,31 @@ public class HttpFSFileSystem extends FileSystem
private HttpURLConnection getConnection(final String method,
Map<String, String> params, Path path, boolean makeQualified)
throws IOException {
return getConnection(method, params, null, path, makeQualified);
}
/**
* Convenience method that creates a <code>HttpURLConnection</code> for the
* HttpFSServer file system operations.
* <p/>
* This methods performs and injects any needed authentication credentials
* via the {@link #getConnection(URL, String)} method
*
* @param method the HTTP method.
* @param params the query string parameters.
* @param multiValuedParams multi valued parameters of the query string
* @param path the file path
* @param makeQualified if the path should be 'makeQualified'
*
* @return HttpURLConnection a <code>HttpURLConnection</code> for the
* HttpFSServer server, authenticated and ready to use for the
* specified path and file system operation.
*
* @throws IOException thrown if an IO error occurrs.
*/
private HttpURLConnection getConnection(final String method,
Map<String, String> params, Map<String, List<String>> multiValuedParams,
Path path, boolean makeQualified) throws IOException {
if (!realUser.getShortUserName().equals(doAs)) {
params.put(DO_AS_PARAM, doAs);
}
@ -254,7 +298,7 @@ public class HttpFSFileSystem extends FileSystem
if (makeQualified) {
path = makeQualified(path);
}
final URL url = HttpFSUtils.createURL(path, params);
final URL url = HttpFSUtils.createURL(path, params, multiValuedParams);
return doAsRealUserIfNecessary(new Callable<HttpURLConnection>() {
@Override
public HttpURLConnection call() throws Exception {
@ -585,7 +629,6 @@ public class HttpFSFileSystem extends FileSystem
*
* @deprecated Use delete(Path, boolean) instead
*/
@SuppressWarnings({"deprecation"})
@Deprecated
@Override
public boolean delete(Path f) throws IOException {
@ -1050,4 +1093,112 @@ public class HttpFSFileSystem extends FileSystem
delegationToken = token;
}
@Override
public void setXAttr(Path f, String name, byte[] value,
EnumSet<XAttrSetFlag> flag) throws IOException {
Map<String, String> params = new HashMap<String, String>();
params.put(OP_PARAM, Operation.SETXATTR.toString());
params.put(XATTR_NAME_PARAM, name);
if (value != null) {
params.put(XATTR_VALUE_PARAM,
XAttrCodec.encodeValue(value, XAttrCodec.HEX));
}
params.put(XATTR_SET_FLAG_PARAM, EnumSetParam.toString(flag));
HttpURLConnection conn = getConnection(Operation.SETXATTR.getMethod(),
params, f, true);
HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
}
@Override
public byte[] getXAttr(Path f, String name) throws IOException {
Map<String, String> params = new HashMap<String, String>();
params.put(OP_PARAM, Operation.GETXATTRS.toString());
params.put(XATTR_NAME_PARAM, name);
HttpURLConnection conn = getConnection(Operation.GETXATTRS.getMethod(),
params, f, true);
HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
Map<String, byte[]> xAttrs = createXAttrMap(
(JSONArray) json.get(XATTRS_JSON));
return xAttrs != null ? xAttrs.get(name) : null;
}
/** Convert xAttrs json to xAttrs map */
private Map<String, byte[]> createXAttrMap(JSONArray jsonArray)
throws IOException {
Map<String, byte[]> xAttrs = Maps.newHashMap();
for (Object obj : jsonArray) {
JSONObject jsonObj = (JSONObject) obj;
final String name = (String)jsonObj.get(XATTR_NAME_JSON);
final byte[] value = XAttrCodec.decodeValue(
(String)jsonObj.get(XATTR_VALUE_JSON));
xAttrs.put(name, value);
}
return xAttrs;
}
/** Convert xAttr names json to names list */
private List<String> createXAttrNames(String xattrNamesStr) throws IOException {
JSONParser parser = new JSONParser();
JSONArray jsonArray;
try {
jsonArray = (JSONArray)parser.parse(xattrNamesStr);
List<String> names = Lists.newArrayListWithCapacity(jsonArray.size());
for (Object name : jsonArray) {
names.add((String) name);
}
return names;
} catch (ParseException e) {
throw new IOException("JSON parser error, " + e.getMessage(), e);
}
}
@Override
public Map<String, byte[]> getXAttrs(Path f) throws IOException {
Map<String, String> params = new HashMap<String, String>();
params.put(OP_PARAM, Operation.GETXATTRS.toString());
HttpURLConnection conn = getConnection(Operation.GETXATTRS.getMethod(),
params, f, true);
HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
return createXAttrMap((JSONArray) json.get(XATTRS_JSON));
}
@Override
public Map<String, byte[]> getXAttrs(Path f, List<String> names)
throws IOException {
Preconditions.checkArgument(names != null && !names.isEmpty(),
"XAttr names cannot be null or empty.");
Map<String, String> params = new HashMap<String, String>();
params.put(OP_PARAM, Operation.GETXATTRS.toString());
Map<String, List<String>> multiValuedParams = Maps.newHashMap();
multiValuedParams.put(XATTR_NAME_PARAM, names);
HttpURLConnection conn = getConnection(Operation.GETXATTRS.getMethod(),
params, multiValuedParams, f, true);
HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
return createXAttrMap((JSONArray) json.get(XATTRS_JSON));
}
@Override
public List<String> listXAttrs(Path f) throws IOException {
Map<String, String> params = new HashMap<String, String>();
params.put(OP_PARAM, Operation.LISTXATTRS.toString());
HttpURLConnection conn = getConnection(Operation.LISTXATTRS.getMethod(),
params, f, true);
HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
return createXAttrNames((String) json.get(XATTRNAMES_JSON));
}
@Override
public void removeXAttr(Path f, String name) throws IOException {
Map<String, String> params = new HashMap<String, String>();
params.put(OP_PARAM, Operation.REMOVEXATTR.toString());
params.put(XATTR_NAME_PARAM, name);
HttpURLConnection conn = getConnection(Operation.REMOVEXATTR.getMethod(),
params, f, true);
HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
}
}

View File

@ -31,6 +31,7 @@ import java.net.URI;
import java.net.URL;
import java.net.URLEncoder;
import java.text.MessageFormat;
import java.util.List;
import java.util.Map;
/**
@ -59,6 +60,24 @@ public class HttpFSUtils {
*/
static URL createURL(Path path, Map<String, String> params)
throws IOException {
return createURL(path, params, null);
}
/**
* Convenience method that creates an HTTP <code>URL</code> for the
* HttpFSServer file system operations.
* <p/>
*
* @param path the file path.
* @param params the query string parameters.
* @param multiValuedParams multi valued parameters of the query string
*
* @return URL a <code>URL</code> for the HttpFSServer server,
*
* @throws IOException thrown if an IO error occurs.
*/
static URL createURL(Path path, Map<String, String> params, Map<String,
List<String>> multiValuedParams) throws IOException {
URI uri = path.toUri();
String realScheme;
if (uri.getScheme().equalsIgnoreCase(HttpFSFileSystem.SCHEME)) {
@ -81,6 +100,18 @@ public class HttpFSUtils {
append(URLEncoder.encode(entry.getValue(), "UTF8"));
separator = "&";
}
if (multiValuedParams != null) {
for (Map.Entry<String, List<String>> multiValuedEntry :
multiValuedParams.entrySet()) {
String name = URLEncoder.encode(multiValuedEntry.getKey(), "UTF8");
List<String> values = multiValuedEntry.getValue();
for (String value : values) {
sb.append(separator).append(name).append("=").
append(URLEncoder.encode(value, "UTF8"));
separator = "&";
}
}
}
return new URL(sb.toString());
}
@ -96,7 +127,7 @@ public class HttpFSUtils {
* @throws IOException thrown if the current status code does not match the
* expected one.
*/
@SuppressWarnings({"unchecked", "deprecation"})
@SuppressWarnings({"unchecked"})
static void validateResponse(HttpURLConnection conn, int expected)
throws IOException {
int status = conn.getResponseCode();

View File

@ -25,6 +25,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.GlobFilter;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.XAttrCodec;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.http.client.HttpFSFileSystem;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
@ -38,9 +40,11 @@ import org.json.simple.JSONObject;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
/**
* FileSystem operation executors used by {@link HttpFSServer}.
@ -235,6 +239,50 @@ public class FSOperations {
return response;
}
/**
* Converts xAttrs to a JSON object.
*
* @param xAttrs file xAttrs.
* @param encoding format of xattr values.
*
* @return The JSON representation of the xAttrs.
* @throws IOException
*/
@SuppressWarnings({"unchecked", "rawtypes"})
private static Map xAttrsToJSON(Map<String, byte[]> xAttrs,
XAttrCodec encoding) throws IOException {
Map jsonMap = new LinkedHashMap();
JSONArray jsonArray = new JSONArray();
if (xAttrs != null) {
for (Entry<String, byte[]> e : xAttrs.entrySet()) {
Map json = new LinkedHashMap();
json.put(HttpFSFileSystem.XATTR_NAME_JSON, e.getKey());
if (e.getValue() != null) {
json.put(HttpFSFileSystem.XATTR_VALUE_JSON,
XAttrCodec.encodeValue(e.getValue(), encoding));
}
jsonArray.add(json);
}
}
jsonMap.put(HttpFSFileSystem.XATTRS_JSON, jsonArray);
return jsonMap;
}
/**
* Converts xAttr names to a JSON object.
*
* @param names file xAttr names.
*
* @return The JSON representation of the xAttr names.
* @throws IOException
*/
@SuppressWarnings({"unchecked", "rawtypes"})
private static Map xAttrNamesToJSON(List<String> names) throws IOException {
Map jsonMap = new LinkedHashMap();
jsonMap.put(HttpFSFileSystem.XATTRNAMES_JSON, JSONArray.toJSONString(names));
return jsonMap;
}
/**
* Converts a <code>ContentSummary</code> object into a JSON array
* object.
@ -1099,4 +1147,132 @@ public class FSOperations {
}
/**
* Executor that performs a setxattr FileSystemAccess files system operation.
*/
@InterfaceAudience.Private
public static class FSSetXAttr implements
FileSystemAccess.FileSystemExecutor<Void> {
private Path path;
private String name;
private byte[] value;
private EnumSet<XAttrSetFlag> flag;
public FSSetXAttr(String path, String name, String encodedValue,
EnumSet<XAttrSetFlag> flag) throws IOException {
this.path = new Path(path);
this.name = name;
this.value = XAttrCodec.decodeValue(encodedValue);
this.flag = flag;
}
@Override
public Void execute(FileSystem fs) throws IOException {
fs.setXAttr(path, name, value, flag);
return null;
}
}
/**
* Executor that performs a removexattr FileSystemAccess files system
* operation.
*/
@InterfaceAudience.Private
public static class FSRemoveXAttr implements
FileSystemAccess.FileSystemExecutor<Void> {
private Path path;
private String name;
public FSRemoveXAttr(String path, String name) {
this.path = new Path(path);
this.name = name;
}
@Override
public Void execute(FileSystem fs) throws IOException {
fs.removeXAttr(path, name);
return null;
}
}
/**
* Executor that performs listing xattrs FileSystemAccess files system
* operation.
*/
@SuppressWarnings("rawtypes")
@InterfaceAudience.Private
public static class FSListXAttrs implements
FileSystemAccess.FileSystemExecutor<Map> {
private Path path;
/**
* Creates listing xattrs executor.
*
* @param path the path to retrieve the xattrs.
*/
public FSListXAttrs(String path) {
this.path = new Path(path);
}
/**
* Executes the filesystem operation.
*
* @param fs filesystem instance to use.
*
* @return Map a map object (JSON friendly) with the xattr names.
*
* @throws IOException thrown if an IO error occured.
*/
@Override
public Map execute(FileSystem fs) throws IOException {
List<String> names = fs.listXAttrs(path);
return xAttrNamesToJSON(names);
}
}
/**
* Executor that performs getting xattrs FileSystemAccess files system
* operation.
*/
@SuppressWarnings("rawtypes")
@InterfaceAudience.Private
public static class FSGetXAttrs implements
FileSystemAccess.FileSystemExecutor<Map> {
private Path path;
private List<String> names;
private XAttrCodec encoding;
/**
* Creates getting xattrs executor.
*
* @param path the path to retrieve the xattrs.
*/
public FSGetXAttrs(String path, List<String> names, XAttrCodec encoding) {
this.path = new Path(path);
this.names = names;
this.encoding = encoding;
}
/**
* Executes the filesystem operation.
*
* @param fs filesystem instance to use.
*
* @return Map a map object (JSON friendly) with the xattrs.
*
* @throws IOException thrown if an IO error occured.
*/
@Override
public Map execute(FileSystem fs) throws IOException {
Map<String, byte[]> xattrs = null;
if (names != null && !names.isEmpty()) {
xattrs = fs.getXAttrs(path, names);
} else {
xattrs = fs.getXAttrs(path);
}
return xAttrsToJSON(xattrs, encoding);
}
}
}

View File

@ -18,10 +18,13 @@
package org.apache.hadoop.fs.http.server;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.XAttrCodec;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.http.client.HttpFSFileSystem;
import org.apache.hadoop.fs.http.client.HttpFSFileSystem.Operation;
import org.apache.hadoop.lib.wsrs.BooleanParam;
import org.apache.hadoop.lib.wsrs.EnumParam;
import org.apache.hadoop.lib.wsrs.EnumSetParam;
import org.apache.hadoop.lib.wsrs.LongParam;
import org.apache.hadoop.lib.wsrs.Param;
import org.apache.hadoop.lib.wsrs.ParametersProvider;
@ -92,6 +95,15 @@ public class HttpFSParametersProvider extends ParametersProvider {
new Class[]{DoAsParam.class, AclPermissionParam.class});
PARAMS_DEF.put(Operation.REMOVEDEFAULTACL,
new Class[]{DoAsParam.class});
PARAMS_DEF.put(Operation.SETXATTR,
new Class[]{DoAsParam.class, XAttrNameParam.class, XAttrValueParam.class,
XAttrSetFlagParam.class});
PARAMS_DEF.put(Operation.REMOVEXATTR,
new Class[]{DoAsParam.class, XAttrNameParam.class});
PARAMS_DEF.put(Operation.GETXATTRS,
new Class[]{DoAsParam.class, XAttrNameParam.class, XAttrEncodingParam.class});
PARAMS_DEF.put(Operation.LISTXATTRS,
new Class[]{DoAsParam.class});
}
public HttpFSParametersProvider() {
@ -461,4 +473,79 @@ public class HttpFSParametersProvider extends ParametersProvider {
super(NAME, null);
}
}
/**
* Class for xattr parameter.
*/
@InterfaceAudience.Private
public static class XAttrNameParam extends StringParam {
public static final String XATTR_NAME_REGX =
"^(user\\.|trusted\\.|system\\.|security\\.).+";
/**
* Parameter name.
*/
public static final String NAME = HttpFSFileSystem.XATTR_NAME_PARAM;
private static final Pattern pattern = Pattern.compile(XATTR_NAME_REGX);
/**
* Constructor.
*/
public XAttrNameParam() {
super(NAME, null, pattern);
}
}
/**
* Class for xattr parameter.
*/
@InterfaceAudience.Private
public static class XAttrValueParam extends StringParam {
/**
* Parameter name.
*/
public static final String NAME = HttpFSFileSystem.XATTR_VALUE_PARAM;
/**
* Constructor.
*/
public XAttrValueParam() {
super(NAME, null);
}
}
/**
* Class for xattr parameter.
*/
@InterfaceAudience.Private
public static class XAttrSetFlagParam extends EnumSetParam<XAttrSetFlag> {
/**
* Parameter name.
*/
public static final String NAME = HttpFSFileSystem.XATTR_SET_FLAG_PARAM;
/**
* Constructor.
*/
public XAttrSetFlagParam() {
super(NAME, XAttrSetFlag.class, null);
}
}
/**
* Class for xattr parameter.
*/
@InterfaceAudience.Private
public static class XAttrEncodingParam extends EnumParam<XAttrCodec> {
/**
* Parameter name.
*/
public static final String NAME = HttpFSFileSystem.XATTR_ENCODING_PARAM;
/**
* Constructor.
*/
public XAttrEncodingParam() {
super(NAME, XAttrCodec.class, null);
}
}
}

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.fs.http.server;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.XAttrCodec;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.http.client.HttpFSFileSystem;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.AccessTimeParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.AclPermissionParam;
@ -40,6 +42,10 @@ import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.PermissionParam
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.RecursiveParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.ReplicationParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.SourcesParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.XAttrEncodingParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.XAttrNameParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.XAttrSetFlagParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.XAttrValueParam;
import org.apache.hadoop.lib.service.FileSystemAccess;
import org.apache.hadoop.lib.service.FileSystemAccessException;
import org.apache.hadoop.lib.service.Groups;
@ -75,6 +81,7 @@ import java.net.URI;
import java.security.AccessControlException;
import java.security.Principal;
import java.text.MessageFormat;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@ -322,6 +329,27 @@ public class HttpFSServer {
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break;
}
case GETXATTRS: {
List<String> xattrNames = params.getValues(XAttrNameParam.NAME,
XAttrNameParam.class);
XAttrCodec encoding = params.get(XAttrEncodingParam.NAME,
XAttrEncodingParam.class);
FSOperations.FSGetXAttrs command = new FSOperations.FSGetXAttrs(path,
xattrNames, encoding);
@SuppressWarnings("rawtypes")
Map json = fsExecute(user, doAs, command);
AUDIT_LOG.info("XAttrs for [{}]", path);
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break;
}
case LISTXATTRS: {
FSOperations.FSListXAttrs command = new FSOperations.FSListXAttrs(path);
@SuppressWarnings("rawtypes")
Map json = fsExecute(user, doAs, command);
AUDIT_LOG.info("XAttr names for [{}]", path);
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break;
}
default: {
throw new IOException(
MessageFormat.format("Invalid HTTP GET operation [{0}]",
@ -526,6 +554,30 @@ public class HttpFSServer {
}
break;
}
case SETXATTR: {
String xattrName = params.get(XAttrNameParam.NAME,
XAttrNameParam.class);
String xattrValue = params.get(XAttrValueParam.NAME,
XAttrValueParam.class);
EnumSet<XAttrSetFlag> flag = params.get(XAttrSetFlagParam.NAME,
XAttrSetFlagParam.class);
FSOperations.FSSetXAttr command = new FSOperations.FSSetXAttr(
path, xattrName, xattrValue, flag);
fsExecute(user, doAs, command);
AUDIT_LOG.info("[{}] to xAttr [{}]", path, xattrName);
response = Response.ok().build();
break;
}
case REMOVEXATTR: {
String xattrName = params.get(XAttrNameParam.NAME, XAttrNameParam.class);
FSOperations.FSRemoveXAttr command = new FSOperations.FSRemoveXAttr(
path, xattrName);
fsExecute(user, doAs, command);
AUDIT_LOG.info("[{}] removed xAttr [{}]", path, xattrName);
response = Response.ok().build();
break;
}
case MKDIRS: {
Short permission = params.get(PermissionParam.NAME,
PermissionParam.class);

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.lib.wsrs;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.Iterator;
import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
public abstract class EnumSetParam<E extends Enum<E>> extends Param<EnumSet<E>> {
Class<E> klass;
public EnumSetParam(String name, Class<E> e, EnumSet<E> defaultValue) {
super(name, defaultValue);
klass = e;
}
@Override
protected EnumSet<E> parse(String str) throws Exception {
final EnumSet<E> set = EnumSet.noneOf(klass);
if (!str.isEmpty()) {
for (String sub : str.split(",")) {
set.add(Enum.valueOf(klass, sub.trim().toUpperCase()));
}
}
return set;
}
@Override
protected String getDomain() {
return Arrays.asList(klass.getEnumConstants()).toString();
}
/** Convert an EnumSet to a string of comma separated values. */
public static <E extends Enum<E>> String toString(EnumSet<E> set) {
if (set == null || set.isEmpty()) {
return "";
} else {
final StringBuilder b = new StringBuilder();
final Iterator<E> i = set.iterator();
b.append(i.next());
while (i.hasNext()) {
b.append(',').append(i.next());
}
return b.toString();
}
}
@Override
public String toString() {
return getName() + "=" + toString(value);
}
}

View File

@ -19,6 +19,9 @@ package org.apache.hadoop.lib.wsrs;
import org.apache.hadoop.classification.InterfaceAudience;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
/**
@ -28,14 +31,14 @@ import java.util.Map;
*/
@InterfaceAudience.Private
public class Parameters {
private Map<String, Param<?>> params;
private Map<String, List<Param<?>>> params;
/**
* Constructor that receives the request parsed parameters.
*
* @param params the request parsed parameters.
*/
public Parameters(Map<String, Param<?>> params) {
public Parameters(Map<String, List<Param<?>>> params) {
this.params = params;
}
@ -44,11 +47,36 @@ public class Parameters {
*
* @param name parameter name.
* @param klass class of the parameter, used for value casting.
* @return the value of the parameter.
* @return the value of the parameter.
*/
@SuppressWarnings("unchecked")
public <V, T extends Param<V>> V get(String name, Class<T> klass) {
return ((T)params.get(name)).value();
List<Param<?>> multiParams = (List<Param<?>>)params.get(name);
if (multiParams != null && multiParams.size() > 0) {
return ((T) multiParams.get(0)).value(); // Return first value;
}
return null;
}
/**
* Returns the values of a request parsed parameter.
*
* @param name parameter name.
* @param klass class of the parameter, used for value casting.
* @return List<V> the values of the parameter.
*/
@SuppressWarnings("unchecked")
public <V, T extends Param<V>> List<V> getValues(String name, Class<T> klass) {
List<Param<?>> multiParams = (List<Param<?>>)params.get(name);
List<V> values = Lists.newArrayList();
if (multiParams != null) {
for (Param<?> param : multiParams) {
V value = ((T) param).value();
if (value != null) {
values.add(value);
}
}
}
return values;
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.lib.wsrs;
import com.google.common.collect.Lists;
import com.sun.jersey.api.core.HttpContext;
import com.sun.jersey.core.spi.component.ComponentContext;
import com.sun.jersey.core.spi.component.ComponentScope;
@ -31,6 +32,7 @@ import javax.ws.rs.core.MultivaluedMap;
import java.lang.reflect.Type;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
@ -56,10 +58,11 @@ public class ParametersProvider
@Override
@SuppressWarnings("unchecked")
public Parameters getValue(HttpContext httpContext) {
Map<String, Param<?>> map = new HashMap<String, Param<?>>();
MultivaluedMap<String, String> queryString =
Map<String, List<Param<?>>> map = new HashMap<String, List<Param<?>>>();
Map<String, List<String>> queryString =
httpContext.getRequest().getQueryParameters();
String str = queryString.getFirst(driverParam);
String str = ((MultivaluedMap<String, String>) queryString).
getFirst(driverParam);
if (str == null) {
throw new IllegalArgumentException(
MessageFormat.format("Missing Operation parameter [{0}]",
@ -77,26 +80,40 @@ public class ParametersProvider
MessageFormat.format("Unsupported Operation [{0}]", op));
}
for (Class<Param<?>> paramClass : paramsDef.get(op)) {
Param<?> param;
try {
param = paramClass.newInstance();
} catch (Exception ex) {
throw new UnsupportedOperationException(
MessageFormat.format(
"Param class [{0}] does not have default constructor",
paramClass.getName()));
Param<?> param = newParam(paramClass);
List<Param<?>> paramList = Lists.newArrayList();
List<String> ps = queryString.get(param.getName());
if (ps != null) {
for (String p : ps) {
try {
param.parseParam(p);
}
catch (Exception ex) {
throw new IllegalArgumentException(ex.toString(), ex);
}
paramList.add(param);
param = newParam(paramClass);
}
} else {
paramList.add(param);
}
try {
param.parseParam(queryString.getFirst(param.getName()));
}
catch (Exception ex) {
throw new IllegalArgumentException(ex.toString(), ex);
}
map.put(param.getName(), param);
map.put(param.getName(), paramList);
}
return new Parameters(map);
}
private Param<?> newParam(Class<Param<?>> paramClass) {
try {
return paramClass.newInstance();
} catch (Exception ex) {
throw new UnsupportedOperationException(
MessageFormat.format(
"Param class [{0}] does not have default constructor",
paramClass.getName()));
}
}
@Override
public ComponentScope getScope() {
return ComponentScope.PerRequest;

View File

@ -47,6 +47,8 @@ import org.junit.runners.Parameterized;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.webapp.WebAppContext;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
@ -59,6 +61,8 @@ import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@RunWith(value = Parameterized.class)
public abstract class BaseTestHttpFSWith extends HFSTestCase {
@ -90,6 +94,7 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase {
Configuration conf = new Configuration(false);
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, fsDefaultName);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY, true);
File hdfsSite = new File(new File(homeDir, "conf"), "hdfs-site.xml");
OutputStream os = new FileOutputStream(hdfsSite);
conf.writeXml(os);
@ -481,6 +486,198 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase {
Assert.assertEquals(httpContentSummary.getSpaceConsumed(), hdfsContentSummary.getSpaceConsumed());
Assert.assertEquals(httpContentSummary.getSpaceQuota(), hdfsContentSummary.getSpaceQuota());
}
/** Set xattr */
private void testSetXAttr() throws Exception {
if (!isLocalFS()) {
FileSystem fs = FileSystem.get(getProxiedFSConf());
fs.mkdirs(getProxiedFSTestDir());
Path path = new Path(getProxiedFSTestDir(), "foo.txt");
OutputStream os = fs.create(path);
os.write(1);
os.close();
fs.close();
final String name1 = "user.a1";
final byte[] value1 = new byte[]{0x31, 0x32, 0x33};
final String name2 = "user.a2";
final byte[] value2 = new byte[]{0x41, 0x42, 0x43};
final String name3 = "user.a3";
final byte[] value3 = null;
final String name4 = "trusted.a1";
final byte[] value4 = new byte[]{0x31, 0x32, 0x33};
final String name5 = "a1";
fs = getHttpFSFileSystem();
fs.setXAttr(path, name1, value1);
fs.setXAttr(path, name2, value2);
fs.setXAttr(path, name3, value3);
fs.setXAttr(path, name4, value4);
try {
fs.setXAttr(path, name5, value1);
Assert.fail("Set xAttr with incorrect name format should fail.");
} catch (IOException e) {
} catch (IllegalArgumentException e) {
}
fs.close();
fs = FileSystem.get(getProxiedFSConf());
Map<String, byte[]> xAttrs = fs.getXAttrs(path);
fs.close();
Assert.assertEquals(4, xAttrs.size());
Assert.assertArrayEquals(value1, xAttrs.get(name1));
Assert.assertArrayEquals(value2, xAttrs.get(name2));
Assert.assertArrayEquals(new byte[0], xAttrs.get(name3));
Assert.assertArrayEquals(value4, xAttrs.get(name4));
}
}
/** Get xattrs */
private void testGetXAttrs() throws Exception {
if (!isLocalFS()) {
FileSystem fs = FileSystem.get(getProxiedFSConf());
fs.mkdirs(getProxiedFSTestDir());
Path path = new Path(getProxiedFSTestDir(), "foo.txt");
OutputStream os = fs.create(path);
os.write(1);
os.close();
fs.close();
final String name1 = "user.a1";
final byte[] value1 = new byte[]{0x31, 0x32, 0x33};
final String name2 = "user.a2";
final byte[] value2 = new byte[]{0x41, 0x42, 0x43};
final String name3 = "user.a3";
final byte[] value3 = null;
final String name4 = "trusted.a1";
final byte[] value4 = new byte[]{0x31, 0x32, 0x33};
fs = FileSystem.get(getProxiedFSConf());
fs.setXAttr(path, name1, value1);
fs.setXAttr(path, name2, value2);
fs.setXAttr(path, name3, value3);
fs.setXAttr(path, name4, value4);
fs.close();
// Get xattrs with names parameter
fs = getHttpFSFileSystem();
List<String> names = Lists.newArrayList();
names.add(name1);
names.add(name2);
names.add(name3);
names.add(name4);
Map<String, byte[]> xAttrs = fs.getXAttrs(path, names);
fs.close();
Assert.assertEquals(4, xAttrs.size());
Assert.assertArrayEquals(value1, xAttrs.get(name1));
Assert.assertArrayEquals(value2, xAttrs.get(name2));
Assert.assertArrayEquals(new byte[0], xAttrs.get(name3));
Assert.assertArrayEquals(value4, xAttrs.get(name4));
// Get specific xattr
fs = getHttpFSFileSystem();
byte[] value = fs.getXAttr(path, name1);
Assert.assertArrayEquals(value1, value);
final String name5 = "a1";
try {
value = fs.getXAttr(path, name5);
Assert.fail("Get xAttr with incorrect name format should fail.");
} catch (IOException e) {
} catch (IllegalArgumentException e) {
}
fs.close();
// Get all xattrs
fs = getHttpFSFileSystem();
xAttrs = fs.getXAttrs(path);
fs.close();
Assert.assertEquals(4, xAttrs.size());
Assert.assertArrayEquals(value1, xAttrs.get(name1));
Assert.assertArrayEquals(value2, xAttrs.get(name2));
Assert.assertArrayEquals(new byte[0], xAttrs.get(name3));
Assert.assertArrayEquals(value4, xAttrs.get(name4));
}
}
/** Remove xattr */
private void testRemoveXAttr() throws Exception {
if (!isLocalFS()) {
FileSystem fs = FileSystem.get(getProxiedFSConf());
fs.mkdirs(getProxiedFSTestDir());
Path path = new Path(getProxiedFSTestDir(), "foo.txt");
OutputStream os = fs.create(path);
os.write(1);
os.close();
fs.close();
final String name1 = "user.a1";
final byte[] value1 = new byte[]{0x31, 0x32, 0x33};
final String name2 = "user.a2";
final byte[] value2 = new byte[]{0x41, 0x42, 0x43};
final String name3 = "user.a3";
final byte[] value3 = null;
final String name4 = "trusted.a1";
final byte[] value4 = new byte[]{0x31, 0x32, 0x33};
final String name5 = "a1";
fs = FileSystem.get(getProxiedFSConf());
fs.setXAttr(path, name1, value1);
fs.setXAttr(path, name2, value2);
fs.setXAttr(path, name3, value3);
fs.setXAttr(path, name4, value4);
fs.close();
fs = getHttpFSFileSystem();
fs.removeXAttr(path, name1);
fs.removeXAttr(path, name3);
fs.removeXAttr(path, name4);
try {
fs.removeXAttr(path, name5);
Assert.fail("Remove xAttr with incorrect name format should fail.");
} catch (IOException e) {
} catch (IllegalArgumentException e) {
}
fs = FileSystem.get(getProxiedFSConf());
Map<String, byte[]> xAttrs = fs.getXAttrs(path);
fs.close();
Assert.assertEquals(1, xAttrs.size());
Assert.assertArrayEquals(value2, xAttrs.get(name2));
}
}
/** List xattrs */
private void testListXAttrs() throws Exception {
if (!isLocalFS()) {
FileSystem fs = FileSystem.get(getProxiedFSConf());
fs.mkdirs(getProxiedFSTestDir());
Path path = new Path(getProxiedFSTestDir(), "foo.txt");
OutputStream os = fs.create(path);
os.write(1);
os.close();
fs.close();
final String name1 = "user.a1";
final byte[] value1 = new byte[]{0x31, 0x32, 0x33};
final String name2 = "user.a2";
final byte[] value2 = new byte[]{0x41, 0x42, 0x43};
final String name3 = "user.a3";
final byte[] value3 = null;
final String name4 = "trusted.a1";
final byte[] value4 = new byte[]{0x31, 0x32, 0x33};
fs = FileSystem.get(getProxiedFSConf());
fs.setXAttr(path, name1, value1);
fs.setXAttr(path, name2, value2);
fs.setXAttr(path, name3, value3);
fs.setXAttr(path, name4, value4);
fs.close();
fs = getHttpFSFileSystem();
List<String> names = fs.listXAttrs(path);
Assert.assertEquals(4, names.size());
Assert.assertTrue(names.contains(name1));
Assert.assertTrue(names.contains(name2));
Assert.assertTrue(names.contains(name3));
Assert.assertTrue(names.contains(name4));
}
}
/**
* Runs assertions testing that two AclStatus objects contain the same info
@ -587,7 +784,7 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase {
protected enum Operation {
GET, OPEN, CREATE, APPEND, CONCAT, RENAME, DELETE, LIST_STATUS, WORKING_DIRECTORY, MKDIRS,
SET_TIMES, SET_PERMISSION, SET_OWNER, SET_REPLICATION, CHECKSUM, CONTENT_SUMMARY,
FILEACLS, DIRACLS
FILEACLS, DIRACLS, SET_XATTR, GET_XATTRS, REMOVE_XATTR, LIST_XATTRS
}
private void operation(Operation op) throws Exception {
@ -645,6 +842,18 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase {
case DIRACLS:
testDirAcls();
break;
case SET_XATTR:
testSetXAttr();
break;
case REMOVE_XATTR:
testRemoveXAttr();
break;
case GET_XATTRS:
testGetXAttrs();
break;
case LIST_XATTRS:
testListXAttrs();
break;
}
}

View File

@ -36,11 +36,13 @@ import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.XAttrCodec;
import org.apache.hadoop.fs.http.client.HttpFSKerberosAuthenticator;
import org.apache.hadoop.lib.server.Service;
import org.apache.hadoop.lib.server.ServiceException;
@ -62,6 +64,8 @@ import org.junit.Test;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.webapp.WebAppContext;
import com.google.common.collect.Maps;
public class TestHttpFSServer extends HFSTestCase {
@Test
@ -132,6 +136,7 @@ public class TestHttpFSServer extends HFSTestCase {
Configuration conf = new Configuration(false);
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, fsDefaultName);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY, true);
File hdfsSite = new File(hadoopConfDir, "hdfs-site.xml");
OutputStream os = new FileOutputStream(hdfsSite);
conf.writeXml(os);
@ -356,6 +361,36 @@ public class TestHttpFSServer extends HFSTestCase {
}
return entries;
}
/**
* Parse xAttrs from JSON result of GETXATTRS call, return xAttrs Map.
* @param statusJson JSON from GETXATTRS
* @return Map<String, byte[]> xAttrs Map
* @throws Exception
*/
private Map<String, byte[]> getXAttrs(String statusJson) throws Exception {
Map<String, byte[]> xAttrs = Maps.newHashMap();
JSONParser parser = new JSONParser();
JSONObject jsonObject = (JSONObject) parser.parse(statusJson);
JSONArray jsonXAttrs = (JSONArray) jsonObject.get("XAttrs");
if (jsonXAttrs != null) {
for (Object a : jsonXAttrs) {
String name = (String) ((JSONObject)a).get("name");
String value = (String) ((JSONObject)a).get("value");
xAttrs.put(name, decodeXAttrValue(value));
}
}
return xAttrs;
}
/** Decode xattr value from string */
private byte[] decodeXAttrValue(String value) throws IOException {
if (value != null) {
return XAttrCodec.decodeValue(value);
} else {
return new byte[0];
}
}
/**
* Validate that files are created with 755 permissions when no
@ -388,6 +423,60 @@ public class TestHttpFSServer extends HFSTestCase {
statusJson = getStatus("/perm/p-321", "GETFILESTATUS");
Assert.assertTrue("321".equals(getPerms(statusJson)));
}
/**
* Validate XAttr get/set/remove calls.
*/
@Test
@TestDir
@TestJetty
@TestHdfs
public void testXAttrs() throws Exception {
final String name1 = "user.a1";
final byte[] value1 = new byte[]{0x31, 0x32, 0x33};
final String name2 = "user.a2";
final byte[] value2 = new byte[]{0x41, 0x42, 0x43};
final String dir = "/xattrTest";
final String path = dir + "/file";
createHttpFSServer(false);
FileSystem fs = FileSystem.get(TestHdfsHelper.getHdfsConf());
fs.mkdirs(new Path(dir));
createWithHttp(path,null);
String statusJson = getStatus(path, "GETXATTRS");
Map<String, byte[]> xAttrs = getXAttrs(statusJson);
Assert.assertEquals(0, xAttrs.size());
// Set two xattrs
putCmd(path, "SETXATTR", setXAttrParam(name1, value1));
putCmd(path, "SETXATTR", setXAttrParam(name2, value2));
statusJson = getStatus(path, "GETXATTRS");
xAttrs = getXAttrs(statusJson);
Assert.assertEquals(2, xAttrs.size());
Assert.assertArrayEquals(value1, xAttrs.get(name1));
Assert.assertArrayEquals(value2, xAttrs.get(name2));
// Remove one xattr
putCmd(path, "REMOVEXATTR", "xattr.name=" + name1);
statusJson = getStatus(path, "GETXATTRS");
xAttrs = getXAttrs(statusJson);
Assert.assertEquals(1, xAttrs.size());
Assert.assertArrayEquals(value2, xAttrs.get(name2));
// Remove another xattr, then there is no xattr
putCmd(path, "REMOVEXATTR", "xattr.name=" + name2);
statusJson = getStatus(path, "GETXATTRS");
xAttrs = getXAttrs(statusJson);
Assert.assertEquals(0, xAttrs.size());
}
/** Params for setting an xAttr */
public static String setXAttrParam(String name, byte[] value) throws IOException {
return "xattr.name=" + name + "&xattr.value=" + XAttrCodec.encodeValue(
value, XAttrCodec.HEX) + "&encoding=hex&flag=create";
}
/**
* Validate the various ACL set/modify/remove calls. General strategy is

View File

@ -0,0 +1,248 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.http.server;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.test.HTestCase;
import org.apache.hadoop.test.HadoopUsersConfTestHelper;
import org.apache.hadoop.test.TestDir;
import org.apache.hadoop.test.TestDirHelper;
import org.apache.hadoop.test.TestHdfs;
import org.apache.hadoop.test.TestJetty;
import org.apache.hadoop.test.TestJettyHelper;
import org.junit.Assert;
import org.junit.Test;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.webapp.WebAppContext;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Writer;
import java.net.HttpURLConnection;
import java.net.URL;
import java.text.MessageFormat;
/**
* This test class ensures that everything works as expected when XAttr
* support is turned off HDFS. This is the default configuration. The other
* tests operate with XAttr support turned on.
*/
public class TestHttpFSServerNoXAttrs extends HTestCase {
private MiniDFSCluster miniDfs;
private Configuration nnConf;
/**
* Fire up our own hand-rolled MiniDFSCluster. We do this here instead
* of relying on TestHdfsHelper because we don't want to turn on XAttr
* support.
*
* @throws Exception
*/
private void startMiniDFS() throws Exception {
File testDirRoot = TestDirHelper.getTestDir();
if (System.getProperty("hadoop.log.dir") == null) {
System.setProperty("hadoop.log.dir",
new File(testDirRoot, "hadoop-log").getAbsolutePath());
}
if (System.getProperty("test.build.data") == null) {
System.setProperty("test.build.data",
new File(testDirRoot, "hadoop-data").getAbsolutePath());
}
Configuration conf = HadoopUsersConfTestHelper.getBaseConf();
HadoopUsersConfTestHelper.addUserConf(conf);
conf.set("fs.hdfs.impl.disable.cache", "true");
conf.set("dfs.block.access.token.enable", "false");
conf.set("dfs.permissions", "true");
conf.set("hadoop.security.authentication", "simple");
// Explicitly turn off XAttr support
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY, false);
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
builder.numDataNodes(2);
miniDfs = builder.build();
nnConf = miniDfs.getConfiguration(0);
}
/**
* Create an HttpFS Server to talk to the MiniDFSCluster we created.
* @throws Exception
*/
private void createHttpFSServer() throws Exception {
File homeDir = TestDirHelper.getTestDir();
Assert.assertTrue(new File(homeDir, "conf").mkdir());
Assert.assertTrue(new File(homeDir, "log").mkdir());
Assert.assertTrue(new File(homeDir, "temp").mkdir());
HttpFSServerWebApp.setHomeDirForCurrentThread(homeDir.getAbsolutePath());
File secretFile = new File(new File(homeDir, "conf"), "secret");
Writer w = new FileWriter(secretFile);
w.write("secret");
w.close();
// HDFS configuration
File hadoopConfDir = new File(new File(homeDir, "conf"), "hadoop-conf");
if ( !hadoopConfDir.mkdirs() ) {
throw new IOException();
}
String fsDefaultName =
nnConf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY);
Configuration conf = new Configuration(false);
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, fsDefaultName);
// Explicitly turn off XAttr support
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY, false);
File hdfsSite = new File(hadoopConfDir, "hdfs-site.xml");
OutputStream os = new FileOutputStream(hdfsSite);
conf.writeXml(os);
os.close();
// HTTPFS configuration
conf = new Configuration(false);
conf.set("httpfs.hadoop.config.dir", hadoopConfDir.toString());
conf.set("httpfs.proxyuser." +
HadoopUsersConfTestHelper.getHadoopProxyUser() + ".groups",
HadoopUsersConfTestHelper.getHadoopProxyUserGroups());
conf.set("httpfs.proxyuser." +
HadoopUsersConfTestHelper.getHadoopProxyUser() + ".hosts",
HadoopUsersConfTestHelper.getHadoopProxyUserHosts());
conf.set("httpfs.authentication.signature.secret.file",
secretFile.getAbsolutePath());
File httpfsSite = new File(new File(homeDir, "conf"), "httpfs-site.xml");
os = new FileOutputStream(httpfsSite);
conf.writeXml(os);
os.close();
ClassLoader cl = Thread.currentThread().getContextClassLoader();
URL url = cl.getResource("webapp");
if ( url == null ) {
throw new IOException();
}
WebAppContext context = new WebAppContext(url.getPath(), "/webhdfs");
Server server = TestJettyHelper.getJettyServer();
server.addHandler(context);
server.start();
}
/**
* Talks to the http interface to get the json output of a *STATUS command
* on the given file.
*
* @param filename The file to query.
* @param command Either GETXATTRS, SETXATTR, or REMOVEXATTR
* @throws Exception
*/
private void getStatus(String filename, String command)
throws Exception {
String user = HadoopUsersConfTestHelper.getHadoopUsers()[0];
// Remove leading / from filename
if ( filename.charAt(0) == '/' ) {
filename = filename.substring(1);
}
String pathOps = MessageFormat.format(
"/webhdfs/v1/{0}?user.name={1}&op={2}",
filename, user, command);
URL url = new URL(TestJettyHelper.getJettyURL(), pathOps);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.connect();
int resp = conn.getResponseCode();
BufferedReader reader;
Assert.assertEquals(HttpURLConnection.HTTP_INTERNAL_ERROR, resp);
reader = new BufferedReader(new InputStreamReader(conn.getErrorStream()));
String res = reader.readLine();
Assert.assertTrue(res.contains("RemoteException"));
Assert.assertTrue(res.contains("XAttr"));
Assert.assertTrue(res.contains("rejected"));
}
/**
* General-purpose http PUT command to the httpfs server.
* @param filename The file to operate upon
* @param command The command to perform (SETXATTR, etc)
* @param params Parameters
*/
private void putCmd(String filename, String command,
String params) throws Exception {
String user = HadoopUsersConfTestHelper.getHadoopUsers()[0];
// Remove leading / from filename
if ( filename.charAt(0) == '/' ) {
filename = filename.substring(1);
}
String pathOps = MessageFormat.format(
"/webhdfs/v1/{0}?user.name={1}{2}{3}&op={4}",
filename, user, (params == null) ? "" : "&",
(params == null) ? "" : params, command);
URL url = new URL(TestJettyHelper.getJettyURL(), pathOps);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("PUT");
conn.connect();
int resp = conn.getResponseCode();
Assert.assertEquals(HttpURLConnection.HTTP_INTERNAL_ERROR, resp);
BufferedReader reader;
reader = new BufferedReader(new InputStreamReader(conn.getErrorStream()));
String err = reader.readLine();
Assert.assertTrue(err.contains("RemoteException"));
Assert.assertTrue(err.contains("XAttr"));
Assert.assertTrue(err.contains("rejected"));
}
/**
* Ensure that GETXATTRS, SETXATTR, REMOVEXATTR fail.
*/
@Test
@TestDir
@TestJetty
@TestHdfs
public void testWithXAttrs() throws Exception {
final String name1 = "user.a1";
final byte[] value1 = new byte[]{0x31, 0x32, 0x33};
final String dir = "/noXAttr";
final String path = dir + "/file";
startMiniDFS();
createHttpFSServer();
FileSystem fs = FileSystem.get(nnConf);
fs.mkdirs(new Path(dir));
OutputStream os = fs.create(new Path(path));
os.write(1);
os.close();
/* GETXATTRS, SETXATTR, REMOVEXATTR fail */
getStatus(path, "GETXATTRS");
putCmd(path, "SETXATTR", TestHttpFSServer.setXAttrParam(name1, value1));
putCmd(path, "REMOVEXATTR", "xattr.name=" + name1);
}
}

View File

@ -147,6 +147,7 @@ public class TestHdfsHelper extends TestDirHelper {
conf.set("dfs.permissions", "true");
conf.set("hadoop.security.authentication", "simple");
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY, true);
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
builder.numDataNodes(2);
MiniDFSCluster miniHdfs = builder.build();

View File

@ -471,6 +471,8 @@ Release 2.5.0 - UNRELEASED
HDFS-6486. Add user doc for XAttrs via WebHDFS. (Yi Liu via umamahesh)
HDFS-6430. HTTPFS - Implement XAttr support. (Yi Liu via tucu)
OPTIMIZATIONS
HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)