HDFS-15554. RBF: force router check file existence in destinations before adding/updating mount points (#2266). Contributed by Fengnan Li.
This commit is contained in:
parent
83c7c2b4c4
commit
3e8b1e7426
|
@ -277,6 +277,10 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
|
||||||
public static final String DFS_ROUTER_ADMIN_MAX_COMPONENT_LENGTH_KEY =
|
public static final String DFS_ROUTER_ADMIN_MAX_COMPONENT_LENGTH_KEY =
|
||||||
FEDERATION_ROUTER_PREFIX + "fs-limits.max-component-length";
|
FEDERATION_ROUTER_PREFIX + "fs-limits.max-component-length";
|
||||||
public static final int DFS_ROUTER_ADMIN_MAX_COMPONENT_LENGTH_DEFAULT = 0;
|
public static final int DFS_ROUTER_ADMIN_MAX_COMPONENT_LENGTH_DEFAULT = 0;
|
||||||
|
public static final String DFS_ROUTER_ADMIN_MOUNT_CHECK_ENABLE =
|
||||||
|
FEDERATION_ROUTER_PREFIX + "admin.mount.check.enable";
|
||||||
|
public static final boolean DFS_ROUTER_ADMIN_MOUNT_CHECK_ENABLE_DEFAULT =
|
||||||
|
false;
|
||||||
|
|
||||||
// HDFS Router-based federation web
|
// HDFS Router-based federation web
|
||||||
public static final String DFS_ROUTER_HTTP_ENABLE =
|
public static final String DFS_ROUTER_HTTP_ENABLE =
|
||||||
|
|
|
@ -25,6 +25,7 @@ import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -89,6 +90,7 @@ import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.authorize.ProxyUsers;
|
import org.apache.hadoop.security.authorize.ProxyUsers;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -126,6 +128,7 @@ public class RouterAdminServer extends AbstractService
|
||||||
private static boolean isPermissionEnabled;
|
private static boolean isPermissionEnabled;
|
||||||
private boolean iStateStoreCache;
|
private boolean iStateStoreCache;
|
||||||
private final long maxComponentLength;
|
private final long maxComponentLength;
|
||||||
|
private boolean mountTableCheckDestination;
|
||||||
|
|
||||||
public RouterAdminServer(Configuration conf, Router router)
|
public RouterAdminServer(Configuration conf, Router router)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -184,6 +187,9 @@ public class RouterAdminServer extends AbstractService
|
||||||
this.maxComponentLength = (int) conf.getLongBytes(
|
this.maxComponentLength = (int) conf.getLongBytes(
|
||||||
RBFConfigKeys.DFS_ROUTER_ADMIN_MAX_COMPONENT_LENGTH_KEY,
|
RBFConfigKeys.DFS_ROUTER_ADMIN_MAX_COMPONENT_LENGTH_KEY,
|
||||||
RBFConfigKeys.DFS_ROUTER_ADMIN_MAX_COMPONENT_LENGTH_DEFAULT);
|
RBFConfigKeys.DFS_ROUTER_ADMIN_MAX_COMPONENT_LENGTH_DEFAULT);
|
||||||
|
this.mountTableCheckDestination = conf.getBoolean(
|
||||||
|
RBFConfigKeys.DFS_ROUTER_ADMIN_MOUNT_CHECK_ENABLE,
|
||||||
|
RBFConfigKeys.DFS_ROUTER_ADMIN_MOUNT_CHECK_ENABLE_DEFAULT);
|
||||||
|
|
||||||
GenericRefreshProtocolServerSideTranslatorPB genericRefreshXlator =
|
GenericRefreshProtocolServerSideTranslatorPB genericRefreshXlator =
|
||||||
new GenericRefreshProtocolServerSideTranslatorPB(this);
|
new GenericRefreshProtocolServerSideTranslatorPB(this);
|
||||||
|
@ -326,6 +332,13 @@ public class RouterAdminServer extends AbstractService
|
||||||
// Checks max component length limit.
|
// Checks max component length limit.
|
||||||
MountTable mountTable = request.getEntry();
|
MountTable mountTable = request.getEntry();
|
||||||
verifyMaxComponentLength(mountTable);
|
verifyMaxComponentLength(mountTable);
|
||||||
|
if (this.mountTableCheckDestination) {
|
||||||
|
List<String> nsIds = verifyFileInDestinations(mountTable);
|
||||||
|
if (!nsIds.isEmpty()) {
|
||||||
|
throw new IllegalArgumentException("File not found in downstream " +
|
||||||
|
"nameservices: " + StringUtils.join(",", nsIds));
|
||||||
|
}
|
||||||
|
}
|
||||||
return getMountTableStore().addMountTableEntry(request);
|
return getMountTableStore().addMountTableEntry(request);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -336,6 +349,13 @@ public class RouterAdminServer extends AbstractService
|
||||||
MountTable oldEntry = null;
|
MountTable oldEntry = null;
|
||||||
// Checks max component length limit.
|
// Checks max component length limit.
|
||||||
verifyMaxComponentLength(updateEntry);
|
verifyMaxComponentLength(updateEntry);
|
||||||
|
if (this.mountTableCheckDestination) {
|
||||||
|
List<String> nsIds = verifyFileInDestinations(updateEntry);
|
||||||
|
if (!nsIds.isEmpty()) {
|
||||||
|
throw new IllegalArgumentException("File not found in downstream " +
|
||||||
|
"nameservices: " + StringUtils.join(",", nsIds));
|
||||||
|
}
|
||||||
|
}
|
||||||
if (this.router.getSubclusterResolver() instanceof MountTableResolver) {
|
if (this.router.getSubclusterResolver() instanceof MountTableResolver) {
|
||||||
MountTableResolver mResolver =
|
MountTableResolver mResolver =
|
||||||
(MountTableResolver) this.router.getSubclusterResolver();
|
(MountTableResolver) this.router.getSubclusterResolver();
|
||||||
|
@ -542,10 +562,31 @@ public class RouterAdminServer extends AbstractService
|
||||||
@Override
|
@Override
|
||||||
public GetDestinationResponse getDestination(
|
public GetDestinationResponse getDestination(
|
||||||
GetDestinationRequest request) throws IOException {
|
GetDestinationRequest request) throws IOException {
|
||||||
|
RouterRpcServer rpcServer = this.router.getRpcServer();
|
||||||
|
List<RemoteLocation> locations =
|
||||||
|
rpcServer.getLocationsForPath(request.getSrcPath(), false);
|
||||||
|
List<String> nsIds = getDestinationNameServices(request, locations);
|
||||||
|
if (nsIds.isEmpty() && !locations.isEmpty()) {
|
||||||
|
String nsId = locations.get(0).getNameserviceId();
|
||||||
|
nsIds.add(nsId);
|
||||||
|
}
|
||||||
|
return GetDestinationResponse.newInstance(nsIds);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get destination nameservices where the file in request exists.
|
||||||
|
*
|
||||||
|
* @param request request with src info.
|
||||||
|
* @param locations remote locations to check against.
|
||||||
|
* @return list of nameservices where the dest file was found
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private List<String> getDestinationNameServices(
|
||||||
|
GetDestinationRequest request, List<RemoteLocation> locations)
|
||||||
|
throws IOException {
|
||||||
final String src = request.getSrcPath();
|
final String src = request.getSrcPath();
|
||||||
final List<String> nsIds = new ArrayList<>();
|
final List<String> nsIds = new ArrayList<>();
|
||||||
RouterRpcServer rpcServer = this.router.getRpcServer();
|
RouterRpcServer rpcServer = this.router.getRpcServer();
|
||||||
List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, false);
|
|
||||||
RouterRpcClient rpcClient = rpcServer.getRPCClient();
|
RouterRpcClient rpcClient = rpcServer.getRPCClient();
|
||||||
RemoteMethod method = new RemoteMethod("getFileInfo",
|
RemoteMethod method = new RemoteMethod("getFileInfo",
|
||||||
new Class<?>[] {String.class}, new RemoteParam());
|
new Class<?>[] {String.class}, new RemoteParam());
|
||||||
|
@ -562,11 +603,35 @@ public class RouterAdminServer extends AbstractService
|
||||||
LOG.error("Cannot get location for {}: {}",
|
LOG.error("Cannot get location for {}: {}",
|
||||||
src, ioe.getMessage());
|
src, ioe.getMessage());
|
||||||
}
|
}
|
||||||
if (nsIds.isEmpty() && !locations.isEmpty()) {
|
return nsIds;
|
||||||
String nsId = locations.get(0).getNameserviceId();
|
}
|
||||||
nsIds.add(nsId);
|
|
||||||
|
/**
|
||||||
|
* Verify the file exists in destination nameservices to avoid dangling
|
||||||
|
* mount points.
|
||||||
|
*
|
||||||
|
* @param entry the new mount points added, could be from add or update.
|
||||||
|
* @return destination nameservices where the file doesn't exist.
|
||||||
|
* @throws IOException unable to verify the file in destinations
|
||||||
|
*/
|
||||||
|
public List<String> verifyFileInDestinations(MountTable entry)
|
||||||
|
throws IOException {
|
||||||
|
GetDestinationRequest request =
|
||||||
|
GetDestinationRequest.newInstance(entry.getSourcePath());
|
||||||
|
List<RemoteLocation> locations = entry.getDestinations();
|
||||||
|
List<String> nsId =
|
||||||
|
getDestinationNameServices(request, locations);
|
||||||
|
|
||||||
|
// get nameservices where no target file exists
|
||||||
|
Set<String> destNs = new HashSet<>(nsId);
|
||||||
|
List<String> nsWithoutFile = new ArrayList<>();
|
||||||
|
for (RemoteLocation location : locations) {
|
||||||
|
String ns = location.getNameserviceId();
|
||||||
|
if (!destNs.contains(ns)) {
|
||||||
|
nsWithoutFile.add(ns);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return GetDestinationResponse.newInstance(nsIds);
|
return nsWithoutFile;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -215,6 +215,16 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.federation.router.admin.mount.check.enable</name>
|
||||||
|
<value>false</value>
|
||||||
|
<description>
|
||||||
|
If true, add/update mount table will include a destination check to make
|
||||||
|
sure the file exists in downstream namenodes, and changes to mount table
|
||||||
|
will fail if the file doesn't exist in any of the destination namenode.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.federation.router.http-address</name>
|
<name>dfs.federation.router.http-address</name>
|
||||||
<value>0.0.0.0:50071</value>
|
<value>0.0.0.0:50071</value>
|
||||||
|
|
|
@ -201,7 +201,8 @@ And to stop it:
|
||||||
|
|
||||||
### Mount table management
|
### Mount table management
|
||||||
|
|
||||||
The mount table entries are pretty much the same as in [ViewFs](../hadoop-hdfs/ViewFs.html).
|
The mount table entries are pretty much the same as in [ViewFs](../hadoop-hdfs/ViewFs.html). Please make sure the downstream namespace path
|
||||||
|
exists before creating mount table entry pointing to it.
|
||||||
A good practice for simplifying the management is to name the federated namespace with the same names as the destination namespaces.
|
A good practice for simplifying the management is to name the federated namespace with the same names as the destination namespaces.
|
||||||
For example, if we to mount `/data/app1` in the federated namespace, it is recommended to have that same name as in the destination namespace.
|
For example, if we to mount `/data/app1` in the federated namespace, it is recommended to have that same name as in the destination namespace.
|
||||||
|
|
||||||
|
|
|
@ -27,11 +27,14 @@ import static org.junit.Assert.assertTrue;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
|
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
|
||||||
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||||
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
|
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
|
||||||
|
@ -57,7 +60,6 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableE
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.test.Whitebox;
|
|
||||||
import org.apache.hadoop.test.LambdaTestUtils;
|
import org.apache.hadoop.test.LambdaTestUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
@ -65,6 +67,9 @@ import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.internal.util.reflection.FieldSetter;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The administrator interface of the {@link Router} implemented by
|
* The administrator interface of the {@link Router} implemented by
|
||||||
|
@ -78,6 +83,7 @@ public class TestRouterAdmin {
|
||||||
"Hadoop:service=Router,name=FederationRPC";
|
"Hadoop:service=Router,name=FederationRPC";
|
||||||
private static List<MountTable> mockMountTable;
|
private static List<MountTable> mockMountTable;
|
||||||
private static StateStoreService stateStore;
|
private static StateStoreService stateStore;
|
||||||
|
private static RouterRpcClient mockRpcClient;
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void globalSetUp() throws Exception {
|
public static void globalSetUp() throws Exception {
|
||||||
|
@ -88,6 +94,7 @@ public class TestRouterAdmin {
|
||||||
.admin()
|
.admin()
|
||||||
.rpc()
|
.rpc()
|
||||||
.build();
|
.build();
|
||||||
|
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_ADMIN_MOUNT_CHECK_ENABLE, true);
|
||||||
cluster.addRouterOverrides(conf);
|
cluster.addRouterOverrides(conf);
|
||||||
cluster.startRouters();
|
cluster.startRouters();
|
||||||
routerContext = cluster.getRandomRouter();
|
routerContext = cluster.getRandomRouter();
|
||||||
|
@ -103,11 +110,51 @@ public class TestRouterAdmin {
|
||||||
createNamenodeReport("ns1", "nn1", HAServiceState.ACTIVE));
|
createNamenodeReport("ns1", "nn1", HAServiceState.ACTIVE));
|
||||||
stateStore.refreshCaches(true);
|
stateStore.refreshCaches(true);
|
||||||
|
|
||||||
|
setUpMocks();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Group all mocks together.
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
* @throws NoSuchFieldException
|
||||||
|
*/
|
||||||
|
private static void setUpMocks() throws IOException, NoSuchFieldException {
|
||||||
RouterRpcServer spyRpcServer =
|
RouterRpcServer spyRpcServer =
|
||||||
Mockito.spy(routerContext.getRouter().createRpcServer());
|
Mockito.spy(routerContext.getRouter().createRpcServer());
|
||||||
Whitebox
|
FieldSetter.setField(routerContext.getRouter(),
|
||||||
.setInternalState(routerContext.getRouter(), "rpcServer", spyRpcServer);
|
Router.class.getDeclaredField("rpcServer"), spyRpcServer);
|
||||||
Mockito.doReturn(null).when(spyRpcServer).getFileInfo(Mockito.anyString());
|
Mockito.doReturn(null).when(spyRpcServer).getFileInfo(Mockito.anyString());
|
||||||
|
|
||||||
|
// mock rpc client for destination check when editing mount tables.
|
||||||
|
mockRpcClient = Mockito.spy(spyRpcServer.getRPCClient());
|
||||||
|
FieldSetter.setField(spyRpcServer,
|
||||||
|
RouterRpcServer.class.getDeclaredField("rpcClient"),
|
||||||
|
mockRpcClient);
|
||||||
|
RemoteLocation remoteLocation0 =
|
||||||
|
new RemoteLocation("ns0", "/testdir", null);
|
||||||
|
RemoteLocation remoteLocation1 =
|
||||||
|
new RemoteLocation("ns1", "/", null);
|
||||||
|
final Map<RemoteLocation, HdfsFileStatus> mockResponse0 = new HashMap<>();
|
||||||
|
final Map<RemoteLocation, HdfsFileStatus> mockResponse1 = new HashMap<>();
|
||||||
|
mockResponse0.put(remoteLocation0,
|
||||||
|
new HdfsFileStatus.Builder().build());
|
||||||
|
Mockito.doReturn(mockResponse0).when(mockRpcClient).invokeConcurrent(
|
||||||
|
Mockito.eq(Lists.newArrayList(remoteLocation0)),
|
||||||
|
Mockito.any(RemoteMethod.class),
|
||||||
|
Mockito.eq(false),
|
||||||
|
Mockito.eq(false),
|
||||||
|
Mockito.eq(HdfsFileStatus.class)
|
||||||
|
);
|
||||||
|
mockResponse1.put(remoteLocation1,
|
||||||
|
new HdfsFileStatus.Builder().build());
|
||||||
|
Mockito.doReturn(mockResponse1).when(mockRpcClient).invokeConcurrent(
|
||||||
|
Mockito.eq(Lists.newArrayList(remoteLocation1)),
|
||||||
|
Mockito.any(RemoteMethod.class),
|
||||||
|
Mockito.eq(false),
|
||||||
|
Mockito.eq(false),
|
||||||
|
Mockito.eq(HdfsFileStatus.class)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
|
@ -332,6 +379,26 @@ public class TestRouterAdmin {
|
||||||
assertEquals(entry.getSourcePath(), "/ns0");
|
assertEquals(entry.getSourcePath(), "/ns0");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testVerifyFileInDestinations() throws IOException {
|
||||||
|
// this entry has been created in the mock setup
|
||||||
|
MountTable newEntry = MountTable.newInstance(
|
||||||
|
"/testpath", Collections.singletonMap("ns0", "/testdir"),
|
||||||
|
Time.now(), Time.now());
|
||||||
|
RouterAdminServer adminServer =
|
||||||
|
this.routerContext.getRouter().getAdminServer();
|
||||||
|
List<String> result = adminServer.verifyFileInDestinations(newEntry);
|
||||||
|
assertEquals(0, result.size());
|
||||||
|
|
||||||
|
// this entry was not created in the mock
|
||||||
|
newEntry = MountTable.newInstance(
|
||||||
|
"/testpath", Collections.singletonMap("ns0", "/testdir1"),
|
||||||
|
Time.now(), Time.now());
|
||||||
|
result = adminServer.verifyFileInDestinations(newEntry);
|
||||||
|
assertEquals(1, result.size());
|
||||||
|
assertEquals("ns0", result.get(0));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets an existing mount table record in the state store.
|
* Gets an existing mount table record in the state store.
|
||||||
*
|
*
|
||||||
|
|
Loading…
Reference in New Issue