HDFS-13484. RBF: Disable Nameservices from the federation. Contributed by Inigo Goiri.

This commit is contained in:
Yiqun Lin 2018-04-25 15:22:26 +08:00 committed by Owen O'Malley
parent ebedd69e25
commit 48269c370c
15 changed files with 571 additions and 42 deletions

View File

@ -642,7 +642,7 @@ public class FederationMetrics implements FederationMBean {
namenodeResolver.getNamenodesForNameserviceId(nsId);
if (nns != null) {
FederationNamenodeContext nn = nns.get(0);
if (nn != null && nn instanceof MembershipState) {
if (nn instanceof MembershipState) {
resultList.add((MembershipState) nn);
}
}

View File

@ -108,6 +108,14 @@ public interface ActiveNamenodeResolver {
*/
Set<FederationNamespaceInfo> getNamespaces() throws IOException;
/**
* Get a list of all namespaces that are disabled.
*
* @return List of name spaces identifier in the federation
* @throws IOException If the disabled list is not available.
*/
Set<String> getDisabledNamespaces() throws IOException;
/**
* Assign a unique identifier for the parent router service.
* Required to report the status to the namenode resolver.

View File

@ -27,7 +27,8 @@ public enum FederationNamenodeServiceState {
ACTIVE, // HAServiceState.ACTIVE or operational.
STANDBY, // HAServiceState.STANDBY.
UNAVAILABLE, // When the namenode cannot be reached.
EXPIRED; // When the last update is too old.
EXPIRED, // When the last update is too old.
DISABLED; // When the nameservice is disabled.
public static FederationNamenodeServiceState getState(HAServiceState state) {
switch(state) {

View File

@ -29,10 +29,13 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.store.DisabledNameserviceStore;
import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
import org.apache.hadoop.hdfs.server.federation.store.RecordStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreCache;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
@ -63,6 +66,8 @@ public class MembershipNamenodeResolver
private final StateStoreService stateStore;
/** Membership State Store interface. */
private MembershipStore membershipInterface;
/** Disabled Nameservice State Store interface. */
private DisabledNameserviceStore disabledNameserviceInterface;
/** Parent router ID. */
private String routerId;
@ -88,22 +93,38 @@ public class MembershipNamenodeResolver
private synchronized MembershipStore getMembershipStore() throws IOException {
if (this.membershipInterface == null) {
this.membershipInterface = this.stateStore.getRegisteredRecordStore(
MembershipStore.class);
if (this.membershipInterface == null) {
throw new IOException("State Store does not have an interface for " +
MembershipStore.class.getSimpleName());
}
this.membershipInterface = getStoreInterface(MembershipStore.class);
}
return this.membershipInterface;
}
private synchronized DisabledNameserviceStore getDisabledNameserviceStore()
throws IOException {
if (this.disabledNameserviceInterface == null) {
this.disabledNameserviceInterface =
getStoreInterface(DisabledNameserviceStore.class);
}
return this.disabledNameserviceInterface;
}
private <T extends RecordStore<?>> T getStoreInterface(Class<T> clazz)
throws IOException{
T store = this.stateStore.getRegisteredRecordStore(clazz);
if (store == null) {
throw new IOException("State Store does not have an interface for " +
clazz.getSimpleName());
}
return store;
}
@Override
public boolean loadCache(boolean force) {
// Our cache depends on the store, update it first
try {
MembershipStore membership = getMembershipStore();
membership.loadCache(force);
DisabledNameserviceStore disabled = getDisabledNameserviceStore();
disabled.loadCache(force);
} catch (IOException e) {
LOG.error("Cannot update membership from the State Store", e);
}
@ -151,30 +172,48 @@ public class MembershipNamenodeResolver
final String nsId) throws IOException {
List<? extends FederationNamenodeContext> ret = cacheNS.get(nsId);
if (ret == null) {
try {
MembershipState partial = MembershipState.newInstance();
partial.setNameserviceId(nsId);
GetNamenodeRegistrationsRequest request =
GetNamenodeRegistrationsRequest.newInstance(partial);
final List<MembershipState> result =
getRecentRegistrationForQuery(request, true, false);
if (result == null || result.isEmpty()) {
LOG.error("Cannot locate eligible NNs for {}", nsId);
return null;
} else {
cacheNS.put(nsId, result);
ret = result;
}
} catch (StateStoreUnavailableException e) {
LOG.error("Cannot get active NN for {}, State Store unavailable", nsId);
}
if (ret != null) {
return ret;
}
if (ret == null) {
// Not cached, generate the value
final List<MembershipState> result;
try {
MembershipState partial = MembershipState.newInstance();
partial.setNameserviceId(nsId);
GetNamenodeRegistrationsRequest request =
GetNamenodeRegistrationsRequest.newInstance(partial);
result = getRecentRegistrationForQuery(request, true, false);
} catch (StateStoreUnavailableException e) {
LOG.error("Cannot get active NN for {}, State Store unavailable", nsId);
return null;
}
return Collections.unmodifiableList(ret);
if (result == null || result.isEmpty()) {
LOG.error("Cannot locate eligible NNs for {}", nsId);
return null;
}
// Mark disabled name services
try {
Set<String> disabled =
getDisabledNameserviceStore().getDisabledNameservices();
if (disabled == null) {
LOG.error("Cannot get disabled name services");
} else {
for (MembershipState nn : result) {
if (disabled.contains(nn.getNameserviceId())) {
nn.setState(FederationNamenodeServiceState.DISABLED);
}
}
}
} catch (StateStoreUnavailableException e) {
LOG.error("Cannot get disabled name services, State Store unavailable");
}
// Cache the response
ret = Collections.unmodifiableList(result);
cacheNS.put(nsId, result);
return ret;
}
@Override
@ -261,7 +300,24 @@ public class MembershipNamenodeResolver
GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance();
GetNamespaceInfoResponse response =
getMembershipStore().getNamespaceInfo(request);
return response.getNamespaceInfo();
Set<FederationNamespaceInfo> nss = response.getNamespaceInfo();
// Filter disabled namespaces
Set<FederationNamespaceInfo> ret = new TreeSet<>();
Set<String> disabled = getDisabledNamespaces();
for (FederationNamespaceInfo ns : nss) {
if (!disabled.contains(ns.getNameserviceId())) {
ret.add(ns);
}
}
return ret;
}
@Override
public Set<String> getDisabledNamespaces() throws IOException {
DisabledNameserviceStore store = getDisabledNameserviceStore();
return store.getDisabledNameservices();
}
/**

View File

@ -29,6 +29,8 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.proto.RouterProtocolProtos.RouterAdminProtocolService;
import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.store.DisabledNameserviceStore;
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
@ -282,28 +284,60 @@ public class RouterAdminServer extends AbstractService
@Override
public DisableNameserviceResponse disableNameservice(
DisableNameserviceRequest request) throws IOException {
// TODO check permissions
RouterPermissionChecker pc = getPermissionChecker();
if (pc != null) {
pc.checkSuperuserPrivilege();
}
String nsId = request.getNameServiceId();
// TODO check that the name service exists
boolean success = getDisabledNameserviceStore().disableNameservice(nsId);
boolean success = false;
if (namespaceExists(nsId)) {
success = getDisabledNameserviceStore().disableNameservice(nsId);
} else {
LOG.error("Cannot disable {}, it does not exists", nsId);
}
return DisableNameserviceResponse.newInstance(success);
}
private boolean namespaceExists(final String nsId) throws IOException {
boolean found = false;
ActiveNamenodeResolver resolver = router.getNamenodeResolver();
Set<FederationNamespaceInfo> nss = resolver.getNamespaces();
for (FederationNamespaceInfo ns : nss) {
if (nsId.equals(ns.getNameserviceId())) {
found = true;
break;
}
}
return found;
}
@Override
public EnableNameserviceResponse enableNameservice(
EnableNameserviceRequest request) throws IOException {
// TODO check permissions
RouterPermissionChecker pc = getPermissionChecker();
if (pc != null) {
pc.checkSuperuserPrivilege();
}
String nsId = request.getNameServiceId();
// TODO check that the name service exists
boolean success = getDisabledNameserviceStore().enableNameservice(nsId);
DisabledNameserviceStore store = getDisabledNameserviceStore();
Set<String> disabled = store.getDisabledNameservices();
boolean success = false;
if (disabled.contains(nsId)) {
success = store.enableNameservice(nsId);
} else {
LOG.error("Cannot enable {}, it was not disabled", nsId);
}
return EnableNameserviceResponse.newInstance(success);
}
@Override
public GetDisabledNameservicesResponse getDisabledNameservices(
GetDisabledNameservicesRequest request) throws IOException {
// TODO check permissions
Set<String> nsIds = getDisabledNameserviceStore().getDisabledNameservices();
Set<String> nsIds =
getDisabledNameserviceStore().getDisabledNameservices();
return GetDisabledNameservicesResponse.newInstance(nsIds);
}

View File

@ -17,12 +17,17 @@
*/
package org.apache.hadoop.hdfs.server.federation.router;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
@ -35,9 +40,23 @@ public class RouterPermissionChecker extends FSPermissionChecker {
/** Mount table default permission. */
public static final short MOUNT_TABLE_PERMISSION_DEFAULT = 00755;
public RouterPermissionChecker(String routerOwner, String supergroup,
/** Name of the super user. */
private final String superUser;
/** Name of the super group. */
private final String superGroup;
public RouterPermissionChecker(String user, String group,
UserGroupInformation callerUgi) {
super(routerOwner, supergroup, callerUgi, null);
super(user, group, callerUgi, null);
this.superUser = user;
this.superGroup = group;
}
public RouterPermissionChecker(String user, String group)
throws IOException {
super(user, group, UserGroupInformation.getCurrentUser(), null);
this.superUser = user;
this.superGroup = group;
}
/**
@ -79,4 +98,40 @@ public class RouterPermissionChecker extends FSPermissionChecker {
+ ": user " + getUser() + " does not have " + access.toString()
+ " permissions.");
}
/**
* Check the superuser privileges of the current RPC caller. This method is
* based on Datanode#checkSuperuserPrivilege().
* @throws AccessControlException If the user is not authorized.
*/
@Override
public void checkSuperuserPrivilege() throws AccessControlException {
// Try to get the ugi in the RPC call.
UserGroupInformation ugi = null;
try {
ugi = NameNode.getRemoteUser();
} catch (IOException e) {
// Ignore as we catch it afterwards
}
if (ugi == null) {
LOG.error("Cannot get the remote user name");
throw new AccessControlException("Cannot get the remote user name");
}
// Is this by the Router user itself?
if (ugi.getUserName().equals(superUser)) {
return;
}
// Is the user a member of the super group?
List<String> groups = Arrays.asList(ugi.getGroupNames());
if (groups.contains(superGroup)) {
return;
}
// Not a superuser
throw new AccessControlException(
ugi.getUserName() + " is not a super user");
}
}

View File

@ -30,6 +30,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Array;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
@ -2270,7 +2271,15 @@ public class RouterRpcServer extends AbstractService
}
}
return location.getDestinations();
// Filter disabled subclusters
Set<String> disabled = namenodeResolver.getDisabledNamespaces();
List<RemoteLocation> locs = new ArrayList<>();
for (RemoteLocation loc : location.getDestinations()) {
if (!disabled.contains(loc.getNameserviceId())) {
locs.add(loc);
}
}
return locs;
} catch (IOException ioe) {
if (this.rpcMonitor != null) {
this.rpcMonitor.routerFailureStateStore();

View File

@ -229,6 +229,17 @@ Ls command will show below information for each mount table entry:
Source Destinations Owner Group Mode Quota/Usage
/path ns0->/path root supergroup rwxr-xr-x [NsQuota: 50/0, SsQuota: 100 B/0 B]
### Disabling nameservices
To prevent accessing a nameservice (sublcuster), it can be disabled from the federation.
For example, one can disable `ns1`, list it and enable it again:
[hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -nameservice disable ns1
[hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -getDisabledNameservices
[hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -nameservice enable ns1
This is useful when decommissioning subclusters or when one subcluster is missbehaving (e.g., low performance or unavailability).
Client configuration
--------------------

View File

@ -20,6 +20,9 @@ package org.apache.hadoop.hdfs.server.federation;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
@ -49,9 +52,18 @@ import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.test.GenericTestUtils;
import org.mockito.internal.util.reflection.Whitebox;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Supplier;
@ -60,6 +72,9 @@ import com.google.common.base.Supplier;
*/
public final class FederationTestUtils {
private static final Logger LOG =
LoggerFactory.getLogger(FederationTestUtils.class);
public final static String[] NAMESERVICES = {"ns0", "ns1"};
public final static String[] NAMENODES = {"nn0", "nn1", "nn2", "nn3"};
public final static String[] ROUTERS =
@ -274,4 +289,31 @@ public final class FederationTestUtils {
throws IOException {
return fs.delete(new Path(path), true);
}
/**
* Simulate that a Namenode is slow by adding a sleep to the check operation
* in the NN.
* @param nn Namenode to simulate slow.
* @param seconds Number of seconds to add to the Namenode.
* @throws Exception If we cannot add the sleep time.
*/
public static void simulateSlowNamenode(final NameNode nn, final int seconds)
throws Exception {
FSNamesystem namesystem = nn.getNamesystem();
HAContext haContext = namesystem.getHAContext();
HAContext spyHAContext = spy(haContext);
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
LOG.info("Simulating slow namenode {}", invocation.getMock());
try {
Thread.sleep(seconds * 1000);
} catch(InterruptedException e) {
LOG.error("Simulating a slow namenode aborted");
}
return null;
}
}).when(spyHAContext).checkOperation(any(OperationCategory.class));
Whitebox.setInternalState(namesystem, "haContext", spyHAContext);
}
}

View File

@ -185,6 +185,10 @@ public class MiniRouterDFSCluster {
return this.fileContext;
}
public URI getFileSystemURI() {
return fileSystemUri;
}
public String getHttpAddress() {
InetSocketAddress httpAddress = router.getHttpServerAddress();
return NetUtils.getHostPortString(httpAddress);
@ -236,6 +240,10 @@ public class MiniRouterDFSCluster {
return adminClient;
}
public void resetAdminClient() {
adminClient = null;
}
public DFSClient getClient() throws IOException, URISyntaxException {
if (client == null) {
LOG.info("Connecting to router at {}", fileSystemUri);

View File

@ -27,6 +27,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
@ -262,6 +263,11 @@ public class MockResolver
return this.namespaces;
}
@Override
public Set<String> getDisabledNamespaces() throws IOException {
return new TreeSet<>();
}
@Override
public PathLocation getDestinationForPath(String path) throws IOException {
List<RemoteLocation> remoteLocations = new LinkedList<>();

View File

@ -0,0 +1,236 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateSlowNamenode;
import static org.apache.hadoop.util.Time.monotonicNow;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.store.DisabledNameserviceStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.DisableNameserviceRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.codehaus.jettison.json.JSONObject;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Test the behavior when disabling name services.
*/
public class TestDisableNameservices {
private static StateStoreDFSCluster cluster;
private static RouterContext routerContext;
private static RouterClient routerAdminClient;
private static ClientProtocol routerProtocol;
@BeforeClass
public static void setUp() throws Exception {
// Build and start a federated cluster
cluster = new StateStoreDFSCluster(false, 2);
Configuration routerConf = new RouterConfigBuilder()
.stateStore()
.metrics()
.admin()
.rpc()
.build();
// Reduce the number of RPC threads to saturate the Router easy
routerConf.setInt(RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY, 8);
routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 4);
// Set the DNs to belong to only one subcluster
cluster.setIndependentDNs();
cluster.addRouterOverrides(routerConf);
// override some settings for the client
cluster.startCluster();
cluster.startRouters();
cluster.waitClusterUp();
routerContext = cluster.getRandomRouter();
routerProtocol = routerContext.getClient().getNamenode();
routerAdminClient = routerContext.getAdminClient();
setupNamespace();
// Simulate one of the subclusters to be slow
MiniDFSCluster dfsCluster = cluster.getCluster();
NameNode nn0 = dfsCluster.getNameNode(0);
simulateSlowNamenode(nn0, 1);
}
private static void setupNamespace() throws IOException {
// Setup a mount table to map to the two namespaces
MountTableManager mountTable = routerAdminClient.getMountTableManager();
Map<String, String> destinations = new TreeMap<>();
destinations.put("ns0", "/");
destinations.put("ns1", "/");
MountTable newEntry = MountTable.newInstance("/", destinations);
newEntry.setDestOrder(DestinationOrder.RANDOM);
AddMountTableEntryRequest request =
AddMountTableEntryRequest.newInstance(newEntry);
mountTable.addMountTableEntry(request);
// Refresh the cache in the Router
Router router = routerContext.getRouter();
MountTableResolver mountTableResolver =
(MountTableResolver) router.getSubclusterResolver();
mountTableResolver.loadCache(true);
// Add a folder to each namespace
NamenodeContext nn0 = cluster.getNamenode("ns0", null);
nn0.getFileSystem().mkdirs(new Path("/dirns0"));
NamenodeContext nn1 = cluster.getNamenode("ns1", null);
nn1.getFileSystem().mkdirs(new Path("/dirns1"));
}
@AfterClass
public static void tearDown() {
if (cluster != null) {
cluster.stopRouter(routerContext);
cluster.shutdown();
cluster = null;
}
}
@After
public void cleanup() throws IOException {
Router router = routerContext.getRouter();
StateStoreService stateStore = router.getStateStore();
DisabledNameserviceStore store =
stateStore.getRegisteredRecordStore(DisabledNameserviceStore.class);
store.loadCache(true);
Set<String> disabled = store.getDisabledNameservices();
for (String nsId : disabled) {
store.enableNameservice(nsId);
}
store.loadCache(true);
}
@Test
public void testWithoutDisabling() throws IOException {
// ns0 is slow and renewLease should take a long time
long t0 = monotonicNow();
routerProtocol.renewLease("client0");
long t = monotonicNow() - t0;
assertTrue("It took too little: " + t + "ms",
t > TimeUnit.SECONDS.toMillis(1));
// Return the results from all subclusters even if slow
FileSystem routerFs = routerContext.getFileSystem();
FileStatus[] filesStatus = routerFs.listStatus(new Path("/"));
assertEquals(2, filesStatus.length);
assertEquals("dirns0", filesStatus[0].getPath().getName());
assertEquals("dirns1", filesStatus[1].getPath().getName());
}
@Test
public void testDisabling() throws Exception {
disableNameservice("ns0");
// renewLease should be fast as we are skipping ns0
long t0 = monotonicNow();
routerProtocol.renewLease("client0");
long t = monotonicNow() - t0;
assertTrue("It took too long: " + t + "ms",
t < TimeUnit.SECONDS.toMillis(1));
// We should not report anything from ns0
FileSystem routerFs = routerContext.getFileSystem();
FileStatus[] filesStatus = routerFs.listStatus(new Path("/"));
assertEquals(1, filesStatus.length);
assertEquals("dirns1", filesStatus[0].getPath().getName());
}
@Test
public void testMetrics() throws Exception {
disableNameservice("ns0");
int numActive = 0;
int numDisabled = 0;
Router router = routerContext.getRouter();
FederationMetrics metrics = router.getMetrics();
String jsonString = metrics.getNameservices();
JSONObject jsonObject = new JSONObject(jsonString);
Iterator<?> keys = jsonObject.keys();
while (keys.hasNext()) {
String key = (String) keys.next();
JSONObject json = jsonObject.getJSONObject(key);
String nsId = json.getString("nameserviceId");
String state = json.getString("state");
if (nsId.equals("ns0")) {
assertEquals("DISABLED", state);
numDisabled++;
} else {
assertEquals("ACTIVE", state);
numActive++;
}
}
assertEquals(1, numActive);
assertEquals(1, numDisabled);
}
private static void disableNameservice(final String nsId)
throws IOException {
NameserviceManager nsManager = routerAdminClient.getNameserviceManager();
DisableNameserviceRequest req =
DisableNameserviceRequest.newInstance(nsId);
nsManager.disableNameservice(req);
Router router = routerContext.getRouter();
StateStoreService stateStore = router.getStateStore();
DisabledNameserviceStore store =
stateStore.getRegisteredRecordStore(DisabledNameserviceStore.class);
store.loadCache(true);
MembershipNamenodeResolver resolver =
(MembershipNamenodeResolver) router.getNamenodeResolver();
resolver.loadCache(true);
}
}

View File

@ -17,21 +17,27 @@
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createNamenodeReport;
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords;
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
@ -52,6 +58,7 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableE
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.junit.AfterClass;
import org.junit.Before;
@ -86,6 +93,14 @@ public class TestRouterAdmin {
mockMountTable = cluster.generateMockMountTable();
Router router = routerContext.getRouter();
stateStore = router.getStateStore();
// Add two name services for testing disabling
ActiveNamenodeResolver membership = router.getNamenodeResolver();
membership.registerNamenode(
createNamenodeReport("ns0", "nn1", HAServiceState.ACTIVE));
membership.registerNamenode(
createNamenodeReport("ns1", "nn1", HAServiceState.ACTIVE));
stateStore.refreshCaches(true);
}
@AfterClass
@ -97,6 +112,8 @@ public class TestRouterAdmin {
public void testSetup() throws Exception {
assertTrue(
synchronizeRecords(stateStore, mockMountTable, MountTable.class));
// Avoid running with random users
routerContext.resetAdminClient();
}
@Test
@ -375,6 +392,37 @@ public class TestRouterAdmin {
assertTrue(enableResp.getStatus());
disabled = getDisabledNameservices(nsManager);
assertTrue(disabled.isEmpty());
// Non existing name services should fail
disableReq = DisableNameserviceRequest.newInstance("nsunknown");
disableResp = nsManager.disableNameservice(disableReq);
assertFalse(disableResp.getStatus());
}
@Test
public void testNameserviceManagerUnauthorized() throws Exception {
// Try to disable a name service with a random user
final String username = "baduser";
UserGroupInformation user =
UserGroupInformation.createRemoteUser(username);
user.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
RouterClient client = routerContext.getAdminClient();
NameserviceManager nameservices = client.getNameserviceManager();
DisableNameserviceRequest disableReq =
DisableNameserviceRequest.newInstance("ns0");
try {
nameservices.disableNameservice(disableReq);
fail("We should not be able to disable nameservices");
} catch (IOException ioe) {
assertExceptionContains(
username + " is not a super user", ioe);
}
return null;
}
});
}
private Set<String> getDisabledNameservices(NameserviceManager nsManager)

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createNamenodeReport;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@ -27,10 +28,12 @@ import java.net.InetSocketAddress;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
@ -93,6 +96,14 @@ public class TestRouterAdminCLI {
routerSocket);
admin = new RouterAdmin(routerConf);
client = routerContext.getAdminClient();
// Add two fake name services to testing disabling them
ActiveNamenodeResolver membership = router.getNamenodeResolver();
membership.registerNamenode(
createNamenodeReport("ns0", "nn1", HAServiceState.ACTIVE));
membership.registerNamenode(
createNamenodeReport("ns1", "nn1", HAServiceState.ACTIVE));
stateStore.refreshCaches(true);
}
@AfterClass

View File

@ -431,6 +431,8 @@ Usage:
[-setQuota <path> -nsQuota <nsQuota> -ssQuota <quota in bytes or quota size string>]
[-clrQuota <path>]
[-safemode enter | leave | get]
[-nameservice disable | enable <nameservice>]
[-getDisabledNameservices]
| COMMAND\_OPTION | Description |
|:---- |:---- |
@ -440,6 +442,8 @@ Usage:
| `-setQuota` *path* `-nsQuota` *nsQuota* `-ssQuota` *ssQuota* | Set quota for specified path. See [HDFS Quotas Guide](./HdfsQuotaAdminGuide.html) for the quota detail. |
| `-clrQuota` *path* | Clear quota of given mount point. See [HDFS Quotas Guide](./HdfsQuotaAdminGuide.html) for the quota detail. |
| `-safemode` `enter` `leave` `get` | Manually set the Router entering or leaving safe mode. The option *get* will be used for verifying if the Router is in safe mode state. |
| `-nameservice` `disable` `enable` *nameservice* | Disable/enable a name service from the federation. If disabled, requests will not go to that name service. |
| `-getDisabledNameservices` | Get the name services that are disabled in the federation. |
The commands for managing Router-based federation. See [Mount table management](./HDFSRouterFederation.html#Mount_table_management) for more info.