HDFS-8630. WebHDFS : Support get/set/unset StoragePolicy. Contributed by Surendra Singh Lilhore.

This commit is contained in:
Andrew Wang 2016-12-09 14:56:49 -08:00
parent 40367c8da3
commit e8b0ef0618
16 changed files with 861 additions and 8 deletions

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.web;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.ContentSummary.Builder; import org.apache.hadoop.fs.ContentSummary.Builder;
import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileChecksum;
@ -32,6 +33,7 @@ import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FsPermissionExtension; import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
@ -53,6 +55,8 @@ import java.io.ByteArrayInputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -545,4 +549,49 @@ class JsonUtilClient {
lastLocatedBlock, isLastBlockComplete, null); lastLocatedBlock, isLastBlockComplete, null);
} }
public static Collection<BlockStoragePolicy> getStoragePolicies(
Map<?, ?> json) {
Map<?, ?> policiesJson = (Map<?, ?>) json.get("BlockStoragePolicies");
if (policiesJson != null) {
List<?> objs = (List<?>) policiesJson.get(BlockStoragePolicy.class
.getSimpleName());
if (objs != null) {
BlockStoragePolicy[] storagePolicies = new BlockStoragePolicy[objs
.size()];
for (int i = 0; i < objs.size(); i++) {
final Map<?, ?> m = (Map<?, ?>) objs.get(i);
BlockStoragePolicy blockStoragePolicy = toBlockStoragePolicy(m);
storagePolicies[i] = blockStoragePolicy;
}
return Arrays.asList(storagePolicies);
}
}
return new ArrayList<BlockStoragePolicy>(0);
}
public static BlockStoragePolicy toBlockStoragePolicy(Map<?, ?> m) {
byte id = ((Number) m.get("id")).byteValue();
String name = (String) m.get("name");
StorageType[] storageTypes = toStorageTypes((List<?>) m
.get("storageTypes"));
StorageType[] creationFallbacks = toStorageTypes((List<?>) m
.get("creationFallbacks"));
StorageType[] replicationFallbacks = toStorageTypes((List<?>) m
.get("replicationFallbacks"));
Boolean copyOnCreateFile = (Boolean) m.get("copyOnCreateFile");
return new BlockStoragePolicy(id, name, storageTypes, creationFallbacks,
replicationFallbacks, copyOnCreateFile.booleanValue());
}
private static StorageType[] toStorageTypes(List<?> list) {
if (list == null) {
return null;
} else {
StorageType[] storageTypes = new StorageType[list.size()];
for (int i = 0; i < list.size(); i++) {
storageTypes[i] = StorageType.parseStorageType((String) list.get(i));
}
return storageTypes;
}
}
} }

View File

@ -39,6 +39,7 @@ import java.net.URI;
import java.net.URL; import java.net.URL;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -79,6 +80,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HAUtilClient; import org.apache.hadoop.hdfs.HAUtilClient;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@ -1669,6 +1671,50 @@ public class WebHdfsFileSystem extends FileSystem
: tokenServiceName.toString(); : tokenServiceName.toString();
} }
@Override
public void setStoragePolicy(Path p, String policyName) throws IOException {
if (policyName == null) {
throw new IOException("policyName == null");
}
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.SET_STORAGE_POLICY);
final HttpOpParam.Op op = PutOpParam.Op.SETSTORAGEPOLICY;
new FsPathRunner(op, p, new StoragePolicyParam(policyName)).run();
}
@Override
public Collection<BlockStoragePolicy> getAllStoragePolicies()
throws IOException {
final HttpOpParam.Op op = GetOpParam.Op.GETALLSTORAGEPOLICY;
return new FsPathResponseRunner<Collection<BlockStoragePolicy>>(op, null) {
@Override
Collection<BlockStoragePolicy> decodeResponse(Map<?, ?> json)
throws IOException {
return JsonUtilClient.getStoragePolicies(json);
}
}.run();
}
@Override
public BlockStoragePolicy getStoragePolicy(Path src) throws IOException {
final HttpOpParam.Op op = GetOpParam.Op.GETSTORAGEPOLICY;
return new FsPathResponseRunner<BlockStoragePolicy>(op, src) {
@Override
BlockStoragePolicy decodeResponse(Map<?, ?> json) throws IOException {
return JsonUtilClient.toBlockStoragePolicy((Map<?, ?>) json
.get(BlockStoragePolicy.class.getSimpleName()));
}
}.run();
}
@Override
public void unsetStoragePolicy(Path src) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.UNSET_STORAGE_POLICY);
final HttpOpParam.Op op = PostOpParam.Op.UNSETSTORAGEPOLICY;
new FsPathRunner(op, src).run();
}
@VisibleForTesting @VisibleForTesting
InetSocketAddress[] getResolvedNNAddr() { InetSocketAddress[] getResolvedNNAddr() {
return nnAddrs; return nnAddrs;

View File

@ -39,6 +39,9 @@ public class GetOpParam extends HttpOpParam<GetOpParam.Op> {
GETXATTRS(false, HttpURLConnection.HTTP_OK), GETXATTRS(false, HttpURLConnection.HTTP_OK),
LISTXATTRS(false, HttpURLConnection.HTTP_OK), LISTXATTRS(false, HttpURLConnection.HTTP_OK),
GETALLSTORAGEPOLICY(false, HttpURLConnection.HTTP_OK),
GETSTORAGEPOLICY(false, HttpURLConnection.HTTP_OK),
NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED), NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED),
CHECKACCESS(false, HttpURLConnection.HTTP_OK); CHECKACCESS(false, HttpURLConnection.HTTP_OK);

View File

@ -29,6 +29,8 @@ public class PostOpParam extends HttpOpParam<PostOpParam.Op> {
TRUNCATE(false, HttpURLConnection.HTTP_OK), TRUNCATE(false, HttpURLConnection.HTTP_OK),
UNSETSTORAGEPOLICY(false, HttpURLConnection.HTTP_OK),
NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED); NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED);
final boolean doOutputAndRedirect; final boolean doOutputAndRedirect;

View File

