HDFS-13480. RBF: Separate namenodeHeartbeat and routerHeartbeat to different config key. Contributed by Ayush Saxena.
This commit is contained in:
parent
f544121239
commit
0c21e81e02
|
@ -91,6 +91,8 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
|
||||||
public static final String DFS_ROUTER_HEARTBEAT_ENABLE =
|
public static final String DFS_ROUTER_HEARTBEAT_ENABLE =
|
||||||
FEDERATION_ROUTER_PREFIX + "heartbeat.enable";
|
FEDERATION_ROUTER_PREFIX + "heartbeat.enable";
|
||||||
public static final boolean DFS_ROUTER_HEARTBEAT_ENABLE_DEFAULT = true;
|
public static final boolean DFS_ROUTER_HEARTBEAT_ENABLE_DEFAULT = true;
|
||||||
|
public static final String DFS_ROUTER_NAMENODE_HEARTBEAT_ENABLE =
|
||||||
|
FEDERATION_ROUTER_PREFIX + "namenode.heartbeat.enable";
|
||||||
public static final String DFS_ROUTER_HEARTBEAT_INTERVAL_MS =
|
public static final String DFS_ROUTER_HEARTBEAT_INTERVAL_MS =
|
||||||
FEDERATION_ROUTER_PREFIX + "heartbeat.interval";
|
FEDERATION_ROUTER_PREFIX + "heartbeat.interval";
|
||||||
public static final long DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT =
|
public static final long DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT =
|
||||||
|
|
|
@ -205,9 +205,13 @@ public class Router extends CompositeService implements
|
||||||
addService(this.httpServer);
|
addService(this.httpServer);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (conf.getBoolean(
|
boolean isRouterHeartbeatEnabled = conf.getBoolean(
|
||||||
RBFConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE,
|
RBFConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE,
|
||||||
RBFConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE_DEFAULT)) {
|
RBFConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE_DEFAULT);
|
||||||
|
boolean isNamenodeHeartbeatEnable = conf.getBoolean(
|
||||||
|
RBFConfigKeys.DFS_ROUTER_NAMENODE_HEARTBEAT_ENABLE,
|
||||||
|
isRouterHeartbeatEnabled);
|
||||||
|
if (isNamenodeHeartbeatEnable) {
|
||||||
|
|
||||||
// Create status updater for each monitored Namenode
|
// Create status updater for each monitored Namenode
|
||||||
this.namenodeHeartbeatServices = createNamenodeHeartbeatServices();
|
this.namenodeHeartbeatServices = createNamenodeHeartbeatServices();
|
||||||
|
@ -219,7 +223,8 @@ public class Router extends CompositeService implements
|
||||||
if (this.namenodeHeartbeatServices.isEmpty()) {
|
if (this.namenodeHeartbeatServices.isEmpty()) {
|
||||||
LOG.error("Heartbeat is enabled but there are no namenodes to monitor");
|
LOG.error("Heartbeat is enabled but there are no namenodes to monitor");
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
if (isRouterHeartbeatEnabled) {
|
||||||
// Periodically update the router state
|
// Periodically update the router state
|
||||||
this.routerHeartbeatService = new RouterHeartbeatService(this);
|
this.routerHeartbeatService = new RouterHeartbeatService(this);
|
||||||
addService(this.routerHeartbeatService);
|
addService(this.routerHeartbeatService);
|
||||||
|
@ -750,6 +755,14 @@ public class Router extends CompositeService implements
|
||||||
return this.namenodeHeartbeatServices;
|
return this.namenodeHeartbeatServices;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get this router heartbeat service.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
RouterHeartbeatService getRouterHeartbeatService() {
|
||||||
|
return this.routerHeartbeatService;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the Router safe mode service.
|
* Get the Router safe mode service.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -371,6 +371,16 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.federation.router.namenode.heartbeat.enable</name>
|
||||||
|
<value>true</value>
|
||||||
|
<description>
|
||||||
|
If true, get namenode heartbeats and send into the State Store.
|
||||||
|
If not explicitly specified takes the same value as for
|
||||||
|
dfs.federation.router.heartbeat.enable.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.federation.router.store.router.expiration</name>
|
<name>dfs.federation.router.store.router.expiration</name>
|
||||||
<value>5m</value>
|
<value>5m</value>
|
||||||
|
|
|
@ -45,6 +45,7 @@ This approach has the same architecture as [YARN federation](../../hadoop-yarn/h
|
||||||
|
|
||||||
### Example flow
|
### Example flow
|
||||||
The simplest configuration deploys a Router on each NameNode machine.
|
The simplest configuration deploys a Router on each NameNode machine.
|
||||||
|
The Router monitors the local NameNode and its state and heartbeats to the State Store.
|
||||||
The Router monitors the local NameNode and heartbeats the state to the State Store.
|
The Router monitors the local NameNode and heartbeats the state to the State Store.
|
||||||
When a regular DFS client contacts any of the Routers to access a file in the federated filesystem, the Router checks the Mount Table in the State Store (i.e., the local cache) to find out which subcluster contains the file.
|
When a regular DFS client contacts any of the Routers to access a file in the federated filesystem, the Router checks the Mount Table in the State Store (i.e., the local cache) to find out which subcluster contains the file.
|
||||||
Then it checks the Membership table in the State Store (i.e., the local cache) for the NameNode responsible for the subcluster.
|
Then it checks the Membership table in the State Store (i.e., the local cache) for the NameNode responsible for the subcluster.
|
||||||
|
@ -69,6 +70,9 @@ To make sure that changes have been propagated to all Routers, each Router heart
|
||||||
The communications between the Routers and the State Store are cached (with timed expiration for freshness).
|
The communications between the Routers and the State Store are cached (with timed expiration for freshness).
|
||||||
This improves the performance of the system.
|
This improves the performance of the system.
|
||||||
|
|
||||||
|
#### Router heartbeat
|
||||||
|
The Router periodically heartbeats its state to the State Store.
|
||||||
|
|
||||||
#### NameNode heartbeat
|
#### NameNode heartbeat
|
||||||
For this role, the Router periodically checks the state of a NameNode (usually on the same server) and reports their high availability (HA) state and load/space status to the State Store.
|
For this role, the Router periodically checks the state of a NameNode (usually on the same server) and reports their high availability (HA) state and load/space status to the State Store.
|
||||||
Note that this is an optional role, as a Router can be independent of any subcluster.
|
Note that this is an optional role, as a Router can be independent of any subcluster.
|
||||||
|
@ -433,7 +437,8 @@ Monitor the namenodes in the subclusters for forwarding the client requests.
|
||||||
|
|
||||||
| Property | Default | Description|
|
| Property | Default | Description|
|
||||||
|:---- |:---- |:---- |
|
|:---- |:---- |:---- |
|
||||||
| dfs.federation.router.heartbeat.enable | `true` | If `true`, the Router heartbeats into the State Store. |
|
| dfs.federation.router.heartbeat.enable | `true` | If `true`, the Router periodically heartbeats its state to the State Store. |
|
||||||
|
| dfs.federation.router.namenode.heartbeat.enable | | If `true`, the Router gets namenode heartbeats and send to the State Store. If not explicitly specified takes the same value as for `dfs.federation.router.heartbeat.enable`. |
|
||||||
| dfs.federation.router.heartbeat.interval | 5000 | How often the Router should heartbeat into the State Store in milliseconds. |
|
| dfs.federation.router.heartbeat.interval | 5000 | How often the Router should heartbeat into the State Store in milliseconds. |
|
||||||
| dfs.federation.router.monitor.namenode | | The identifier of the namenodes to monitor and heartbeat. |
|
| dfs.federation.router.monitor.namenode | | The identifier of the namenodes to monitor and heartbeat. |
|
||||||
| dfs.federation.router.monitor.localnamenode.enable | `true` | If `true`, the Router should monitor the namenode in the local machine. |
|
| dfs.federation.router.monitor.localnamenode.enable | `true` | If `true`, the Router should monitor the namenode in the local machine. |
|
||||||
|
|
|
@ -21,11 +21,13 @@ import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
|
||||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
|
@ -217,4 +219,45 @@ public class TestRouter {
|
||||||
router.stop();
|
router.stop();
|
||||||
router.close();
|
router.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSwitchRouter() throws IOException {
|
||||||
|
assertRouterHeartbeater(true, true);
|
||||||
|
assertRouterHeartbeater(true, false);
|
||||||
|
assertRouterHeartbeater(false, true);
|
||||||
|
assertRouterHeartbeater(false, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute the test by specify the routerHeartbeat and nnHeartbeat switch.
|
||||||
|
*
|
||||||
|
* @param expectedRouterHeartbeat expect the routerHeartbeat enable state.
|
||||||
|
* @param expectedNNHeartbeat expect the nnHeartbeat enable state.
|
||||||
|
*/
|
||||||
|
private void assertRouterHeartbeater(boolean expectedRouterHeartbeat,
|
||||||
|
boolean expectedNNHeartbeat) throws IOException {
|
||||||
|
final Router router = new Router();
|
||||||
|
Configuration baseCfg = new RouterConfigBuilder(conf).rpc().build();
|
||||||
|
baseCfg.setBoolean(RBFConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE,
|
||||||
|
expectedRouterHeartbeat);
|
||||||
|
baseCfg.setBoolean(RBFConfigKeys.DFS_ROUTER_NAMENODE_HEARTBEAT_ENABLE,
|
||||||
|
expectedNNHeartbeat);
|
||||||
|
router.init(baseCfg);
|
||||||
|
RouterHeartbeatService routerHeartbeatService =
|
||||||
|
router.getRouterHeartbeatService();
|
||||||
|
if (expectedRouterHeartbeat) {
|
||||||
|
assertNotNull(routerHeartbeatService);
|
||||||
|
} else {
|
||||||
|
assertNull(routerHeartbeatService);
|
||||||
|
}
|
||||||
|
Collection<NamenodeHeartbeatService> namenodeHeartbeatServices =
|
||||||
|
router.getNamenodeHeartbeatServices();
|
||||||
|
if (expectedNNHeartbeat) {
|
||||||
|
assertNotNull(namenodeHeartbeatServices);
|
||||||
|
} else {
|
||||||
|
assertNull(namenodeHeartbeatServices);
|
||||||
|
}
|
||||||
|
router.close();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue