HDFS-13042. RBF: Heartbeat Router State. Contributed by Inigo Goiri.

(cherry picked from commit 7721fff744)
This commit is contained in:
Yiqun Lin 2018-01-25 15:51:26 +08:00
parent 2a36b4b1ed
commit 52ead8b90a
7 changed files with 458 additions and 1 deletions

View File

@ -1189,6 +1189,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_ROUTER_MONITOR_LOCAL_NAMENODE =
FEDERATION_ROUTER_PREFIX + "monitor.localnamenode.enable";
public static final boolean DFS_ROUTER_MONITOR_LOCAL_NAMENODE_DEFAULT = true;
public static final String DFS_ROUTER_HEARTBEAT_STATE_INTERVAL_MS =
FEDERATION_ROUTER_PREFIX + "heartbeat-state.interval";
public static final long DFS_ROUTER_HEARTBEAT_STATE_INTERVAL_MS_DEFAULT =
TimeUnit.SECONDS.toMillis(5);
// HDFS Router NN client
public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE =
@ -1249,6 +1253,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
FEDERATION_STORE_PREFIX + "membership.expiration";
public static final long FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT =
TimeUnit.MINUTES.toMillis(5);
public static final String FEDERATION_STORE_ROUTER_EXPIRATION_MS =
FEDERATION_STORE_PREFIX + "router.expiration";
public static final long FEDERATION_STORE_ROUTER_EXPIRATION_MS_DEFAULT =
TimeUnit.MINUTES.toMillis(5);
// HDFS Router-based federation mount table entries
/** Maximum number of cache entries to have. */

View File