@ -50,6 +50,7 @@ public class PutOpParam extends HttpOpParam<PutOpParam.Op> {
DISALLOWSNAPSHOT(false, HttpURLConnection.HTTP_OK), DISALLOWSNAPSHOT(false, HttpURLConnection.HTTP_OK),
CREATESNAPSHOT(false, HttpURLConnection.HTTP_OK), CREATESNAPSHOT(false, HttpURLConnection.HTTP_OK),
RENAMESNAPSHOT(false, HttpURLConnection.HTTP_OK), RENAMESNAPSHOT(false, HttpURLConnection.HTTP_OK),
SETSTORAGEPOLICY(false, HttpURLConnection.HTTP_OK),
NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED); NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED);

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.web.resources;
/** policy parameter. */
public class StoragePolicyParam extends StringParam {
/** Parameter name. */
public static final String NAME = "storagepolicy";
/** Default parameter value. */
public static final String DEFAULT = "";
private static final Domain DOMAIN = new Domain(NAME, null);
/**
* Constructor.
*
* @param str
* a string representation of the parameter value.
*/
public StoragePolicyParam(final String str) {
super(DOMAIN, str == null || str.equals(DEFAULT) ? null : str);
}
@Override
public String getName() {
return NAME;
}
}

View File

@ -18,8 +18,11 @@
package org.apache.hadoop.fs.http.client; package org.apache.hadoop.fs.http.client;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.ContentSummary;
@ -32,6 +35,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PositionedReadable; import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.XAttrCodec; import org.apache.hadoop.fs.XAttrCodec;
import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclEntry;
@ -39,6 +43,7 @@ import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.FsPermissionExtension; import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
import org.apache.hadoop.lib.wsrs.EnumSetParam; import org.apache.hadoop.lib.wsrs.EnumSetParam;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -112,6 +117,7 @@ public class HttpFSFileSystem extends FileSystem
public static final String XATTR_SET_FLAG_PARAM = "flag"; public static final String XATTR_SET_FLAG_PARAM = "flag";
public static final String XATTR_ENCODING_PARAM = "encoding"; public static final String XATTR_ENCODING_PARAM = "encoding";
public static final String NEW_LENGTH_PARAM = "newlength"; public static final String NEW_LENGTH_PARAM = "newlength";
public static final String POLICY_NAME_PARAM = "storagepolicy";
public static final Short DEFAULT_PERMISSION = 0755; public static final Short DEFAULT_PERMISSION = 0755;
public static final String ACLSPEC_DEFAULT = ""; public static final String ACLSPEC_DEFAULT = "";
@ -185,6 +191,9 @@ public class HttpFSFileSystem extends FileSystem
public static final String ENC_BIT_JSON = "encBit"; public static final String ENC_BIT_JSON = "encBit";
public static final String STORAGE_POLICIES_JSON = "BlockStoragePolicies";
public static final String STORAGE_POLICY_JSON = "BlockStoragePolicy";
public static final int HTTP_TEMPORARY_REDIRECT = 307; public static final int HTTP_TEMPORARY_REDIRECT = 307;
private static final String HTTP_GET = "GET"; private static final String HTTP_GET = "GET";
@ -204,7 +213,9 @@ public class HttpFSFileSystem extends FileSystem
MODIFYACLENTRIES(HTTP_PUT), REMOVEACLENTRIES(HTTP_PUT), MODIFYACLENTRIES(HTTP_PUT), REMOVEACLENTRIES(HTTP_PUT),
REMOVEDEFAULTACL(HTTP_PUT), REMOVEACL(HTTP_PUT), SETACL(HTTP_PUT), REMOVEDEFAULTACL(HTTP_PUT), REMOVEACL(HTTP_PUT), SETACL(HTTP_PUT),
DELETE(HTTP_DELETE), SETXATTR(HTTP_PUT), GETXATTRS(HTTP_GET), DELETE(HTTP_DELETE), SETXATTR(HTTP_PUT), GETXATTRS(HTTP_GET),
REMOVEXATTR(HTTP_PUT), LISTXATTRS(HTTP_GET); REMOVEXATTR(HTTP_PUT), LISTXATTRS(HTTP_GET),
GETALLSTORAGEPOLICY(HTTP_GET), GETSTORAGEPOLICY(HTTP_GET),
SETSTORAGEPOLICY(HTTP_PUT), UNSETSTORAGEPOLICY(HTTP_POST);
private String httpMethod; private String httpMethod;
@ -1243,4 +1254,83 @@ public class HttpFSFileSystem extends FileSystem
params, f, true); params, f, true);
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK); HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
} }
@Override
public Collection<BlockStoragePolicy> getAllStoragePolicies()
throws IOException {
Map<String, String> params = new HashMap<String, String>();
params.put(OP_PARAM, Operation.GETALLSTORAGEPOLICY.toString());
HttpURLConnection conn = getConnection(
Operation.GETALLSTORAGEPOLICY.getMethod(), params, new Path(getUri()
.toString(), "/"), false);
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
return createStoragePolicies((JSONObject) json.get(STORAGE_POLICIES_JSON));
}
private Collection<BlockStoragePolicy> createStoragePolicies(JSONObject map)
throws IOException {
JSONArray jsonArray = (JSONArray) map.get(STORAGE_POLICY_JSON);
BlockStoragePolicy[] policies = new BlockStoragePolicy[jsonArray.size()];
for (int i = 0; i < jsonArray.size(); i++) {
policies[i] = createStoragePolicy((JSONObject) jsonArray.get(i));
}
return Arrays.asList(policies);
}
@Override
public BlockStoragePolicy getStoragePolicy(Path src) throws IOException {
Map<String, String> params = new HashMap<String, String>();
params.put(OP_PARAM, Operation.GETSTORAGEPOLICY.toString());
HttpURLConnection conn = getConnection(
Operation.GETSTORAGEPOLICY.getMethod(), params, src, true);
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
return createStoragePolicy((JSONObject) json.get(STORAGE_POLICY_JSON));
}
private BlockStoragePolicy createStoragePolicy(JSONObject policyJson)
throws IOException {
byte id = ((Number) policyJson.get("id")).byteValue();
String name = (String) policyJson.get("name");
StorageType[] storageTypes = toStorageTypes((JSONArray) policyJson
.get("storageTypes"));
StorageType[] creationFallbacks = toStorageTypes((JSONArray) policyJson
.get("creationFallbacks"));
StorageType[] replicationFallbacks = toStorageTypes((JSONArray) policyJson
.get("replicationFallbacks"));
Boolean copyOnCreateFile = (Boolean) policyJson.get("copyOnCreateFile");
return new BlockStoragePolicy(id, name, storageTypes, creationFallbacks,
replicationFallbacks, copyOnCreateFile.booleanValue());
}
private StorageType[] toStorageTypes(JSONArray array) throws IOException {
if (array == null) {
return null;
} else {
List<StorageType> storageTypes = new ArrayList<StorageType>(array.size());
for (Object name : array) {
storageTypes.add(StorageType.parseStorageType((String) name));
}
return storageTypes.toArray(new StorageType[storageTypes.size()]);
}
}
@Override
public void setStoragePolicy(Path src, String policyName) throws IOException {
Map<String, String> params = new HashMap<String, String>();
params.put(OP_PARAM, Operation.SETSTORAGEPOLICY.toString());
params.put(POLICY_NAME_PARAM, policyName);
HttpURLConnection conn = getConnection(
Operation.SETSTORAGEPOLICY.getMethod(), params, src, true);
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
}
@Override
public void unsetStoragePolicy(Path src) throws IOException {
Map<String, String> params = new HashMap<String, String>();
params.put(OP_PARAM, Operation.UNSETSTORAGEPOLICY.toString());
HttpURLConnection conn = getConnection(
Operation.UNSETSTORAGEPOLICY.getMethod(), params, src, true);
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
}
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.fs.http.server; package org.apache.hadoop.fs.http.server;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.BlockStoragePolicySpi;
import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@ -25,12 +26,14 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.GlobFilter; import org.apache.hadoop.fs.GlobFilter;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.XAttrCodec; import org.apache.hadoop.fs.XAttrCodec;
import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.http.client.HttpFSFileSystem; import org.apache.hadoop.fs.http.client.HttpFSFileSystem;
import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.lib.service.FileSystemAccess; import org.apache.hadoop.lib.service.FileSystemAccess;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
@ -40,6 +43,7 @@ import org.json.simple.JSONObject;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
@ -237,6 +241,46 @@ public class FSOperations {
return json; return json;
} }
@SuppressWarnings({ "unchecked" })
private static JSONObject storagePolicyToJSON(BlockStoragePolicySpi policy) {
BlockStoragePolicy p = (BlockStoragePolicy) policy;
JSONObject policyJson = new JSONObject();
policyJson.put("id", p.getId());
policyJson.put("name", p.getName());
policyJson.put("storageTypes", toJsonArray(p.getStorageTypes()));
policyJson.put("creationFallbacks", toJsonArray(p.getCreationFallbacks()));
policyJson.put("replicationFallbacks",
toJsonArray(p.getReplicationFallbacks()));
policyJson.put("copyOnCreateFile", p.isCopyOnCreateFile());
return policyJson;
}
@SuppressWarnings("unchecked")
private static JSONArray toJsonArray(StorageType[] storageTypes) {
JSONArray jsonArray = new JSONArray();
for (StorageType type : storageTypes) {
jsonArray.add(type.toString());
}
return jsonArray;
}
@SuppressWarnings("unchecked")
private static JSONObject storagePoliciesToJSON(
Collection<? extends BlockStoragePolicySpi> storagePolicies) {
JSONObject json = new JSONObject();
JSONArray jsonArray = new JSONArray();
JSONObject policies = new JSONObject();
if (storagePolicies != null) {
for (BlockStoragePolicySpi policy : storagePolicies) {
JSONObject policyMap = storagePolicyToJSON(policy);
jsonArray.add(policyMap);
}
}
policies.put(HttpFSFileSystem.STORAGE_POLICY_JSON, jsonArray);
json.put(HttpFSFileSystem.STORAGE_POLICIES_JSON, policies);
return json;
}
/** /**
* Executor that performs an append FileSystemAccess files system operation. * Executor that performs an append FileSystemAccess files system operation.
*/ */
@ -1234,4 +1278,91 @@ public class FSOperations {
return xAttrsToJSON(xattrs, encoding); return xAttrsToJSON(xattrs, encoding);
} }
} }
/**
* Executor that performs a getAllStoragePolicies FileSystemAccess files
* system operation.
*/
@SuppressWarnings({ "unchecked" })
@InterfaceAudience.Private
public static class FSGetAllStoragePolicies implements
FileSystemAccess.FileSystemExecutor<JSONObject> {
@Override
public JSONObject execute(FileSystem fs) throws IOException {
Collection<? extends BlockStoragePolicySpi> storagePolicies = fs
.getAllStoragePolicies();
return storagePoliciesToJSON(storagePolicies);
}
}
/**
* Executor that performs a getStoragePolicy FileSystemAccess files system
* operation.
*/
@SuppressWarnings({ "unchecked" })
@InterfaceAudience.Private
public static class FSGetStoragePolicy implements
FileSystemAccess.FileSystemExecutor<JSONObject> {
private Path path;
public FSGetStoragePolicy(String path) {
this.path = new Path(path);
}
@Override
public JSONObject execute(FileSystem fs) throws IOException {
BlockStoragePolicySpi storagePolicy = fs.getStoragePolicy(path);
JSONObject json = new JSONObject();
json.put(HttpFSFileSystem.STORAGE_POLICY_JSON,
storagePolicyToJSON(storagePolicy));
return json;
}
}
/**
* Executor that performs a setStoragePolicy FileSystemAccess files system
* operation.
*/
@InterfaceAudience.Private
public static class FSSetStoragePolicy implements
FileSystemAccess.FileSystemExecutor<Void> {
private Path path;
private String policyName;
public FSSetStoragePolicy(String path, String policyName) {
this.path = new Path(path);
this.policyName = policyName;
}
@Override
public Void execute(FileSystem fs) throws IOException {
fs.setStoragePolicy(path, policyName);
return null;
}
}
/**
* Executor that performs a unsetStoragePolicy FileSystemAccess files system
* operation.
*/
@InterfaceAudience.Private
public static class FSUnsetStoragePolicy implements
FileSystemAccess.FileSystemExecutor<Void> {
private Path path;
public FSUnsetStoragePolicy(String path) {
this.path = new Path(path);
}
@Override
public Void execute(FileSystem fs) throws IOException {
fs.unsetStoragePolicy(path);
return null;
}
}
} }

