HDFS-13787. RBF: Add Snapshot related ClientProtocol APIs. Contributed by Inigo Goiri.

This commit is contained in:
Ayush Saxena 2019-05-30 19:58:19 +05:30 committed by Brahma Reddy Battula
parent d240eec136
commit b6fff8c81e
5 changed files with 372 additions and 21 deletions

View File

@ -136,6 +136,8 @@ public class RouterClientProtocol implements ClientProtocol {
private final RouterCacheAdmin routerCacheAdmin;
/** StoragePolicy calls. **/
private final RouterStoragePolicy storagePolicy;
/** Snapshot calls. */
private final RouterSnapshot snapshotProto;
/** Router security manager to handle token operations. */
private RouterSecurityManager securityManager = null;
@ -166,6 +168,7 @@ public class RouterClientProtocol implements ClientProtocol {
DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
this.erasureCoding = new ErasureCoding(rpcServer);
this.storagePolicy = new RouterStoragePolicy(rpcServer);
this.snapshotProto = new RouterSnapshot(rpcServer);
this.routerCacheAdmin = new RouterCacheAdmin(rpcServer);
this.securityManager = rpcServer.getRouterSecurityManager();
}
@ -1221,42 +1224,42 @@ public class RouterClientProtocol implements ClientProtocol {
return rpcClient.invokeSequential(locations, method, String.class, null);
}
@Override // Client Protocol
@Override
public void allowSnapshot(String snapshotRoot) throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
snapshotProto.allowSnapshot(snapshotRoot);
}
@Override // Client Protocol
@Override
public void disallowSnapshot(String snapshot) throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
snapshotProto.disallowSnapshot(snapshot);
}
@Override
public void renameSnapshot(String snapshotRoot, String snapshotOldName,
String snapshotNewName) throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
snapshotProto.renameSnapshot(
snapshotRoot, snapshotOldName, snapshotNewName);
}
@Override
public SnapshottableDirectoryStatus[] getSnapshottableDirListing()
throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
return null;
return snapshotProto.getSnapshottableDirListing();
}
@Override
public SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot,
String earlierSnapshotName, String laterSnapshotName) throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
return null;
return snapshotProto.getSnapshotDiffReport(
snapshotRoot, earlierSnapshotName, laterSnapshotName);
}
@Override
public SnapshotDiffReportListing getSnapshotDiffReportListing(
String snapshotRoot, String earlierSnapshotName, String laterSnapshotName,
byte[] startPath, int index) throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
return null;
return snapshotProto.getSnapshotDiffReportListing(
snapshotRoot, earlierSnapshotName, laterSnapshotName, startPath, index);
}
@Override
@ -1558,14 +1561,13 @@ public class RouterClientProtocol implements ClientProtocol {
@Override
public String createSnapshot(String snapshotRoot, String snapshotName)
throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
return null;
return snapshotProto.createSnapshot(snapshotRoot, snapshotName);
}
@Override
public void deleteSnapshot(String snapshotRoot, String snapshotName)
throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
snapshotProto.deleteSnapshot(snapshotRoot, snapshotName);
}
@Override

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.SnapshotException;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
@ -882,6 +883,14 @@ public class RouterRpcClient {
return newException;
}
if (ioe instanceof SnapshotException) {
String newMsg = processExceptionMsg(
ioe.getMessage(), loc.getDest(), loc.getSrc());
SnapshotException newException = new SnapshotException(newMsg);
newException.setStackTrace(ioe.getStackTrace());
return newException;
}
return ioe;
}

View File