@ -37,11 +37,13 @@ import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -107,6 +109,18 @@ public class Router extends CompositeService {
/** Manages the current state of the router. */
private RouterStore routerStateManager;
/** Heartbeat our run status to the router state manager. */
private RouterHeartbeatService routerHeartbeatService;
/** The start time of the namesystem. */
private final long startTime = Time.now();
/** State of the Router. */
private RouterServiceState state = RouterServiceState.UNINITIALIZED;
/////////////////////////////////////////////////////////
// Constructor
/////////////////////////////////////////////////////////
@ -122,6 +136,7 @@ public class Router extends CompositeService {
@Override
protected void serviceInit(Configuration configuration) throws Exception {
this.conf = configuration;
updateRouterState(RouterServiceState.INITIALIZING);
if (conf.getBoolean(
DFSConfigKeys.DFS_ROUTER_STORE_ENABLE,
@ -183,6 +198,10 @@ public class Router extends CompositeService {
if (this.namenodeHearbeatServices.isEmpty()) {
LOG.error("Heartbeat is enabled but there are no namenodes to monitor");
}
// Periodically update the router state
this.routerHeartbeatService = new RouterHeartbeatService(this);
addService(this.routerHeartbeatService);
}
// Router metrics system
@ -206,6 +225,8 @@ public class Router extends CompositeService {
@Override
protected void serviceStart() throws Exception {
updateRouterState(RouterServiceState.RUNNING);
if (this.pauseMonitor != null) {
this.pauseMonitor.start();
JvmMetrics jvmMetrics = this.metrics.getJvmMetrics();
@ -220,6 +241,9 @@ public class Router extends CompositeService {
@Override
protected void serviceStop() throws Exception {
// Update state
updateRouterState(RouterServiceState.SHUTDOWN);
// JVM pause monitor
if (this.pauseMonitor != null) {
this.pauseMonitor.stop();
@ -440,6 +464,31 @@ public class Router extends CompositeService {
return ret;
}
/////////////////////////////////////////////////////////
// Router State Management
/////////////////////////////////////////////////////////
/**
* Update the router state and heartbeat to the state store.
*
* @param state The new router state.
*/
public void updateRouterState(RouterServiceState newState) {
this.state = newState;
if (this.routerHeartbeatService != null) {
this.routerHeartbeatService.updateStateAsync();
}
}
/**
* Get the status of the router.
*
* @return Status of the router.
*/
public RouterServiceState getRouterState() {
return this.state;
}
/////////////////////////////////////////////////////////
// Submodule getters
/////////////////////////////////////////////////////////
@ -495,10 +544,32 @@ public class Router extends CompositeService {
return this.namenodeResolver;
}
/**
* Get the state store interface for the router heartbeats.
*
* @return FederationRouterStateStore state store API handle.
*/
public RouterStore getRouterStateManager() {
if (this.routerStateManager == null && this.stateStore != null) {
this.routerStateManager = this.stateStore.getRegisteredRecordStore(
RouterStore.class);
}
return this.routerStateManager;
}
/////////////////////////////////////////////////////////
// Router info
/////////////////////////////////////////////////////////
/**
* Get the start date of the Router.
*
* @return Start date of the router.
*/
public long getStartTime() {
return this.startTime;
}
/**
* Unique ID for the router, typically the hostname:port string for the
* router's RPC server. This ID may be null on router startup before the RPC

View File

@ -0,0 +1,155 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.federation.store.CachedRecordStore;
import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
import org.apache.hadoop.hdfs.server.federation.store.RecordStore;
import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
import org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Service to periodically update the Router current state in the State Store.
*/
public class RouterHeartbeatService extends PeriodicService {
private static final Logger LOG =
LoggerFactory.getLogger(RouterHeartbeatService.class);
/** Router we are hearbeating. */
private final Router router;
/**
* Create a new Router heartbeat service.
*
* @param router Router to heartbeat.
*/
public RouterHeartbeatService(Router router) {
super(RouterHeartbeatService.class.getSimpleName());
this.router = router;
}
/**
* Trigger the update of the Router state asynchronously.
*/
protected void updateStateAsync() {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
updateStateStore();
}
}, "Router Heartbeat Async");
thread.setDaemon(true);
thread.start();
}
/**
* Update the state of the Router in the State Store.
*/
private synchronized void updateStateStore() {
String routerId = router.getRouterId();
if (routerId == null) {
LOG.error("Cannot heartbeat for router: unknown router id");
return;
}
RouterStore routerStore = router.getRouterStateManager();
if (routerStore != null) {
try {
RouterState record = RouterState.newInstance(
routerId, router.getStartTime(), router.getRouterState());
StateStoreVersion stateStoreVersion = StateStoreVersion.newInstance(
getStateStoreVersion(MembershipStore.class),
getStateStoreVersion(MountTableStore.class));
record.setStateStoreVersion(stateStoreVersion);
RouterHeartbeatRequest request =
RouterHeartbeatRequest.newInstance(record);
RouterHeartbeatResponse response = routerStore.routerHeartbeat(request);
if (!response.getStatus()) {
LOG.warn("Cannot heartbeat router {}", routerId);
} else {
LOG.debug("Router heartbeat for router {}", routerId);
}
} catch (IOException e) {
LOG.error("Cannot heartbeat router {}: {}", routerId, e.getMessage());
}
} else {
LOG.warn("Cannot heartbeat router {}: State Store unavailable", routerId);
}
}
/**
* Get the version of the data in the State Store.
*
* @param clazz Class in the State Store.
* @return Version of the data.
*/
private <R extends BaseRecord, S extends RecordStore<R>>
long getStateStoreVersion(final Class<S> clazz) {
long version = -1;
try {
StateStoreService stateStore = router.getStateStore();
S recordStore = stateStore.getRegisteredRecordStore(clazz);
if (recordStore != null) {
if (recordStore instanceof CachedRecordStore) {
CachedRecordStore<R> cachedRecordStore =
(CachedRecordStore<R>) recordStore;
List<R> records = cachedRecordStore.getCachedRecords();
for (BaseRecord record : records) {
if (record.getDateModified() > version) {
version = record.getDateModified();
}
}
}
}
} catch (Exception e) {
LOG.error("Cannot get version for {}: {}", clazz, e.getMessage());
}
return version;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
long interval = conf.getTimeDuration(
DFSConfigKeys.DFS_ROUTER_HEARTBEAT_STATE_INTERVAL_MS,
DFSConfigKeys.DFS_ROUTER_HEARTBEAT_STATE_INTERVAL_MS_DEFAULT,
TimeUnit.MILLISECONDS);
this.setIntervalMs(interval);
super.serviceInit(conf);
}
@Override
public void periodicInvoke() {
updateStateStore();
}
}

View File

@ -21,7 +21,7 @@ package org.apache.hadoop.hdfs.server.federation.router;
* States of the Router.
*/
public enum RouterServiceState {
NONE,
UNINITIALIZED,
INITIALIZING,
SAFEMODE,
RUNNING,

View File

@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
@ -38,8 +39,10 @@ import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.impl.MembershipStoreImpl;
import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl;
import org.apache.hadoop.hdfs.server.federation.store.impl.RouterStoreImpl;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
import org.apache.hadoop.metrics2.MetricsException;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.service.CompositeService;
@ -148,6 +151,7 @@ public class StateStoreService extends CompositeService {
// Add supported record stores
addRecordStore(MembershipStoreImpl.class);
addRecordStore(MountTableStoreImpl.class);
addRecordStore(RouterStoreImpl.class);
// Check the connection to the State Store periodically
this.monitorService = new StateStoreConnectionMonitorService(this);
@ -158,6 +162,11 @@ public class StateStoreService extends CompositeService {
DFSConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS,
DFSConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT));
RouterState.setExpirationMs(conf.getTimeDuration(
DFSConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_MS,
DFSConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_MS_DEFAULT,
TimeUnit.MILLISECONDS));
// Cache update service
this.cacheUpdater = new StateStoreCacheUpdateService(this);
addService(this.cacheUpdater);

View File

@ -4980,6 +4980,26 @@
</description>
</property>
<property>
<name>dfs.federation.router.heartbeat-state.interval</name>
<value>5s</value>
<description>
How often the Router should heartbeat its state into the State Store in
milliseconds. This setting supports multiple time unit suffixes as
described in dfs.federation.router.quota-cache.update.interval.
</description>
</property>
<property>
<name>dfs.federation.router.store.router.expiration</name>
<value>5m</value>
<description>
Expiration time in milliseconds for a router state record. This setting
supports multiple time unit suffixes as described in
dfs.federation.router.quota-cache.update.interval.
</description>
</property>
<property>
<name>dfs.federation.router.monitor.namenode</name>
<value></value>

View File

@ -0,0 +1,194 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyException;
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.clearRecords;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.federation.router.FederationUtil;
import org.apache.hadoop.hdfs.server.federation.router.RouterServiceState;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
import org.apache.hadoop.util.Time;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Test the basic {@link StateStoreService} {@link RouterStore} functionality.
*/
public class TestStateStoreRouterState extends TestStateStoreBase {
private static RouterStore routerStore;
@BeforeClass
public static void create() {
// Reduce expirations to 5 seconds
getConf().setTimeDuration(
DFSConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_MS,
5, TimeUnit.SECONDS);
}
@Before
public void setup() throws IOException, InterruptedException {
if (routerStore == null) {
routerStore =
getStateStore().getRegisteredRecordStore(RouterStore.class);
}
// Clear router status registrations
assertTrue(clearRecords(getStateStore(), RouterState.class));
}
@Test
public void testStateStoreDisconnected() throws Exception {
// Close the data store driver
getStateStore().closeDriver();
assertEquals(false, getStateStore().isDriverReady());
// Test all APIs that access the data store to ensure they throw the correct
// exception.
GetRouterRegistrationRequest getSingleRequest =
GetRouterRegistrationRequest.newInstance();
verifyException(routerStore, "getRouterRegistration",
StateStoreUnavailableException.class,
new Class[] {GetRouterRegistrationRequest.class},
new Object[] {getSingleRequest});
GetRouterRegistrationsRequest getRequest =
GetRouterRegistrationsRequest.newInstance();
routerStore.loadCache(true);
verifyException(routerStore, "getRouterRegistrations",
StateStoreUnavailableException.class,
new Class[] {GetRouterRegistrationsRequest.class},
new Object[] {getRequest});
RouterHeartbeatRequest hbRequest = RouterHeartbeatRequest.newInstance(
RouterState.newInstance("test", 0, RouterServiceState.UNINITIALIZED));
verifyException(routerStore, "routerHeartbeat",
StateStoreUnavailableException.class,
new Class[] {RouterHeartbeatRequest.class},
new Object[] {hbRequest});
}
//
// Router
//
@Test
public void testUpdateRouterStatus()
throws IllegalStateException, IOException {
long dateStarted = Time.now();
String address = "testaddress";
// Set
RouterHeartbeatRequest request = RouterHeartbeatRequest.newInstance(
RouterState.newInstance(
address, dateStarted, RouterServiceState.RUNNING));
assertTrue(routerStore.routerHeartbeat(request).getStatus());
// Verify
GetRouterRegistrationRequest getRequest =
GetRouterRegistrationRequest.newInstance(address);
RouterState record =
routerStore.getRouterRegistration(getRequest).getRouter();
assertNotNull(record);
assertEquals(RouterServiceState.RUNNING, record.getStatus());
assertEquals(address, record.getAddress());
assertEquals(FederationUtil.getCompileInfo(), record.getCompileInfo());
// Build version may vary a bit
assertTrue(record.getBuildVersion().length() > 0);
}
@Test
public void testRouterStateExpired()
throws IOException, InterruptedException {
long dateStarted = Time.now();
String address = "testaddress";
RouterHeartbeatRequest request = RouterHeartbeatRequest.newInstance(
RouterState.newInstance(
address, dateStarted, RouterServiceState.RUNNING));
// Set
assertTrue(routerStore.routerHeartbeat(request).getStatus());
// Verify
GetRouterRegistrationRequest getRequest =
GetRouterRegistrationRequest.newInstance(address);
RouterState record =
routerStore.getRouterRegistration(getRequest).getRouter();
assertNotNull(record);
// Wait past expiration (set to 5 sec in config)
Thread.sleep(6000);
// Verify expired
RouterState r = routerStore.getRouterRegistration(getRequest).getRouter();
assertEquals(RouterServiceState.EXPIRED, r.getStatus());
// Heartbeat again and this shouldn't be EXPIRED anymore
assertTrue(routerStore.routerHeartbeat(request).getStatus());
r = routerStore.getRouterRegistration(getRequest).getRouter();
assertEquals(RouterServiceState.RUNNING, r.getStatus());
}
@Test
public void testGetAllRouterStates()
throws StateStoreUnavailableException, IOException {
// Set 2 entries
RouterHeartbeatRequest heartbeatRequest1 =
RouterHeartbeatRequest.newInstance(
RouterState.newInstance(
"testaddress1", Time.now(), RouterServiceState.RUNNING));
assertTrue(routerStore.routerHeartbeat(heartbeatRequest1).getStatus());
RouterHeartbeatRequest heartbeatRequest2 =
RouterHeartbeatRequest.newInstance(
RouterState.newInstance(
"testaddress2", Time.now(), RouterServiceState.RUNNING));
assertTrue(routerStore.routerHeartbeat(heartbeatRequest2).getStatus());
// Verify
routerStore.loadCache(true);
GetRouterRegistrationsRequest request =
GetRouterRegistrationsRequest.newInstance();
List<RouterState> entries =
routerStore.getRouterRegistrations(request).getRouters();
assertEquals(2, entries.size());
Collections.sort(entries);
assertEquals("testaddress1", entries.get(0).getAddress());
assertEquals("testaddress2", entries.get(1).getAddress());
assertEquals(RouterServiceState.RUNNING, entries.get(0).getStatus());
assertEquals(RouterServiceState.RUNNING, entries.get(1).getStatus());
}
}