View File

@ -91,6 +91,11 @@ public class HttpFSParametersProvider extends ParametersProvider {
PARAMS_DEF.put(Operation.GETXATTRS, PARAMS_DEF.put(Operation.GETXATTRS,
new Class[]{XAttrNameParam.class, XAttrEncodingParam.class}); new Class[]{XAttrNameParam.class, XAttrEncodingParam.class});
PARAMS_DEF.put(Operation.LISTXATTRS, new Class[]{}); PARAMS_DEF.put(Operation.LISTXATTRS, new Class[]{});
PARAMS_DEF.put(Operation.GETALLSTORAGEPOLICY, new Class[] {});
PARAMS_DEF.put(Operation.GETSTORAGEPOLICY, new Class[] {});
PARAMS_DEF.put(Operation.SETSTORAGEPOLICY,
new Class[] {PolicyNameParam.class});
PARAMS_DEF.put(Operation.UNSETSTORAGEPOLICY, new Class[] {});
} }
public HttpFSParametersProvider() { public HttpFSParametersProvider() {
@ -520,4 +525,22 @@ public class HttpFSParametersProvider extends ParametersProvider {
super(NAME, XAttrCodec.class, null); super(NAME, XAttrCodec.class, null);
} }
} }
/**
* Class for policyName parameter.
*/
@InterfaceAudience.Private
public static class PolicyNameParam extends StringParam {
/**
* Parameter name.
*/
public static final String NAME = HttpFSFileSystem.POLICY_NAME_PARAM;
/**
* Constructor.
*/
public PolicyNameParam() {
super(NAME, null);
}
}
} }

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.OperationParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.OverwriteParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.OverwriteParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.OwnerParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.OwnerParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.PermissionParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.PermissionParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.PolicyNameParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.RecursiveParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.RecursiveParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.ReplicationParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.ReplicationParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.SourcesParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.SourcesParam;
@ -75,6 +76,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder; import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo; import javax.ws.rs.core.UriInfo;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.URI; import java.net.URI;
@ -320,6 +322,22 @@ public class HttpFSServer {
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build(); response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break; break;
} }
case GETALLSTORAGEPOLICY: {
FSOperations.FSGetAllStoragePolicies command =
new FSOperations.FSGetAllStoragePolicies();
JSONObject json = fsExecute(user, command);
AUDIT_LOG.info("[{}]", path);
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break;
}
case GETSTORAGEPOLICY: {
FSOperations.FSGetStoragePolicy command =
new FSOperations.FSGetStoragePolicy(path);
JSONObject json = fsExecute(user, command);
AUDIT_LOG.info("[{}]", path);
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break;
}
default: { default: {
throw new IOException( throw new IOException(
MessageFormat.format("Invalid HTTP GET operation [{0}]", op.value())); MessageFormat.format("Invalid HTTP GET operation [{0}]", op.value()));
@ -447,6 +465,14 @@ public class HttpFSServer {
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build(); response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break; break;
} }
case UNSETSTORAGEPOLICY: {
FSOperations.FSUnsetStoragePolicy command =
new FSOperations.FSUnsetStoragePolicy(path);
fsExecute(user, command);
AUDIT_LOG.info("Unset storage policy [{}]", path);
response = Response.ok().build();
break;
}
default: { default: {
throw new IOException( throw new IOException(
MessageFormat.format("Invalid HTTP POST operation [{0}]", MessageFormat.format("Invalid HTTP POST operation [{0}]",
@ -664,6 +690,16 @@ public class HttpFSServer {
response = Response.ok().build(); response = Response.ok().build();
break; break;
} }
case SETSTORAGEPOLICY: {
String policyName = params.get(PolicyNameParam.NAME,
PolicyNameParam.class);
FSOperations.FSSetStoragePolicy command =
new FSOperations.FSSetStoragePolicy(path, policyName);
fsExecute(user, command);
AUDIT_LOG.info("[{}] to policy [{}]", path, policyName);
response = Response.ok().build();
break;
}
default: { default: {
throw new IOException( throw new IOException(
MessageFormat.format("Invalid HTTP PUT operation [{0}]", MessageFormat.format("Invalid HTTP PUT operation [{0}]",

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.fs.http.client; package org.apache.hadoop.fs.http.client;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockStoragePolicySpi;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileChecksum;
@ -34,6 +35,8 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.AppendTestUtil; import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.HFSTestCase; import org.apache.hadoop.test.HFSTestCase;
import org.apache.hadoop.test.HadoopUsersConfTestHelper; import org.apache.hadoop.test.HadoopUsersConfTestHelper;
@ -44,6 +47,7 @@ import org.apache.hadoop.test.TestHdfsHelper;
import org.apache.hadoop.test.TestJetty; import org.apache.hadoop.test.TestJetty;
import org.apache.hadoop.test.TestJettyHelper; import org.apache.hadoop.test.TestJettyHelper;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
@ -859,11 +863,55 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase {
assertFalse(httpStatus.isEncrypted()); assertFalse(httpStatus.isEncrypted());
} }
private void testStoragePolicy() throws Exception {
Assume.assumeFalse("Assume its not a local FS", isLocalFS());
FileSystem fs = FileSystem.get(getProxiedFSConf());
fs.mkdirs(getProxiedFSTestDir());
Path path = new Path(getProxiedFSTestDir(), "policy.txt");
FileSystem httpfs = getHttpFSFileSystem();
// test getAllStoragePolicies
BlockStoragePolicy[] dfsPolicies = (BlockStoragePolicy[]) fs
.getAllStoragePolicies().toArray();
BlockStoragePolicy[] httpPolicies = (BlockStoragePolicy[]) httpfs
.getAllStoragePolicies().toArray();
Assert.assertArrayEquals(
"Policy array returned from the DFS and HttpFS should be equals",
dfsPolicies, httpPolicies);
// test get/set/unset policies
DFSTestUtil.createFile(fs, path, 0, (short) 1, 0L);
// get defaultPolicy
BlockStoragePolicySpi defaultdfsPolicy = fs.getStoragePolicy(path);
// set policy through webhdfs
httpfs.setStoragePolicy(path, HdfsConstants.COLD_STORAGE_POLICY_NAME);
// get policy from dfs
BlockStoragePolicySpi dfsPolicy = fs.getStoragePolicy(path);
// get policy from webhdfs
BlockStoragePolicySpi httpFsPolicy = httpfs.getStoragePolicy(path);
Assert
.assertEquals(
"Storage policy returned from the get API should"
+ " be same as set policy",
HdfsConstants.COLD_STORAGE_POLICY_NAME.toString(),
httpFsPolicy.getName());
Assert.assertEquals(
"Storage policy returned from the DFS and HttpFS should be equals",
httpFsPolicy, dfsPolicy);
// unset policy
httpfs.unsetStoragePolicy(path);
Assert
.assertEquals(
"After unset storage policy, the get API shoudld"
+ " return the default policy",
defaultdfsPolicy, httpfs.getStoragePolicy(path));
fs.close();
}
protected enum Operation { protected enum Operation {
GET, OPEN, CREATE, APPEND, TRUNCATE, CONCAT, RENAME, DELETE, LIST_STATUS, GET, OPEN, CREATE, APPEND, TRUNCATE, CONCAT, RENAME, DELETE, LIST_STATUS,
WORKING_DIRECTORY, MKDIRS, SET_TIMES, SET_PERMISSION, SET_OWNER, WORKING_DIRECTORY, MKDIRS, SET_TIMES, SET_PERMISSION, SET_OWNER,
SET_REPLICATION, CHECKSUM, CONTENT_SUMMARY, FILEACLS, DIRACLS, SET_XATTR, SET_REPLICATION, CHECKSUM, CONTENT_SUMMARY, FILEACLS, DIRACLS, SET_XATTR,
GET_XATTRS, REMOVE_XATTR, LIST_XATTRS, ENCRYPTION GET_XATTRS, REMOVE_XATTR, LIST_XATTRS, ENCRYPTION, STORAGEPOLICY
} }
private void operation(Operation op) throws Exception { private void operation(Operation op) throws Exception {
@ -940,6 +988,9 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase {
case ENCRYPTION: case ENCRYPTION:
testEncryption(); testEncryption();
break; break;
case STORAGEPOLICY:
testStoragePolicy();
break;
} }
} }

View File

@ -64,6 +64,7 @@ import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.XAttrHelper; import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@ -405,13 +406,16 @@ public class NamenodeWebHdfsMethods {
@QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT) @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
final ExcludeDatanodesParam excludeDatanodes, final ExcludeDatanodesParam excludeDatanodes,
@QueryParam(CreateFlagParam.NAME) @DefaultValue(CreateFlagParam.DEFAULT) @QueryParam(CreateFlagParam.NAME) @DefaultValue(CreateFlagParam.DEFAULT)
final CreateFlagParam createFlagParam final CreateFlagParam createFlagParam,
@QueryParam(StoragePolicyParam.NAME) @DefaultValue(StoragePolicyParam
.DEFAULT) final StoragePolicyParam policyName
) throws IOException, InterruptedException { ) throws IOException, InterruptedException {
return put(ugi, delegation, username, doAsUser, ROOT, op, destination, return put(ugi, delegation, username, doAsUser, ROOT, op, destination,
owner, group, permission, overwrite, bufferSize, replication, owner, group, permission, overwrite, bufferSize, replication,
blockSize, modificationTime, accessTime, renameOptions, createParent, blockSize, modificationTime, accessTime, renameOptions, createParent,
delegationTokenArgument, aclPermission, xattrName, xattrValue, delegationTokenArgument, aclPermission, xattrName, xattrValue,
xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes, createFlagParam); xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes,
createFlagParam, policyName);
} }
/** Handle HTTP PUT request. */ /** Handle HTTP PUT request. */
@ -471,14 +475,16 @@ public class NamenodeWebHdfsMethods {
@QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT) @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
final ExcludeDatanodesParam excludeDatanodes, final ExcludeDatanodesParam excludeDatanodes,
@QueryParam(CreateFlagParam.NAME) @DefaultValue(CreateFlagParam.DEFAULT) @QueryParam(CreateFlagParam.NAME) @DefaultValue(CreateFlagParam.DEFAULT)
final CreateFlagParam createFlagParam final CreateFlagParam createFlagParam,
@QueryParam(StoragePolicyParam.NAME) @DefaultValue(StoragePolicyParam
.DEFAULT) final StoragePolicyParam policyName
) throws IOException, InterruptedException { ) throws IOException, InterruptedException {
init(ugi, delegation, username, doAsUser, path, op, destination, owner, init(ugi, delegation, username, doAsUser, path, op, destination, owner,
group, permission, overwrite, bufferSize, replication, blockSize, group, permission, overwrite, bufferSize, replication, blockSize,
modificationTime, accessTime, renameOptions, delegationTokenArgument, modificationTime, accessTime, renameOptions, delegationTokenArgument,
aclPermission, xattrName, xattrValue, xattrSetFlag, snapshotName, aclPermission, xattrName, xattrValue, xattrSetFlag, snapshotName,
oldSnapshotName, excludeDatanodes, createFlagParam); oldSnapshotName, excludeDatanodes, createFlagParam, policyName);
return doAs(ugi, new PrivilegedExceptionAction<Response>() { return doAs(ugi, new PrivilegedExceptionAction<Response>() {
@Override @Override
@ -489,7 +495,7 @@ public class NamenodeWebHdfsMethods {
modificationTime, accessTime, renameOptions, createParent, modificationTime, accessTime, renameOptions, createParent,
delegationTokenArgument, aclPermission, xattrName, xattrValue, delegationTokenArgument, aclPermission, xattrName, xattrValue,
xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes, xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes,
createFlagParam); createFlagParam, policyName);
} }
}); });
} }
@ -521,7 +527,8 @@ public class NamenodeWebHdfsMethods {
final SnapshotNameParam snapshotName, final SnapshotNameParam snapshotName,
final OldSnapshotNameParam oldSnapshotName, final OldSnapshotNameParam oldSnapshotName,
final ExcludeDatanodesParam exclDatanodes, final ExcludeDatanodesParam exclDatanodes,
final CreateFlagParam createFlagParam final CreateFlagParam createFlagParam,
final StoragePolicyParam policyName
) throws IOException, URISyntaxException { ) throws IOException, URISyntaxException {
final Configuration conf = (Configuration)context.getAttribute(JspHelper.CURRENT_CONF); final Configuration conf = (Configuration)context.getAttribute(JspHelper.CURRENT_CONF);
@ -652,6 +659,13 @@ public class NamenodeWebHdfsMethods {
np.disallowSnapshot(fullpath); np.disallowSnapshot(fullpath);
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build(); return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
} }
case SETSTORAGEPOLICY: {
if (policyName.getValue() == null) {
throw new IllegalArgumentException("Storage policy name is empty.");
}
np.setStoragePolicy(fullpath, policyName.getValue());
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
default: default:
throw new UnsupportedOperationException(op + " is not supported"); throw new UnsupportedOperationException(op + " is not supported");
} }
@ -764,6 +778,10 @@ public class NamenodeWebHdfsMethods {
final String js = JsonUtil.toJsonString("boolean", b); final String js = JsonUtil.toJsonString("boolean", b);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build(); return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
} }
case UNSETSTORAGEPOLICY: {
np.unsetStoragePolicy(fullpath);
return Response.ok().build();
}
default: default:
throw new UnsupportedOperationException(op + " is not supported"); throw new UnsupportedOperationException(op + " is not supported");
} }
@ -988,6 +1006,16 @@ public class NamenodeWebHdfsMethods {
np.checkAccess(fullpath, FsAction.getFsAction(fsAction.getValue())); np.checkAccess(fullpath, FsAction.getFsAction(fsAction.getValue()));
return Response.ok().build(); return Response.ok().build();
} }
case GETALLSTORAGEPOLICY: {
BlockStoragePolicy[] storagePolicies = np.getStoragePolicies();
final String js = JsonUtil.toJsonString(storagePolicies);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case GETSTORAGEPOLICY: {
BlockStoragePolicy storagePolicy = np.getStoragePolicy(fullpath);
final String js = JsonUtil.toJsonString(storagePolicy);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
default: default:
throw new UnsupportedOperationException(op + " is not supported"); throw new UnsupportedOperationException(op + " is not supported");
} }

