HDFS-6876. Archival Storage: support set/get storage policy in DFSAdmin. Contributed by Jing Zhao.

This commit is contained in:
Jing Zhao 2014-09-04 20:14:46 -07:00
parent e08701ec71
commit 185200e709
6 changed files with 264 additions and 119 deletions

View File

@ -1487,10 +1487,12 @@ private DirectoryListing getSnapshotsListing(String src, byte[] startAfter)
* @param src The string representation of the path to the file
* @param resolveLink whether to throw UnresolvedLinkException
* @param isRawPath true if a /.reserved/raw pathname was passed by the user
* @param includeStoragePolicy whether to include storage policy
* @return object containing information regarding the file
* or null if file not found
*/
HdfsFileStatus getFileInfo(String src, boolean resolveLink, boolean isRawPath)
HdfsFileStatus getFileInfo(String src, boolean resolveLink,
boolean isRawPath, boolean includeStoragePolicy)
throws IOException {
String srcs = normalizePath(src);
readLock();
@ -1500,9 +1502,10 @@ HdfsFileStatus getFileInfo(String src, boolean resolveLink, boolean isRawPath)
}
final INodesInPath inodesInPath = getLastINodeInPath(srcs, resolveLink);
final INode i = inodesInPath.getINode(0);
byte policyId = includeStoragePolicy && i != null ?
i.getStoragePolicyID() : BlockStoragePolicy.ID_UNSPECIFIED;
return i == null ? null : createFileStatus(HdfsFileStatus.EMPTY_NAME, i,
BlockStoragePolicy.ID_UNSPECIFIED, inodesInPath.getPathSnapshotId(),
isRawPath);
policyId, inodesInPath.getPathSnapshotId(), isRawPath);
} finally {
readUnlock();
}

View File

