HDFS-13475. RBF: Admin cannot enforce Router enter SafeMode. Contributed by Chao Sun.

(cherry picked from commit 359ea4e181)
This commit is contained in:
Inigo Goiri 2018-07-16 09:46:21 -07:00
parent 0e6efe06ea
commit 436fceebf9
6 changed files with 121 additions and 53 deletions

View File

@ -665,4 +665,11 @@ public class Router extends CompositeService {
Collection<NamenodeHeartbeatService> getNamenodeHearbeatServices() { Collection<NamenodeHeartbeatService> getNamenodeHearbeatServices() {
return this.namenodeHeartbeatServices; return this.namenodeHeartbeatServices;
} }
/**
* Get the Router safe mode service
*/
RouterSafemodeService getSafemodeService() {
return this.safemodeService;
}
} }

View File

@ -24,6 +24,7 @@ import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Set; import java.util.Set;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -272,23 +273,37 @@ public class RouterAdminServer extends AbstractService
@Override @Override
public EnterSafeModeResponse enterSafeMode(EnterSafeModeRequest request) public EnterSafeModeResponse enterSafeMode(EnterSafeModeRequest request)
throws IOException { throws IOException {
this.router.updateRouterState(RouterServiceState.SAFEMODE); boolean success = false;
this.router.getRpcServer().setSafeMode(true); RouterSafemodeService safeModeService = this.router.getSafemodeService();
return EnterSafeModeResponse.newInstance(verifySafeMode(true)); if (safeModeService != null) {
this.router.updateRouterState(RouterServiceState.SAFEMODE);
safeModeService.setManualSafeMode(true);
success = verifySafeMode(true);
}
return EnterSafeModeResponse.newInstance(success);
} }
@Override @Override
public LeaveSafeModeResponse leaveSafeMode(LeaveSafeModeRequest request) public LeaveSafeModeResponse leaveSafeMode(LeaveSafeModeRequest request)
throws IOException { throws IOException {
this.router.updateRouterState(RouterServiceState.RUNNING); boolean success = false;
this.router.getRpcServer().setSafeMode(false); RouterSafemodeService safeModeService = this.router.getSafemodeService();
return LeaveSafeModeResponse.newInstance(verifySafeMode(false)); if (safeModeService != null) {
this.router.updateRouterState(RouterServiceState.RUNNING);
safeModeService.setManualSafeMode(false);
success = verifySafeMode(false);
}
return LeaveSafeModeResponse.newInstance(success);
} }
@Override @Override
public GetSafeModeResponse getSafeMode(GetSafeModeRequest request) public GetSafeModeResponse getSafeMode(GetSafeModeRequest request)
throws IOException { throws IOException {
boolean isInSafeMode = this.router.getRpcServer().isInSafeMode(); boolean isInSafeMode = false;
RouterSafemodeService safeModeService = this.router.getSafemodeService();
if (safeModeService != null) {
isInSafeMode = safeModeService.isInSafeMode();
}
return GetSafeModeResponse.newInstance(isInSafeMode); return GetSafeModeResponse.newInstance(isInSafeMode);
} }
@ -298,7 +313,8 @@ public class RouterAdminServer extends AbstractService
* @return * @return
*/ */
private boolean verifySafeMode(boolean isInSafeMode) { private boolean verifySafeMode(boolean isInSafeMode) {
boolean serverInSafeMode = this.router.getRpcServer().isInSafeMode(); Preconditions.checkNotNull(this.router.getSafemodeService());
boolean serverInSafeMode = this.router.getSafemodeService().isInSafeMode();
RouterServiceState currentState = this.router.getRouterState(); RouterServiceState currentState = this.router.getRouterState();
return (isInSafeMode && currentState == RouterServiceState.SAFEMODE return (isInSafeMode && currentState == RouterServiceState.SAFEMODE

View File

@ -182,9 +182,6 @@ public class RouterRpcServer extends AbstractService
/** Interface to map global name space to HDFS subcluster name spaces. */ /** Interface to map global name space to HDFS subcluster name spaces. */
private final FileSubclusterResolver subclusterResolver; private final FileSubclusterResolver subclusterResolver;
/** If we are in safe mode, fail requests as if a standby NN. */
private volatile boolean safeMode;
/** Category of the operation that a thread is executing. */ /** Category of the operation that a thread is executing. */
private final ThreadLocal<OperationCategory> opCategory = new ThreadLocal<>(); private final ThreadLocal<OperationCategory> opCategory = new ThreadLocal<>();
@ -440,7 +437,8 @@ public class RouterRpcServer extends AbstractService
return; return;
} }
if (safeMode) { RouterSafemodeService safemodeService = router.getSafemodeService();
if (safemodeService != null && safemodeService.isInSafeMode()) {
// Throw standby exception, router is not available // Throw standby exception, router is not available
if (rpcMonitor != null) { if (rpcMonitor != null) {
rpcMonitor.routerFailureSafemode(); rpcMonitor.routerFailureSafemode();
@ -450,26 +448,6 @@ public class RouterRpcServer extends AbstractService
} }
} }
/**
* In safe mode all RPC requests will fail and return a standby exception.
* The client will try another Router, similar to the client retry logic for
* HA.
*
* @param mode True if enabled, False if disabled.
*/
public void setSafeMode(boolean mode) {
this.safeMode = mode;
}
/**
* Check if the Router is in safe mode and cannot serve RPC calls.
*
* @return If the Router is in safe mode.
*/
public boolean isInSafeMode() {
return this.safeMode;
}
@Override // ClientProtocol @Override // ClientProtocol
public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
throws IOException { throws IOException {

View File

@ -42,6 +42,23 @@ public class RouterSafemodeService extends PeriodicService {
/** Router to manage safe mode. */ /** Router to manage safe mode. */
private final Router router; private final Router router;
/**
* If we are in safe mode, fail requests as if a standby NN.
* Router can enter safe mode in two different ways:
* 1. upon start up: router enters this mode after service start, and will
* exit after certain time threshold;
* 2. via admin command: router enters this mode via admin command:
* dfsrouteradmin -safemode enter
* and exit after admin command:
* dfsrouteradmin -safemode leave
*/
/** Whether Router is in safe mode */
private volatile boolean safeMode;
/** Whether the Router safe mode is set manually (i.e., via Router admin) */
private volatile boolean isSafeModeSetManually;
/** Interval in ms to wait post startup before allowing RPC requests. */ /** Interval in ms to wait post startup before allowing RPC requests. */
private long startupInterval; private long startupInterval;
/** Interval in ms after which the State Store cache is too stale. */ /** Interval in ms after which the State Store cache is too stale. */
@ -63,14 +80,29 @@ public class RouterSafemodeService extends PeriodicService {
this.router = router; this.router = router;
} }
/**
* Return whether the current Router is in safe mode.
*/
boolean isInSafeMode() {
return this.safeMode;
}
/**
* Set the flag to indicate that the safe mode for this Router is set manually
* via the Router admin command.
*/
void setManualSafeMode(boolean mode) {
this.safeMode = mode;
this.isSafeModeSetManually = mode;
}
/** /**
* Enter safe mode. * Enter safe mode.
*/ */
private void enter() { private void enter() {
LOG.info("Entering safe mode"); LOG.info("Entering safe mode");
enterSafeModeTime = now(); enterSafeModeTime = now();
RouterRpcServer rpcServer = router.getRpcServer(); safeMode = true;
rpcServer.setSafeMode(true);
router.updateRouterState(RouterServiceState.SAFEMODE); router.updateRouterState(RouterServiceState.SAFEMODE);
} }
@ -87,8 +119,7 @@ public class RouterSafemodeService extends PeriodicService {
} else { } else {
routerMetrics.setSafeModeTime(timeInSafemode); routerMetrics.setSafeModeTime(timeInSafemode);
} }
RouterRpcServer rpcServer = router.getRpcServer(); safeMode = false;
rpcServer.setSafeMode(false);
router.updateRouterState(RouterServiceState.RUNNING); router.updateRouterState(RouterServiceState.RUNNING);
} }
@ -131,17 +162,16 @@ public class RouterSafemodeService extends PeriodicService {
this.startupInterval - delta); this.startupInterval - delta);
return; return;
} }
RouterRpcServer rpcServer = router.getRpcServer();
StateStoreService stateStore = router.getStateStore(); StateStoreService stateStore = router.getStateStore();
long cacheUpdateTime = stateStore.getCacheUpdateTime(); long cacheUpdateTime = stateStore.getCacheUpdateTime();
boolean isCacheStale = (now - cacheUpdateTime) > this.staleInterval; boolean isCacheStale = (now - cacheUpdateTime) > this.staleInterval;
// Always update to indicate our cache was updated // Always update to indicate our cache was updated
if (isCacheStale) { if (isCacheStale) {
if (!rpcServer.isInSafeMode()) { if (!safeMode) {
enter(); enter();
} }
} else if (rpcServer.isInSafeMode()) { } else if (safeMode && !isSafeModeSetManually) {
// Cache recently updated, leave safe mode // Cache recently updated, leave safe mode
leave(); leave();
} }

