HDFS-13876. HttpFS: Implement ALLOWSNAPSHOT, DISALLOWSNAPSHOT. Contributed by Siyao Meng.
Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
This commit is contained in:
parent
62f817d32e
commit
8de5c923b4
|
@ -230,6 +230,7 @@ public class HttpFSFileSystem extends FileSystem
|
|||
REMOVEXATTR(HTTP_PUT), LISTXATTRS(HTTP_GET), LISTSTATUS_BATCH(HTTP_GET),
|
||||
GETALLSTORAGEPOLICY(HTTP_GET), GETSTORAGEPOLICY(HTTP_GET),
|
||||
SETSTORAGEPOLICY(HTTP_PUT), UNSETSTORAGEPOLICY(HTTP_POST),
|
||||
ALLOWSNAPSHOT(HTTP_PUT), DISALLOWSNAPSHOT(HTTP_PUT),
|
||||
CREATESNAPSHOT(HTTP_PUT), DELETESNAPSHOT(HTTP_DELETE),
|
||||
RENAMESNAPSHOT(HTTP_PUT);
|
||||
|
||||
|
@ -1412,6 +1413,22 @@ public class HttpFSFileSystem extends FileSystem
|
|||
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
||||
}
|
||||
|
||||
public void allowSnapshot(Path path) throws IOException {
|
||||
Map<String, String> params = new HashMap<String, String>();
|
||||
params.put(OP_PARAM, Operation.ALLOWSNAPSHOT.toString());
|
||||
HttpURLConnection conn = getConnection(
|
||||
Operation.ALLOWSNAPSHOT.getMethod(), params, path, true);
|
||||
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
||||
}
|
||||
|
||||
public void disallowSnapshot(Path path) throws IOException {
|
||||
Map<String, String> params = new HashMap<String, String>();
|
||||
params.put(OP_PARAM, Operation.DISALLOWSNAPSHOT.toString());
|
||||
HttpURLConnection conn = getConnection(
|
||||
Operation.DISALLOWSNAPSHOT.getMethod(), params, path, true);
|
||||
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final Path createSnapshot(Path path, String snapshotName)
|
||||
throws IOException {
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.fs.http.client.HttpFSFileSystem;
|
|||
import org.apache.hadoop.fs.permission.AclEntry;
|
||||
import org.apache.hadoop.fs.permission.AclStatus;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.lib.service.FileSystemAccess;
|
||||
|
@ -1475,6 +1476,78 @@ public class FSOperations {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executor that performs an allowSnapshot operation.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public static class FSAllowSnapshot implements
|
||||
FileSystemAccess.FileSystemExecutor<Void> {
|
||||
|
||||
private Path path;
|
||||
|
||||
/**
|
||||
* Creates a allowSnapshot executor.
|
||||
* @param path directory path to allow snapshot.
|
||||
*/
|
||||
public FSAllowSnapshot(String path) {
|
||||
this.path = new Path(path);
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the filesystem operation.
|
||||
* @param fs filesystem instance to use.
|
||||
* @throws IOException thrown if an IO error occurred.
|
||||
*/
|
||||
@Override
|
||||
public Void execute(FileSystem fs) throws IOException {
|
||||
if (fs instanceof DistributedFileSystem) {
|
||||
DistributedFileSystem dfs = (DistributedFileSystem) fs;
|
||||
dfs.allowSnapshot(path);
|
||||
} else {
|
||||
throw new UnsupportedOperationException("allowSnapshot is not "
|
||||
+ "supported for HttpFs on " + fs.getClass()
|
||||
+ ". Please check your fs.defaultFS configuration");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executor that performs an disallowSnapshot operation.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public static class FSDisallowSnapshot implements
|
||||
FileSystemAccess.FileSystemExecutor<Void> {
|
||||
|
||||
private Path path;
|
||||
|
||||
/**
|
||||
* Creates a disallowSnapshot executor.
|
||||
* @param path directory path to allow snapshot.
|
||||
*/
|
||||
public FSDisallowSnapshot(String path) {
|
||||
this.path = new Path(path);
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the filesystem operation.
|
||||
* @param fs filesystem instance to use.
|
||||
* @throws IOException thrown if an IO error occurred.
|
||||
*/
|
||||
@Override
|
||||
public Void execute(FileSystem fs) throws IOException {
|
||||
if (fs instanceof DistributedFileSystem) {
|
||||
DistributedFileSystem dfs = (DistributedFileSystem) fs;
|
||||
dfs.disallowSnapshot(path);
|
||||
} else {
|
||||
throw new UnsupportedOperationException("disallowSnapshot is not "
|
||||
+ "supported for HttpFs on " + fs.getClass()
|
||||
+ ". Please check your fs.defaultFS configuration");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executor that performs a createSnapshot FileSystemAccess operation.
|
||||
*/
|
||||
|
|
|
@ -101,6 +101,8 @@ public class HttpFSParametersProvider extends ParametersProvider {
|
|||
PARAMS_DEF.put(Operation.SETSTORAGEPOLICY,
|
||||
new Class[] {PolicyNameParam.class});
|
||||
PARAMS_DEF.put(Operation.UNSETSTORAGEPOLICY, new Class[] {});
|
||||
PARAMS_DEF.put(Operation.ALLOWSNAPSHOT, new Class[] {});
|
||||
PARAMS_DEF.put(Operation.DISALLOWSNAPSHOT, new Class[] {});
|
||||
PARAMS_DEF.put(Operation.CREATESNAPSHOT,
|
||||
new Class[] {SnapshotNameParam.class});
|
||||
PARAMS_DEF.put(Operation.DELETESNAPSHOT,
|
||||
|
|
|
@ -600,6 +600,22 @@ public class HttpFSServer {
|
|||
}
|
||||
break;
|
||||
}
|
||||
case ALLOWSNAPSHOT: {
|
||||
FSOperations.FSAllowSnapshot command =
|
||||
new FSOperations.FSAllowSnapshot(path);
|
||||
fsExecute(user, command);
|
||||
AUDIT_LOG.info("[{}] allowed snapshot", path);
|
||||
response = Response.ok().build();
|
||||
break;
|
||||
}
|
||||
case DISALLOWSNAPSHOT: {
|
||||
FSOperations.FSDisallowSnapshot command =
|
||||
new FSOperations.FSDisallowSnapshot(path);
|
||||
fsExecute(user, command);
|
||||
AUDIT_LOG.info("[{}] disallowed snapshot", path);
|
||||
response = Response.ok().build();
|
||||
break;
|
||||
}
|
||||
case CREATESNAPSHOT: {
|
||||
String snapshotName = params.get(SnapshotNameParam.NAME,
|
||||
SnapshotNameParam.class);
|
||||
|
|
|
@ -40,6 +40,8 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
|
|||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotException;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.test.HFSTestCase;
|
||||
import org.apache.hadoop.test.HadoopUsersConfTestHelper;
|
||||
|
@ -1071,6 +1073,7 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase {
|
|||
GET_XATTRS, REMOVE_XATTR, LIST_XATTRS, ENCRYPTION, LIST_STATUS_BATCH,
|
||||
GETTRASHROOT, STORAGEPOLICY, ERASURE_CODING,
|
||||
CREATE_SNAPSHOT, RENAME_SNAPSHOT, DELETE_SNAPSHOT,
|
||||
ALLOW_SNAPSHOT, DISALLOW_SNAPSHOT, DISALLOW_SNAPSHOT_EXCEPTION,
|
||||
FILE_STATUS_ATTR
|
||||
}
|
||||
|
||||
|
@ -1169,6 +1172,14 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase {
|
|||
case DELETE_SNAPSHOT:
|
||||
testDeleteSnapshot();
|
||||
break;
|
||||
case ALLOW_SNAPSHOT:
|
||||
testAllowSnapshot();
|
||||
break;
|
||||
case DISALLOW_SNAPSHOT:
|
||||
testDisallowSnapshot();
|
||||
break;
|
||||
case DISALLOW_SNAPSHOT_EXCEPTION:
|
||||
testDisallowSnapshotException();
|
||||
case FILE_STATUS_ATTR:
|
||||
testFileStatusAttr();
|
||||
break;
|
||||
|
@ -1251,17 +1262,24 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase {
|
|||
testCreateSnapshot("snap-with-name");
|
||||
}
|
||||
|
||||
private void createSnapshotTestsPreconditions(Path snapshottablePath)
|
||||
throws Exception {
|
||||
private void createSnapshotTestsPreconditions(Path snapshottablePath,
|
||||
Boolean allowSnapshot) throws Exception {
|
||||
//Needed to get a DistributedFileSystem instance, in order to
|
||||
//call allowSnapshot on the newly created directory
|
||||
DistributedFileSystem distributedFs = (DistributedFileSystem)
|
||||
FileSystem.get(snapshottablePath.toUri(), this.getProxiedFSConf());
|
||||
distributedFs.mkdirs(snapshottablePath);
|
||||
distributedFs.allowSnapshot(snapshottablePath);
|
||||
if (allowSnapshot) {
|
||||
distributedFs.allowSnapshot(snapshottablePath);
|
||||
}
|
||||
Path subdirPath = new Path("/tmp/tmp-snap-test/subdir");
|
||||
distributedFs.mkdirs(subdirPath);
|
||||
}
|
||||
|
||||
private void createSnapshotTestsPreconditions(Path snapshottablePath)
|
||||
throws Exception {
|
||||
// Allow snapshot by default for snapshot test
|
||||
createSnapshotTestsPreconditions(snapshottablePath, true);
|
||||
}
|
||||
|
||||
private void cleanSnapshotTests(Path snapshottablePath,
|
||||
|
@ -1310,4 +1328,112 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase {
|
|||
fs.delete(snapshottablePath, true);
|
||||
}
|
||||
}
|
||||
|
||||
private void testAllowSnapshot() throws Exception {
|
||||
if (!this.isLocalFS()) {
|
||||
// Create a directory with snapshot disallowed
|
||||
Path path = new Path("/tmp/tmp-snap-test");
|
||||
createSnapshotTestsPreconditions(path, false);
|
||||
// Get the FileSystem instance that's being tested
|
||||
FileSystem fs = this.getHttpFSFileSystem();
|
||||
// Check FileStatus
|
||||
assertFalse("Snapshot should be disallowed by default",
|
||||
fs.getFileStatus(path).isSnapshotEnabled());
|
||||
// Allow snapshot
|
||||
if (fs instanceof HttpFSFileSystem) {
|
||||
HttpFSFileSystem httpFS = (HttpFSFileSystem) fs;
|
||||
httpFS.allowSnapshot(path);
|
||||
} else if (fs instanceof WebHdfsFileSystem) {
|
||||
WebHdfsFileSystem webHdfsFileSystem = (WebHdfsFileSystem) fs;
|
||||
webHdfsFileSystem.allowSnapshot(path);
|
||||
} else {
|
||||
Assert.fail(fs.getClass().getSimpleName() +
|
||||
" doesn't support allowSnapshot");
|
||||
}
|
||||
// Check FileStatus
|
||||
assertTrue("allowSnapshot failed",
|
||||
fs.getFileStatus(path).isSnapshotEnabled());
|
||||
// Cleanup
|
||||
fs.delete(path, true);
|
||||
}
|
||||
}
|
||||
|
||||
private void testDisallowSnapshot() throws Exception {
|
||||
if (!this.isLocalFS()) {
|
||||
// Create a directory with snapshot allowed
|
||||
Path path = new Path("/tmp/tmp-snap-test");
|
||||
createSnapshotTestsPreconditions(path);
|
||||
// Get the FileSystem instance that's being tested
|
||||
FileSystem fs = this.getHttpFSFileSystem();
|
||||
// Check FileStatus
|
||||
assertTrue("Snapshot should be allowed by DFS",
|
||||
fs.getFileStatus(path).isSnapshotEnabled());
|
||||
// Disallow snapshot
|
||||
if (fs instanceof HttpFSFileSystem) {
|
||||
HttpFSFileSystem httpFS = (HttpFSFileSystem) fs;
|
||||
httpFS.disallowSnapshot(path);
|
||||
} else if (fs instanceof WebHdfsFileSystem) {
|
||||
WebHdfsFileSystem webHdfsFileSystem = (WebHdfsFileSystem) fs;
|
||||
webHdfsFileSystem.disallowSnapshot(path);
|
||||
} else {
|
||||
Assert.fail(fs.getClass().getSimpleName() +
|
||||
" doesn't support disallowSnapshot");
|
||||
}
|
||||
// Check FileStatus
|
||||
assertFalse("disallowSnapshot failed",
|
||||
fs.getFileStatus(path).isSnapshotEnabled());
|
||||
// Cleanup
|
||||
fs.delete(path, true);
|
||||
}
|
||||
}
|
||||
|
||||
private void testDisallowSnapshotException() throws Exception {
|
||||
if (!this.isLocalFS()) {
|
||||
// Create a directory with snapshot allowed
|
||||
Path path = new Path("/tmp/tmp-snap-test");
|
||||
createSnapshotTestsPreconditions(path);
|
||||
// Get the FileSystem instance that's being tested
|
||||
FileSystem fs = this.getHttpFSFileSystem();
|
||||
// Check FileStatus
|
||||
assertTrue("Snapshot should be allowed by DFS",
|
||||
fs.getFileStatus(path).isSnapshotEnabled());
|
||||
// Create some snapshots
|
||||
fs.createSnapshot(path, "snap-01");
|
||||
fs.createSnapshot(path, "snap-02");
|
||||
// Disallow snapshot
|
||||
boolean disallowSuccess = false;
|
||||
if (fs instanceof HttpFSFileSystem) {
|
||||
HttpFSFileSystem httpFS = (HttpFSFileSystem) fs;
|
||||
try {
|
||||
httpFS.disallowSnapshot(path);
|
||||
disallowSuccess = true;
|
||||
} catch (SnapshotException e) {
|
||||
// Expect SnapshotException
|
||||
}
|
||||
} else if (fs instanceof WebHdfsFileSystem) {
|
||||
WebHdfsFileSystem webHdfsFileSystem = (WebHdfsFileSystem) fs;
|
||||
try {
|
||||
webHdfsFileSystem.disallowSnapshot(path);
|
||||
disallowSuccess = true;
|
||||
} catch (SnapshotException e) {
|
||||
// Expect SnapshotException
|
||||
}
|
||||
} else {
|
||||
Assert.fail(fs.getClass().getSimpleName() +
|
||||
" doesn't support disallowSnapshot");
|
||||
}
|
||||
if (disallowSuccess) {
|
||||
Assert.fail("disallowSnapshot doesn't throw SnapshotException when "
|
||||
+ "disallowing snapshot on a directory with at least one snapshot");
|
||||
}
|
||||
// Check FileStatus, should still be enabled since
|
||||
// disallow snapshot should fail
|
||||
assertTrue("disallowSnapshot should not have succeeded",
|
||||
fs.getFileStatus(path).isSnapshotEnabled());
|
||||
// Cleanup
|
||||
fs.deleteSnapshot(path, "snap-02");
|
||||
fs.deleteSnapshot(path, "snap-01");
|
||||
fs.delete(path, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1103,6 +1103,108 @@ public class TestHttpFSServer extends HFSTestCase {
|
|||
return conn;
|
||||
}
|
||||
|
||||
@Test
|
||||
@TestDir
|
||||
@TestJetty
|
||||
@TestHdfs
|
||||
public void testAllowSnapshot() throws Exception {
|
||||
createHttpFSServer(false, false);
|
||||
// Create a test directory
|
||||
String pathString = "/tmp/tmp-snap-allow-test";
|
||||
createDirWithHttp(pathString, "700", null);
|
||||
|
||||
Path path = new Path(pathString);
|
||||
DistributedFileSystem dfs = (DistributedFileSystem) FileSystem.get(
|
||||
path.toUri(), TestHdfsHelper.getHdfsConf());
|
||||
// FileStatus should have snapshot enabled bit unset by default
|
||||
Assert.assertFalse(dfs.getFileStatus(path).isSnapshotEnabled());
|
||||
// Send a request with ALLOWSNAPSHOT API
|
||||
String user = HadoopUsersConfTestHelper.getHadoopUsers()[0];
|
||||
URL url = new URL(TestJettyHelper.getJettyURL(), MessageFormat.format(
|
||||
"/webhdfs/v1{0}?user.name={1}&op=ALLOWSNAPSHOT",
|
||||
pathString, user));
|
||||
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
|
||||
conn.setRequestMethod("PUT");
|
||||
conn.connect();
|
||||
// Should return HTTP_OK
|
||||
Assert.assertEquals(conn.getResponseCode(), HttpURLConnection.HTTP_OK);
|
||||
// FileStatus should have snapshot enabled bit set
|
||||
Assert.assertTrue(dfs.getFileStatus(path).isSnapshotEnabled());
|
||||
// Clean up
|
||||
dfs.delete(path, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
@TestDir
|
||||
@TestJetty
|
||||
@TestHdfs
|
||||
public void testDisallowSnapshot() throws Exception {
|
||||
createHttpFSServer(false, false);
|
||||
// Create a test directory
|
||||
String pathString = "/tmp/tmp-snap-disallow-test";
|
||||
createDirWithHttp(pathString, "700", null);
|
||||
|
||||
Path path = new Path(pathString);
|
||||
DistributedFileSystem dfs = (DistributedFileSystem) FileSystem.get(
|
||||
path.toUri(), TestHdfsHelper.getHdfsConf());
|
||||
// Allow snapshot
|
||||
dfs.allowSnapshot(path);
|
||||
// FileStatus should have snapshot enabled bit set so far
|
||||
Assert.assertTrue(dfs.getFileStatus(path).isSnapshotEnabled());
|
||||
// Send a request with DISALLOWSNAPSHOT API
|
||||
String user = HadoopUsersConfTestHelper.getHadoopUsers()[0];
|
||||
URL url = new URL(TestJettyHelper.getJettyURL(), MessageFormat.format(
|
||||
"/webhdfs/v1{0}?user.name={1}&op=DISALLOWSNAPSHOT",
|
||||
pathString, user));
|
||||
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
|
||||
conn.setRequestMethod("PUT");
|
||||
conn.connect();
|
||||
// Should return HTTP_OK
|
||||
Assert.assertEquals(conn.getResponseCode(), HttpURLConnection.HTTP_OK);
|
||||
// FileStatus should not have snapshot enabled bit set
|
||||
Assert.assertFalse(dfs.getFileStatus(path).isSnapshotEnabled());
|
||||
// Clean up
|
||||
dfs.delete(path, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
@TestDir
|
||||
@TestJetty
|
||||
@TestHdfs
|
||||
public void testDisallowSnapshotException() throws Exception {
|
||||
createHttpFSServer(false, false);
|
||||
// Create a test directory
|
||||
String pathString = "/tmp/tmp-snap-disallow-exception-test";
|
||||
createDirWithHttp(pathString, "700", null);
|
||||
|
||||
Path path = new Path(pathString);
|
||||
DistributedFileSystem dfs = (DistributedFileSystem) FileSystem.get(
|
||||
path.toUri(), TestHdfsHelper.getHdfsConf());
|
||||
// Allow snapshot
|
||||
dfs.allowSnapshot(path);
|
||||
// FileStatus should have snapshot enabled bit set so far
|
||||
Assert.assertTrue(dfs.getFileStatus(path).isSnapshotEnabled());
|
||||
// Create some snapshots
|
||||
dfs.createSnapshot(path, "snap-01");
|
||||
dfs.createSnapshot(path, "snap-02");
|
||||
// Send a request with DISALLOWSNAPSHOT API
|
||||
String user = HadoopUsersConfTestHelper.getHadoopUsers()[0];
|
||||
URL url = new URL(TestJettyHelper.getJettyURL(), MessageFormat.format(
|
||||
"/webhdfs/v1{0}?user.name={1}&op=DISALLOWSNAPSHOT",
|
||||
pathString, user));
|
||||
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
|
||||
conn.setRequestMethod("PUT");
|
||||
conn.connect();
|
||||
// Should not return HTTP_OK
|
||||
Assert.assertNotEquals(conn.getResponseCode(), HttpURLConnection.HTTP_OK);
|
||||
// FileStatus should still have snapshot enabled bit set
|
||||
Assert.assertTrue(dfs.getFileStatus(path).isSnapshotEnabled());
|
||||
// Clean up
|
||||
dfs.deleteSnapshot(path, "snap-02");
|
||||
dfs.deleteSnapshot(path, "snap-01");
|
||||
dfs.delete(path, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
@TestDir
|
||||
@TestJetty
|
||||
|
|
Loading…
Reference in New Issue