diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index 7b06ca428bd..efd4a3aff64 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -277,6 +277,10 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { public static final String DFS_ROUTER_ADMIN_MAX_COMPONENT_LENGTH_KEY = FEDERATION_ROUTER_PREFIX + "fs-limits.max-component-length"; public static final int DFS_ROUTER_ADMIN_MAX_COMPONENT_LENGTH_DEFAULT = 0; + public static final String DFS_ROUTER_ADMIN_MOUNT_CHECK_ENABLE = + FEDERATION_ROUTER_PREFIX + "admin.mount.check.enable"; + public static final boolean DFS_ROUTER_ADMIN_MOUNT_CHECK_ENABLE_DEFAULT = + false; // HDFS Router-based federation web public static final String DFS_ROUTER_HTTP_ENABLE = diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java index 50094b02d47..c7525f65635 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -89,6 +90,7 @@ import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -126,6 +128,7 @@ public class RouterAdminServer extends AbstractService private static boolean isPermissionEnabled; private boolean iStateStoreCache; private final long maxComponentLength; + private boolean mountTableCheckDestination; public RouterAdminServer(Configuration conf, Router router) throws IOException { @@ -184,6 +187,9 @@ public class RouterAdminServer extends AbstractService this.maxComponentLength = (int) conf.getLongBytes( RBFConfigKeys.DFS_ROUTER_ADMIN_MAX_COMPONENT_LENGTH_KEY, RBFConfigKeys.DFS_ROUTER_ADMIN_MAX_COMPONENT_LENGTH_DEFAULT); + this.mountTableCheckDestination = conf.getBoolean( + RBFConfigKeys.DFS_ROUTER_ADMIN_MOUNT_CHECK_ENABLE, + RBFConfigKeys.DFS_ROUTER_ADMIN_MOUNT_CHECK_ENABLE_DEFAULT); GenericRefreshProtocolServerSideTranslatorPB genericRefreshXlator = new GenericRefreshProtocolServerSideTranslatorPB(this); @@ -326,6 +332,13 @@ public class RouterAdminServer extends AbstractService // Checks max component length limit. MountTable mountTable = request.getEntry(); verifyMaxComponentLength(mountTable); + if (this.mountTableCheckDestination) { + List nsIds = verifyFileInDestinations(mountTable); + if (!nsIds.isEmpty()) { + throw new IllegalArgumentException("File not found in downstream " + + "nameservices: " + StringUtils.join(",", nsIds)); + } + } return getMountTableStore().addMountTableEntry(request); } @@ -336,6 +349,13 @@ public class RouterAdminServer extends AbstractService MountTable oldEntry = null; // Checks max component length limit. verifyMaxComponentLength(updateEntry); + if (this.mountTableCheckDestination) { + List nsIds = verifyFileInDestinations(updateEntry); + if (!nsIds.isEmpty()) { + throw new IllegalArgumentException("File not found in downstream " + + "nameservices: " + StringUtils.join(",", nsIds)); + } + } if (this.router.getSubclusterResolver() instanceof MountTableResolver) { MountTableResolver mResolver = (MountTableResolver) this.router.getSubclusterResolver(); @@ -542,10 +562,31 @@ public class RouterAdminServer extends AbstractService @Override public GetDestinationResponse getDestination( GetDestinationRequest request) throws IOException { + RouterRpcServer rpcServer = this.router.getRpcServer(); + List locations = + rpcServer.getLocationsForPath(request.getSrcPath(), false); + List nsIds = getDestinationNameServices(request, locations); + if (nsIds.isEmpty() && !locations.isEmpty()) { + String nsId = locations.get(0).getNameserviceId(); + nsIds.add(nsId); + } + return GetDestinationResponse.newInstance(nsIds); + } + + /** + * Get destination nameservices where the file in request exists. + * + * @param request request with src info. + * @param locations remote locations to check against. + * @return list of nameservices where the dest file was found + * @throws IOException + */ + private List getDestinationNameServices( + GetDestinationRequest request, List locations) + throws IOException { final String src = request.getSrcPath(); final List nsIds = new ArrayList<>(); RouterRpcServer rpcServer = this.router.getRpcServer(); - List locations = rpcServer.getLocationsForPath(src, false); RouterRpcClient rpcClient = rpcServer.getRPCClient(); RemoteMethod method = new RemoteMethod("getFileInfo", new Class[] {String.class}, new RemoteParam()); @@ -562,11 +603,35 @@ public class RouterAdminServer extends AbstractService LOG.error("Cannot get location for {}: {}", src, ioe.getMessage()); } - if (nsIds.isEmpty() && !locations.isEmpty()) { - String nsId = locations.get(0).getNameserviceId(); - nsIds.add(nsId); + return nsIds; + } + + /** + * Verify the file exists in destination nameservices to avoid dangling + * mount points. + * + * @param entry the new mount points added, could be from add or update. + * @return destination nameservices where the file doesn't exist. + * @throws IOException unable to verify the file in destinations + */ + public List verifyFileInDestinations(MountTable entry) + throws IOException { + GetDestinationRequest request = + GetDestinationRequest.newInstance(entry.getSourcePath()); + List locations = entry.getDestinations(); + List nsId = + getDestinationNameServices(request, locations); + + // get nameservices where no target file exists + Set destNs = new HashSet<>(nsId); + List nsWithoutFile = new ArrayList<>(); + for (RemoteLocation location : locations) { + String ns = location.getNameserviceId(); + if (!destNs.contains(ns)) { + nsWithoutFile.add(ns); + } } - return GetDestinationResponse.newInstance(nsIds); + return nsWithoutFile; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index 4bd2ac36b13..aab90e46065 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -215,6 +215,16 @@ + + dfs.federation.router.admin.mount.check.enable + false + + If true, add/update mount table will include a destination check to make + sure the file exists in downstream namenodes, and changes to mount table + will fail if the file doesn't exist in any of the destination namenode. + + + dfs.federation.router.http-address 0.0.0.0:50071 diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md index de45645db09..66f039a0c88 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md @@ -201,7 +201,8 @@ And to stop it: ### Mount table management -The mount table entries are pretty much the same as in [ViewFs](../hadoop-hdfs/ViewFs.html). +The mount table entries are pretty much the same as in [ViewFs](../hadoop-hdfs/ViewFs.html). Please make sure the downstream namespace path +exists before creating mount table entry pointing to it. A good practice for simplifying the management is to name the federated namespace with the same names as the destination namespaces. For example, if we to mount `/data/app1` in the federated namespace, it is recommended to have that same name as in the destination namespace. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdmin.java index 8a57224b455..0cb229f1576 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdmin.java @@ -27,11 +27,14 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; @@ -57,7 +60,6 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableE import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.test.Whitebox; import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.util.Time; import org.junit.AfterClass; @@ -65,6 +67,9 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; +import org.mockito.internal.util.reflection.FieldSetter; + +import com.google.common.collect.Lists; /** * The administrator interface of the {@link Router} implemented by @@ -78,6 +83,7 @@ public class TestRouterAdmin { "Hadoop:service=Router,name=FederationRPC"; private static List mockMountTable; private static StateStoreService stateStore; + private static RouterRpcClient mockRpcClient; @BeforeClass public static void globalSetUp() throws Exception { @@ -88,6 +94,7 @@ public class TestRouterAdmin { .admin() .rpc() .build(); + conf.setBoolean(RBFConfigKeys.DFS_ROUTER_ADMIN_MOUNT_CHECK_ENABLE, true); cluster.addRouterOverrides(conf); cluster.startRouters(); routerContext = cluster.getRandomRouter(); @@ -103,11 +110,51 @@ public class TestRouterAdmin { createNamenodeReport("ns1", "nn1", HAServiceState.ACTIVE)); stateStore.refreshCaches(true); + setUpMocks(); + } + + /** + * Group all mocks together. + * + * @throws IOException + * @throws NoSuchFieldException + */ + private static void setUpMocks() throws IOException, NoSuchFieldException { RouterRpcServer spyRpcServer = Mockito.spy(routerContext.getRouter().createRpcServer()); - Whitebox - .setInternalState(routerContext.getRouter(), "rpcServer", spyRpcServer); + FieldSetter.setField(routerContext.getRouter(), + Router.class.getDeclaredField("rpcServer"), spyRpcServer); Mockito.doReturn(null).when(spyRpcServer).getFileInfo(Mockito.anyString()); + + // mock rpc client for destination check when editing mount tables. + mockRpcClient = Mockito.spy(spyRpcServer.getRPCClient()); + FieldSetter.setField(spyRpcServer, + RouterRpcServer.class.getDeclaredField("rpcClient"), + mockRpcClient); + RemoteLocation remoteLocation0 = + new RemoteLocation("ns0", "/testdir", null); + RemoteLocation remoteLocation1 = + new RemoteLocation("ns1", "/", null); + final Map mockResponse0 = new HashMap<>(); + final Map mockResponse1 = new HashMap<>(); + mockResponse0.put(remoteLocation0, + new HdfsFileStatus.Builder().build()); + Mockito.doReturn(mockResponse0).when(mockRpcClient).invokeConcurrent( + Mockito.eq(Lists.newArrayList(remoteLocation0)), + Mockito.any(RemoteMethod.class), + Mockito.eq(false), + Mockito.eq(false), + Mockito.eq(HdfsFileStatus.class) + ); + mockResponse1.put(remoteLocation1, + new HdfsFileStatus.Builder().build()); + Mockito.doReturn(mockResponse1).when(mockRpcClient).invokeConcurrent( + Mockito.eq(Lists.newArrayList(remoteLocation1)), + Mockito.any(RemoteMethod.class), + Mockito.eq(false), + Mockito.eq(false), + Mockito.eq(HdfsFileStatus.class) + ); } @AfterClass @@ -332,6 +379,26 @@ public class TestRouterAdmin { assertEquals(entry.getSourcePath(), "/ns0"); } + @Test + public void testVerifyFileInDestinations() throws IOException { + // this entry has been created in the mock setup + MountTable newEntry = MountTable.newInstance( + "/testpath", Collections.singletonMap("ns0", "/testdir"), + Time.now(), Time.now()); + RouterAdminServer adminServer = + this.routerContext.getRouter().getAdminServer(); + List result = adminServer.verifyFileInDestinations(newEntry); + assertEquals(0, result.size()); + + // this entry was not created in the mock + newEntry = MountTable.newInstance( + "/testpath", Collections.singletonMap("ns0", "/testdir1"), + Time.now(), Time.now()); + result = adminServer.verifyFileInDestinations(newEntry); + assertEquals(1, result.size()); + assertEquals("ns0", result.get(0)); + } + /** * Gets an existing mount table record in the state store. *