@ -1007,12 +1007,12 @@ public class RouterRpcServer extends AbstractService
return clientProto.getLinkTarget(path);
}
@Override // Client Protocol
@Override // ClientProtocol
public void allowSnapshot(String snapshotRoot) throws IOException {
clientProto.allowSnapshot(snapshotRoot);
}
@Override // Client Protocol
@Override // ClientProtocol
public void disallowSnapshot(String snapshot) throws IOException {
clientProto.disallowSnapshot(snapshot);
}
@ -1023,7 +1023,7 @@ public class RouterRpcServer extends AbstractService
clientProto.renameSnapshot(snapshotRoot, snapshotOldName, snapshotNewName);
}
@Override // Client Protocol
@Override // ClientProtocol
public SnapshottableDirectoryStatus[] getSnapshottableDirListing()
throws IOException {
return clientProto.getSnapshottableDirListing();
@ -1584,14 +1584,16 @@ public class RouterRpcServer extends AbstractService
* @param clazz Class of the values.
* @return Array with the outputs.
*/
protected static <T> T[] merge(
static <T> T[] merge(
Map<FederationNamespaceInfo, T[]> map, Class<T> clazz) {
// Put all results into a set to avoid repeats
Set<T> ret = new LinkedHashSet<>();
for (T[] values : map.values()) {
for (T val : values) {
ret.add(val);
if (values != null) {
for (T val : values) {
ret.add(val);
}
}
}
@ -1605,7 +1607,7 @@ public class RouterRpcServer extends AbstractService
* @param clazz Class of the values.
* @return Array with the values in set.
*/
private static <T> T[] toArray(Collection<T> set, Class<T> clazz) {
static <T> T[] toArray(Collection<T> set, Class<T> clazz) {
@SuppressWarnings("unchecked")
T[] combinedData = (T[]) Array.newInstance(clazz, set.size());
combinedData = set.toArray(combinedData);

View File

@ -0,0 +1,208 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
/**
* Module that implements all the RPC calls related to snapshots in
* {@link ClientProtocol} in the {@link RouterRpcServer}.
*/
public class RouterSnapshot {
/** RPC server to receive client calls. */
private final RouterRpcServer rpcServer;
/** RPC clients to connect to the Namenodes. */
private final RouterRpcClient rpcClient;
/** Find generic locations. */
private final ActiveNamenodeResolver namenodeResolver;
public RouterSnapshot(RouterRpcServer server) {
this.rpcServer = server;
this.rpcClient = this.rpcServer.getRPCClient();
this.namenodeResolver = rpcServer.getNamenodeResolver();
}
public void allowSnapshot(String snapshotRoot) throws IOException {
rpcServer.checkOperation(OperationCategory.WRITE);
final List<RemoteLocation> locations =
rpcServer.getLocationsForPath(snapshotRoot, true, false);
RemoteMethod method = new RemoteMethod("allowSnapshot",
new Class<?>[] {String.class}, new RemoteParam());
if (rpcServer.isInvokeConcurrent(snapshotRoot)) {
rpcClient.invokeConcurrent(locations, method);
} else {
rpcClient.invokeSequential(locations, method);
}
}
public void disallowSnapshot(String snapshotRoot) throws IOException {
rpcServer.checkOperation(OperationCategory.WRITE);
final List<RemoteLocation> locations =
rpcServer.getLocationsForPath(snapshotRoot, true, false);
RemoteMethod method = new RemoteMethod("disallowSnapshot",
new Class<?>[] {String.class}, new RemoteParam());
if (rpcServer.isInvokeConcurrent(snapshotRoot)) {
rpcClient.invokeConcurrent(locations, method);
} else {
rpcClient.invokeSequential(locations, method);
}
}
public String createSnapshot(String snapshotRoot, String snapshotName)
throws IOException {
rpcServer.checkOperation(OperationCategory.WRITE);
final List<RemoteLocation> locations =
rpcServer.getLocationsForPath(snapshotRoot, true, false);
RemoteMethod method = new RemoteMethod("createSnapshot",
new Class<?>[] {String.class, String.class}, new RemoteParam(),
snapshotName);
String result = null;
if (rpcServer.isInvokeConcurrent(snapshotRoot)) {
Map<RemoteLocation, String> results = rpcClient.invokeConcurrent(
locations, method, String.class);
Entry<RemoteLocation, String> firstelement =
results.entrySet().iterator().next();
RemoteLocation loc = firstelement.getKey();
result = firstelement.getValue();
result = result.replaceFirst(loc.getDest(), loc.getSrc());
} else {
result = rpcClient.invokeSequential(
locations, method, String.class, null);
RemoteLocation loc = locations.get(0);
result = result.replaceFirst(loc.getDest(), loc.getSrc());
}
return result;
}
public void deleteSnapshot(String snapshotRoot, String snapshotName)
throws IOException {
rpcServer.checkOperation(OperationCategory.WRITE);
final List<RemoteLocation> locations =
rpcServer.getLocationsForPath(snapshotRoot, true, false);
RemoteMethod method = new RemoteMethod("deleteSnapshot",
new Class<?>[] {String.class, String.class},
new RemoteParam(), snapshotName);
if (rpcServer.isInvokeConcurrent(snapshotRoot)) {
rpcClient.invokeConcurrent(locations, method);
} else {
rpcClient.invokeSequential(locations, method);
}
}
public void renameSnapshot(String snapshotRoot, String oldSnapshotName,
String newSnapshot) throws IOException {
rpcServer.checkOperation(OperationCategory.WRITE);
final List<RemoteLocation> locations =
rpcServer.getLocationsForPath(snapshotRoot, true, false);
RemoteMethod method = new RemoteMethod("renameSnapshot",
new Class<?>[] {String.class, String.class, String.class},
new RemoteParam(), oldSnapshotName, newSnapshot);
if (rpcServer.isInvokeConcurrent(snapshotRoot)) {
rpcClient.invokeConcurrent(locations, method);
} else {
rpcClient.invokeSequential(locations, method);
}
}
public SnapshottableDirectoryStatus[] getSnapshottableDirListing()
throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ);
RemoteMethod method = new RemoteMethod("getSnapshottableDirListing");
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
Map<FederationNamespaceInfo, SnapshottableDirectoryStatus[]> ret =
rpcClient.invokeConcurrent(
nss, method, true, false, SnapshottableDirectoryStatus[].class);
return RouterRpcServer.merge(ret, SnapshottableDirectoryStatus.class);
}
public SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot,
String earlierSnapshotName, String laterSnapshotName)
throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ);
final List<RemoteLocation> locations =
rpcServer.getLocationsForPath(snapshotRoot, true, false);
RemoteMethod remoteMethod = new RemoteMethod("getSnapshotDiffReport",
new Class<?>[] {String.class, String.class, String.class},
new RemoteParam(), earlierSnapshotName, laterSnapshotName);
if (rpcServer.isInvokeConcurrent(snapshotRoot)) {
Map<RemoteLocation, SnapshotDiffReport> ret = rpcClient.invokeConcurrent(
locations, remoteMethod, true, false, SnapshotDiffReport.class);
return ret.values().iterator().next();
} else {
return rpcClient.invokeSequential(
locations, remoteMethod, SnapshotDiffReport.class, null);
}
}
public SnapshotDiffReportListing getSnapshotDiffReportListing(
String snapshotRoot, String earlierSnapshotName, String laterSnapshotName,
byte[] startPath, int index) throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ);
final List<RemoteLocation> locations =
rpcServer.getLocationsForPath(snapshotRoot, true, false);
Class<?>[] params = new Class<?>[] {
String.class, String.class, String.class,
byte[].class, int.class};
RemoteMethod remoteMethod = new RemoteMethod(
"getSnapshotDiffReportListing", params,
new RemoteParam(), earlierSnapshotName, laterSnapshotName,
startPath, index);
if (rpcServer.isInvokeConcurrent(snapshotRoot)) {
Map<RemoteLocation, SnapshotDiffReportListing> ret =
rpcClient.invokeConcurrent(locations, remoteMethod, false, false,
SnapshotDiffReportListing.class);
Collection<SnapshotDiffReportListing> listings = ret.values();
SnapshotDiffReportListing listing0 = listings.iterator().next();
return listing0;
} else {
return rpcClient.invokeSequential(
locations, remoteMethod, SnapshotDiffReportListing.class, null);
}
}
}