@ -334,7 +334,7 @@ public boolean isAuditEnabled() {
private HdfsFileStatus getAuditFileInfo(String path, boolean resolveSymlink)
throws IOException {
return (isAuditEnabled() && isExternalInvocation())
? dir.getFileInfo(path, resolveSymlink, false) : null;
? dir.getFileInfo(path, resolveSymlink, false, false) : null;
}
private void logAuditEvent(boolean succeeded, String cmd, String src)
@ -2546,7 +2546,7 @@ private HdfsFileStatus startFileInt(final String srcArg,
overwrite, createParent, replication, blockSize, suite, edek,
logRetryCache);
stat = dir.getFileInfo(src, false,
FSDirectory.isReservedRawName(srcArg));
FSDirectory.isReservedRawName(srcArg), false);
} catch (StandbyException se) {
skipSync = true;
throw se;
@ -3970,12 +3970,14 @@ HdfsFileStatus getFileInfo(final String srcArg, boolean resolveLink)
try {
checkOperation(OperationCategory.READ);
src = resolvePath(src, pathComponents);
boolean isSuperUser = true;
if (isPermissionEnabled) {
checkPermission(pc, src, false, null, null, null, null, false,
resolveLink);
isSuperUser = pc.isSuperUser();
}
stat = dir.getFileInfo(src, resolveLink,
FSDirectory.isReservedRawName(srcArg));
FSDirectory.isReservedRawName(srcArg), isSuperUser);
} catch (AccessControlException e) {
logAuditEvent(false, "getfileinfo", srcArg);
throw e;

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.tools;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
@ -43,6 +44,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.shell.Command;
import org.apache.hadoop.fs.shell.CommandFormat;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -58,23 +60,24 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.protocol.SnapshotException;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
import org.apache.hadoop.ipc.GenericRefreshProtocol;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
import org.apache.hadoop.ipc.RefreshResponse;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolClientSideTranslatorPB;
import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.RefreshUserMappingsProtocol;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolClientSideTranslatorPB;
import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
@ -555,6 +558,32 @@ private boolean waitExitSafeMode(ClientProtocol nn, boolean inSafeMode)
return inSafeMode;
}
public int setStoragePolicy(String[] argv) throws IOException {
DistributedFileSystem dfs = getDFS();
dfs.setStoragePolicy(new Path(argv[1]), argv[2]);
System.out.println("Set storage policy " + argv[2] + " on " + argv[1]);
return 0;
}
public int getStoragePolicy(String[] argv) throws IOException {
DistributedFileSystem dfs = getDFS();
HdfsFileStatus status = dfs.getClient().getFileInfo(argv[1]);
if (status == null) {
throw new FileNotFoundException("File/Directory does not exist: "
+ argv[1]);
}
byte storagePolicyId = status.getStoragePolicy();
BlockStoragePolicy.Suite suite = BlockStoragePolicy
.readBlockStorageSuite(getConf());
BlockStoragePolicy policy = suite.getPolicy(storagePolicyId);
if (policy != null) {
System.out.println("The storage policy of " + argv[1] + ":\n" + policy);
return 0;
} else {
throw new IOException("Cannot identify the storage policy for " + argv[1]);
}
}
/**
* Allow snapshot on a directory.
* Usage: java DFSAdmin -allowSnapshot snapshotDir
@ -806,6 +835,8 @@ private void printHelp(String cmd) {
"\t[-disallowSnapshot <snapshotDir>]\n" +
"\t[-shutdownDatanode <datanode_host:ipc_port> [upgrade]]\n" +
"\t[-getDatanodeInfo <datanode_host:ipc_port>\n" +
"\t[-setStoragePolicy path policyName\n" +
"\t[-getStoragePolicy path\n" +
"\t[-help [cmd]]\n";
String report ="-report [-live] [-dead] [-decommissioning]:\n" +
@ -924,6 +955,12 @@ private void printHelp(String cmd) {
+ "\tGet the information about the given datanode. This command can\n"
+ "\tbe used for checking if a datanode is alive.\n";
String setStoragePolicy = "-setStoragePolicy path policyName\n"
+ "\tSet the storage policy for a file/directory.\n";
String getStoragePolicy = "-getStoragePolicy path\n"
+ "\tGet the storage policy for a file/directory.\n";
String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
"\t\tis specified.\n";
@ -981,6 +1018,10 @@ private void printHelp(String cmd) {
System.out.println(shutdownDatanode);
} else if ("getDatanodeInfo".equalsIgnoreCase(cmd)) {
System.out.println(getDatanodeInfo);
} else if ("setStoragePolicy".equalsIgnoreCase(cmd)) {
System.out.println(setStoragePolicy);
} else if ("getStoragePolicy".equalsIgnoreCase(cmd)) {
System.out.println(getStoragePolicy);
} else if ("help".equals(cmd)) {
System.out.println(help);
} else {
@ -1012,6 +1053,8 @@ private void printHelp(String cmd) {
System.out.println(disallowSnapshot);
System.out.println(shutdownDatanode);
System.out.println(getDatanodeInfo);
System.out.println(setStoragePolicy);
System.out.println(getStoragePolicy);
System.out.println(help);
System.out.println();
ToolRunner.printGenericCommandUsage(System.out);
@ -1371,6 +1414,12 @@ private static void printUsage(String cmd) {
} else if ("-safemode".equals(cmd)) {
System.err.println("Usage: java DFSAdmin"
+ " [-safemode enter | leave | get | wait]");
} else if ("-setStoragePolicy".equals(cmd)) {
System.err.println("Usage: java DFSAdmin"
+ " [-setStoragePolicy path policyName]");
} else if ("-getStoragePolicy".equals(cmd)) {
System.err.println("Usage: java DFSAdmin"
+ " [-getStoragePolicy path]");
} else if ("-allowSnapshot".equalsIgnoreCase(cmd)) {
System.err.println("Usage: java DFSAdmin"
+ " [-allowSnapshot <snapshotDir>]");
@ -1476,6 +1525,8 @@ private static void printUsage(String cmd) {
System.err.println(" [-fetchImage <local directory>]");
System.err.println(" [-shutdownDatanode <datanode_host:ipc_port> [upgrade]]");
System.err.println(" [-getDatanodeInfo <datanode_host:ipc_port>]");
System.err.println(" [-setStoragePolicy path policyName]");
System.err.println(" [-getStoragePolicy path]");
System.err.println(" [-help [cmd]]");
System.err.println();
ToolRunner.printGenericCommandUsage(System.err);
@ -1607,6 +1658,16 @@ public int run(String[] argv) throws Exception {
printUsage(cmd);
return exitCode;
}
} else if ("-setStoragePolicy".equals(cmd)) {
if (argv.length != 3) {
printUsage(cmd);
return exitCode;
}
} else if ("-getStoragePolicy".equals(cmd)) {
if (argv.length != 2) {
printUsage(cmd);
return exitCode;
}
}
// initialize DFSAdmin
@ -1678,6 +1739,10 @@ public int run(String[] argv) throws Exception {
exitCode = shutdownDatanode(argv, i);
} else if ("-getDatanodeInfo".equals(cmd)) {
exitCode = getDatanodeInfo(argv, i);
} else if ("-setStoragePolicy".equals(cmd)) {
exitCode = setStoragePolicy(argv);
} else if ("-getStoragePolicy".equals(cmd)) {
exitCode = getStoragePolicy(argv);
} else if ("-help".equals(cmd)) {
if (i < argv.length) {
printHelp(argv[i]);

View File

@ -64,6 +64,7 @@
.ConfiguredFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.NetUtils;
@ -73,6 +74,8 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.VersionInfo;
import org.junit.Assume;
@ -1419,4 +1422,55 @@ public static DatanodeDescriptor getExpectedPrimaryNode(NameNode nn,
}
return expectedPrimary.getDatanodeDescriptor();
}
public static void toolRun(Tool tool, String cmd, int retcode, String contain)
throws Exception {
String [] cmds = StringUtils.split(cmd, ' ');
System.out.flush();
System.err.flush();
PrintStream origOut = System.out;
PrintStream origErr = System.err;
String output = null;
int ret = 0;
try {
ByteArrayOutputStream bs = new ByteArrayOutputStream(1024);
PrintStream out = new PrintStream(bs);
System.setOut(out);
System.setErr(out);
ret = tool.run(cmds);
System.out.flush();
System.err.flush();
out.close();
output = bs.toString();
} finally {
System.setOut(origOut);
System.setErr(origErr);
}
System.out.println("Output for command: " + cmd + " retcode: " + ret);
if (output != null) {
System.out.println(output);
}
assertEquals(retcode, ret);
if (contain != null) {
assertTrue("The real output is: " + output + ".\n It should contain: "
+ contain, output.contains(contain));
}
}
public static void FsShellRun(String cmd, int retcode, String contain,
Configuration conf) throws Exception {
FsShell shell = new FsShell(new Configuration(conf));
toolRun(shell, cmd, retcode, contain);
}
public static void DFSAdminRun(String cmd, int retcode, String contain,
Configuration conf) throws Exception {
DFSAdmin admin = new DFSAdmin(new Configuration(conf));
toolRun(admin, cmd, retcode, contain);
}
public static void FsShellRun(String cmd, Configuration conf)
throws Exception {
FsShellRun(cmd, 0, null, conf);
}
}

View File

@ -18,22 +18,11 @@
package org.apache.hadoop.hdfs;
import static org.junit.Assert.*;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@ -89,136 +78,87 @@ public void tearDown() throws IOException {
}
}
private void toolRun(Tool tool, String cmd, int retcode, String contain)
throws Exception {
String [] cmds = StringUtils.split(cmd, ' ');
System.out.flush();
System.err.flush();
PrintStream origOut = System.out;
PrintStream origErr = System.err;
String output = null;
int ret = 0;
try {
ByteArrayOutputStream bs = new ByteArrayOutputStream(1024);
PrintStream out = new PrintStream(bs);
System.setOut(out);
System.setErr(out);
ret = tool.run(cmds);
System.out.flush();
System.err.flush();
out.close();
output = bs.toString();
} finally {
System.setOut(origOut);
System.setErr(origErr);
}
System.out.println("Output for command: " + cmd + " retcode: " + ret);
if (output != null) {
System.out.println(output);
}
assertEquals(retcode, ret);
if (contain != null) {
assertTrue(output.contains(contain));
}
}
private void FsShellRun(String cmd, int retcode, String contain)
throws Exception {
FsShell shell = new FsShell(new Configuration(conf));
toolRun(shell, cmd, retcode, contain);
}
private void DFSAdminRun(String cmd, int retcode, String contain)
throws Exception {
DFSAdmin admin = new DFSAdmin(new Configuration(conf));
toolRun(admin, cmd, retcode, contain);
}
private void FsShellRun(String cmd) throws Exception {
FsShellRun(cmd, 0, null);
}
@Test
public void testAllowSnapshot() throws Exception {
// Idempotent test
DFSAdminRun("-allowSnapshot /sub1", 0, "Allowing snaphot on /sub1 succeeded");
DFSTestUtil.DFSAdminRun("-allowSnapshot /sub1", 0, "Allowing snaphot on /sub1 succeeded", conf);
// allow normal dir success
FsShellRun("-mkdir /sub2");
DFSAdminRun("-allowSnapshot /sub2", 0, "Allowing snaphot on /sub2 succeeded");
DFSTestUtil.FsShellRun("-mkdir /sub2", conf);
DFSTestUtil.DFSAdminRun("-allowSnapshot /sub2", 0, "Allowing snaphot on /sub2 succeeded", conf);
// allow non-exists dir failed
DFSAdminRun("-allowSnapshot /sub3", -1, null);
DFSTestUtil.DFSAdminRun("-allowSnapshot /sub3", -1, null, conf);
}
@Test
public void testCreateSnapshot() throws Exception {
// test createSnapshot
FsShellRun("-createSnapshot /sub1 sn0", 0, "Created snapshot /sub1/.snapshot/sn0");
FsShellRun("-createSnapshot /sub1 sn0", 1, "there is already a snapshot with the same name \"sn0\"");
FsShellRun("-rmr /sub1/sub1sub2");
FsShellRun("-mkdir /sub1/sub1sub3");
FsShellRun("-createSnapshot /sub1 sn1", 0, "Created snapshot /sub1/.snapshot/sn1");
DFSTestUtil.FsShellRun("-createSnapshot /sub1 sn0", 0, "Created snapshot /sub1/.snapshot/sn0", conf);
DFSTestUtil.FsShellRun("-createSnapshot /sub1 sn0", 1, "there is already a snapshot with the same name \"sn0\"", conf);
DFSTestUtil.FsShellRun("-rmr /sub1/sub1sub2", conf);
DFSTestUtil.FsShellRun("-mkdir /sub1/sub1sub3", conf);
DFSTestUtil.FsShellRun("-createSnapshot /sub1 sn1", 0, "Created snapshot /sub1/.snapshot/sn1", conf);
// check snapshot contents
FsShellRun("-ls /sub1", 0, "/sub1/sub1sub1");
FsShellRun("-ls /sub1", 0, "/sub1/sub1sub3");
FsShellRun("-ls /sub1/.snapshot", 0, "/sub1/.snapshot/sn0");
FsShellRun("-ls /sub1/.snapshot", 0, "/sub1/.snapshot/sn1");
FsShellRun("-ls /sub1/.snapshot/sn0", 0, "/sub1/.snapshot/sn0/sub1sub1");
FsShellRun("-ls /sub1/.snapshot/sn0", 0, "/sub1/.snapshot/sn0/sub1sub2");
FsShellRun("-ls /sub1/.snapshot/sn1", 0, "/sub1/.snapshot/sn1/sub1sub1");
FsShellRun("-ls /sub1/.snapshot/sn1", 0, "/sub1/.snapshot/sn1/sub1sub3");
DFSTestUtil.FsShellRun("-ls /sub1", 0, "/sub1/sub1sub1", conf);
DFSTestUtil.FsShellRun("-ls /sub1", 0, "/sub1/sub1sub3", conf);
DFSTestUtil.FsShellRun("-ls /sub1/.snapshot", 0, "/sub1/.snapshot/sn0", conf);
DFSTestUtil.FsShellRun("-ls /sub1/.snapshot", 0, "/sub1/.snapshot/sn1", conf);
DFSTestUtil.FsShellRun("-ls /sub1/.snapshot/sn0", 0, "/sub1/.snapshot/sn0/sub1sub1", conf);
DFSTestUtil.FsShellRun("-ls /sub1/.snapshot/sn0", 0, "/sub1/.snapshot/sn0/sub1sub2", conf);
DFSTestUtil.FsShellRun("-ls /sub1/.snapshot/sn1", 0, "/sub1/.snapshot/sn1/sub1sub1", conf);
DFSTestUtil.FsShellRun("-ls /sub1/.snapshot/sn1", 0, "/sub1/.snapshot/sn1/sub1sub3", conf);
}
@Test
public void testMkdirUsingReservedName() throws Exception {
// test can not create dir with reserved name: .snapshot
FsShellRun("-ls /");
FsShellRun("-mkdir /.snapshot", 1, "File exists");
FsShellRun("-mkdir /sub1/.snapshot", 1, "File exists");
DFSTestUtil.FsShellRun("-ls /", conf);
DFSTestUtil.FsShellRun("-mkdir /.snapshot", 1, "File exists", conf);
DFSTestUtil.FsShellRun("-mkdir /sub1/.snapshot", 1, "File exists", conf);
// mkdir -p ignore reserved name check if dir already exists
FsShellRun("-mkdir -p /sub1/.snapshot");
FsShellRun("-mkdir -p /sub1/sub1sub1/.snapshot", 1, "mkdir: \".snapshot\" is a reserved name.");
DFSTestUtil.FsShellRun("-mkdir -p /sub1/.snapshot", conf);
DFSTestUtil.FsShellRun("-mkdir -p /sub1/sub1sub1/.snapshot", 1, "mkdir: \".snapshot\" is a reserved name.", conf);
}
@Test
public void testRenameSnapshot() throws Exception {
FsShellRun("-createSnapshot /sub1 sn.orig");
FsShellRun("-renameSnapshot /sub1 sn.orig sn.rename");
FsShellRun("-ls /sub1/.snapshot", 0, "/sub1/.snapshot/sn.rename");
FsShellRun("-ls /sub1/.snapshot/sn.rename", 0, "/sub1/.snapshot/sn.rename/sub1sub1");
FsShellRun("-ls /sub1/.snapshot/sn.rename", 0, "/sub1/.snapshot/sn.rename/sub1sub2");
DFSTestUtil.FsShellRun("-createSnapshot /sub1 sn.orig", conf);
DFSTestUtil.FsShellRun("-renameSnapshot /sub1 sn.orig sn.rename", conf);
DFSTestUtil.FsShellRun("-ls /sub1/.snapshot", 0, "/sub1/.snapshot/sn.rename", conf);
DFSTestUtil.FsShellRun("-ls /sub1/.snapshot/sn.rename", 0, "/sub1/.snapshot/sn.rename/sub1sub1", conf);
DFSTestUtil.FsShellRun("-ls /sub1/.snapshot/sn.rename", 0, "/sub1/.snapshot/sn.rename/sub1sub2", conf);
//try renaming from a non-existing snapshot
FsShellRun("-renameSnapshot /sub1 sn.nonexist sn.rename", 1,
"renameSnapshot: The snapshot sn.nonexist does not exist for directory /sub1");
DFSTestUtil.FsShellRun("-renameSnapshot /sub1 sn.nonexist sn.rename", 1,
"renameSnapshot: The snapshot sn.nonexist does not exist for directory /sub1", conf);
//try renaming to existing snapshots
FsShellRun("-createSnapshot /sub1 sn.new");
FsShellRun("-renameSnapshot /sub1 sn.new sn.rename", 1,
"renameSnapshot: The snapshot sn.rename already exists for directory /sub1");
FsShellRun("-renameSnapshot /sub1 sn.rename sn.new", 1,
"renameSnapshot: The snapshot sn.new already exists for directory /sub1");
DFSTestUtil.FsShellRun("-createSnapshot /sub1 sn.new", conf);
DFSTestUtil.FsShellRun("-renameSnapshot /sub1 sn.new sn.rename", 1,
"renameSnapshot: The snapshot sn.rename already exists for directory /sub1", conf);
DFSTestUtil.FsShellRun("-renameSnapshot /sub1 sn.rename sn.new", 1,
"renameSnapshot: The snapshot sn.new already exists for directory /sub1", conf);
}
@Test
public void testDeleteSnapshot() throws Exception {
FsShellRun("-createSnapshot /sub1 sn1");
FsShellRun("-deleteSnapshot /sub1 sn1");
FsShellRun("-deleteSnapshot /sub1 sn1", 1,
"deleteSnapshot: Cannot delete snapshot sn1 from path /sub1: the snapshot does not exist.");
DFSTestUtil.FsShellRun("-createSnapshot /sub1 sn1", conf);
DFSTestUtil.FsShellRun("-deleteSnapshot /sub1 sn1", conf);
DFSTestUtil.FsShellRun("-deleteSnapshot /sub1 sn1", 1,
"deleteSnapshot: Cannot delete snapshot sn1 from path /sub1: the snapshot does not exist.", conf);
}
@Test
public void testDisallowSnapshot() throws Exception {
FsShellRun("-createSnapshot /sub1 sn1");
DFSTestUtil.FsShellRun("-createSnapshot /sub1 sn1", conf);
// cannot delete snapshotable dir
FsShellRun("-rmr /sub1", 1, "The directory /sub1 cannot be deleted since /sub1 is snapshottable and already has snapshots");
DFSAdminRun("-disallowSnapshot /sub1", -1,
"disallowSnapshot: The directory /sub1 has snapshot(s). Please redo the operation after removing all the snapshots.");
FsShellRun("-deleteSnapshot /sub1 sn1");
DFSAdminRun("-disallowSnapshot /sub1", 0, "Disallowing snaphot on /sub1 succeeded");
DFSTestUtil.FsShellRun("-rmr /sub1", 1, "The directory /sub1 cannot be deleted since /sub1 is snapshottable and already has snapshots", conf);
DFSTestUtil.DFSAdminRun("-disallowSnapshot /sub1", -1,
"disallowSnapshot: The directory /sub1 has snapshot(s). Please redo the operation after removing all the snapshots.", conf);
DFSTestUtil.FsShellRun("-deleteSnapshot /sub1 sn1", conf);
DFSTestUtil.DFSAdminRun("-disallowSnapshot /sub1", 0, "Disallowing snaphot on /sub1 succeeded", conf);
// Idempotent test
DFSAdminRun("-disallowSnapshot /sub1", 0, "Disallowing snaphot on /sub1 succeeded");
DFSTestUtil.DFSAdminRun("-disallowSnapshot /sub1", 0, "Disallowing snaphot on /sub1 succeeded", conf);
// now it can be deleted
FsShellRun("-rmr /sub1");
DFSTestUtil.FsShellRun("-rmr /sub1", conf);
}
}

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* Test storage policy related DFSAdmin commands
*/
public class TestStoragePolicyCommands {
private static final short REPL = 1;
private static final int SIZE = 128;
private static Configuration conf;
private static MiniDFSCluster cluster;
private static DistributedFileSystem fs;
@Before
public void clusterSetUp() throws IOException {
conf = new HdfsConfiguration();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPL).build();
cluster.waitActive();
fs = cluster.getFileSystem();
}
@After
public void clusterShutdown() throws IOException{
if(fs != null){
fs.close();
}
if(cluster != null){
cluster.shutdown();
}
}
@Test
public void testSetAndGetStoragePolicy() throws Exception {
final Path foo = new Path("/foo");
final Path bar = new Path(foo, "bar");
DFSTestUtil.createFile(fs, bar, SIZE, REPL, 0);
DFSTestUtil.DFSAdminRun("-setStoragePolicy /foo WARM", 0,
"Set storage policy WARM on " + foo.toString(), conf);
DFSTestUtil.DFSAdminRun("-setStoragePolicy /foo/bar COLD", 0,
"Set storage policy COLD on " + bar.toString(), conf);
DFSTestUtil.DFSAdminRun("-setStoragePolicy /fooz WARM", -1,
"File/Directory does not exist: /fooz", conf);
final BlockStoragePolicy.Suite suite = BlockStoragePolicy
.readBlockStorageSuite(conf);
final BlockStoragePolicy warm = suite.getPolicy("WARM");
final BlockStoragePolicy cold = suite.getPolicy("COLD");
DFSTestUtil.DFSAdminRun("-getStoragePolicy /foo", 0,
"The storage policy of " + foo.toString() + ":\n" + warm, conf);
DFSTestUtil.DFSAdminRun("-getStoragePolicy /foo/bar", 0,
"The storage policy of " + bar.toString() + ":\n" + cold, conf);
DFSTestUtil.DFSAdminRun("-getStoragePolicy /fooz", -1,
"File/Directory does not exist: /fooz", conf);
}
}