HDFS-13856. RBF: RouterAdmin should support dfsrouteradmin -refreshRouterArgs command. Contributed by yanghuafeng.

This commit is contained in:
Inigo Goiri 2019-01-11 10:11:18 -08:00 committed by Brahma Reddy Battula
parent 3bb5752276
commit 5fcfc3c306
5 changed files with 357 additions and 1 deletions

View File

@ -23,12 +23,14 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Set; import java.util.Set;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.proto.RouterProtocolProtos.RouterAdminProtocolService; import org.apache.hadoop.hdfs.protocol.proto.RouterProtocolProtos.RouterAdminProtocolService;
@ -64,9 +66,15 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableE
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.GenericRefreshProtocol;
import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server; import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.ipc.RefreshRegistry;
import org.apache.hadoop.ipc.RefreshResponse;
import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos;
import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB;
import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolServerSideTranslatorPB;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
@ -81,7 +89,8 @@ import com.google.protobuf.BlockingService;
* router. It is created, started, and stopped by {@link Router}. * router. It is created, started, and stopped by {@link Router}.
*/ */
public class RouterAdminServer extends AbstractService public class RouterAdminServer extends AbstractService
implements MountTableManager, RouterStateManager, NameserviceManager { implements MountTableManager, RouterStateManager, NameserviceManager,
GenericRefreshProtocol {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(RouterAdminServer.class); LoggerFactory.getLogger(RouterAdminServer.class);
@ -160,6 +169,15 @@ public class RouterAdminServer extends AbstractService
router.setAdminServerAddress(this.adminAddress); router.setAdminServerAddress(this.adminAddress);
iStateStoreCache = iStateStoreCache =
router.getSubclusterResolver() instanceof StateStoreCache; router.getSubclusterResolver() instanceof StateStoreCache;
GenericRefreshProtocolServerSideTranslatorPB genericRefreshXlator =
new GenericRefreshProtocolServerSideTranslatorPB(this);
BlockingService genericRefreshService =
GenericRefreshProtocolProtos.GenericRefreshProtocolService.
newReflectiveBlockingService(genericRefreshXlator);
DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class,
genericRefreshService, adminServer);
} }
/** /**
@ -487,4 +505,10 @@ public class RouterAdminServer extends AbstractService
public static String getSuperGroup(){ public static String getSuperGroup(){
return superGroup; return superGroup;
} }
@Override // GenericRefreshProtocol
public Collection<RefreshResponse> refresh(String identifier, String[] args) {
// Let the registry handle as needed
return RefreshRegistry.defaultRegistry().dispatch(identifier, args);
}
} }

View File

@ -19,6 +19,8 @@ package org.apache.hadoop.hdfs.tools.federation;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -26,8 +28,10 @@ import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
@ -61,9 +65,14 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableE
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RefreshResponse;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolClientSideTranslatorPB;
import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
@ -147,6 +156,8 @@ public class RouterAdmin extends Configured implements Tool {
return "\t[-getDisabledNameservices]"; return "\t[-getDisabledNameservices]";
} else if (cmd.equals("-refresh")) { } else if (cmd.equals("-refresh")) {
return "\t[-refresh]"; return "\t[-refresh]";
} else if (cmd.equals("-refreshRouterArgs")) {
return "\t[-refreshRouterArgs <host:ipc_port> <key> [arg1..argn]]";
} }
return getUsage(null); return getUsage(null);
} }
@ -213,6 +224,10 @@ public class RouterAdmin extends Configured implements Tool {
if (argv.length < 3) { if (argv.length < 3) {
return false; return false;
} }
} else if ("-refreshRouterArgs".equals(cmd)) {
if (argv.length < 2) {
return false;
}
} }
return true; return true;
} }
@ -310,6 +325,8 @@ public class RouterAdmin extends Configured implements Tool {
getDisabledNameservices(); getDisabledNameservices();
} else if ("-refresh".equals(cmd)) { } else if ("-refresh".equals(cmd)) {
refresh(address); refresh(address);
} else if ("-refreshRouterArgs".equals(cmd)) {
exitCode = genericRefresh(argv, i);
} else { } else {
throw new IllegalArgumentException("Unknown Command: " + cmd); throw new IllegalArgumentException("Unknown Command: " + cmd);
} }
@ -923,6 +940,61 @@ public class RouterAdmin extends Configured implements Tool {
} }
} }
public int genericRefresh(String[] argv, int i) throws IOException {
String hostport = argv[i++];
String identifier = argv[i++];
String[] args = Arrays.copyOfRange(argv, i, argv.length);
// Get the current configuration
Configuration conf = getConf();
// for security authorization
// server principal for this call
// should be NN's one.
conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY,
conf.get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, ""));
// Create the client
Class<?> xface = GenericRefreshProtocolPB.class;
InetSocketAddress address = NetUtils.createSocketAddr(hostport);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine.class);
GenericRefreshProtocolPB proxy = (GenericRefreshProtocolPB)RPC.getProxy(
xface, RPC.getProtocolVersion(xface), address, ugi, conf,
NetUtils.getDefaultSocketFactory(conf), 0);
Collection<RefreshResponse> responses = null;
try (GenericRefreshProtocolClientSideTranslatorPB xlator =
new GenericRefreshProtocolClientSideTranslatorPB(proxy)) {
// Refresh
responses = xlator.refresh(identifier, args);
int returnCode = 0;
// Print refresh responses
System.out.println("Refresh Responses:\n");
for (RefreshResponse response : responses) {
System.out.println(response.toString());
if (returnCode == 0 && response.getReturnCode() != 0) {
// This is the first non-zero return code, so we should return this
returnCode = response.getReturnCode();
} else if (returnCode != 0 && response.getReturnCode() != 0) {
// Then now we have multiple non-zero return codes,
// so we merge them into -1
returnCode = -1;
}
}
return returnCode;
} finally {
if (responses == null) {
System.out.println("Failed to get response.\n");
return -1;
}
}
}
/** /**
* Normalize a path for that filesystem. * Normalize a path for that filesystem.
* *

View File

@ -274,6 +274,12 @@ For example, one can disable `ns1`, list it and enable it again:
This is useful when decommissioning subclusters or when one subcluster is missbehaving (e.g., low performance or unavailability). This is useful when decommissioning subclusters or when one subcluster is missbehaving (e.g., low performance or unavailability).
### Router server generically refresh
To trigger a runtime-refresh of the resource specified by \<key\> on \<host:ipc\_port\>. For example, to enable white list checking, we just need to send a refresh command other than restart the router server.
[hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -refreshRouterArgs <host:ipc_port> <key> [arg1..argn]
Client configuration Client configuration
-------------------- --------------------

View File

@ -0,0 +1,252 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.tools.federation.RouterAdmin;
import org.apache.hadoop.ipc.RefreshHandler;
import org.apache.hadoop.ipc.RefreshRegistry;
import org.apache.hadoop.ipc.RefreshResponse;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.IOException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Before all tests, a router is spun up.
* Before each test, mock refresh handlers are created and registered.
* After each test, the mock handlers are unregistered.
* After all tests, the router is spun down.
*/
public class TestRouterAdminGenericRefresh {
private static Router router;
private static RouterAdmin admin;
private static RefreshHandler firstHandler;
private static RefreshHandler secondHandler;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
// Build and start a router with admin + RPC
router = new Router();
Configuration config = new RouterConfigBuilder()
.admin()
.rpc()
.build();
router.init(config);
router.start();
admin = new RouterAdmin(config);
}
@AfterClass
public static void tearDownBeforeClass() throws IOException {
if (router != null) {
router.stop();
router.close();
}
}
@Before
public void setUp() throws Exception {
// Register Handlers, first one just sends an ok response
firstHandler = Mockito.mock(RefreshHandler.class);
Mockito.when(firstHandler.handleRefresh(Mockito.anyString(),
Mockito.any(String[].class))).thenReturn(
RefreshResponse.successResponse());
RefreshRegistry.defaultRegistry().register("firstHandler", firstHandler);
// Second handler has conditional response for testing args
secondHandler = Mockito.mock(RefreshHandler.class);
Mockito.when(secondHandler.handleRefresh(
"secondHandler", new String[]{"one", "two"})).thenReturn(
new RefreshResponse(3, "three"));
Mockito.when(secondHandler.handleRefresh(
"secondHandler", new String[]{"one"})).thenReturn(
new RefreshResponse(2, "two"));
RefreshRegistry.defaultRegistry().register("secondHandler", secondHandler);
}
@After
public void tearDown() throws Exception {
RefreshRegistry.defaultRegistry().unregisterAll("firstHandler");
RefreshRegistry.defaultRegistry().unregisterAll("secondHandler");
}
@Test
public void testInvalidCommand() throws Exception {
String[] args = new String[]{"-refreshRouterArgs", "nn"};
int exitCode = admin.run(args);
assertEquals("RouterAdmin should fail due to bad args", -1, exitCode);
}
@Test
public void testInvalidIdentifier() throws Exception {
String[] argv = new String[]{"-refreshRouterArgs", "localhost:" +
router.getAdminServerAddress().getPort(), "unregisteredIdentity"};
int exitCode = admin.run(argv);
assertEquals("RouterAdmin should fail due to no handler registered",
-1, exitCode);
}
@Test
public void testValidIdentifier() throws Exception {
String[] args = new String[]{"-refreshRouterArgs", "localhost:" +
router.getAdminServerAddress().getPort(), "firstHandler"};
int exitCode = admin.run(args);
assertEquals("RouterAdmin should succeed", 0, exitCode);
Mockito.verify(firstHandler).handleRefresh("firstHandler", new String[]{});
// Second handler was never called
Mockito.verify(secondHandler, Mockito.never())
.handleRefresh(Mockito.anyString(), Mockito.any(String[].class));
}
@Test
public void testVariableArgs() throws Exception {
String[] args = new String[]{"-refreshRouterArgs", "localhost:" +
router.getAdminServerAddress().getPort(), "secondHandler", "one"};
int exitCode = admin.run(args);
assertEquals("RouterAdmin should return 2", 2, exitCode);
exitCode = admin.run(new String[]{"-refreshRouterArgs", "localhost:" +
router.getAdminServerAddress().getPort(),
"secondHandler", "one", "two"});
assertEquals("RouterAdmin should now return 3", 3, exitCode);
Mockito.verify(secondHandler).handleRefresh(
"secondHandler", new String[]{"one"});
Mockito.verify(secondHandler).handleRefresh(
"secondHandler", new String[]{"one", "two"});
}
@Test
public void testUnregistration() throws Exception {
RefreshRegistry.defaultRegistry().unregisterAll("firstHandler");
// And now this should fail
String[] args = new String[]{"-refreshRouterArgs", "localhost:" +
router.getAdminServerAddress().getPort(), "firstHandler"};
int exitCode = admin.run(args);
assertEquals("RouterAdmin should return -1", -1, exitCode);
}
@Test
public void testUnregistrationReturnValue() {
RefreshHandler mockHandler = Mockito.mock(RefreshHandler.class);
RefreshRegistry.defaultRegistry().register("test", mockHandler);
boolean ret = RefreshRegistry.defaultRegistry().
unregister("test", mockHandler);
assertTrue(ret);
}
@Test
public void testMultipleRegistration() throws Exception {
RefreshRegistry.defaultRegistry().register("sharedId", firstHandler);
RefreshRegistry.defaultRegistry().register("sharedId", secondHandler);
// this should trigger both
String[] args = new String[]{"-refreshRouterArgs", "localhost:" +
router.getAdminServerAddress().getPort(), "sharedId", "one"};
int exitCode = admin.run(args);
// -1 because one of the responses is unregistered
assertEquals(-1, exitCode);
// verify we called both
Mockito.verify(firstHandler).handleRefresh(
"sharedId", new String[]{"one"});
Mockito.verify(secondHandler).handleRefresh(
"sharedId", new String[]{"one"});
RefreshRegistry.defaultRegistry().unregisterAll("sharedId");
}
@Test
public void testMultipleReturnCodeMerging() throws Exception {
// Two handlers which return two non-zero values
RefreshHandler handlerOne = Mockito.mock(RefreshHandler.class);
Mockito.when(handlerOne.handleRefresh(Mockito.anyString(),
Mockito.any(String[].class))).thenReturn(
new RefreshResponse(23, "Twenty Three"));
RefreshHandler handlerTwo = Mockito.mock(RefreshHandler.class);
Mockito.when(handlerTwo.handleRefresh(Mockito.anyString(),
Mockito.any(String[].class))).thenReturn(
new RefreshResponse(10, "Ten"));
// Then registered to the same ID
RefreshRegistry.defaultRegistry().register("shared", handlerOne);
RefreshRegistry.defaultRegistry().register("shared", handlerTwo);
// We refresh both
String[] args = new String[]{"-refreshRouterArgs", "localhost:" +
router.getAdminServerAddress().getPort(), "shared"};
int exitCode = admin.run(args);
// We get -1 because of our logic for melding non-zero return codes
assertEquals(-1, exitCode);
// Verify we called both
Mockito.verify(handlerOne).handleRefresh("shared", new String[]{});
Mockito.verify(handlerTwo).handleRefresh("shared", new String[]{});
RefreshRegistry.defaultRegistry().unregisterAll("shared");
}
@Test
public void testExceptionResultsInNormalError() throws Exception {
// In this test, we ensure that all handlers are called
// even if we throw an exception in one
RefreshHandler exceptionalHandler = Mockito.mock(RefreshHandler.class);
Mockito.when(exceptionalHandler.handleRefresh(Mockito.anyString(),
Mockito.any(String[].class))).thenThrow(
new RuntimeException("Exceptional Handler Throws Exception"));
RefreshHandler otherExceptionalHandler = Mockito.mock(RefreshHandler.class);
Mockito.when(otherExceptionalHandler.handleRefresh(Mockito.anyString(),
Mockito.any(String[].class))).thenThrow(
new RuntimeException("More Exceptions"));
RefreshRegistry.defaultRegistry().register("exceptional",
exceptionalHandler);
RefreshRegistry.defaultRegistry().register("exceptional",
otherExceptionalHandler);
String[] args = new String[]{"-refreshRouterArgs", "localhost:" +
router.getAdminServerAddress().getPort(), "exceptional"};
int exitCode = admin.run(args);
assertEquals(-1, exitCode); // Exceptions result in a -1
Mockito.verify(exceptionalHandler).handleRefresh(
"exceptional", new String[]{});
Mockito.verify(otherExceptionalHandler).handleRefresh(
"exceptional", new String[]{});
RefreshRegistry.defaultRegistry().unregisterAll("exceptional");
}
}

