HDFS-8630. WebHDFS : Support get/set/unset StoragePolicy. Contributed by Surendra Singh Lilhore.

This commit is contained in:
Andrew Wang 2016-12-07 15:52:16 -08:00
parent 72fe546841
commit ea2895f4ed
16 changed files with 871 additions and 8 deletions

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
@ -56,6 +57,8 @@ import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -588,4 +591,50 @@ class JsonUtilClient {
lastLocatedBlock, isLastBlockComplete, null, null);
}
public static Collection<BlockStoragePolicy> getStoragePolicies(
Map<?, ?> json) {
Map<?, ?> policiesJson = (Map<?, ?>) json.get("BlockStoragePolicies");
if (policiesJson != null) {
List<?> objs = (List<?>) policiesJson.get(BlockStoragePolicy.class
.getSimpleName());
if (objs != null) {
BlockStoragePolicy[] storagePolicies = new BlockStoragePolicy[objs
.size()];
for (int i = 0; i < objs.size(); i++) {
final Map<?, ?> m = (Map<?, ?>) objs.get(i);
BlockStoragePolicy blockStoragePolicy = toBlockStoragePolicy(m);
storagePolicies[i] = blockStoragePolicy;
}
return Arrays.asList(storagePolicies);
}
}
return new ArrayList<BlockStoragePolicy>(0);
}
public static BlockStoragePolicy toBlockStoragePolicy(Map<?, ?> m) {
byte id = ((Number) m.get("id")).byteValue();
String name = (String) m.get("name");
StorageType[] storageTypes = toStorageTypes((List<?>) m
.get("storageTypes"));
StorageType[] creationFallbacks = toStorageTypes((List<?>) m
.get("creationFallbacks"));
StorageType[] replicationFallbacks = toStorageTypes((List<?>) m
.get("replicationFallbacks"));
Boolean copyOnCreateFile = (Boolean) m.get("copyOnCreateFile");
return new BlockStoragePolicy(id, name, storageTypes, creationFallbacks,
replicationFallbacks, copyOnCreateFile.booleanValue());
}
private static StorageType[] toStorageTypes(List<?> list) {
if (list == null) {
return null;
} else {
StorageType[] storageTypes = new StorageType[list.size()];
for (int i = 0; i < list.size(); i++) {
storageTypes[i] = StorageType.parseStorageType((String) list.get(i));
}
return storageTypes;
}
}
}

View File

