diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java index 596e0f57b38..c2964caf436 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java @@ -631,4 +631,11 @@ public class Router extends CompositeService { Collection getNamenodeHearbeatServices() { return this.namenodeHeartbeatServices; } + + /** + * Get the Router safe mode service + */ + RouterSafemodeService getSafemodeService() { + return this.safemodeService; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java index 3da9a5a09b6..644e182ef18 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.Set; +import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.proto.RouterProtocolProtos.RouterAdminProtocolService; @@ -246,23 +247,37 @@ public class RouterAdminServer extends AbstractService @Override public EnterSafeModeResponse enterSafeMode(EnterSafeModeRequest request) throws IOException { - this.router.updateRouterState(RouterServiceState.SAFEMODE); - this.router.getRpcServer().setSafeMode(true); - return EnterSafeModeResponse.newInstance(verifySafeMode(true)); + boolean success = false; + RouterSafemodeService safeModeService = this.router.getSafemodeService(); + if (safeModeService != null) { + this.router.updateRouterState(RouterServiceState.SAFEMODE); + safeModeService.setManualSafeMode(true); + success = verifySafeMode(true); + } + return EnterSafeModeResponse.newInstance(success); } @Override public LeaveSafeModeResponse leaveSafeMode(LeaveSafeModeRequest request) throws IOException { - this.router.updateRouterState(RouterServiceState.RUNNING); - this.router.getRpcServer().setSafeMode(false); - return LeaveSafeModeResponse.newInstance(verifySafeMode(false)); + boolean success = false; + RouterSafemodeService safeModeService = this.router.getSafemodeService(); + if (safeModeService != null) { + this.router.updateRouterState(RouterServiceState.RUNNING); + safeModeService.setManualSafeMode(false); + success = verifySafeMode(false); + } + return LeaveSafeModeResponse.newInstance(success); } @Override public GetSafeModeResponse getSafeMode(GetSafeModeRequest request) throws IOException { - boolean isInSafeMode = this.router.getRpcServer().isInSafeMode(); + boolean isInSafeMode = false; + RouterSafemodeService safeModeService = this.router.getSafemodeService(); + if (safeModeService != null) { + isInSafeMode = safeModeService.isInSafeMode(); + } return GetSafeModeResponse.newInstance(isInSafeMode); } @@ -272,7 +287,8 @@ public class RouterAdminServer extends AbstractService * @return */ private boolean verifySafeMode(boolean isInSafeMode) { - boolean serverInSafeMode = this.router.getRpcServer().isInSafeMode(); + Preconditions.checkNotNull(this.router.getSafemodeService()); + boolean serverInSafeMode = this.router.getSafemodeService().isInSafeMode(); RouterServiceState currentState = this.router.getRouterState(); return (isInSafeMode && currentState == RouterServiceState.SAFEMODE diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 2f4b9fba9d3..9fb8b4d3c25 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -191,9 +191,6 @@ public class RouterRpcServer extends AbstractService /** Interface to map global name space to HDFS subcluster name spaces. */ private final FileSubclusterResolver subclusterResolver; - /** If we are in safe mode, fail requests as if a standby NN. */ - private volatile boolean safeMode; - /** Category of the operation that a thread is executing. */ private final ThreadLocal opCategory = new ThreadLocal<>(); @@ -451,7 +448,8 @@ public class RouterRpcServer extends AbstractService return; } - if (safeMode) { + RouterSafemodeService safemodeService = router.getSafemodeService(); + if (safemodeService != null && safemodeService.isInSafeMode()) { // Throw standby exception, router is not available if (rpcMonitor != null) { rpcMonitor.routerFailureSafemode(); @@ -461,26 +459,6 @@ public class RouterRpcServer extends AbstractService } } - /** - * In safe mode all RPC requests will fail and return a standby exception. - * The client will try another Router, similar to the client retry logic for - * HA. - * - * @param mode True if enabled, False if disabled. - */ - public void setSafeMode(boolean mode) { - this.safeMode = mode; - } - - /** - * Check if the Router is in safe mode and cannot serve RPC calls. - * - * @return If the Router is in safe mode. - */ - public boolean isInSafeMode() { - return this.safeMode; - } - @Override // ClientProtocol public Token getDelegationToken(Text renewer) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafemodeService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafemodeService.java index 5dfb356ad5c..877e1d4927f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafemodeService.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafemodeService.java @@ -42,6 +42,23 @@ public class RouterSafemodeService extends PeriodicService { /** Router to manage safe mode. */ private final Router router; + /** + * If we are in safe mode, fail requests as if a standby NN. + * Router can enter safe mode in two different ways: + * 1. upon start up: router enters this mode after service start, and will + * exit after certain time threshold; + * 2. via admin command: router enters this mode via admin command: + * dfsrouteradmin -safemode enter + * and exit after admin command: + * dfsrouteradmin -safemode leave + */ + + /** Whether Router is in safe mode */ + private volatile boolean safeMode; + + /** Whether the Router safe mode is set manually (i.e., via Router admin) */ + private volatile boolean isSafeModeSetManually; + /** Interval in ms to wait post startup before allowing RPC requests. */ private long startupInterval; /** Interval in ms after which the State Store cache is too stale. */ @@ -63,14 +80,29 @@ public class RouterSafemodeService extends PeriodicService { this.router = router; } + /** + * Return whether the current Router is in safe mode. + */ + boolean isInSafeMode() { + return this.safeMode; + } + + /** + * Set the flag to indicate that the safe mode for this Router is set manually + * via the Router admin command. + */ + void setManualSafeMode(boolean mode) { + this.safeMode = mode; + this.isSafeModeSetManually = mode; + } + /** * Enter safe mode. */ private void enter() { LOG.info("Entering safe mode"); enterSafeModeTime = now(); - RouterRpcServer rpcServer = router.getRpcServer(); - rpcServer.setSafeMode(true); + safeMode = true; router.updateRouterState(RouterServiceState.SAFEMODE); } @@ -87,8 +119,7 @@ public class RouterSafemodeService extends PeriodicService { } else { routerMetrics.setSafeModeTime(timeInSafemode); } - RouterRpcServer rpcServer = router.getRpcServer(); - rpcServer.setSafeMode(false); + safeMode = false; router.updateRouterState(RouterServiceState.RUNNING); } @@ -131,17 +162,16 @@ public class RouterSafemodeService extends PeriodicService { this.startupInterval - delta); return; } - RouterRpcServer rpcServer = router.getRpcServer(); StateStoreService stateStore = router.getStateStore(); long cacheUpdateTime = stateStore.getCacheUpdateTime(); boolean isCacheStale = (now - cacheUpdateTime) > this.staleInterval; // Always update to indicate our cache was updated if (isCacheStale) { - if (!rpcServer.isInSafeMode()) { + if (!safeMode) { enter(); } - } else if (rpcServer.isInSafeMode()) { + } else if (safeMode && !isSafeModeSetManually) { // Cache recently updated, leave safe mode leave(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java index d0dd05b3823..f6ae0137679 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java @@ -79,6 +79,7 @@ public class TestRouterAdminCLI { .stateStore() .admin() .rpc() + .safemode() .build(); cluster.addRouterOverrides(conf); @@ -420,13 +421,13 @@ public class TestRouterAdminCLI { public void testManageSafeMode() throws Exception { // ensure the Router become RUNNING state waitState(RouterServiceState.RUNNING); - assertFalse(routerContext.getRouter().getRpcServer().isInSafeMode()); + assertFalse(routerContext.getRouter().getSafemodeService().isInSafeMode()); assertEquals(0, ToolRunner.run(admin, new String[] {"-safemode", "enter"})); // verify state assertEquals(RouterServiceState.SAFEMODE, routerContext.getRouter().getRouterState()); - assertTrue(routerContext.getRouter().getRpcServer().isInSafeMode()); + assertTrue(routerContext.getRouter().getSafemodeService().isInSafeMode()); System.setOut(new PrintStream(out)); assertEquals(0, ToolRunner.run(admin, @@ -438,7 +439,7 @@ public class TestRouterAdminCLI { // verify state assertEquals(RouterServiceState.RUNNING, routerContext.getRouter().getRouterState()); - assertFalse(routerContext.getRouter().getRpcServer().isInSafeMode()); + assertFalse(routerContext.getRouter().getSafemodeService().isInSafeMode()); out.reset(); assertEquals(0, ToolRunner.run(admin, diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java index f16ceb58f44..9c1aeb2b3f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java @@ -28,14 +28,17 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; +import java.net.InetSocketAddress; import java.net.URISyntaxException; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.tools.federation.RouterAdmin; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.ToolRunner; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -60,12 +63,12 @@ public class TestRouterSafemode { // 2 sec startup standby conf.setTimeDuration(DFS_ROUTER_SAFEMODE_EXTENSION, TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS); - // 1 sec cache refresh + // 200 ms cache refresh conf.setTimeDuration(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, - TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS); - // 2 sec post cache update before entering safemode (2 intervals) + 200, TimeUnit.MILLISECONDS); + // 1 sec post cache update before entering safemode (2 intervals) conf.setTimeDuration(DFS_ROUTER_SAFEMODE_EXPIRATION, - TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS); + TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS); conf.set(RBFConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY, "0.0.0.0"); conf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "127.0.0.1:0"); @@ -77,6 +80,7 @@ public class TestRouterSafemode { // RPC + State Store + Safe Mode only conf = new RouterConfigBuilder(conf) .rpc() + .admin() .safemode() .stateStore() .metrics() @@ -118,7 +122,7 @@ public class TestRouterSafemode { public void testRouterExitSafemode() throws InterruptedException, IllegalStateException, IOException { - assertTrue(router.getRpcServer().isInSafeMode()); + assertTrue(router.getSafemodeService().isInSafeMode()); verifyRouter(RouterServiceState.SAFEMODE); // Wait for initial time in milliseconds @@ -129,7 +133,7 @@ public class TestRouterSafemode { TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS); Thread.sleep(interval); - assertFalse(router.getRpcServer().isInSafeMode()); + assertFalse(router.getSafemodeService().isInSafeMode()); verifyRouter(RouterServiceState.RUNNING); } @@ -138,7 +142,7 @@ public class TestRouterSafemode { throws IllegalStateException, IOException, InterruptedException { // Verify starting state - assertTrue(router.getRpcServer().isInSafeMode()); + assertTrue(router.getSafemodeService().isInSafeMode()); verifyRouter(RouterServiceState.SAFEMODE); // We should be in safe mode for DFS_ROUTER_SAFEMODE_EXTENSION time @@ -157,7 +161,7 @@ public class TestRouterSafemode { Thread.sleep(interval1); // Running - assertFalse(router.getRpcServer().isInSafeMode()); + assertFalse(router.getSafemodeService().isInSafeMode()); verifyRouter(RouterServiceState.RUNNING); // Disable cache @@ -167,12 +171,12 @@ public class TestRouterSafemode { long interval2 = conf.getTimeDuration(DFS_ROUTER_SAFEMODE_EXPIRATION, TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS) + - conf.getTimeDuration(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, + 2 * conf.getTimeDuration(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS); Thread.sleep(interval2); // Safemode - assertTrue(router.getRpcServer().isInSafeMode()); + assertTrue(router.getSafemodeService().isInSafeMode()); verifyRouter(RouterServiceState.SAFEMODE); } @@ -180,7 +184,7 @@ public class TestRouterSafemode { public void testRouterRpcSafeMode() throws IllegalStateException, IOException { - assertTrue(router.getRpcServer().isInSafeMode()); + assertTrue(router.getSafemodeService().isInSafeMode()); verifyRouter(RouterServiceState.SAFEMODE); // If the Router is in Safe Mode, we should get a SafeModeException @@ -194,6 +198,38 @@ public class TestRouterSafemode { assertTrue("We should have thrown a safe mode exception", exception); } + @Test + public void testRouterManualSafeMode() throws Exception { + InetSocketAddress adminAddr = router.getAdminServerAddress(); + conf.setSocketAddr(RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, adminAddr); + RouterAdmin admin = new RouterAdmin(conf); + + assertTrue(router.getSafemodeService().isInSafeMode()); + verifyRouter(RouterServiceState.SAFEMODE); + + // Wait until the Router exit start up safe mode + long interval = conf.getTimeDuration(DFS_ROUTER_SAFEMODE_EXTENSION, + TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS) + 300; + Thread.sleep(interval); + verifyRouter(RouterServiceState.RUNNING); + + // Now enter safe mode via Router admin command - it should work + assertEquals(0, ToolRunner.run(admin, new String[] {"-safemode", "enter"})); + verifyRouter(RouterServiceState.SAFEMODE); + + // Wait for update interval of the safe mode service, it should still in + // safe mode. + interval = 2 * conf.getTimeDuration( + DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, TimeUnit.SECONDS.toMillis(1), + TimeUnit.MILLISECONDS); + Thread.sleep(interval); + verifyRouter(RouterServiceState.SAFEMODE); + + // Exit safe mode via admin command + assertEquals(0, ToolRunner.run(admin, new String[] {"-safemode", "leave"})); + verifyRouter(RouterServiceState.RUNNING); + } + private void verifyRouter(RouterServiceState status) throws IllegalStateException, IOException { assertEquals(status, router.getRouterState());