View File

@ -438,6 +438,7 @@ Usage:
[-nameservice disable | enable <nameservice>] [-nameservice disable | enable <nameservice>]
[-getDisabledNameservices] [-getDisabledNameservices]
[-refresh] [-refresh]
[-refreshRouterArgs <host:ipc_port> <key> [arg1..argn]]
| COMMAND\_OPTION | Description | | COMMAND\_OPTION | Description |
|:---- |:---- | |:---- |:---- |
@ -451,6 +452,7 @@ Usage:
| `-nameservice` `disable` `enable` *nameservice* | Disable/enable a name service from the federation. If disabled, requests will not go to that name service. | | `-nameservice` `disable` `enable` *nameservice* | Disable/enable a name service from the federation. If disabled, requests will not go to that name service. |
| `-getDisabledNameservices` | Get the name services that are disabled in the federation. | | `-getDisabledNameservices` | Get the name services that are disabled in the federation. |
| `-refresh` | Update mount table cache of the connected router. | | `-refresh` | Update mount table cache of the connected router. |
| `refreshRouterArgs` \<host:ipc\_port\> \<key\> [arg1..argn] | To trigger a runtime-refresh of the resource specified by \<key\> on \<host:ipc\_port\>. For example, to enable white list checking, we just need to send a refresh command other than restart the router server. |
The commands for managing Router-based federation. See [Mount table management](../hadoop-hdfs-rbf/HDFSRouterFederation.html#Mount_table_management) for more info. The commands for managing Router-based federation. See [Mount table management](../hadoop-hdfs-rbf/HDFSRouterFederation.html#Mount_table_management) for more info.