@ -39,6 +39,7 @@ import java.net.URI;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
@ -82,6 +83,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HAUtilClient;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@ -1715,6 +1717,50 @@ public class WebHdfsFileSystem extends FileSystem
: tokenServiceName.toString();
}
@Override
public void setStoragePolicy(Path p, String policyName) throws IOException {
if (policyName == null) {
throw new IOException("policyName == null");
}
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.SET_STORAGE_POLICY);
final HttpOpParam.Op op = PutOpParam.Op.SETSTORAGEPOLICY;
new FsPathRunner(op, p, new StoragePolicyParam(policyName)).run();
}
@Override
public Collection<BlockStoragePolicy> getAllStoragePolicies()
throws IOException {
final HttpOpParam.Op op = GetOpParam.Op.GETALLSTORAGEPOLICY;
return new FsPathResponseRunner<Collection<BlockStoragePolicy>>(op, null) {
@Override
Collection<BlockStoragePolicy> decodeResponse(Map<?, ?> json)
throws IOException {
return JsonUtilClient.getStoragePolicies(json);
}
}.run();
}
@Override
public BlockStoragePolicy getStoragePolicy(Path src) throws IOException {
final HttpOpParam.Op op = GetOpParam.Op.GETSTORAGEPOLICY;
return new FsPathResponseRunner<BlockStoragePolicy>(op, src) {
@Override
BlockStoragePolicy decodeResponse(Map<?, ?> json) throws IOException {
return JsonUtilClient.toBlockStoragePolicy((Map<?, ?>) json
.get(BlockStoragePolicy.class.getSimpleName()));
}
}.run();
}
@Override
public void unsetStoragePolicy(Path src) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.UNSET_STORAGE_POLICY);
final HttpOpParam.Op op = PostOpParam.Op.UNSETSTORAGEPOLICY;
new FsPathRunner(op, src).run();
}
@VisibleForTesting
InetSocketAddress[] getResolvedNNAddr() {
return nnAddrs;

View File

@ -40,6 +40,9 @@ public class GetOpParam extends HttpOpParam<GetOpParam.Op> {
GETTRASHROOT(false, HttpURLConnection.HTTP_OK),
LISTXATTRS(false, HttpURLConnection.HTTP_OK),
GETALLSTORAGEPOLICY(false, HttpURLConnection.HTTP_OK),
GETSTORAGEPOLICY(false, HttpURLConnection.HTTP_OK),
NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED),
CHECKACCESS(false, HttpURLConnection.HTTP_OK),

View File

@ -29,6 +29,8 @@ public class PostOpParam extends HttpOpParam<PostOpParam.Op> {
TRUNCATE(false, HttpURLConnection.HTTP_OK),
UNSETSTORAGEPOLICY(false, HttpURLConnection.HTTP_OK),
NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED);
final boolean doOutputAndRedirect;

View File

@ -50,6 +50,7 @@ public class PutOpParam extends HttpOpParam<PutOpParam.Op> {
DISALLOWSNAPSHOT(false, HttpURLConnection.HTTP_OK),
CREATESNAPSHOT(false, HttpURLConnection.HTTP_OK),
RENAMESNAPSHOT(false, HttpURLConnection.HTTP_OK),
SETSTORAGEPOLICY(false, HttpURLConnection.HTTP_OK),
NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED);

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.web.resources;
/** policy parameter. */
public class StoragePolicyParam extends StringParam {
/** Parameter name. */
public static final String NAME = "storagepolicy";
/** Default parameter value. */
public static final String DEFAULT = "";
private static final Domain DOMAIN = new Domain(NAME, null);
/**
* Constructor.
*
* @param str
* a string representation of the parameter value.
*/
public StoragePolicyParam(final String str) {
super(DOMAIN, str == null || str.equals(DEFAULT) ? null : str);
}
@Override
public String getName() {
return NAME;
}
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.fs.http.client;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
@ -34,12 +36,14 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.XAttrCodec;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
import org.apache.hadoop.lib.wsrs.EnumSetParam;
import org.apache.hadoop.security.UserGroupInformation;
@ -114,6 +118,7 @@ public class HttpFSFileSystem extends FileSystem
public static final String XATTR_ENCODING_PARAM = "encoding";
public static final String NEW_LENGTH_PARAM = "newlength";
public static final String START_AFTER_PARAM = "startAfter";
public static final String POLICY_NAME_PARAM = "storagepolicy";
public static final Short DEFAULT_PERMISSION = 0755;
public static final String ACLSPEC_DEFAULT = "";
@ -193,6 +198,9 @@ public class HttpFSFileSystem extends FileSystem
public static final String PARTIAL_LISTING_JSON = "partialListing";
public static final String REMAINING_ENTRIES_JSON = "remainingEntries";
public static final String STORAGE_POLICIES_JSON = "BlockStoragePolicies";
public static final String STORAGE_POLICY_JSON = "BlockStoragePolicy";
public static final int HTTP_TEMPORARY_REDIRECT = 307;
private static final String HTTP_GET = "GET";
@ -212,7 +220,9 @@ public class HttpFSFileSystem extends FileSystem
MODIFYACLENTRIES(HTTP_PUT), REMOVEACLENTRIES(HTTP_PUT),
REMOVEDEFAULTACL(HTTP_PUT), REMOVEACL(HTTP_PUT), SETACL(HTTP_PUT),
DELETE(HTTP_DELETE), SETXATTR(HTTP_PUT), GETXATTRS(HTTP_GET),
REMOVEXATTR(HTTP_PUT), LISTXATTRS(HTTP_GET), LISTSTATUS_BATCH(HTTP_GET);
REMOVEXATTR(HTTP_PUT), LISTXATTRS(HTTP_GET), LISTSTATUS_BATCH(HTTP_GET),
GETALLSTORAGEPOLICY(HTTP_GET), GETSTORAGEPOLICY(HTTP_GET),
SETSTORAGEPOLICY(HTTP_PUT), UNSETSTORAGEPOLICY(HTTP_POST);
private String httpMethod;
@ -1310,4 +1320,84 @@ public class HttpFSFileSystem extends FileSystem
params, f, true);
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
}
@Override
public Collection<BlockStoragePolicy> getAllStoragePolicies()
throws IOException {
Map<String, String> params = new HashMap<String, String>();
params.put(OP_PARAM, Operation.GETALLSTORAGEPOLICY.toString());
HttpURLConnection conn = getConnection(
Operation.GETALLSTORAGEPOLICY.getMethod(), params, new Path(getUri()
.toString(), "/"), false);
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
return createStoragePolicies((JSONObject) json.get(STORAGE_POLICIES_JSON));
}
private Collection<BlockStoragePolicy> createStoragePolicies(JSONObject map)
throws IOException {
JSONArray jsonArray = (JSONArray) map.get(STORAGE_POLICY_JSON);
BlockStoragePolicy[] policies = new BlockStoragePolicy[jsonArray.size()];
for (int i = 0; i < jsonArray.size(); i++) {
policies[i] = createStoragePolicy((JSONObject) jsonArray.get(i));
}
return Arrays.asList(policies);
}
@Override
public BlockStoragePolicy getStoragePolicy(Path src) throws IOException {
Map<String, String> params = new HashMap<String, String>();
params.put(OP_PARAM, Operation.GETSTORAGEPOLICY.toString());
HttpURLConnection conn = getConnection(
Operation.GETSTORAGEPOLICY.getMethod(), params, src, true);
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
return createStoragePolicy((JSONObject) json.get(STORAGE_POLICY_JSON));
}
private BlockStoragePolicy createStoragePolicy(JSONObject policyJson)
throws IOException {
byte id = ((Number) policyJson.get("id")).byteValue();
String name = (String) policyJson.get("name");
StorageType[] storageTypes = toStorageTypes((JSONArray) policyJson
.get("storageTypes"));
StorageType[] creationFallbacks = toStorageTypes((JSONArray) policyJson
.get("creationFallbacks"));
StorageType[] replicationFallbacks = toStorageTypes((JSONArray) policyJson
.get("replicationFallbacks"));
Boolean copyOnCreateFile = (Boolean) policyJson.get("copyOnCreateFile");
return new BlockStoragePolicy(id, name, storageTypes, creationFallbacks,
replicationFallbacks, copyOnCreateFile.booleanValue());
}
private StorageType[] toStorageTypes(JSONArray array) throws IOException {
if (array == null) {
return null;
} else {
List<StorageType> storageTypes = new ArrayList<StorageType>(array.size());
for (Object name : array) {
storageTypes.add(StorageType.parseStorageType((String) name));
}
return storageTypes.toArray(new StorageType[storageTypes.size()]);
}
}
@Override
public void setStoragePolicy(Path src, String policyName) throws IOException {
Map<String, String> params = new HashMap<String, String>();
params.put(OP_PARAM, Operation.SETSTORAGEPOLICY.toString());
params.put(POLICY_NAME_PARAM, policyName);
HttpURLConnection conn = getConnection(
Operation.SETSTORAGEPOLICY.getMethod(), params, src, true);
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
}
@Override
public void unsetStoragePolicy(Path src) throws IOException {
Map<String, String> params = new HashMap<String, String>();
params.put(OP_PARAM, Operation.UNSETSTORAGEPOLICY.toString());
HttpURLConnection conn = getConnection(
Operation.UNSETSTORAGEPOLICY.getMethod(), params, src, true);
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.fs.http.server;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.BlockStoragePolicySpi;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
@ -26,12 +27,14 @@ import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.GlobFilter;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.XAttrCodec;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.http.client.HttpFSFileSystem;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.lib.service.FileSystemAccess;
import org.apache.hadoop.util.StringUtils;
@ -42,6 +45,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.List;
@ -260,6 +264,46 @@ public class FSOperations {
return json;
}
@SuppressWarnings({ "unchecked" })
private static JSONObject storagePolicyToJSON(BlockStoragePolicySpi policy) {
BlockStoragePolicy p = (BlockStoragePolicy) policy;
JSONObject policyJson = new JSONObject();
policyJson.put("id", p.getId());
policyJson.put("name", p.getName());
policyJson.put("storageTypes", toJsonArray(p.getStorageTypes()));
policyJson.put("creationFallbacks", toJsonArray(p.getCreationFallbacks()));
policyJson.put("replicationFallbacks",
toJsonArray(p.getReplicationFallbacks()));
policyJson.put("copyOnCreateFile", p.isCopyOnCreateFile());
return policyJson;
}
@SuppressWarnings("unchecked")
private static JSONArray toJsonArray(StorageType[] storageTypes) {
JSONArray jsonArray = new JSONArray();
for (StorageType type : storageTypes) {
jsonArray.add(type.toString());
}
return jsonArray;
}
@SuppressWarnings("unchecked")
private static JSONObject storagePoliciesToJSON(
Collection<? extends BlockStoragePolicySpi> storagePolicies) {
JSONObject json = new JSONObject();
JSONArray jsonArray = new JSONArray();
JSONObject policies = new JSONObject();
if (storagePolicies != null) {
for (BlockStoragePolicySpi policy : storagePolicies) {
JSONObject policyMap = storagePolicyToJSON(policy);
jsonArray.add(policyMap);
}
}
policies.put(HttpFSFileSystem.STORAGE_POLICY_JSON, jsonArray);
json.put(HttpFSFileSystem.STORAGE_POLICIES_JSON, policies);
return json;
}
/**
* Executor that performs an append FileSystemAccess files system operation.
*/
@ -1319,4 +1363,90 @@ public class FSOperations {
return xAttrsToJSON(xattrs, encoding);
}
}
/**
* Executor that performs a getAllStoragePolicies FileSystemAccess files
* system operation.
*/
@SuppressWarnings({ "unchecked" })
@InterfaceAudience.Private
public static class FSGetAllStoragePolicies implements
FileSystemAccess.FileSystemExecutor<JSONObject> {
@Override
public JSONObject execute(FileSystem fs) throws IOException {
Collection<? extends BlockStoragePolicySpi> storagePolicies = fs
.getAllStoragePolicies();
return storagePoliciesToJSON(storagePolicies);
}
}
/**
* Executor that performs a getStoragePolicy FileSystemAccess files system
* operation.
*/
@SuppressWarnings({ "unchecked" })
@InterfaceAudience.Private
public static class FSGetStoragePolicy implements
FileSystemAccess.FileSystemExecutor<JSONObject> {
private Path path;
public FSGetStoragePolicy(String path) {
this.path = new Path(path);
}
@Override
public JSONObject execute(FileSystem fs) throws IOException {
BlockStoragePolicySpi storagePolicy = fs.getStoragePolicy(path);
JSONObject json = new JSONObject();
json.put(HttpFSFileSystem.STORAGE_POLICY_JSON,
storagePolicyToJSON(storagePolicy));
return json;
}
}
/**
* Executor that performs a setStoragePolicy FileSystemAccess files system
* operation.
*/
@InterfaceAudience.Private
public static class FSSetStoragePolicy implements
FileSystemAccess.FileSystemExecutor<Void> {
private Path path;
private String policyName;
public FSSetStoragePolicy(String path, String policyName) {
this.path = new Path(path);
this.policyName = policyName;
}
@Override
public Void execute(FileSystem fs) throws IOException {
fs.setStoragePolicy(path, policyName);
return null;
}
}
/**
* Executor that performs a unsetStoragePolicy FileSystemAccess files system
* operation.
*/
@InterfaceAudience.Private
public static class FSUnsetStoragePolicy implements
FileSystemAccess.FileSystemExecutor<Void> {
private Path path;
public FSUnsetStoragePolicy(String path) {
this.path = new Path(path);
}
@Override
public Void execute(FileSystem fs) throws IOException {
fs.unsetStoragePolicy(path);
return null;
}
}
}

