HDFS-13475. RBF: Admin cannot enforce Router enter SafeMode. Contributed by Chao Sun.

(cherry picked from commit 359ea4e181)
This commit is contained in:
Inigo Goiri 2018-07-16 09:46:21 -07:00
parent 1ae35834a2
commit 3d47ef1d31
6 changed files with 121 additions and 53 deletions

View File

@ -631,4 +631,11 @@ public class Router extends CompositeService {
Collection<NamenodeHeartbeatService> getNamenodeHearbeatServices() {
return this.namenodeHeartbeatServices;
}
/**
* Get the Router safe mode service
*/
RouterSafemodeService getSafemodeService() {
return this.safemodeService;
}
}

View File

@ -24,6 +24,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Set;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.proto.RouterProtocolProtos.RouterAdminProtocolService;
@ -246,23 +247,37 @@ public class RouterAdminServer extends AbstractService
@Override
public EnterSafeModeResponse enterSafeMode(EnterSafeModeRequest request)
throws IOException {
this.router.updateRouterState(RouterServiceState.SAFEMODE);
this.router.getRpcServer().setSafeMode(true);
return EnterSafeModeResponse.newInstance(verifySafeMode(true));
boolean success = false;
RouterSafemodeService safeModeService = this.router.getSafemodeService();
if (safeModeService != null) {
this.router.updateRouterState(RouterServiceState.SAFEMODE);
safeModeService.setManualSafeMode(true);
success = verifySafeMode(true);
}
return EnterSafeModeResponse.newInstance(success);
}
@Override
public LeaveSafeModeResponse leaveSafeMode(LeaveSafeModeRequest request)
throws IOException {
this.router.updateRouterState(RouterServiceState.RUNNING);
this.router.getRpcServer().setSafeMode(false);
return LeaveSafeModeResponse.newInstance(verifySafeMode(false));
boolean success = false;
RouterSafemodeService safeModeService = this.router.getSafemodeService();
if (safeModeService != null) {
this.router.updateRouterState(RouterServiceState.RUNNING);
safeModeService.setManualSafeMode(false);
success = verifySafeMode(false);
}
return LeaveSafeModeResponse.newInstance(success);
}
@Override
public GetSafeModeResponse getSafeMode(GetSafeModeRequest request)
throws IOException {
boolean isInSafeMode = this.router.getRpcServer().isInSafeMode();
boolean isInSafeMode = false;
RouterSafemodeService safeModeService = this.router.getSafemodeService();
if (safeModeService != null) {
isInSafeMode = safeModeService.isInSafeMode();
}
return GetSafeModeResponse.newInstance(isInSafeMode);
}
@ -272,7 +287,8 @@ public class RouterAdminServer extends AbstractService
* @return
*/
private boolean verifySafeMode(boolean isInSafeMode) {
boolean serverInSafeMode = this.router.getRpcServer().isInSafeMode();
Preconditions.checkNotNull(this.router.getSafemodeService());
boolean serverInSafeMode = this.router.getSafemodeService().isInSafeMode();
RouterServiceState currentState = this.router.getRouterState();
return (isInSafeMode && currentState == RouterServiceState.SAFEMODE

View File

@ -191,9 +191,6 @@ public class RouterRpcServer extends AbstractService
/** Interface to map global name space to HDFS subcluster name spaces. */
private final FileSubclusterResolver subclusterResolver;
/** If we are in safe mode, fail requests as if a standby NN. */
private volatile boolean safeMode;
/** Category of the operation that a thread is executing. */
private final ThreadLocal<OperationCategory> opCategory = new ThreadLocal<>();
@ -451,7 +448,8 @@ public class RouterRpcServer extends AbstractService
return;
}
if (safeMode) {
RouterSafemodeService safemodeService = router.getSafemodeService();
if (safemodeService != null && safemodeService.isInSafeMode()) {
// Throw standby exception, router is not available
if (rpcMonitor != null) {
rpcMonitor.routerFailureSafemode();
@ -461,26 +459,6 @@ public class RouterRpcServer extends AbstractService
}
}
/**
* In safe mode all RPC requests will fail and return a standby exception.
* The client will try another Router, similar to the client retry logic for
* HA.
*
* @param mode True if enabled, False if disabled.
*/
public void setSafeMode(boolean mode) {
this.safeMode = mode;
}
/**
* Check if the Router is in safe mode and cannot serve RPC calls.
*
* @return If the Router is in safe mode.
*/
public boolean isInSafeMode() {
return this.safeMode;
}
@Override // ClientProtocol
public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
throws IOException {

View File

@ -42,6 +42,23 @@ public class RouterSafemodeService extends PeriodicService {
/** Router to manage safe mode. */
private final Router router;
/**
* If we are in safe mode, fail requests as if a standby NN.
* Router can enter safe mode in two different ways:
* 1. upon start up: router enters this mode after service start, and will
* exit after certain time threshold;
* 2. via admin command: router enters this mode via admin command:
* dfsrouteradmin -safemode enter
* and exit after admin command:
* dfsrouteradmin -safemode leave
*/
/** Whether Router is in safe mode */
private volatile boolean safeMode;
/** Whether the Router safe mode is set manually (i.e., via Router admin) */
private volatile boolean isSafeModeSetManually;
/** Interval in ms to wait post startup before allowing RPC requests. */
private long startupInterval;
/** Interval in ms after which the State Store cache is too stale. */
@ -63,14 +80,29 @@ public class RouterSafemodeService extends PeriodicService {
this.router = router;
}
/**
* Return whether the current Router is in safe mode.
*/
boolean isInSafeMode() {
return this.safeMode;
}
/**
* Set the flag to indicate that the safe mode for this Router is set manually
* via the Router admin command.
*/
void setManualSafeMode(boolean mode) {
this.safeMode = mode;
this.isSafeModeSetManually = mode;
}
/**
* Enter safe mode.
*/
private void enter() {
LOG.info("Entering safe mode");
enterSafeModeTime = now();
RouterRpcServer rpcServer = router.getRpcServer();
rpcServer.setSafeMode(true);
safeMode = true;
router.updateRouterState(RouterServiceState.SAFEMODE);
}
@ -87,8 +119,7 @@ public class RouterSafemodeService extends PeriodicService {
} else {
routerMetrics.setSafeModeTime(timeInSafemode);
}
RouterRpcServer rpcServer = router.getRpcServer();
rpcServer.setSafeMode(false);
safeMode = false;
router.updateRouterState(RouterServiceState.RUNNING);
}
@ -131,17 +162,16 @@ public class RouterSafemodeService extends PeriodicService {
this.startupInterval - delta);
return;
}
RouterRpcServer rpcServer = router.getRpcServer();
StateStoreService stateStore = router.getStateStore();
long cacheUpdateTime = stateStore.getCacheUpdateTime();
boolean isCacheStale = (now - cacheUpdateTime) > this.staleInterval;
// Always update to indicate our cache was updated
if (isCacheStale) {
if (!rpcServer.isInSafeMode()) {
if (!safeMode) {
enter();
}
} else if (rpcServer.isInSafeMode()) {
} else if (safeMode && !isSafeModeSetManually) {
// Cache recently updated, leave safe mode
leave();
}

View File

@ -79,6 +79,7 @@ public class TestRouterAdminCLI {
.stateStore()
.admin()
.rpc()
.safemode()
.build();
cluster.addRouterOverrides(conf);
@ -420,13 +421,13 @@ public class TestRouterAdminCLI {
public void testManageSafeMode() throws Exception {
// ensure the Router become RUNNING state
waitState(RouterServiceState.RUNNING);
assertFalse(routerContext.getRouter().getRpcServer().isInSafeMode());
assertFalse(routerContext.getRouter().getSafemodeService().isInSafeMode());
assertEquals(0, ToolRunner.run(admin,
new String[] {"-safemode", "enter"}));
// verify state
assertEquals(RouterServiceState.SAFEMODE,
routerContext.getRouter().getRouterState());
assertTrue(routerContext.getRouter().getRpcServer().isInSafeMode());
assertTrue(routerContext.getRouter().getSafemodeService().isInSafeMode());
System.setOut(new PrintStream(out));
assertEquals(0, ToolRunner.run(admin,
@ -438,7 +439,7 @@ public class TestRouterAdminCLI {
// verify state
assertEquals(RouterServiceState.RUNNING,
routerContext.getRouter().getRouterState());
assertFalse(routerContext.getRouter().getRpcServer().isInSafeMode());
assertFalse(routerContext.getRouter().getSafemodeService().isInSafeMode());
out.reset();
assertEquals(0, ToolRunner.run(admin,

View File

@ -28,14 +28,17 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.tools.federation.RouterAdmin;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.ToolRunner;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@ -60,12 +63,12 @@ public class TestRouterSafemode {
// 2 sec startup standby
conf.setTimeDuration(DFS_ROUTER_SAFEMODE_EXTENSION,
TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS);
// 1 sec cache refresh
// 200 ms cache refresh
conf.setTimeDuration(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS,
TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS);
// 2 sec post cache update before entering safemode (2 intervals)
200, TimeUnit.MILLISECONDS);
// 1 sec post cache update before entering safemode (2 intervals)
conf.setTimeDuration(DFS_ROUTER_SAFEMODE_EXPIRATION,
TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS);
TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS);
conf.set(RBFConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY, "0.0.0.0");
conf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "127.0.0.1:0");
@ -77,6 +80,7 @@ public class TestRouterSafemode {
// RPC + State Store + Safe Mode only
conf = new RouterConfigBuilder(conf)
.rpc()
.admin()
.safemode()
.stateStore()
.metrics()
@ -118,7 +122,7 @@ public class TestRouterSafemode {
public void testRouterExitSafemode()
throws InterruptedException, IllegalStateException, IOException {
assertTrue(router.getRpcServer().isInSafeMode());
assertTrue(router.getSafemodeService().isInSafeMode());
verifyRouter(RouterServiceState.SAFEMODE);
// Wait for initial time in milliseconds
@ -129,7 +133,7 @@ public class TestRouterSafemode {
TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS);
Thread.sleep(interval);
assertFalse(router.getRpcServer().isInSafeMode());
assertFalse(router.getSafemodeService().isInSafeMode());
verifyRouter(RouterServiceState.RUNNING);
}
@ -138,7 +142,7 @@ public class TestRouterSafemode {
throws IllegalStateException, IOException, InterruptedException {
// Verify starting state
assertTrue(router.getRpcServer().isInSafeMode());
assertTrue(router.getSafemodeService().isInSafeMode());
verifyRouter(RouterServiceState.SAFEMODE);
// We should be in safe mode for DFS_ROUTER_SAFEMODE_EXTENSION time
@ -157,7 +161,7 @@ public class TestRouterSafemode {
Thread.sleep(interval1);
// Running
assertFalse(router.getRpcServer().isInSafeMode());
assertFalse(router.getSafemodeService().isInSafeMode());
verifyRouter(RouterServiceState.RUNNING);
// Disable cache
@ -167,12 +171,12 @@ public class TestRouterSafemode {
long interval2 =
conf.getTimeDuration(DFS_ROUTER_SAFEMODE_EXPIRATION,
TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS) +
conf.getTimeDuration(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS,
2 * conf.getTimeDuration(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS,
TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS);
Thread.sleep(interval2);
// Safemode
assertTrue(router.getRpcServer().isInSafeMode());
assertTrue(router.getSafemodeService().isInSafeMode());
verifyRouter(RouterServiceState.SAFEMODE);
}
@ -180,7 +184,7 @@ public class TestRouterSafemode {
public void testRouterRpcSafeMode()
throws IllegalStateException, IOException {
assertTrue(router.getRpcServer().isInSafeMode());
assertTrue(router.getSafemodeService().isInSafeMode());
verifyRouter(RouterServiceState.SAFEMODE);
// If the Router is in Safe Mode, we should get a SafeModeException
@ -194,6 +198,38 @@ public class TestRouterSafemode {
assertTrue("We should have thrown a safe mode exception", exception);
}
@Test
public void testRouterManualSafeMode() throws Exception {
InetSocketAddress adminAddr = router.getAdminServerAddress();
conf.setSocketAddr(RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, adminAddr);
RouterAdmin admin = new RouterAdmin(conf);
assertTrue(router.getSafemodeService().isInSafeMode());
verifyRouter(RouterServiceState.SAFEMODE);
// Wait until the Router exit start up safe mode
long interval = conf.getTimeDuration(DFS_ROUTER_SAFEMODE_EXTENSION,
TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS) + 300;
Thread.sleep(interval);
verifyRouter(RouterServiceState.RUNNING);
// Now enter safe mode via Router admin command - it should work
assertEquals(0, ToolRunner.run(admin, new String[] {"-safemode", "enter"}));
verifyRouter(RouterServiceState.SAFEMODE);
// Wait for update interval of the safe mode service, it should still in
// safe mode.
interval = 2 * conf.getTimeDuration(
DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, TimeUnit.SECONDS.toMillis(1),
TimeUnit.MILLISECONDS);
Thread.sleep(interval);
verifyRouter(RouterServiceState.SAFEMODE);
// Exit safe mode via admin command
assertEquals(0, ToolRunner.run(admin, new String[] {"-safemode", "leave"}));
verifyRouter(RouterServiceState.RUNNING);
}
private void verifyRouter(RouterServiceState status)
throws IllegalStateException, IOException {
assertEquals(status, router.getRouterState());