View File

@ -393,4 +393,31 @@ public class JsonUtil {
return MAPPER.writeValueAsString(obj); return MAPPER.writeValueAsString(obj);
} }
public static String toJsonString(BlockStoragePolicy[] storagePolicies) {
final Map<String, Object> blockStoragePolicies = new TreeMap<>();
Object[] a = null;
if (storagePolicies != null && storagePolicies.length > 0) {
a = new Object[storagePolicies.length];
for (int i = 0; i < storagePolicies.length; i++) {
a[i] = toJsonMap(storagePolicies[i]);
}
}
blockStoragePolicies.put("BlockStoragePolicy", a);
return toJsonString("BlockStoragePolicies", blockStoragePolicies);
}
private static Object toJsonMap(BlockStoragePolicy blockStoragePolicy) {
final Map<String, Object> m = new TreeMap<String, Object>();
m.put("id", blockStoragePolicy.getId());
m.put("name", blockStoragePolicy.getName());
m.put("storageTypes", blockStoragePolicy.getStorageTypes());
m.put("creationFallbacks", blockStoragePolicy.getCreationFallbacks());
m.put("replicationFallbacks", blockStoragePolicy.getReplicationFallbacks());
m.put("copyOnCreateFile", blockStoragePolicy.isCopyOnCreateFile());
return m;
}
public static String toJsonString(BlockStoragePolicy storagePolicy) {
return toJsonString(BlockStoragePolicy.class, toJsonMap(storagePolicy));
}
} }