View File

@ -94,6 +94,11 @@ public class HttpFSParametersProvider extends ParametersProvider {
PARAMS_DEF.put(Operation.LISTXATTRS, new Class[]{});
PARAMS_DEF.put(Operation.LISTSTATUS_BATCH,
new Class[]{StartAfterParam.class});
PARAMS_DEF.put(Operation.GETALLSTORAGEPOLICY, new Class[] {});
PARAMS_DEF.put(Operation.GETSTORAGEPOLICY, new Class[] {});
PARAMS_DEF.put(Operation.SETSTORAGEPOLICY,
new Class[] {PolicyNameParam.class});
PARAMS_DEF.put(Operation.UNSETSTORAGEPOLICY, new Class[] {});
}
public HttpFSParametersProvider() {
@ -541,4 +546,22 @@ public class HttpFSParametersProvider extends ParametersProvider {
super(NAME, null);
}
}
/**
* Class for policyName parameter.
*/
@InterfaceAudience.Private
public static class PolicyNameParam extends StringParam {
/**
* Parameter name.
*/
public static final String NAME = HttpFSFileSystem.POLICY_NAME_PARAM;
/**
* Constructor.
*/
public PolicyNameParam() {
super(NAME, null);
}
}
}

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.OperationParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.OverwriteParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.OwnerParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.PermissionParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.PolicyNameParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.RecursiveParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.ReplicationParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.SourcesParam;
@ -346,6 +347,22 @@ public class HttpFSServer {
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break;
}
case GETALLSTORAGEPOLICY: {
FSOperations.FSGetAllStoragePolicies command =
new FSOperations.FSGetAllStoragePolicies();
JSONObject json = fsExecute(user, command);
AUDIT_LOG.info("[{}]", path);
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break;
}
case GETSTORAGEPOLICY: {
FSOperations.FSGetStoragePolicy command =
new FSOperations.FSGetStoragePolicy(path);
JSONObject json = fsExecute(user, command);
AUDIT_LOG.info("[{}]", path);
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break;
}
default: {
throw new IOException(
MessageFormat.format("Invalid HTTP GET operation [{0}]", op.value()));
@ -473,6 +490,14 @@ public class HttpFSServer {
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break;
}
case UNSETSTORAGEPOLICY: {
FSOperations.FSUnsetStoragePolicy command =
new FSOperations.FSUnsetStoragePolicy(path);
fsExecute(user, command);
AUDIT_LOG.info("Unset storage policy [{}]", path);
response = Response.ok().build();
break;
}
default: {
throw new IOException(
MessageFormat.format("Invalid HTTP POST operation [{0}]",
@ -690,6 +715,16 @@ public class HttpFSServer {
response = Response.ok().build();
break;
}
case SETSTORAGEPOLICY: {
String policyName = params.get(PolicyNameParam.NAME,
PolicyNameParam.class);
FSOperations.FSSetStoragePolicy command =
new FSOperations.FSSetStoragePolicy(path, policyName);
fsExecute(user, command);
AUDIT_LOG.info("[{}] to policy [{}]", path, policyName);
response = Response.ok().build();
break;
}
default: {
throw new IOException(
MessageFormat.format("Invalid HTTP PUT operation [{0}]",

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.fs.http.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockStoragePolicySpi;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileChecksum;
@ -35,6 +36,8 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.HFSTestCase;
import org.apache.hadoop.test.HadoopUsersConfTestHelper;
@ -941,12 +944,56 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase {
assertFalse(httpStatus.isEncrypted());
}
private void testStoragePolicy() throws Exception {
Assume.assumeFalse("Assume its not a local FS", isLocalFS());
FileSystem fs = FileSystem.get(getProxiedFSConf());
fs.mkdirs(getProxiedFSTestDir());
Path path = new Path(getProxiedFSTestDir(), "policy.txt");
FileSystem httpfs = getHttpFSFileSystem();
// test getAllStoragePolicies
BlockStoragePolicy[] dfsPolicies = (BlockStoragePolicy[]) fs
.getAllStoragePolicies().toArray();
BlockStoragePolicy[] httpPolicies = (BlockStoragePolicy[]) httpfs
.getAllStoragePolicies().toArray();
Assert.assertArrayEquals(
"Policy array returned from the DFS and HttpFS should be equals",
dfsPolicies, httpPolicies);
// test get/set/unset policies
DFSTestUtil.createFile(fs, path, 0, (short) 1, 0L);
// get defaultPolicy
BlockStoragePolicySpi defaultdfsPolicy = fs.getStoragePolicy(path);
// set policy through webhdfs
httpfs.setStoragePolicy(path, HdfsConstants.COLD_STORAGE_POLICY_NAME);
// get policy from dfs
BlockStoragePolicySpi dfsPolicy = fs.getStoragePolicy(path);
// get policy from webhdfs
BlockStoragePolicySpi httpFsPolicy = httpfs.getStoragePolicy(path);
Assert
.assertEquals(
"Storage policy returned from the get API should"
+ " be same as set policy",
HdfsConstants.COLD_STORAGE_POLICY_NAME.toString(),
httpFsPolicy.getName());
Assert.assertEquals(
"Storage policy returned from the DFS and HttpFS should be equals",
httpFsPolicy, dfsPolicy);
// unset policy
httpfs.unsetStoragePolicy(path);
Assert
.assertEquals(
"After unset storage policy, the get API shoudld"
+ " return the default policy",
defaultdfsPolicy, httpfs.getStoragePolicy(path));
fs.close();
}
protected enum Operation {
GET, OPEN, CREATE, APPEND, TRUNCATE, CONCAT, RENAME, DELETE, LIST_STATUS,
WORKING_DIRECTORY, MKDIRS, SET_TIMES, SET_PERMISSION, SET_OWNER,
SET_REPLICATION, CHECKSUM, CONTENT_SUMMARY, FILEACLS, DIRACLS, SET_XATTR,
GET_XATTRS, REMOVE_XATTR, LIST_XATTRS, ENCRYPTION, LIST_STATUS_BATCH,
GETTRASHROOT
GETTRASHROOT, STORAGEPOLICY
}
private void operation(Operation op) throws Exception {
@ -1029,6 +1076,9 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase {
case GETTRASHROOT:
testTrashRoot();
break;
case STORAGEPOLICY:
testStoragePolicy();
break;
}
}

View File

@ -66,6 +66,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@ -414,14 +415,16 @@ public class NamenodeWebHdfsMethods {
@QueryParam(CreateFlagParam.NAME) @DefaultValue(CreateFlagParam.DEFAULT)
final CreateFlagParam createFlagParam,
@QueryParam(NoRedirectParam.NAME) @DefaultValue(NoRedirectParam.DEFAULT)
final NoRedirectParam noredirect
final NoRedirectParam noredirect,
@QueryParam(StoragePolicyParam.NAME) @DefaultValue(StoragePolicyParam
.DEFAULT) final StoragePolicyParam policyName
) throws IOException, InterruptedException {
return put(ugi, delegation, username, doAsUser, ROOT, op, destination,
owner, group, permission, unmaskedPermission, overwrite, bufferSize,
replication, blockSize, modificationTime, accessTime, renameOptions,
createParent, delegationTokenArgument, aclPermission, xattrName,
xattrValue, xattrSetFlag, snapshotName, oldSnapshotName,
excludeDatanodes, createFlagParam, noredirect);
excludeDatanodes, createFlagParam, noredirect, policyName);
}
/** Validate all required params. */
@ -499,7 +502,9 @@ public class NamenodeWebHdfsMethods {
@QueryParam(CreateFlagParam.NAME) @DefaultValue(CreateFlagParam.DEFAULT)
final CreateFlagParam createFlagParam,
@QueryParam(NoRedirectParam.NAME) @DefaultValue(NoRedirectParam.DEFAULT)
final NoRedirectParam noredirect
final NoRedirectParam noredirect,
@QueryParam(StoragePolicyParam.NAME) @DefaultValue(StoragePolicyParam
.DEFAULT) final StoragePolicyParam policyName
) throws IOException, InterruptedException {
init(ugi, delegation, username, doAsUser, path, op, destination, owner,
@ -507,7 +512,7 @@ public class NamenodeWebHdfsMethods {
replication, blockSize, modificationTime, accessTime, renameOptions,
delegationTokenArgument, aclPermission, xattrName, xattrValue,
xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes,
createFlagParam, noredirect);
createFlagParam, noredirect, policyName);
return doAs(ugi, new PrivilegedExceptionAction<Response>() {
@Override
@ -519,7 +524,7 @@ public class NamenodeWebHdfsMethods {
renameOptions, createParent, delegationTokenArgument,
aclPermission, xattrName, xattrValue, xattrSetFlag,
snapshotName, oldSnapshotName, excludeDatanodes,
createFlagParam, noredirect);
createFlagParam, noredirect, policyName);
}
});
}
@ -553,7 +558,8 @@ public class NamenodeWebHdfsMethods {
final OldSnapshotNameParam oldSnapshotName,
final ExcludeDatanodesParam exclDatanodes,
final CreateFlagParam createFlagParam,
final NoRedirectParam noredirectParam
final NoRedirectParam noredirectParam,
final StoragePolicyParam policyName
) throws IOException, URISyntaxException {
final Configuration conf = (Configuration)context.getAttribute(JspHelper.CURRENT_CONF);
@ -706,6 +712,13 @@ public class NamenodeWebHdfsMethods {
np.disallowSnapshot(fullpath);
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case SETSTORAGEPOLICY: {
if (policyName.getValue() == null) {
throw new IllegalArgumentException("Storage policy name is empty.");
}
np.setStoragePolicy(fullpath, policyName.getValue());
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
default:
throw new UnsupportedOperationException(op + " is not supported");
}
@ -829,6 +842,10 @@ public class NamenodeWebHdfsMethods {
final String js = JsonUtil.toJsonString("boolean", b);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case UNSETSTORAGEPOLICY: {
np.unsetStoragePolicy(fullpath);
return Response.ok().build();
}
default:
throw new UnsupportedOperationException(op + " is not supported");
}
@ -1094,6 +1111,16 @@ public class NamenodeWebHdfsMethods {
final String js = JsonUtil.toJsonString(listing);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case GETALLSTORAGEPOLICY: {
BlockStoragePolicy[] storagePolicies = np.getStoragePolicies();
final String js = JsonUtil.toJsonString(storagePolicies);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case GETSTORAGEPOLICY: {
BlockStoragePolicy storagePolicy = np.getStoragePolicy(fullpath);
final String js = JsonUtil.toJsonString(storagePolicy);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
default:
throw new UnsupportedOperationException(op + " is not supported");
}

View File

@ -436,4 +436,31 @@ public class JsonUtil {
return MAPPER.writeValueAsString(obj);
}
public static String toJsonString(BlockStoragePolicy[] storagePolicies) {
final Map<String, Object> blockStoragePolicies = new TreeMap<>();
Object[] a = null;
if (storagePolicies != null && storagePolicies.length > 0) {
a = new Object[storagePolicies.length];
for (int i = 0; i < storagePolicies.length; i++) {
a[i] = toJsonMap(storagePolicies[i]);
}
}
blockStoragePolicies.put("BlockStoragePolicy", a);
return toJsonString("BlockStoragePolicies", blockStoragePolicies);
}
private static Object toJsonMap(BlockStoragePolicy blockStoragePolicy) {
final Map<String, Object> m = new TreeMap<String, Object>();
m.put("id", blockStoragePolicy.getId());
m.put("name", blockStoragePolicy.getName());
m.put("storageTypes", blockStoragePolicy.getStorageTypes());
m.put("creationFallbacks", blockStoragePolicy.getCreationFallbacks());
m.put("replicationFallbacks", blockStoragePolicy.getReplicationFallbacks());
m.put("copyOnCreateFile", blockStoragePolicy.isCopyOnCreateFile());
return m;
}
public static String toJsonString(BlockStoragePolicy storagePolicy) {
return toJsonString(BlockStoragePolicy.class, toJsonMap(storagePolicy));
}
}

View File

@ -54,6 +54,10 @@ WebHDFS REST API
* [Set ACL](#Set_ACL)
* [Get ACL Status](#Get_ACL_Status)
* [Check access](#Check_access)
* [Get all Storage Policies](#Get_all_Storage_Policies)
* [Set Storage Policy](#Set_Storage_Policy)
* [Unset Storage Policy](#Unset_Storage_Policy)
* [Get Storage Policy](#Get_Storage_Policy)
* [Extended Attributes(XAttrs) Operations](#Extended_AttributesXAttrs_Operations)
* [Set XAttr](#Set_XAttr)
* [Remove XAttr](#Remove_XAttr)
@ -90,6 +94,9 @@ WebHDFS REST API
* [RemoteException JSON Schema](#RemoteException_JSON_Schema)
* [Token JSON Schema](#Token_JSON_Schema)
* [Token Properties](#Token_Properties)
* [BlockStoragePolicy JSON Schema](#BlockStoragePolicy_JSON_Schema)
* [BlockStoragePolicy Properties](#BlockStoragePolicy_Properties)
* [BlockStoragePolicies JSON Schema](#BlockStoragePolicies_JSON_Schema)
* [HTTP Query Parameter Dictionary](#HTTP_Query_Parameter_Dictionary)
* [ACL Spec](#ACL_Spec)
* [XAttr Name](#XAttr_Name)
@ -124,6 +131,8 @@ WebHDFS REST API
* [Token Service](#Token_Service)
* [Username](#Username)
* [NoRedirect](#NoRedirect)
* [Storage Policy](#Storage_Policy)
* [Start After](#Start_After)
Document Conventions
--------------------
@ -156,6 +165,8 @@ The HTTP REST API supports the complete [FileSystem](../../api/org/apache/hadoop
* [`GETXATTRS`](#Get_all_XAttrs) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getXAttrs)
* [`LISTXATTRS`](#List_all_XAttrs) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).listXAttrs)
* [`CHECKACCESS`](#Check_access) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).access)
* [`GETALLSTORAGEPOLICY`](#Get_all_Storage_Policies) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getAllStoragePolicies)
* [`GETSTORAGEPOLICY`](#Get_Storage_Policy) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getStoragePolicy)
* HTTP PUT
* [`CREATE`](#Create_and_Write_to_a_File) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).create)
* [`MKDIRS`](#Make_a_Directory) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).mkdirs)
@ -171,10 +182,12 @@ The HTTP REST API supports the complete [FileSystem](../../api/org/apache/hadoop
* [`RENAMESNAPSHOT`](#Rename_Snapshot) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).renameSnapshot)
* [`SETXATTR`](#Set_XAttr) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).setXAttr)
* [`REMOVEXATTR`](#Remove_XAttr) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).removeXAttr)
* [`SETSTORAGEPOLICY`](#Set_Storage_Policy) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).setStoragePolicy)
* HTTP POST
* [`APPEND`](#Append_to_a_File) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).append)
* [`CONCAT`](#Concat_Files) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).concat)
* [`TRUNCATE`](#Truncate_a_File) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).truncate)
* [`UNSETSTORAGEPOLICY`](#Unset_Storage_Policy) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).unsetStoragePolicy)
* HTTP DELETE
* [`DELETE`](#Delete_a_FileDirectory) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).delete)
* [`DELETESNAPSHOT`](#Delete_Snapshot) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).deleteSnapshot)
@ -1015,6 +1028,129 @@ See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getAclSta
See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).access
Storage Policy Operations
-------------------------
### Get all Storage Policies
* Submit a HTTP GET request.
curl -i "http://<HOST>:<PORT>/webhdfs/v1?op=GETALLSTORAGEPOLICY"
The client receives a response with a [`BlockStoragePolicies` JSON object](#BlockStoragePolicies_JSON_Schema):
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
{
"BlockStoragePolicies": {
"BlockStoragePolicy": [
{
"copyOnCreateFile": false,
"creationFallbacks": [],
"id": 2,
"name": "COLD",
"replicationFallbacks": [],
"storageTypes": ["ARCHIVE"]
},
{
"copyOnCreateFile": false,
"creationFallbacks": ["DISK","ARCHIVE"],
"id": 5,
"name": "WARM",
"replicationFallbacks": ["DISK","ARCHIVE"],
"storageTypes": ["DISK","ARCHIVE"]
},
{
"copyOnCreateFile": false,
"creationFallbacks": [],
"id": 7,
"name": "HOT",
"replicationFallbacks": ["ARCHIVE"],
"storageTypes": ["DISK"]
},
{
"copyOnCreateFile": false,
"creationFallbacks": ["SSD","DISK"],
"id": 10,"name": "ONE_SSD",
"replicationFallbacks": ["SSD","DISK"],
"storageTypes": ["SSD","DISK"]
},
{
"copyOnCreateFile": false,
"creationFallbacks": ["DISK"],
"id": 12,
"name": "ALL_SSD",
"replicationFallbacks": ["DISK"],
"storageTypes": ["SSD"]
},
{
"copyOnCreateFile": true,
"creationFallbacks": ["DISK"],
"id": 15,
"name": "LAZY_PERSIST",
"replicationFallbacks": ["DISK"],
"storageTypes": ["RAM_DISK","DISK"]
}
]
}
}
See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getAllStoragePolicies
### Set Storage Policy
* Submit a HTTP PUT request.
curl -i -X PUT "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETSTORAGEPOLICY
&storagepolicy=<policy>"
The client receives a response with zero content length:
HTTP/1.1 200 OK
Content-Length: 0
See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).setStoragePolicy
### Unset Storage Policy
* Submit a HTTP POT request.
curl -i -X POST "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=UNSETSTORAGEPOLICY"
The client receives a response with zero content length:
HTTP/1.1 200 OK
Content-Length: 0
See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).unsetStoragePolicy
### Get Storage Policy
* Submit a HTTP GET request.
curl -i "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETSTORAGEPOLICY"
The client receives a response with a [`BlockStoragePolicy` JSON object](#BlockStoragePolicy_JSON_Schema):
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
{
"BlockStoragePolicy": {
"copyOnCreateFile": false,
"creationFallbacks": [],
"id":7,
"name":"HOT",
"replicationFallbacks":["ARCHIVE"],
"storageTypes":["DISK"]
}
}
See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getStoragePolicy
Extended Attributes(XAttrs) Operations
--------------------------------------
@ -1871,6 +2007,107 @@ var tokenProperties =
```
See also: [`Token` Properties](#Token_Properties), the note in [Delegation](#Delegation).
### BlockStoragePolicy JSON Schema
```json
{
"name" : "BlockStoragePolicy",
"properties":
{
"BlockStoragePolicy": blockStoragePolicyProperties //See BlockStoragePolicy Properties
}
}
```
See also: [`BlockStoragePolicy` Properties](#BlockStoragePolicy_Properties), [`GETSTORAGEPOLICY`](#Get_Storage_Policy)
#### BlockStoragePolicy Properties
JavaScript syntax is used to define `blockStoragePolicyProperties` so that it can be referred in both `BlockStoragePolicy` and `BlockStoragePolicies` JSON schemas.
```javascript
var blockStoragePolicyProperties =
{
"type" : "object",
"properties":
{
"id":
{
"description": "Policy ID.",
"type" : "integer",
"required" : true
},
"name":
{
"description": "Policy name.",
"type" : "string",
"required" : true
},
"storageTypes":
{
"description": "An array of storage types for block placement.",
"type" : "array",
"required" : true
"items" :
{
"type": "string"
}
},
"replicationFallbacks":
{
"description": "An array of fallback storage types for replication.",
"type" : "array",
"required" : true
"items" :
{
"type": "string"
}
},
"creationFallbacks":
{
"description": "An array of fallback storage types for file creation.",
"type" : "array",
"required" : true
"items" :
{
"type": "string"
}
},
"copyOnCreateFile":
{
"description": "If set then the policy cannot be changed after file creation.",
"type" : "boolean",
"required" : true
}
}
};
```
### BlockStoragePolicies JSON Schema
A `BlockStoragePolicies` JSON object represents an array of `BlockStoragePolicy` JSON objects.
```json
{
"name" : "BlockStoragePolicies",
"properties":
{
"BlockStoragePolicies":
{
"type" : "object",
"properties":
{
"BlockStoragePolicy":
{
"description": "An array of BlockStoragePolicy",
"type" : "array",
"items" : blockStoragePolicyProperties //See BlockStoragePolicy Properties
}
}
}
}
}
```
HTTP Query Parameter Dictionary
-------------------------------
@ -2281,3 +2518,27 @@ See also: [Authentication](#Authentication)
| Syntax | true |
See also: [Create and Write to a File](#Create_and_Write_to_a_File)
### Storage Policy
| Name | `storagepolicy` |
|:---- |:---- |
| Description | The name of the storage policy. |
| Type | String |
| Default Value | \<empty\> |
| Valid Values | Any valid storage policy name; see [GETALLSTORAGEPOLICY](#Get_all_Storage_Policies). |
| Syntax | Any string. |
See also: [`SETSTORAGEPOLICY`](#Set_Storage_Policy)
### Start After
| Name | `startAfter` |
|:---- |:---- |
| Description | The last item returned in the liststatus batch. |
| Type | String |
| Default Value | \<empty\> |
| Valid Values | Any valid file/directory name. |
| Syntax | Any string. |
See also: [`LISTSTATUS_BATCH`](#Iteratively_List_a_Directory)

View File

@ -37,6 +37,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Random;
import org.apache.commons.io.IOUtils;
@ -44,6 +45,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStoragePolicySpi;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FSDataInputStream;
@ -58,10 +60,13 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.TestDFSClientRetries;
import org.apache.hadoop.hdfs.TestFileCreation;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
@ -1113,4 +1118,67 @@ public class TestWebHDFS {
}
}
}
@Test
public void testStoragePolicy() throws Exception {
MiniDFSCluster cluster = null;
final Configuration conf = WebHdfsTestUtil.createConf();
final Path path = new Path("/file");
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
final DistributedFileSystem dfs = cluster.getFileSystem();
final WebHdfsFileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(
conf, WebHdfsConstants.WEBHDFS_SCHEME);
// test getAllStoragePolicies
BlockStoragePolicy[] dfsPolicies = (BlockStoragePolicy[]) dfs
.getAllStoragePolicies().toArray();
BlockStoragePolicy[] webHdfsPolicies = (BlockStoragePolicy[]) webHdfs
.getAllStoragePolicies().toArray();
Assert.assertTrue(Arrays.equals(dfsPolicies, webHdfsPolicies));
// test get/set/unset policies
DFSTestUtil.createFile(dfs, path, 0, (short) 1, 0L);
// get defaultPolicy
BlockStoragePolicySpi defaultdfsPolicy = dfs.getStoragePolicy(path);
// set policy through webhdfs
webHdfs.setStoragePolicy(path, HdfsConstants.COLD_STORAGE_POLICY_NAME);
// get policy from dfs
BlockStoragePolicySpi dfsPolicy = dfs.getStoragePolicy(path);
// get policy from webhdfs
BlockStoragePolicySpi webHdfsPolicy = webHdfs.getStoragePolicy(path);
Assert.assertEquals(HdfsConstants.COLD_STORAGE_POLICY_NAME.toString(),
webHdfsPolicy.getName());
Assert.assertEquals(webHdfsPolicy, dfsPolicy);
// unset policy
webHdfs.unsetStoragePolicy(path);
Assert.assertEquals(defaultdfsPolicy, webHdfs.getStoragePolicy(path));
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test
public void testSetStoragePolicyWhenPolicyDisabled() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, false);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.build();
try {
cluster.waitActive();
final WebHdfsFileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(
conf, WebHdfsConstants.WEBHDFS_SCHEME);
webHdfs.setStoragePolicy(new Path("/"),
HdfsConstants.COLD_STORAGE_POLICY_NAME);
fail("Should throw exception, when storage policy disabled");
} catch (IOException e) {
Assert.assertTrue(e.getMessage().contains(
"Failed to set storage policy since"));
} finally {
cluster.shutdown();
}
}
}

View File

@ -461,4 +461,12 @@ public class TestParam {
StartAfterParam param = new StartAfterParam(s);
Assert.assertEquals(s, param.getValue());
}
@Test
public void testStoragePolicyParam() {
StoragePolicyParam p = new StoragePolicyParam(StoragePolicyParam.DEFAULT);
Assert.assertEquals(null, p.getValue());
p = new StoragePolicyParam("COLD");
Assert.assertEquals("COLD", p.getValue());
}
}