From dbb9dded33b3cff3b630e98300d30515a9d1eec4 Mon Sep 17 00:00:00 2001 From: Yiqun Lin Date: Tue, 30 Jan 2018 12:12:08 +0800 Subject: [PATCH] HDFS-13044. RBF: Add a safe mode for the Router. Contributed by Inigo Goiri. --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 13 ++ .../hdfs/server/federation/router/Router.java | 16 +- .../federation/router/RouterRpcServer.java | 43 +++- .../router/RouterSafeModeException.java | 53 +++++ .../router/RouterSafemodeService.java | 150 ++++++++++++++ .../store/StateStoreCacheUpdateService.java | 7 +- .../src/main/resources/hdfs-default.xml | 36 +++- .../src/site/markdown/HDFSRouterFederation.md | 4 + .../federation/RouterConfigBuilder.java | 13 ++ .../federation/router/TestRouterSafemode.java | 192 ++++++++++++++++++ 10 files changed, 515 insertions(+), 12 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafeModeException.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafemodeService.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 84215f3f27b..4589aaa3208 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -1291,6 +1291,19 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final long FEDERATION_STORE_ROUTER_EXPIRATION_MS_DEFAULT = TimeUnit.MINUTES.toMillis(5); + // HDFS Router safe mode + public static final String DFS_ROUTER_SAFEMODE_ENABLE = + FEDERATION_ROUTER_PREFIX + "safemode.enable"; + public static final boolean DFS_ROUTER_SAFEMODE_ENABLE_DEFAULT = true; + public static final String DFS_ROUTER_SAFEMODE_EXTENSION = + FEDERATION_ROUTER_PREFIX + "safemode.extension"; + public static final long DFS_ROUTER_SAFEMODE_EXTENSION_DEFAULT = + TimeUnit.SECONDS.toMillis(30); + public static final String DFS_ROUTER_SAFEMODE_EXPIRATION = + FEDERATION_ROUTER_PREFIX + "safemode.expiration"; + public static final long DFS_ROUTER_SAFEMODE_EXPIRATION_DEFAULT = + 3 * DFS_ROUTER_CACHE_TIME_TO_LIVE_MS_DEFAULT; + // HDFS Router-based federation mount table entries /** Maximum number of cache entries to have. */ public static final String FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java index 1e72c93adb7..79f43bb2daa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java @@ -118,6 +118,8 @@ public class Router extends CompositeService { private RouterStore routerStateManager; /** Heartbeat our run status to the router state manager. */ private RouterHeartbeatService routerHeartbeatService; + /** Enter/exit safemode. */ + private RouterSafemodeService safemodeService; /** The start time of the namesystem. */ private final long startTime = Time.now(); @@ -232,13 +234,25 @@ public class Router extends CompositeService { addService(this.quotaUpdateService); } + // Safemode service to refuse RPC calls when the router is out of sync + if (conf.getBoolean( + DFSConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE, + DFSConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE_DEFAULT)) { + // Create safemode monitoring service + this.safemodeService = new RouterSafemodeService(this); + addService(this.safemodeService); + } + super.serviceInit(conf); } @Override protected void serviceStart() throws Exception { - updateRouterState(RouterServiceState.RUNNING); + if (this.safemodeService == null) { + // Router is running now + updateRouterState(RouterServiceState.RUNNING); + } if (this.pauseMonitor != null) { this.pauseMonitor.start(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 9afd441ccd2..57125ca0a96 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -179,6 +179,8 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { /** Interface to map global name space to HDFS subcluster name spaces. */ private final FileSubclusterResolver subclusterResolver; + /** If we are in safe mode, fail requests as if a standby NN. */ + private volatile boolean safeMode; /** Category of the operation that a thread is executing. */ private final ThreadLocal opCategory = new ThreadLocal<>(); @@ -370,12 +372,12 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { * @param op Category of the operation to check. * @param supported If the operation is supported or not. If not, it will * throw an UnsupportedOperationException. - * @throws StandbyException If the Router is in safe mode and cannot serve - * client requests. + * @throws SafeModeException If the Router is in safe mode and cannot serve + * client requests. * @throws UnsupportedOperationException If the operation is not supported. */ protected void checkOperation(OperationCategory op, boolean supported) - throws StandbyException, UnsupportedOperationException { + throws RouterSafeModeException, UnsupportedOperationException { checkOperation(op); if (!supported) { @@ -393,10 +395,11 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { * UNCHECKED. This function should be called by all ClientProtocol functions. * * @param op Category of the operation to check. - * @throws StandbyException If the Router is in safe mode and cannot serve - * client requests. + * @throws SafeModeException If the Router is in safe mode and cannot serve + * client requests. */ - protected void checkOperation(OperationCategory op) throws StandbyException { + protected void checkOperation(OperationCategory op) + throws RouterSafeModeException { // Log the function we are currently calling. if (rpcMonitor != null) { rpcMonitor.startOp(); @@ -415,7 +418,33 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { return; } - // TODO check Router safe mode and return Standby exception + if (safeMode) { + // Throw standby exception, router is not available + if (rpcMonitor != null) { + rpcMonitor.routerFailureSafemode(); + } + throw new RouterSafeModeException(router.getRouterId(), op); + } + } + + /** + * In safe mode all RPC requests will fail and return a standby exception. + * The client will try another Router, similar to the client retry logic for + * HA. + * + * @param mode True if enabled, False if disabled. + */ + public void setSafeMode(boolean mode) { + this.safeMode = mode; + } + + /** + * Check if the Router is in safe mode and cannot serve RPC calls. + * + * @return If the Router is in safe mode. + */ + public boolean isInSafeMode() { + return this.safeMode; } @Override // ClientProtocol diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafeModeException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafeModeException.java new file mode 100644 index 00000000000..7a78b5b7330 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafeModeException.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; +import org.apache.hadoop.ipc.StandbyException; + +/** + * Exception that the Router throws when it is in safe mode. This extends + * {@link StandbyException} for the client to try another Router when it gets + * this exception. + */ +public class RouterSafeModeException extends StandbyException { + + private static final long serialVersionUID = 453568188334993493L; + + /** Identifier of the Router that generated this exception. */ + private final String routerId; + + /** + * Build a new Router safe mode exception. + * @param router Identifier of the Router. + * @param op Category of the operation (READ/WRITE). + */ + public RouterSafeModeException(String router, OperationCategory op) { + super("Router " + router + " is in safe mode and cannot handle " + op + + " requests."); + this.routerId = router; + } + + /** + * Get the id of the Router that generated this exception. + * @return Id of the Router that generated this exception. + */ + public String getRouterId() { + return this.routerId; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafemodeService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafemodeService.java new file mode 100644 index 00000000000..56aab0aca1b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafemodeService.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.util.Time.now; + +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service to periodically check if the {@link org.apache.hadoop.hdfs.server. + * federation.store.StateStoreService StateStoreService} cached information in + * the {@link Router} is up to date. This is for performance and removes the + * {@link org.apache.hadoop.hdfs.server.federation.store.StateStoreService + * StateStoreService} from the critical path in common operations. + */ +public class RouterSafemodeService extends PeriodicService { + + private static final Logger LOG = + LoggerFactory.getLogger(RouterSafemodeService.class); + + /** Router to manage safe mode. */ + private final Router router; + + /** Interval in ms to wait post startup before allowing RPC requests. */ + private long startupInterval; + /** Interval in ms after which the State Store cache is too stale. */ + private long staleInterval; + /** Start time in ms of this service. */ + private long startupTime; + + /** The time the Router enters safe mode in milliseconds. */ + private long enterSafeModeTime = now(); + + + /** + * Create a new Cache update service. + * + * @param router Router containing the cache. + */ + public RouterSafemodeService(Router router) { + super(RouterSafemodeService.class.getSimpleName()); + this.router = router; + } + + /** + * Enter safe mode. + */ + private void enter() { + LOG.info("Entering safe mode"); + enterSafeModeTime = now(); + RouterRpcServer rpcServer = router.getRpcServer(); + rpcServer.setSafeMode(true); + router.updateRouterState(RouterServiceState.SAFEMODE); + } + + /** + * Leave safe mode. + */ + private void leave() { + // Cache recently updated, leave safemode + long timeInSafemode = now() - enterSafeModeTime; + LOG.info("Leaving safe mode after {} milliseconds", timeInSafemode); + RouterMetrics routerMetrics = router.getRouterMetrics(); + if (routerMetrics == null) { + LOG.error("The Router metrics are not enabled"); + } else { + routerMetrics.setSafeModeTime(timeInSafemode); + } + RouterRpcServer rpcServer = router.getRpcServer(); + rpcServer.setSafeMode(false); + router.updateRouterState(RouterServiceState.RUNNING); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + + // Use same interval as cache update service + this.setIntervalMs(conf.getTimeDuration( + DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, + DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS_DEFAULT, + TimeUnit.MILLISECONDS)); + + this.startupInterval = conf.getTimeDuration( + DFSConfigKeys.DFS_ROUTER_SAFEMODE_EXTENSION, + DFSConfigKeys.DFS_ROUTER_SAFEMODE_EXTENSION_DEFAULT, + TimeUnit.MILLISECONDS); + LOG.info("Leave startup safe mode after {} ms", this.startupInterval); + + this.staleInterval = conf.getTimeDuration( + DFSConfigKeys.DFS_ROUTER_SAFEMODE_EXPIRATION, + DFSConfigKeys.DFS_ROUTER_SAFEMODE_EXPIRATION_DEFAULT, + TimeUnit.MILLISECONDS); + LOG.info("Enter safe mode after {} ms without reaching the State Store", + this.staleInterval); + + this.startupTime = Time.now(); + + // Initializing the RPC server in safe mode, it will disable it later + enter(); + + super.serviceInit(conf); + } + + @Override + public void periodicInvoke() { + long now = Time.now(); + long delta = now - startupTime; + if (delta < startupInterval) { + LOG.info("Delaying safemode exit for {} milliseconds...", + this.startupInterval - delta); + return; + } + RouterRpcServer rpcServer = router.getRpcServer(); + StateStoreService stateStore = router.getStateStore(); + long cacheUpdateTime = stateStore.getCacheUpdateTime(); + boolean isCacheStale = (now - cacheUpdateTime) > this.staleInterval; + + // Always update to indicate our cache was updated + if (isCacheStale) { + if (!rpcServer.isInSafeMode()) { + enter(); + } + } else if (rpcServer.isInSafeMode()) { + // Cache recently updated, leave safe mode + leave(); + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java index bb8cfb027e0..9bcbc1eea00 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.server.federation.store; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.server.federation.router.PeriodicService; @@ -52,9 +54,10 @@ public class StateStoreCacheUpdateService extends PeriodicService { @Override protected void serviceInit(Configuration conf) throws Exception { - this.setIntervalMs(conf.getLong( + this.setIntervalMs(conf.getTimeDuration( DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, - DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS_DEFAULT)); + DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS_DEFAULT, + TimeUnit.MILLISECONDS)); super.serviceInit(conf); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index d24310ee545..74467666058 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -5080,9 +5080,12 @@ dfs.federation.router.cache.ttl - 60000 + 1m - How often to refresh the State Store caches in milliseconds. + How often to refresh the State Store caches in milliseconds. This setting + supports multiple time unit suffixes as described in + dfs.heartbeat.interval. If no suffix is specified then milliseconds is + assumed. @@ -5130,6 +5133,35 @@ + + dfs.federation.router.safemode.enable + true + + + + + + dfs.federation.router.safemode.extension + 30s + + Time after startup that the Router is in safe mode. This setting + supports multiple time unit suffixes as described in + dfs.heartbeat.interval. If no suffix is specified then milliseconds is + assumed. + + + + + dfs.federation.router.safemode.expiration + 3m + + Time without being able to reach the State Store to enter safe mode. This + setting supports multiple time unit suffixes as described in + dfs.heartbeat.interval. If no suffix is specified then milliseconds is + assumed. + + + dfs.federation.router.monitor.namenode diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSRouterFederation.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSRouterFederation.md index 75798a19fed..6b2112323da 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSRouterFederation.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSRouterFederation.md @@ -81,6 +81,10 @@ The Routers are stateless and metadata operations are atomic at the NameNodes. If a Router becomes unavailable, any Router can take over for it. The clients configure their DFS HA client (e.g., ConfiguredFailoverProvider or RequestHedgingProxyProvider) with all the Routers in the federation as endpoints. +* **Unavailable State Store:** +If a Router cannot contact the State Store, it will enter into a Safe Mode state which disallows it from serving requests. +Clients will treat Routers in Safe Mode as it was an Standby NameNode and try another Router. + * **NameNode heartbeat HA:** For high availability and flexibility, multiple Routers can monitor the same NameNode and heartbeat the information to the State Store. This increases clients' resiliency to stale information, should a Router fail. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java index 3d8b35ccbdb..3659bf99506 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java @@ -35,6 +35,7 @@ public class RouterConfigBuilder { private boolean enableStateStore = false; private boolean enableMetrics = false; private boolean enableQuota = false; + private boolean enableSafemode = false; public RouterConfigBuilder(Configuration configuration) { this.conf = configuration; @@ -52,6 +53,7 @@ public class RouterConfigBuilder { this.enableLocalHeartbeat = true; this.enableStateStore = true; this.enableMetrics = true; + this.enableSafemode = true; return this; } @@ -95,6 +97,11 @@ public class RouterConfigBuilder { return this; } + public RouterConfigBuilder safemode(boolean enable) { + this.enableSafemode = enable; + return this; + } + public RouterConfigBuilder rpc() { return this.rpc(true); } @@ -123,6 +130,10 @@ public class RouterConfigBuilder { return this.quota(true); } + public RouterConfigBuilder safemode() { + return this.safemode(true); + } + public Configuration build() { conf.setBoolean(DFSConfigKeys.DFS_ROUTER_STORE_ENABLE, this.enableStateStore); @@ -139,6 +150,8 @@ public class RouterConfigBuilder { this.enableMetrics); conf.setBoolean(DFSConfigKeys.DFS_ROUTER_QUOTA_ENABLE, this.enableQuota); + conf.setBoolean(DFSConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE, + this.enableSafemode); return conf; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java new file mode 100644 index 00000000000..9299f77e5fc --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_SAFEMODE_EXPIRATION; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_SAFEMODE_EXTENSION; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.deleteStateStore; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.service.Service.STATE; +import org.apache.hadoop.util.Time; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test the safe mode for the {@link Router} controlled by + * {@link RouterSafemodeService}. + */ +public class TestRouterSafemode { + + private Router router; + private static Configuration conf; + + @BeforeClass + public static void create() throws IOException { + // Wipe state store + deleteStateStore(); + // Configuration that supports the state store + conf = getStateStoreConfiguration(); + // 2 sec startup standby + conf.setTimeDuration(DFS_ROUTER_SAFEMODE_EXTENSION, + TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS); + // 1 sec cache refresh + conf.setTimeDuration(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, + TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS); + // 2 sec post cache update before entering safemode (2 intervals) + conf.setTimeDuration(DFS_ROUTER_SAFEMODE_EXPIRATION, + TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS); + // RPC + State Store + Safe Mode only + conf = new RouterConfigBuilder(conf) + .rpc() + .safemode() + .stateStore() + .metrics() + .build(); + } + + @AfterClass + public static void destroy() { + } + + @Before + public void setup() throws IOException, URISyntaxException { + router = new Router(); + router.init(conf); + router.start(); + } + + @After + public void cleanup() throws IOException { + if (router != null) { + router.stop(); + router = null; + } + } + + @Test + public void testSafemodeService() throws IOException { + RouterSafemodeService server = new RouterSafemodeService(router); + server.init(conf); + assertEquals(STATE.INITED, server.getServiceState()); + server.start(); + assertEquals(STATE.STARTED, server.getServiceState()); + server.stop(); + assertEquals(STATE.STOPPED, server.getServiceState()); + server.close(); + } + + @Test + public void testRouterExitSafemode() + throws InterruptedException, IllegalStateException, IOException { + + assertTrue(router.getRpcServer().isInSafeMode()); + verifyRouter(RouterServiceState.SAFEMODE); + + // Wait for initial time in milliseconds + long interval = + conf.getTimeDuration(DFS_ROUTER_SAFEMODE_EXTENSION, + TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS) + + conf.getTimeDuration(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, + TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS); + Thread.sleep(interval); + + assertFalse(router.getRpcServer().isInSafeMode()); + verifyRouter(RouterServiceState.RUNNING); + } + + @Test + public void testRouterEnterSafemode() + throws IllegalStateException, IOException, InterruptedException { + + // Verify starting state + assertTrue(router.getRpcServer().isInSafeMode()); + verifyRouter(RouterServiceState.SAFEMODE); + + // We should be in safe mode for DFS_ROUTER_SAFEMODE_EXTENSION time + long interval0 = conf.getTimeDuration(DFS_ROUTER_SAFEMODE_EXTENSION, + TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS) - 1000; + long t0 = Time.now(); + while (Time.now() - t0 < interval0) { + verifyRouter(RouterServiceState.SAFEMODE); + Thread.sleep(100); + } + + // We wait some time for the state to propagate + long interval1 = 1000 + 2 * conf.getTimeDuration( + DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, TimeUnit.SECONDS.toMillis(1), + TimeUnit.MILLISECONDS); + Thread.sleep(interval1); + + // Running + assertFalse(router.getRpcServer().isInSafeMode()); + verifyRouter(RouterServiceState.RUNNING); + + // Disable cache + router.getStateStore().stopCacheUpdateService(); + + // Wait until the State Store cache is stale in milliseconds + long interval2 = + conf.getTimeDuration(DFS_ROUTER_SAFEMODE_EXPIRATION, + TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS) + + conf.getTimeDuration(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, + TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS); + Thread.sleep(interval2); + + // Safemode + assertTrue(router.getRpcServer().isInSafeMode()); + verifyRouter(RouterServiceState.SAFEMODE); + } + + @Test + public void testRouterRpcSafeMode() + throws IllegalStateException, IOException { + + assertTrue(router.getRpcServer().isInSafeMode()); + verifyRouter(RouterServiceState.SAFEMODE); + + // If the Router is in Safe Mode, we should get a SafeModeException + boolean exception = false; + try { + router.getRpcServer().delete("/testfile.txt", true); + fail("We should have thrown a safe mode exception"); + } catch (RouterSafeModeException sme) { + exception = true; + } + assertTrue("We should have thrown a safe mode exception", exception); + } + + private void verifyRouter(RouterServiceState status) + throws IllegalStateException, IOException { + assertEquals(status, router.getRouterState()); + } +}