View File

@ -52,6 +52,10 @@ WebHDFS REST API
* [Set ACL](#Set_ACL) * [Set ACL](#Set_ACL)
* [Get ACL Status](#Get_ACL_Status) * [Get ACL Status](#Get_ACL_Status)
* [Check access](#Check_access) * [Check access](#Check_access)
* [Get all Storage Policies](#Get_all_Storage_Policies)
* [Set Storage Policy](#Set_Storage_Policy)
* [Unset Storage Policy](#Unset_Storage_Policy)
* [Get Storage Policy](#Get_Storage_Policy)
* [Extended Attributes(XAttrs) Operations](#Extended_AttributesXAttrs_Operations) * [Extended Attributes(XAttrs) Operations](#Extended_AttributesXAttrs_Operations)
* [Set XAttr](#Set_XAttr) * [Set XAttr](#Set_XAttr)
* [Remove XAttr](#Remove_XAttr) * [Remove XAttr](#Remove_XAttr)
@ -88,6 +92,9 @@ WebHDFS REST API
* [RemoteException JSON Schema](#RemoteException_JSON_Schema) * [RemoteException JSON Schema](#RemoteException_JSON_Schema)
* [Token JSON Schema](#Token_JSON_Schema) * [Token JSON Schema](#Token_JSON_Schema)
* [Token Properties](#Token_Properties) * [Token Properties](#Token_Properties)
* [BlockStoragePolicy JSON Schema](#BlockStoragePolicy_JSON_Schema)
* [BlockStoragePolicy Properties](#BlockStoragePolicy_Properties)
* [BlockStoragePolicies JSON Schema](#BlockStoragePolicies_JSON_Schema)
* [HTTP Query Parameter Dictionary](#HTTP_Query_Parameter_Dictionary) * [HTTP Query Parameter Dictionary](#HTTP_Query_Parameter_Dictionary)
* [ACL Spec](#ACL_Spec) * [ACL Spec](#ACL_Spec)
* [XAttr Name](#XAttr_Name) * [XAttr Name](#XAttr_Name)
@ -121,6 +128,7 @@ WebHDFS REST API
* [Token Kind](#Token_Kind) * [Token Kind](#Token_Kind)
* [Token Service](#Token_Service) * [Token Service](#Token_Service)
* [Username](#Username) * [Username](#Username)
* [Storage Policy](#Storage_Policy)
Document Conventions Document Conventions
-------------------- --------------------
@ -151,6 +159,8 @@ The HTTP REST API supports the complete [FileSystem](../../api/org/apache/hadoop
* [`GETXATTRS`](#Get_all_XAttrs) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getXAttrs) * [`GETXATTRS`](#Get_all_XAttrs) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getXAttrs)
* [`LISTXATTRS`](#List_all_XAttrs) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).listXAttrs) * [`LISTXATTRS`](#List_all_XAttrs) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).listXAttrs)
* [`CHECKACCESS`](#Check_access) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).access) * [`CHECKACCESS`](#Check_access) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).access)
* [`GETALLSTORAGEPOLICY`](#Get_all_Storage_Policies) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getAllStoragePolicies)
* [`GETSTORAGEPOLICY`](#Get_Storage_Policy) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getStoragePolicy)
* HTTP PUT * HTTP PUT
* [`CREATE`](#Create_and_Write_to_a_File) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).create) * [`CREATE`](#Create_and_Write_to_a_File) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).create)
* [`MKDIRS`](#Make_a_Directory) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).mkdirs) * [`MKDIRS`](#Make_a_Directory) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).mkdirs)
@ -166,10 +176,12 @@ The HTTP REST API supports the complete [FileSystem](../../api/org/apache/hadoop
* [`RENAMESNAPSHOT`](#Rename_Snapshot) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).renameSnapshot) * [`RENAMESNAPSHOT`](#Rename_Snapshot) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).renameSnapshot)
* [`SETXATTR`](#Set_XAttr) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).setXAttr) * [`SETXATTR`](#Set_XAttr) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).setXAttr)
* [`REMOVEXATTR`](#Remove_XAttr) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).removeXAttr) * [`REMOVEXATTR`](#Remove_XAttr) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).removeXAttr)
* [`SETSTORAGEPOLICY`](#Set_Storage_Policy) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).setStoragePolicy)
* HTTP POST * HTTP POST
* [`APPEND`](#Append_to_a_File) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).append) * [`APPEND`](#Append_to_a_File) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).append)
* [`CONCAT`](#Concat_Files) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).concat) * [`CONCAT`](#Concat_Files) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).concat)
* [`TRUNCATE`](#Truncate_a_File) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).truncate) * [`TRUNCATE`](#Truncate_a_File) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).truncate)
* [`UNSETSTORAGEPOLICY`](#Unset_Storage_Policy) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).unsetStoragePolicy)
* HTTP DELETE * HTTP DELETE
* [`DELETE`](#Delete_a_FileDirectory) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).delete) * [`DELETE`](#Delete_a_FileDirectory) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).delete)
* [`DELETESNAPSHOT`](#Delete_Snapshot) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).deleteSnapshot) * [`DELETESNAPSHOT`](#Delete_Snapshot) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).deleteSnapshot)
@ -830,6 +842,129 @@ See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getAclSta
See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).access See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).access
Storage Policy Operations
-------------------------
### Get all Storage Policies
* Submit a HTTP GET request.
curl -i "http://<HOST>:<PORT>/webhdfs/v1?op=GETALLSTORAGEPOLICY"
The client receives a response with a [`BlockStoragePolicies` JSON object](#BlockStoragePolicies_JSON_Schema):
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
{
"BlockStoragePolicies": {
"BlockStoragePolicy": [
{
"copyOnCreateFile": false,
"creationFallbacks": [],
"id": 2,
"name": "COLD",
"replicationFallbacks": [],
"storageTypes": ["ARCHIVE"]
},
{
"copyOnCreateFile": false,
"creationFallbacks": ["DISK","ARCHIVE"],
"id": 5,
"name": "WARM",
"replicationFallbacks": ["DISK","ARCHIVE"],
"storageTypes": ["DISK","ARCHIVE"]
},
{
"copyOnCreateFile": false,
"creationFallbacks": [],
"id": 7,
"name": "HOT",
"replicationFallbacks": ["ARCHIVE"],
"storageTypes": ["DISK"]
},
{
"copyOnCreateFile": false,
"creationFallbacks": ["SSD","DISK"],
"id": 10,"name": "ONE_SSD",
"replicationFallbacks": ["SSD","DISK"],
"storageTypes": ["SSD","DISK"]
},
{
"copyOnCreateFile": false,
"creationFallbacks": ["DISK"],
"id": 12,
"name": "ALL_SSD",
"replicationFallbacks": ["DISK"],
"storageTypes": ["SSD"]
},
{
"copyOnCreateFile": true,
"creationFallbacks": ["DISK"],
"id": 15,
"name": "LAZY_PERSIST",
"replicationFallbacks": ["DISK"],
"storageTypes": ["RAM_DISK","DISK"]
}
]
+ }
}
See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getAllStoragePolicies
### Set Storage Policy
* Submit a HTTP PUT request.
curl -i -X PUT "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETSTORAGEPOLICY
&storagepolicy=<policy>"
The client receives a response with zero content length:
HTTP/1.1 200 OK
Content-Length: 0
See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).setStoragePolicy
### Unset Storage Policy
* Submit a HTTP POT request.
curl -i -X POST "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=UNSETSTORAGEPOLICY"
The client receives a response with zero content length:
HTTP/1.1 200 OK
Content-Length: 0
See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).unsetStoragePolicy
### Get Storage Policy
* Submit a HTTP GET request.
curl -i "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETSTORAGEPOLICY"
The client receives a response with a [`BlockStoragePolicy` JSON object](#BlockStoragePolicy_JSON_Schema):
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
{
"BlockStoragePolicy": {
"copyOnCreateFile": false,
"creationFallbacks": [],
"id":7,
"name":"HOT",
"replicationFallbacks":["ARCHIVE"],
"storageTypes":["DISK"]
}
}
See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getStoragePolicy
Extended Attributes(XAttrs) Operations Extended Attributes(XAttrs) Operations
-------------------------------------- --------------------------------------
@ -1651,6 +1786,108 @@ var tokenProperties =
See also: [`Token` Properties](#Token_Properties), the note in [Delegation](#Delegation). See also: [`Token` Properties](#Token_Properties), the note in [Delegation](#Delegation).
### BlockStoragePolicy JSON Schema
```json
{
"name" : "BlockStoragePolicy",
"properties":
{
"BlockStoragePolicy": blockStoragePolicyProperties //See BlockStoragePolicy Properties
}
}
```
See also: [`BlockStoragePolicy` Properties](#BlockStoragePolicy_Properties), [`GETSTORAGEPOLICY`](#Get_Storage_Policy)
#### BlockStoragePolicy Properties
JavaScript syntax is used to define `blockStoragePolicyProperties` so that it can be referred in both `BlockStoragePolicy` and `BlockStoragePolicies` JSON schemas.
```javascript
var blockStoragePolicyProperties =
{
"type" : "object",
"properties":
{
"id":
{
"description": "Policy ID.",
"type" : "integer",
"required" : true
},
"name":
{
"description": "Policy name.",
"type" : "string",
"required" : true
},
"storageTypes":
{
"description": "An array of storage types for block placement.",
"type" : "array",
"required" : true
"items" :
{
"type": "string"
}
},
"replicationFallbacks":
{
"description": "An array of fallback storage types for replication.",
"type" : "array",
"required" : true
"items" :
{
"type": "string"
}
},
"creationFallbacks":
{
"description": "An array of fallback storage types for file creation.",
"type" : "array",
"required" : true
"items" :
{
"type": "string"
}
},
"copyOnCreateFile":
{
"description": "If set then the policy cannot be changed after file creation.",
"type" : "boolean",
"required" : true
}
}
};
```
### BlockStoragePolicies JSON Schema
A `BlockStoragePolicies` JSON object represents an array of `BlockStoragePolicy` JSON objects.
```json
{
"name" : "BlockStoragePolicies",
"properties":
{
"BlockStoragePolicies":
{
"type" : "object",
"properties":
{
"BlockStoragePolicy":
{
"description": "An array of BlockStoragePolicy",
"type" : "array",
"items" : blockStoragePolicyProperties //See BlockStoragePolicy Properties
}
}
}
}
}
```
HTTP Query Parameter Dictionary HTTP Query Parameter Dictionary
------------------------------- -------------------------------
@ -2048,3 +2285,15 @@ See also: [`GETDELEGATIONTOKEN`](#Get_Delegation_Token)
| Syntax | Any string. | | Syntax | Any string. |
See also: [Authentication](#Authentication) See also: [Authentication](#Authentication)
### Storage Policy
| Name | `storagepolicy` |
|:---- |:---- |
| Description | The name of the storage policy. |
| Type | String |
| Default Value | \<empty\> |
| Valid Values | Any valid storage policy name; see [GETALLSTORAGEPOLICY](#Get_all_Storage_Policies). |
| Syntax | Any string. |
See also: [`SETSTORAGEPOLICY`](#Set_Storage_Policy)

View File

@ -36,6 +36,7 @@ import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.URL; import java.net.URL;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Random; import java.util.Random;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
@ -43,6 +44,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStoragePolicySpi;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
@ -61,6 +63,8 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.TestDFSClientRetries; import org.apache.hadoop.hdfs.TestDFSClientRetries;
import org.apache.hadoop.hdfs.TestFileCreation; import org.apache.hadoop.hdfs.TestFileCreation;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
@ -962,4 +966,66 @@ public class TestWebHDFS {
webIn.close(); webIn.close();
in.close(); in.close();
} }
@Test
public void testStoragePolicy() throws Exception {
MiniDFSCluster cluster = null;
final Configuration conf = WebHdfsTestUtil.createConf();
final Path path = new Path("/file");
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
final DistributedFileSystem dfs = cluster.getFileSystem();
final WebHdfsFileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(
conf, WebHdfsConstants.WEBHDFS_SCHEME);
// test getAllStoragePolicies
BlockStoragePolicy[] dfsPolicies = (BlockStoragePolicy[]) dfs
.getAllStoragePolicies().toArray();
BlockStoragePolicy[] webHdfsPolicies = (BlockStoragePolicy[]) webHdfs
.getAllStoragePolicies().toArray();
Assert.assertTrue(Arrays.equals(dfsPolicies, webHdfsPolicies));
// test get/set/unset policies
DFSTestUtil.createFile(dfs, path, 0, (short) 1, 0L);
// get defaultPolicy
BlockStoragePolicySpi defaultdfsPolicy = dfs.getStoragePolicy(path);
// set policy through webhdfs
webHdfs.setStoragePolicy(path, HdfsConstants.COLD_STORAGE_POLICY_NAME);
// get policy from dfs
BlockStoragePolicySpi dfsPolicy = dfs.getStoragePolicy(path);
// get policy from webhdfs
BlockStoragePolicySpi webHdfsPolicy = webHdfs.getStoragePolicy(path);
Assert.assertEquals(HdfsConstants.COLD_STORAGE_POLICY_NAME.toString(),
webHdfsPolicy.getName());
Assert.assertEquals(webHdfsPolicy, dfsPolicy);
// unset policy
webHdfs.unsetStoragePolicy(path);
Assert.assertEquals(defaultdfsPolicy, webHdfs.getStoragePolicy(path));
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test
public void testSetStoragePolicyWhenPolicyDisabled() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, false);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.build();
try {
cluster.waitActive();
final WebHdfsFileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(
conf, WebHdfsConstants.WEBHDFS_SCHEME);
webHdfs.setStoragePolicy(new Path("/"),
HdfsConstants.COLD_STORAGE_POLICY_NAME);
fail("Should throw exception, when storage policy disabled");
} catch (IOException e) {
Assert.assertTrue(e.getMessage().contains(
"Failed to set storage policy since"));
} finally {
cluster.shutdown();
}
}
} }

View File

@ -453,4 +453,12 @@ public class TestParam {
LOG.info("EXPECTED: " + e); LOG.info("EXPECTED: " + e);
} }
} }
@Test
public void testStoragePolicyParam() {
StoragePolicyParam p = new StoragePolicyParam(StoragePolicyParam.DEFAULT);
Assert.assertEquals(null, p.getValue());
p = new StoragePolicyParam("COLD");
Assert.assertEquals("COLD", p.getValue());
}
} }