View File

@ -83,6 +83,7 @@ public class TestRouterAdminCLI {
.stateStore() .stateStore()
.admin() .admin()
.rpc() .rpc()
.safemode()
.build(); .build();
cluster.addRouterOverrides(conf); cluster.addRouterOverrides(conf);
@ -502,13 +503,13 @@ public class TestRouterAdminCLI {
public void testManageSafeMode() throws Exception { public void testManageSafeMode() throws Exception {
// ensure the Router become RUNNING state // ensure the Router become RUNNING state
waitState(RouterServiceState.RUNNING); waitState(RouterServiceState.RUNNING);
assertFalse(routerContext.getRouter().getRpcServer().isInSafeMode()); assertFalse(routerContext.getRouter().getSafemodeService().isInSafeMode());
assertEquals(0, ToolRunner.run(admin, assertEquals(0, ToolRunner.run(admin,
new String[] {"-safemode", "enter"})); new String[] {"-safemode", "enter"}));
// verify state // verify state
assertEquals(RouterServiceState.SAFEMODE, assertEquals(RouterServiceState.SAFEMODE,
routerContext.getRouter().getRouterState()); routerContext.getRouter().getRouterState());
assertTrue(routerContext.getRouter().getRpcServer().isInSafeMode()); assertTrue(routerContext.getRouter().getSafemodeService().isInSafeMode());
System.setOut(new PrintStream(out)); System.setOut(new PrintStream(out));
assertEquals(0, ToolRunner.run(admin, assertEquals(0, ToolRunner.run(admin,
@ -520,7 +521,7 @@ public class TestRouterAdminCLI {
// verify state // verify state
assertEquals(RouterServiceState.RUNNING, assertEquals(RouterServiceState.RUNNING,
routerContext.getRouter().getRouterState()); routerContext.getRouter().getRouterState());
assertFalse(routerContext.getRouter().getRpcServer().isInSafeMode()); assertFalse(routerContext.getRouter().getSafemodeService().isInSafeMode());
out.reset(); out.reset();
assertEquals(0, ToolRunner.run(admin, assertEquals(0, ToolRunner.run(admin,

View File

@ -28,14 +28,17 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.tools.federation.RouterAdmin;
import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.ToolRunner;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
@ -60,12 +63,12 @@ public class TestRouterSafemode {
// 2 sec startup standby // 2 sec startup standby
conf.setTimeDuration(DFS_ROUTER_SAFEMODE_EXTENSION, conf.setTimeDuration(DFS_ROUTER_SAFEMODE_EXTENSION,
TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS); TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS);
// 1 sec cache refresh // 200 ms cache refresh
conf.setTimeDuration(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, conf.setTimeDuration(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS,
TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS); 200, TimeUnit.MILLISECONDS);
// 2 sec post cache update before entering safemode (2 intervals) // 1 sec post cache update before entering safemode (2 intervals)
conf.setTimeDuration(DFS_ROUTER_SAFEMODE_EXPIRATION, conf.setTimeDuration(DFS_ROUTER_SAFEMODE_EXPIRATION,
TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS); TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS);
conf.set(RBFConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY, "0.0.0.0"); conf.set(RBFConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY, "0.0.0.0");
conf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "127.0.0.1:0"); conf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "127.0.0.1:0");
@ -77,6 +80,7 @@ public class TestRouterSafemode {
// RPC + State Store + Safe Mode only // RPC + State Store + Safe Mode only
conf = new RouterConfigBuilder(conf) conf = new RouterConfigBuilder(conf)
.rpc() .rpc()
.admin()
.safemode() .safemode()
.stateStore() .stateStore()
.metrics() .metrics()
@ -118,7 +122,7 @@ public class TestRouterSafemode {
public void testRouterExitSafemode() public void testRouterExitSafemode()
throws InterruptedException, IllegalStateException, IOException { throws InterruptedException, IllegalStateException, IOException {
assertTrue(router.getRpcServer().isInSafeMode()); assertTrue(router.getSafemodeService().isInSafeMode());
verifyRouter(RouterServiceState.SAFEMODE); verifyRouter(RouterServiceState.SAFEMODE);
// Wait for initial time in milliseconds // Wait for initial time in milliseconds
@ -129,7 +133,7 @@ public class TestRouterSafemode {
TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS); TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS);
Thread.sleep(interval); Thread.sleep(interval);
assertFalse(router.getRpcServer().isInSafeMode()); assertFalse(router.getSafemodeService().isInSafeMode());
verifyRouter(RouterServiceState.RUNNING); verifyRouter(RouterServiceState.RUNNING);
} }
@ -138,7 +142,7 @@ public class TestRouterSafemode {
throws IllegalStateException, IOException, InterruptedException { throws IllegalStateException, IOException, InterruptedException {
// Verify starting state // Verify starting state
assertTrue(router.getRpcServer().isInSafeMode()); assertTrue(router.getSafemodeService().isInSafeMode());
verifyRouter(RouterServiceState.SAFEMODE); verifyRouter(RouterServiceState.SAFEMODE);
// We should be in safe mode for DFS_ROUTER_SAFEMODE_EXTENSION time // We should be in safe mode for DFS_ROUTER_SAFEMODE_EXTENSION time
@ -157,7 +161,7 @@ public class TestRouterSafemode {
Thread.sleep(interval1); Thread.sleep(interval1);
// Running // Running
assertFalse(router.getRpcServer().isInSafeMode()); assertFalse(router.getSafemodeService().isInSafeMode());
verifyRouter(RouterServiceState.RUNNING); verifyRouter(RouterServiceState.RUNNING);
// Disable cache // Disable cache
@ -167,12 +171,12 @@ public class TestRouterSafemode {
long interval2 = long interval2 =
conf.getTimeDuration(DFS_ROUTER_SAFEMODE_EXPIRATION, conf.getTimeDuration(DFS_ROUTER_SAFEMODE_EXPIRATION,
TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS) + TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS) +
conf.getTimeDuration(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, 2 * conf.getTimeDuration(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS,
TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS); TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS);
Thread.sleep(interval2); Thread.sleep(interval2);
// Safemode // Safemode
assertTrue(router.getRpcServer().isInSafeMode()); assertTrue(router.getSafemodeService().isInSafeMode());
verifyRouter(RouterServiceState.SAFEMODE); verifyRouter(RouterServiceState.SAFEMODE);
} }
@ -180,7 +184,7 @@ public class TestRouterSafemode {
public void testRouterRpcSafeMode() public void testRouterRpcSafeMode()
throws IllegalStateException, IOException { throws IllegalStateException, IOException {
assertTrue(router.getRpcServer().isInSafeMode()); assertTrue(router.getSafemodeService().isInSafeMode());
verifyRouter(RouterServiceState.SAFEMODE); verifyRouter(RouterServiceState.SAFEMODE);
// If the Router is in Safe Mode, we should get a SafeModeException // If the Router is in Safe Mode, we should get a SafeModeException
@ -194,6 +198,38 @@ public class TestRouterSafemode {
assertTrue("We should have thrown a safe mode exception", exception); assertTrue("We should have thrown a safe mode exception", exception);
} }
@Test
public void testRouterManualSafeMode() throws Exception {
InetSocketAddress adminAddr = router.getAdminServerAddress();
conf.setSocketAddr(RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, adminAddr);
RouterAdmin admin = new RouterAdmin(conf);
assertTrue(router.getSafemodeService().isInSafeMode());
verifyRouter(RouterServiceState.SAFEMODE);
// Wait until the Router exit start up safe mode
long interval = conf.getTimeDuration(DFS_ROUTER_SAFEMODE_EXTENSION,
TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS) + 300;
Thread.sleep(interval);
verifyRouter(RouterServiceState.RUNNING);
// Now enter safe mode via Router admin command - it should work
assertEquals(0, ToolRunner.run(admin, new String[] {"-safemode", "enter"}));
verifyRouter(RouterServiceState.SAFEMODE);
// Wait for update interval of the safe mode service, it should still in
// safe mode.
interval = 2 * conf.getTimeDuration(
DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, TimeUnit.SECONDS.toMillis(1),
TimeUnit.MILLISECONDS);
Thread.sleep(interval);
verifyRouter(RouterServiceState.SAFEMODE);
// Exit safe mode via admin command
assertEquals(0, ToolRunner.run(admin, new String[] {"-safemode", "leave"}));
verifyRouter(RouterServiceState.RUNNING);
}
private void verifyRouter(RouterServiceState status) private void verifyRouter(RouterServiceState status)
throws IllegalStateException, IOException { throws IllegalStateException, IOException {
assertEquals(status, router.getRouterState()); assertEquals(status, router.getRouterState());