View File

@ -86,6 +86,10 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
import org.apache.hadoop.hdfs.protocol.SnapshotException;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
@ -94,6 +98,11 @@ import org.apache.hadoop.hdfs.server.federation.MockResolver;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
@ -105,6 +114,7 @@ import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.codehaus.jettison.json.JSONObject;
import org.junit.AfterClass;
import org.junit.Before;
@ -743,6 +753,126 @@ public class TestRouterRpc {
new Object[] {badPath, (long) 0, "testclient"});
}
@Test
public void testAllowDisallowSnapshots() throws Exception {
// Create a directory via the router at the root level
String dirPath = "/testdir";
String filePath1 = "/sample";
FsPermission permission = new FsPermission("705");
routerProtocol.mkdirs(dirPath, permission, false);
createFile(routerFS, filePath1, 32);
// Check that initially doesn't allow snapshots
NamenodeContext nnContext = cluster.getNamenodes().get(0);
NameNode nn = nnContext.getNamenode();
FSNamesystem fsn = NameNodeAdapter.getNamesystem(nn);
FSDirectory fsdir = fsn.getFSDirectory();
INodeDirectory dirNode = fsdir.getINode4Write(dirPath).asDirectory();
assertFalse(dirNode.isSnapshottable());
// Allow snapshots and verify the folder allows them
routerProtocol.allowSnapshot("/testdir");
dirNode = fsdir.getINode4Write(dirPath).asDirectory();
assertTrue(dirNode.isSnapshottable());
// Disallow snapshot on dir and verify does not allow snapshots anymore
routerProtocol.disallowSnapshot("/testdir");
dirNode = fsdir.getINode4Write(dirPath).asDirectory();
assertFalse(dirNode.isSnapshottable());
// Cleanup
routerProtocol.delete(dirPath, true);
}
@Test
public void testManageSnapshot() throws Exception {
final String mountPoint = "/mntsnapshot";
final String snapshotFolder = mountPoint + "/folder";
LOG.info("Setup a mount point for snapshots: {}", mountPoint);
Router r = router.getRouter();
MockResolver resolver = (MockResolver) r.getSubclusterResolver();
String ns0 = cluster.getNameservices().get(0);
resolver.addLocation(mountPoint, ns0, "/");
FsPermission permission = new FsPermission("777");
routerProtocol.mkdirs(mountPoint, permission, false);
routerProtocol.mkdirs(snapshotFolder, permission, false);
for (int i = 1; i <= 9; i++) {
String folderPath = snapshotFolder + "/subfolder" + i;
routerProtocol.mkdirs(folderPath, permission, false);
}
LOG.info("Create the snapshot: {}", snapshotFolder);
routerProtocol.allowSnapshot(snapshotFolder);
String snapshotName = routerProtocol.createSnapshot(
snapshotFolder, "snap");
assertEquals(snapshotFolder + "/.snapshot/snap", snapshotName);
assertTrue(verifyFileExists(routerFS, snapshotFolder + "/.snapshot/snap"));
LOG.info("Rename the snapshot and check it changed");
routerProtocol.renameSnapshot(snapshotFolder, "snap", "newsnap");
assertFalse(
verifyFileExists(routerFS, snapshotFolder + "/.snapshot/snap"));
assertTrue(
verifyFileExists(routerFS, snapshotFolder + "/.snapshot/newsnap"));
LambdaTestUtils.intercept(SnapshotException.class,
"Cannot delete snapshot snap from path " + snapshotFolder + ":",
() -> routerFS.deleteSnapshot(new Path(snapshotFolder), "snap"));
LOG.info("Delete the snapshot and check it is not there");
routerProtocol.deleteSnapshot(snapshotFolder, "newsnap");
assertFalse(
verifyFileExists(routerFS, snapshotFolder + "/.snapshot/newsnap"));
// Cleanup
routerProtocol.delete(mountPoint, true);
}
@Test
public void testGetSnapshotListing() throws IOException {
// Create a directory via the router and allow snapshots
final String snapshotPath = "/testGetSnapshotListing";
final String childDir = snapshotPath + "/subdir";
FsPermission permission = new FsPermission("705");
routerProtocol.mkdirs(snapshotPath, permission, false);
routerProtocol.allowSnapshot(snapshotPath);
// Create two snapshots
final String snapshot1 = "snap1";
final String snapshot2 = "snap2";
routerProtocol.createSnapshot(snapshotPath, snapshot1);
routerProtocol.mkdirs(childDir, permission, false);
routerProtocol.createSnapshot(snapshotPath, snapshot2);
// Check for listing through the Router
SnapshottableDirectoryStatus[] dirList =
routerProtocol.getSnapshottableDirListing();
assertEquals(1, dirList.length);
SnapshottableDirectoryStatus snapshotDir0 = dirList[0];
assertEquals(snapshotPath, snapshotDir0.getFullPath().toString());
// Check for difference report in two snapshot
SnapshotDiffReport diffReport = routerProtocol.getSnapshotDiffReport(
snapshotPath, snapshot1, snapshot2);
assertEquals(2, diffReport.getDiffList().size());
// Check for difference in two snapshot
byte[] startPath = {};
SnapshotDiffReportListing diffReportListing =
routerProtocol.getSnapshotDiffReportListing(
snapshotPath, snapshot1, snapshot2, startPath, -1);
assertEquals(1, diffReportListing.getModifyList().size());
assertEquals(1, diffReportListing.getCreateList().size());
// Cleanup
routerProtocol.deleteSnapshot(snapshotPath, snapshot1);
routerProtocol.deleteSnapshot(snapshotPath, snapshot2);
routerProtocol.disallowSnapshot(snapshotPath);
}
@Test
public void testProxyGetBlockLocations() throws Exception {