HDFS-13484. RBF: Disable Nameservices from the federation. Contributed by Inigo Goiri.
(cherry picked from commit 30fef0bf1e5c8c0ca073df99ad9b33cb0e4431a5)
This commit is contained in:
parent
3043a93d46
commit
999ea44b9f
|
@ -642,7 +642,7 @@ public class FederationMetrics implements FederationMBean {
|
|||
namenodeResolver.getNamenodesForNameserviceId(nsId);
|
||||
if (nns != null) {
|
||||
FederationNamenodeContext nn = nns.get(0);
|
||||
if (nn != null && nn instanceof MembershipState) {
|
||||
if (nn instanceof MembershipState) {
|
||||
resultList.add((MembershipState) nn);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -108,6 +108,14 @@ public interface ActiveNamenodeResolver {
|
|||
*/
|
||||
Set<FederationNamespaceInfo> getNamespaces() throws IOException;
|
||||
|
||||
/**
|
||||
* Get a list of all namespaces that are disabled.
|
||||
*
|
||||
* @return List of name spaces identifier in the federation
|
||||
* @throws IOException If the disabled list is not available.
|
||||
*/
|
||||
Set<String> getDisabledNamespaces() throws IOException;
|
||||
|
||||
/**
|
||||
* Assign a unique identifier for the parent router service.
|
||||
* Required to report the status to the namenode resolver.
|
||||
|
|
|
@ -27,7 +27,8 @@ public enum FederationNamenodeServiceState {
|
|||
ACTIVE, // HAServiceState.ACTIVE or operational.
|
||||
STANDBY, // HAServiceState.STANDBY.
|
||||
UNAVAILABLE, // When the namenode cannot be reached.
|
||||
EXPIRED; // When the last update is too old.
|
||||
EXPIRED, // When the last update is too old.
|
||||
DISABLED; // When the nameservice is disabled.
|
||||
|
||||
public static FederationNamenodeServiceState getState(HAServiceState state) {
|
||||
switch(state) {
|
||||
|
|
|
@ -29,10 +29,13 @@ import java.util.Iterator;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.DisabledNameserviceStore;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.RecordStore;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreCache;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
|
||||
|
@ -63,6 +66,8 @@ public class MembershipNamenodeResolver
|
|||
private final StateStoreService stateStore;
|
||||
/** Membership State Store interface. */
|
||||
private MembershipStore membershipInterface;
|
||||
/** Disabled Nameservice State Store interface. */
|
||||
private DisabledNameserviceStore disabledNameserviceInterface;
|
||||
|
||||
/** Parent router ID. */
|
||||
private String routerId;
|
||||
|
@ -88,22 +93,38 @@ public class MembershipNamenodeResolver
|
|||
|
||||
private synchronized MembershipStore getMembershipStore() throws IOException {
|
||||
if (this.membershipInterface == null) {
|
||||
this.membershipInterface = this.stateStore.getRegisteredRecordStore(
|
||||
MembershipStore.class);
|
||||
if (this.membershipInterface == null) {
|
||||
throw new IOException("State Store does not have an interface for " +
|
||||
MembershipStore.class.getSimpleName());
|
||||
}
|
||||
this.membershipInterface = getStoreInterface(MembershipStore.class);
|
||||
}
|
||||
return this.membershipInterface;
|
||||
}
|
||||
|
||||
private synchronized DisabledNameserviceStore getDisabledNameserviceStore()
|
||||
throws IOException {
|
||||
if (this.disabledNameserviceInterface == null) {
|
||||
this.disabledNameserviceInterface =
|
||||
getStoreInterface(DisabledNameserviceStore.class);
|
||||
}
|
||||
return this.disabledNameserviceInterface;
|
||||
}
|
||||
|
||||
private <T extends RecordStore<?>> T getStoreInterface(Class<T> clazz)
|
||||
throws IOException{
|
||||
T store = this.stateStore.getRegisteredRecordStore(clazz);
|
||||
if (store == null) {
|
||||
throw new IOException("State Store does not have an interface for " +
|
||||
clazz.getSimpleName());
|
||||
}
|
||||
return store;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean loadCache(boolean force) {
|
||||
// Our cache depends on the store, update it first
|
||||
try {
|
||||
MembershipStore membership = getMembershipStore();
|
||||
membership.loadCache(force);
|
||||
DisabledNameserviceStore disabled = getDisabledNameserviceStore();
|
||||
disabled.loadCache(force);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Cannot update membership from the State Store", e);
|
||||
}
|
||||
|
@ -151,30 +172,48 @@ public class MembershipNamenodeResolver
|
|||
final String nsId) throws IOException {
|
||||
|
||||
List<? extends FederationNamenodeContext> ret = cacheNS.get(nsId);
|
||||
if (ret == null) {
|
||||
if (ret != null) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
// Not cached, generate the value
|
||||
final List<MembershipState> result;
|
||||
try {
|
||||
MembershipState partial = MembershipState.newInstance();
|
||||
partial.setNameserviceId(nsId);
|
||||
GetNamenodeRegistrationsRequest request =
|
||||
GetNamenodeRegistrationsRequest.newInstance(partial);
|
||||
|
||||
final List<MembershipState> result =
|
||||
getRecentRegistrationForQuery(request, true, false);
|
||||
result = getRecentRegistrationForQuery(request, true, false);
|
||||
} catch (StateStoreUnavailableException e) {
|
||||
LOG.error("Cannot get active NN for {}, State Store unavailable", nsId);
|
||||
return null;
|
||||
}
|
||||
if (result == null || result.isEmpty()) {
|
||||
LOG.error("Cannot locate eligible NNs for {}", nsId);
|
||||
return null;
|
||||
}
|
||||
|
||||
// Mark disabled name services
|
||||
try {
|
||||
Set<String> disabled =
|
||||
getDisabledNameserviceStore().getDisabledNameservices();
|
||||
if (disabled == null) {
|
||||
LOG.error("Cannot get disabled name services");
|
||||
} else {
|
||||
cacheNS.put(nsId, result);
|
||||
ret = result;
|
||||
for (MembershipState nn : result) {
|
||||
if (disabled.contains(nn.getNameserviceId())) {
|
||||
nn.setState(FederationNamenodeServiceState.DISABLED);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (StateStoreUnavailableException e) {
|
||||
LOG.error("Cannot get active NN for {}, State Store unavailable", nsId);
|
||||
LOG.error("Cannot get disabled name services, State Store unavailable");
|
||||
}
|
||||
}
|
||||
if (ret == null) {
|
||||
return null;
|
||||
}
|
||||
return Collections.unmodifiableList(ret);
|
||||
|
||||
// Cache the response
|
||||
ret = Collections.unmodifiableList(result);
|
||||
cacheNS.put(nsId, result);
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -261,7 +300,24 @@ public class MembershipNamenodeResolver
|
|||
GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance();
|
||||
GetNamespaceInfoResponse response =
|
||||
getMembershipStore().getNamespaceInfo(request);
|
||||
return response.getNamespaceInfo();
|
||||
Set<FederationNamespaceInfo> nss = response.getNamespaceInfo();
|
||||
|
||||
// Filter disabled namespaces
|
||||
Set<FederationNamespaceInfo> ret = new TreeSet<>();
|
||||
Set<String> disabled = getDisabledNamespaces();
|
||||
for (FederationNamespaceInfo ns : nss) {
|
||||
if (!disabled.contains(ns.getNameserviceId())) {
|
||||
ret.add(ns);
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getDisabledNamespaces() throws IOException {
|
||||
DisabledNameserviceStore store = getDisabledNameserviceStore();
|
||||
return store.getDisabledNameservices();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -29,6 +29,8 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|||
import org.apache.hadoop.hdfs.protocol.proto.RouterProtocolProtos.RouterAdminProtocolService;
|
||||
import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolPB;
|
||||
import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.DisabledNameserviceStore;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
|
||||
|
@ -282,28 +284,60 @@ public class RouterAdminServer extends AbstractService
|
|||
@Override
|
||||
public DisableNameserviceResponse disableNameservice(
|
||||
DisableNameserviceRequest request) throws IOException {
|
||||
// TODO check permissions
|
||||
|
||||
RouterPermissionChecker pc = getPermissionChecker();
|
||||
if (pc != null) {
|
||||
pc.checkSuperuserPrivilege();
|
||||
}
|
||||
|
||||
String nsId = request.getNameServiceId();
|
||||
// TODO check that the name service exists
|
||||
boolean success = getDisabledNameserviceStore().disableNameservice(nsId);
|
||||
boolean success = false;
|
||||
if (namespaceExists(nsId)) {
|
||||
success = getDisabledNameserviceStore().disableNameservice(nsId);
|
||||
} else {
|
||||
LOG.error("Cannot disable {}, it does not exists", nsId);
|
||||
}
|
||||
return DisableNameserviceResponse.newInstance(success);
|
||||
}
|
||||
|
||||
private boolean namespaceExists(final String nsId) throws IOException {
|
||||
boolean found = false;
|
||||
ActiveNamenodeResolver resolver = router.getNamenodeResolver();
|
||||
Set<FederationNamespaceInfo> nss = resolver.getNamespaces();
|
||||
for (FederationNamespaceInfo ns : nss) {
|
||||
if (nsId.equals(ns.getNameserviceId())) {
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return found;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EnableNameserviceResponse enableNameservice(
|
||||
EnableNameserviceRequest request) throws IOException {
|
||||
// TODO check permissions
|
||||
RouterPermissionChecker pc = getPermissionChecker();
|
||||
if (pc != null) {
|
||||
pc.checkSuperuserPrivilege();
|
||||
}
|
||||
|
||||
String nsId = request.getNameServiceId();
|
||||
// TODO check that the name service exists
|
||||
boolean success = getDisabledNameserviceStore().enableNameservice(nsId);
|
||||
DisabledNameserviceStore store = getDisabledNameserviceStore();
|
||||
Set<String> disabled = store.getDisabledNameservices();
|
||||
boolean success = false;
|
||||
if (disabled.contains(nsId)) {
|
||||
success = store.enableNameservice(nsId);
|
||||
} else {
|
||||
LOG.error("Cannot enable {}, it was not disabled", nsId);
|
||||
}
|
||||
return EnableNameserviceResponse.newInstance(success);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetDisabledNameservicesResponse getDisabledNameservices(
|
||||
GetDisabledNameservicesRequest request) throws IOException {
|
||||
// TODO check permissions
|
||||
Set<String> nsIds = getDisabledNameserviceStore().getDisabledNameservices();
|
||||
Set<String> nsIds =
|
||||
getDisabledNameserviceStore().getDisabledNameservices();
|
||||
return GetDisabledNameservicesResponse.newInstance(nsIds);
|
||||
}
|
||||
|
||||
|
|
|
@ -17,12 +17,17 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.router;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
||||
|
@ -35,9 +40,23 @@ public class RouterPermissionChecker extends FSPermissionChecker {
|
|||
/** Mount table default permission. */
|
||||
public static final short MOUNT_TABLE_PERMISSION_DEFAULT = 00755;
|
||||
|
||||
public RouterPermissionChecker(String routerOwner, String supergroup,
|
||||
/** Name of the super user. */
|
||||
private final String superUser;
|
||||
/** Name of the super group. */
|
||||
private final String superGroup;
|
||||
|
||||
public RouterPermissionChecker(String user, String group,
|
||||
UserGroupInformation callerUgi) {
|
||||
super(routerOwner, supergroup, callerUgi, null);
|
||||
super(user, group, callerUgi, null);
|
||||
this.superUser = user;
|
||||
this.superGroup = group;
|
||||
}
|
||||
|
||||
public RouterPermissionChecker(String user, String group)
|
||||
throws IOException {
|
||||
super(user, group, UserGroupInformation.getCurrentUser(), null);
|
||||
this.superUser = user;
|
||||
this.superGroup = group;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -79,4 +98,40 @@ public class RouterPermissionChecker extends FSPermissionChecker {
|
|||
+ ": user " + getUser() + " does not have " + access.toString()
|
||||
+ " permissions.");
|
||||
}
|
||||
|
||||
/**
|
||||
* Check the superuser privileges of the current RPC caller. This method is
|
||||
* based on Datanode#checkSuperuserPrivilege().
|
||||
* @throws AccessControlException If the user is not authorized.
|
||||
*/
|
||||
@Override
|
||||
public void checkSuperuserPrivilege() throws AccessControlException {
|
||||
|
||||
// Try to get the ugi in the RPC call.
|
||||
UserGroupInformation ugi = null;
|
||||
try {
|
||||
ugi = NameNode.getRemoteUser();
|
||||
} catch (IOException e) {
|
||||
// Ignore as we catch it afterwards
|
||||
}
|
||||
if (ugi == null) {
|
||||
LOG.error("Cannot get the remote user name");
|
||||
throw new AccessControlException("Cannot get the remote user name");
|
||||
}
|
||||
|
||||
// Is this by the Router user itself?
|
||||
if (ugi.getUserName().equals(superUser)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Is the user a member of the super group?
|
||||
List<String> groups = Arrays.asList(ugi.getGroupNames());
|
||||
if (groups.contains(superGroup)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Not a superuser
|
||||
throw new AccessControlException(
|
||||
ugi.getUserName() + " is not a super user");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ import java.io.FileNotFoundException;
|
|||
import java.io.IOException;
|
||||
import java.lang.reflect.Array;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
|
@ -2270,7 +2271,15 @@ public class RouterRpcServer extends AbstractService
|
|||
}
|
||||
}
|
||||
|
||||
return location.getDestinations();
|
||||
// Filter disabled subclusters
|
||||
Set<String> disabled = namenodeResolver.getDisabledNamespaces();
|
||||
List<RemoteLocation> locs = new ArrayList<>();
|
||||
for (RemoteLocation loc : location.getDestinations()) {
|
||||
if (!disabled.contains(loc.getNameserviceId())) {
|
||||
locs.add(loc);
|
||||
}
|
||||
}
|
||||
return locs;
|
||||
} catch (IOException ioe) {
|
||||
if (this.rpcMonitor != null) {
|
||||
this.rpcMonitor.routerFailureStateStore();
|
||||
|
|
|
@ -229,6 +229,17 @@ Ls command will show below information for each mount table entry:
|
|||
Source Destinations Owner Group Mode Quota/Usage
|
||||
/path ns0->/path root supergroup rwxr-xr-x [NsQuota: 50/0, SsQuota: 100 B/0 B]
|
||||
|
||||
### Disabling nameservices
|
||||
|
||||
To prevent accessing a nameservice (sublcuster), it can be disabled from the federation.
|
||||
For example, one can disable `ns1`, list it and enable it again:
|
||||
|
||||
[hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -nameservice disable ns1
|
||||
[hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -getDisabledNameservices
|
||||
[hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -nameservice enable ns1
|
||||
|
||||
This is useful when decommissioning subclusters or when one subcluster is missbehaving (e.g., low performance or unavailability).
|
||||
|
||||
Client configuration
|
||||
--------------------
|
||||
|
||||
|
|
|
@ -20,6 +20,9 @@ package org.apache.hadoop.hdfs.server.federation;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.FileNotFoundException;
|
||||
|
@ -49,9 +52,18 @@ import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
|||
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.mockito.internal.util.reflection.Whitebox;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
|
||||
|
@ -60,6 +72,9 @@ import com.google.common.base.Supplier;
|
|||
*/
|
||||
public final class FederationTestUtils {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(FederationTestUtils.class);
|
||||
|
||||
public final static String[] NAMESERVICES = {"ns0", "ns1"};
|
||||
public final static String[] NAMENODES = {"nn0", "nn1", "nn2", "nn3"};
|
||||
public final static String[] ROUTERS =
|
||||
|
@ -274,4 +289,31 @@ public final class FederationTestUtils {
|
|||
throws IOException {
|
||||
return fs.delete(new Path(path), true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Simulate that a Namenode is slow by adding a sleep to the check operation
|
||||
* in the NN.
|
||||
* @param nn Namenode to simulate slow.
|
||||
* @param seconds Number of seconds to add to the Namenode.
|
||||
* @throws Exception If we cannot add the sleep time.
|
||||
*/
|
||||
public static void simulateSlowNamenode(final NameNode nn, final int seconds)
|
||||
throws Exception {
|
||||
FSNamesystem namesystem = nn.getNamesystem();
|
||||
HAContext haContext = namesystem.getHAContext();
|
||||
HAContext spyHAContext = spy(haContext);
|
||||
doAnswer(new Answer<Object>() {
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||
LOG.info("Simulating slow namenode {}", invocation.getMock());
|
||||
try {
|
||||
Thread.sleep(seconds * 1000);
|
||||
} catch(InterruptedException e) {
|
||||
LOG.error("Simulating a slow namenode aborted");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}).when(spyHAContext).checkOperation(any(OperationCategory.class));
|
||||
Whitebox.setInternalState(namesystem, "haContext", spyHAContext);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -185,6 +185,10 @@ public class MiniRouterDFSCluster {
|
|||
return this.fileContext;
|
||||
}
|
||||
|
||||
public URI getFileSystemURI() {
|
||||
return fileSystemUri;
|
||||
}
|
||||
|
||||
public String getHttpAddress() {
|
||||
InetSocketAddress httpAddress = router.getHttpServerAddress();
|
||||
return NetUtils.getHostPortString(httpAddress);
|
||||
|
@ -236,6 +240,10 @@ public class MiniRouterDFSCluster {
|
|||
return adminClient;
|
||||
}
|
||||
|
||||
public void resetAdminClient() {
|
||||
adminClient = null;
|
||||
}
|
||||
|
||||
public DFSClient getClient() throws IOException, URISyntaxException {
|
||||
if (client == null) {
|
||||
LOG.info("Connecting to router at {}", fileSystemUri);
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.util.LinkedList;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
||||
|
@ -262,6 +263,11 @@ public class MockResolver
|
|||
return this.namespaces;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getDisabledNamespaces() throws IOException {
|
||||
return new TreeSet<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public PathLocation getDestinationForPath(String path) throws IOException {
|
||||
List<RemoteLocation> remoteLocations = new LinkedList<>();
|
||||
|
|
|
@ -0,0 +1,236 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.router;
|
||||
|
||||
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateSlowNamenode;
|
||||
import static org.apache.hadoop.util.Time.monotonicNow;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
|
||||
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
|
||||
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.federation.metrics.FederationMetrics;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.DisabledNameserviceStore;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.DisableNameserviceRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.codehaus.jettison.json.JSONObject;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Test the behavior when disabling name services.
|
||||
*/
|
||||
public class TestDisableNameservices {
|
||||
|
||||
private static StateStoreDFSCluster cluster;
|
||||
private static RouterContext routerContext;
|
||||
private static RouterClient routerAdminClient;
|
||||
private static ClientProtocol routerProtocol;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
// Build and start a federated cluster
|
||||
cluster = new StateStoreDFSCluster(false, 2);
|
||||
Configuration routerConf = new RouterConfigBuilder()
|
||||
.stateStore()
|
||||
.metrics()
|
||||
.admin()
|
||||
.rpc()
|
||||
.build();
|
||||
// Reduce the number of RPC threads to saturate the Router easy
|
||||
routerConf.setInt(RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY, 8);
|
||||
routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 4);
|
||||
|
||||
// Set the DNs to belong to only one subcluster
|
||||
cluster.setIndependentDNs();
|
||||
|
||||
cluster.addRouterOverrides(routerConf);
|
||||
// override some settings for the client
|
||||
cluster.startCluster();
|
||||
cluster.startRouters();
|
||||
cluster.waitClusterUp();
|
||||
|
||||
routerContext = cluster.getRandomRouter();
|
||||
routerProtocol = routerContext.getClient().getNamenode();
|
||||
routerAdminClient = routerContext.getAdminClient();
|
||||
|
||||
setupNamespace();
|
||||
|
||||
// Simulate one of the subclusters to be slow
|
||||
MiniDFSCluster dfsCluster = cluster.getCluster();
|
||||
NameNode nn0 = dfsCluster.getNameNode(0);
|
||||
simulateSlowNamenode(nn0, 1);
|
||||
}
|
||||
|
||||
private static void setupNamespace() throws IOException {
|
||||
|
||||
// Setup a mount table to map to the two namespaces
|
||||
MountTableManager mountTable = routerAdminClient.getMountTableManager();
|
||||
Map<String, String> destinations = new TreeMap<>();
|
||||
destinations.put("ns0", "/");
|
||||
destinations.put("ns1", "/");
|
||||
MountTable newEntry = MountTable.newInstance("/", destinations);
|
||||
newEntry.setDestOrder(DestinationOrder.RANDOM);
|
||||
AddMountTableEntryRequest request =
|
||||
AddMountTableEntryRequest.newInstance(newEntry);
|
||||
mountTable.addMountTableEntry(request);
|
||||
|
||||
// Refresh the cache in the Router
|
||||
Router router = routerContext.getRouter();
|
||||
MountTableResolver mountTableResolver =
|
||||
(MountTableResolver) router.getSubclusterResolver();
|
||||
mountTableResolver.loadCache(true);
|
||||
|
||||
// Add a folder to each namespace
|
||||
NamenodeContext nn0 = cluster.getNamenode("ns0", null);
|
||||
nn0.getFileSystem().mkdirs(new Path("/dirns0"));
|
||||
NamenodeContext nn1 = cluster.getNamenode("ns1", null);
|
||||
nn1.getFileSystem().mkdirs(new Path("/dirns1"));
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
if (cluster != null) {
|
||||
cluster.stopRouter(routerContext);
|
||||
cluster.shutdown();
|
||||
cluster = null;
|
||||
}
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanup() throws IOException {
|
||||
Router router = routerContext.getRouter();
|
||||
StateStoreService stateStore = router.getStateStore();
|
||||
DisabledNameserviceStore store =
|
||||
stateStore.getRegisteredRecordStore(DisabledNameserviceStore.class);
|
||||
store.loadCache(true);
|
||||
|
||||
Set<String> disabled = store.getDisabledNameservices();
|
||||
for (String nsId : disabled) {
|
||||
store.enableNameservice(nsId);
|
||||
}
|
||||
store.loadCache(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithoutDisabling() throws IOException {
|
||||
|
||||
// ns0 is slow and renewLease should take a long time
|
||||
long t0 = monotonicNow();
|
||||
routerProtocol.renewLease("client0");
|
||||
long t = monotonicNow() - t0;
|
||||
assertTrue("It took too little: " + t + "ms",
|
||||
t > TimeUnit.SECONDS.toMillis(1));
|
||||
|
||||
// Return the results from all subclusters even if slow
|
||||
FileSystem routerFs = routerContext.getFileSystem();
|
||||
FileStatus[] filesStatus = routerFs.listStatus(new Path("/"));
|
||||
assertEquals(2, filesStatus.length);
|
||||
assertEquals("dirns0", filesStatus[0].getPath().getName());
|
||||
assertEquals("dirns1", filesStatus[1].getPath().getName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDisabling() throws Exception {
|
||||
|
||||
disableNameservice("ns0");
|
||||
|
||||
// renewLease should be fast as we are skipping ns0
|
||||
long t0 = monotonicNow();
|
||||
routerProtocol.renewLease("client0");
|
||||
long t = monotonicNow() - t0;
|
||||
assertTrue("It took too long: " + t + "ms",
|
||||
t < TimeUnit.SECONDS.toMillis(1));
|
||||
|
||||
// We should not report anything from ns0
|
||||
FileSystem routerFs = routerContext.getFileSystem();
|
||||
FileStatus[] filesStatus = routerFs.listStatus(new Path("/"));
|
||||
assertEquals(1, filesStatus.length);
|
||||
assertEquals("dirns1", filesStatus[0].getPath().getName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMetrics() throws Exception {
|
||||
disableNameservice("ns0");
|
||||
|
||||
int numActive = 0;
|
||||
int numDisabled = 0;
|
||||
Router router = routerContext.getRouter();
|
||||
FederationMetrics metrics = router.getMetrics();
|
||||
String jsonString = metrics.getNameservices();
|
||||
JSONObject jsonObject = new JSONObject(jsonString);
|
||||
Iterator<?> keys = jsonObject.keys();
|
||||
while (keys.hasNext()) {
|
||||
String key = (String) keys.next();
|
||||
JSONObject json = jsonObject.getJSONObject(key);
|
||||
String nsId = json.getString("nameserviceId");
|
||||
String state = json.getString("state");
|
||||
if (nsId.equals("ns0")) {
|
||||
assertEquals("DISABLED", state);
|
||||
numDisabled++;
|
||||
} else {
|
||||
assertEquals("ACTIVE", state);
|
||||
numActive++;
|
||||
}
|
||||
}
|
||||
assertEquals(1, numActive);
|
||||
assertEquals(1, numDisabled);
|
||||
}
|
||||
|
||||
private static void disableNameservice(final String nsId)
|
||||
throws IOException {
|
||||
NameserviceManager nsManager = routerAdminClient.getNameserviceManager();
|
||||
DisableNameserviceRequest req =
|
||||
DisableNameserviceRequest.newInstance(nsId);
|
||||
nsManager.disableNameservice(req);
|
||||
|
||||
Router router = routerContext.getRouter();
|
||||
StateStoreService stateStore = router.getStateStore();
|
||||
DisabledNameserviceStore store =
|
||||
stateStore.getRegisteredRecordStore(DisabledNameserviceStore.class);
|
||||
store.loadCache(true);
|
||||
MembershipNamenodeResolver resolver =
|
||||
(MembershipNamenodeResolver) router.getNamenodeResolver();
|
||||
resolver.loadCache(true);
|
||||
}
|
||||
|
||||
}
|
|
@ -17,21 +17,27 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.router;
|
||||
|
||||
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createNamenodeReport;
|
||||
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords;
|
||||
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
|
||||
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
|
||||
|
@ -52,6 +58,7 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableE
|
|||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
|
@ -86,6 +93,14 @@ public class TestRouterAdmin {
|
|||
mockMountTable = cluster.generateMockMountTable();
|
||||
Router router = routerContext.getRouter();
|
||||
stateStore = router.getStateStore();
|
||||
|
||||
// Add two name services for testing disabling
|
||||
ActiveNamenodeResolver membership = router.getNamenodeResolver();
|
||||
membership.registerNamenode(
|
||||
createNamenodeReport("ns0", "nn1", HAServiceState.ACTIVE));
|
||||
membership.registerNamenode(
|
||||
createNamenodeReport("ns1", "nn1", HAServiceState.ACTIVE));
|
||||
stateStore.refreshCaches(true);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
@ -97,6 +112,8 @@ public class TestRouterAdmin {
|
|||
public void testSetup() throws Exception {
|
||||
assertTrue(
|
||||
synchronizeRecords(stateStore, mockMountTable, MountTable.class));
|
||||
// Avoid running with random users
|
||||
routerContext.resetAdminClient();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -375,6 +392,37 @@ public class TestRouterAdmin {
|
|||
assertTrue(enableResp.getStatus());
|
||||
disabled = getDisabledNameservices(nsManager);
|
||||
assertTrue(disabled.isEmpty());
|
||||
|
||||
// Non existing name services should fail
|
||||
disableReq = DisableNameserviceRequest.newInstance("nsunknown");
|
||||
disableResp = nsManager.disableNameservice(disableReq);
|
||||
assertFalse(disableResp.getStatus());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNameserviceManagerUnauthorized() throws Exception {
|
||||
|
||||
// Try to disable a name service with a random user
|
||||
final String username = "baduser";
|
||||
UserGroupInformation user =
|
||||
UserGroupInformation.createRemoteUser(username);
|
||||
user.doAs(new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
RouterClient client = routerContext.getAdminClient();
|
||||
NameserviceManager nameservices = client.getNameserviceManager();
|
||||
DisableNameserviceRequest disableReq =
|
||||
DisableNameserviceRequest.newInstance("ns0");
|
||||
try {
|
||||
nameservices.disableNameservice(disableReq);
|
||||
fail("We should not be able to disable nameservices");
|
||||
} catch (IOException ioe) {
|
||||
assertExceptionContains(
|
||||
username + " is not a super user", ioe);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private Set<String> getDisabledNameservices(NameserviceManager nsManager)
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.router;
|
||||
|
||||
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createNamenodeReport;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
@ -27,10 +28,12 @@ import java.net.InetSocketAddress;
|
|||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
|
||||
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
|
||||
|
@ -93,6 +96,14 @@ public class TestRouterAdminCLI {
|
|||
routerSocket);
|
||||
admin = new RouterAdmin(routerConf);
|
||||
client = routerContext.getAdminClient();
|
||||
|
||||
// Add two fake name services to testing disabling them
|
||||
ActiveNamenodeResolver membership = router.getNamenodeResolver();
|
||||
membership.registerNamenode(
|
||||
createNamenodeReport("ns0", "nn1", HAServiceState.ACTIVE));
|
||||
membership.registerNamenode(
|
||||
createNamenodeReport("ns1", "nn1", HAServiceState.ACTIVE));
|
||||
stateStore.refreshCaches(true);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
|
|
@ -431,6 +431,8 @@ Usage:
|
|||
[-setQuota <path> -nsQuota <nsQuota> -ssQuota <quota in bytes or quota size string>]
|
||||
[-clrQuota <path>]
|
||||
[-safemode enter | leave | get]
|
||||
[-nameservice disable | enable <nameservice>]
|
||||
[-getDisabledNameservices]
|
||||
|
||||
| COMMAND\_OPTION | Description |
|
||||
|:---- |:---- |
|
||||
|
@ -440,6 +442,8 @@ Usage:
|
|||
| `-setQuota` *path* `-nsQuota` *nsQuota* `-ssQuota` *ssQuota* | Set quota for specified path. See [HDFS Quotas Guide](./HdfsQuotaAdminGuide.html) for the quota detail. |
|
||||
| `-clrQuota` *path* | Clear quota of given mount point. See [HDFS Quotas Guide](./HdfsQuotaAdminGuide.html) for the quota detail. |
|
||||
| `-safemode` `enter` `leave` `get` | Manually set the Router entering or leaving safe mode. The option *get* will be used for verifying if the Router is in safe mode state. |
|
||||
| `-nameservice` `disable` `enable` *nameservice* | Disable/enable a name service from the federation. If disabled, requests will not go to that name service. |
|
||||
| `-getDisabledNameservices` | Get the name services that are disabled in the federation. |
|
||||
|
||||
The commands for managing Router-based federation. See [Mount table management](./HDFSRouterFederation.html#Mount_table_management) for more info.
|
||||
|
||||
|
|
Loading…
Reference in New Issue