HDFS-14812. RBF: MountTableRefresherService should load cache when refresh. Contributed by xuzq.

This commit is contained in:
Ayush Saxena 2019-09-05 08:29:56 +05:30
parent ae287474c0
commit 2f70b52a5b
2 changed files with 24 additions and 14 deletions

View File

@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore; import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException; import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
@ -57,7 +58,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
*/ */
public class MountTableRefresherService extends AbstractService { public class MountTableRefresherService extends AbstractService {
private static final String ROUTER_CONNECT_ERROR_MSG = private static final String ROUTER_CONNECT_ERROR_MSG =
"Router {} connection failed. Mount table cache will not refesh."; "Router {} connection failed. Mount table cache will not refresh.";
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(MountTableRefresherService.class); LoggerFactory.getLogger(MountTableRefresherService.class);
@ -66,7 +67,7 @@ public class MountTableRefresherService extends AbstractService {
/** Mount table store. */ /** Mount table store. */
private MountTableStore mountTableStore; private MountTableStore mountTableStore;
/** Local router admin address in the form of host:port. */ /** Local router admin address in the form of host:port. */
private String localAdminAdress; private String localAdminAddress;
/** Timeout in ms to update mount table cache on all the routers. */ /** Timeout in ms to update mount table cache on all the routers. */
private long cacheUpdateTimeout; private long cacheUpdateTimeout;
@ -97,9 +98,9 @@ public class MountTableRefresherService extends AbstractService {
protected void serviceInit(Configuration conf) throws Exception { protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf); super.serviceInit(conf);
this.mountTableStore = getMountTableStore(); this.mountTableStore = getMountTableStore();
// attach this service to mount table store. // Attach this service to mount table store.
this.mountTableStore.setRefreshService(this); this.mountTableStore.setRefreshService(this);
this.localAdminAdress = this.localAdminAddress =
StateStoreUtils.getHostPortString(router.getAdminServerAddress()); StateStoreUtils.getHostPortString(router.getAdminServerAddress());
this.cacheUpdateTimeout = conf.getTimeDuration( this.cacheUpdateTimeout = conf.getTimeDuration(
RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_TIMEOUT, RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_TIMEOUT,
@ -198,19 +199,27 @@ public class MountTableRefresherService extends AbstractService {
* Refresh mount table cache of this router as well as all other routers. * Refresh mount table cache of this router as well as all other routers.
*/ */
public void refresh() throws StateStoreUnavailableException { public void refresh() throws StateStoreUnavailableException {
List<RouterState> cachedRecords = RouterStore routerStore = router.getRouterStateManager();
router.getRouterStateManager().getCachedRecords();
try {
routerStore.loadCache(true);
} catch (IOException e) {
LOG.warn("RouterStore load cache failed,", e);
}
List<RouterState> cachedRecords = routerStore.getCachedRecords();
List<MountTableRefresherThread> refreshThreads = new ArrayList<>(); List<MountTableRefresherThread> refreshThreads = new ArrayList<>();
for (RouterState routerState : cachedRecords) { for (RouterState routerState : cachedRecords) {
String adminAddress = routerState.getAdminAddress(); String adminAddress = routerState.getAdminAddress();
if (adminAddress == null || adminAddress.length() == 0) { if (adminAddress == null || adminAddress.length() == 0) {
// this router has not enabled router admin // this router has not enabled router admin.
continue; continue;
} }
// No use of calling refresh on router which is not running state // No use of calling refresh on router which is not running state
if (routerState.getStatus() != RouterServiceState.RUNNING) { if (routerState.getStatus() != RouterServiceState.RUNNING) {
LOG.info( LOG.info(
"Router {} is not running. Mount table cache will not refesh."); "Router {} is not running. Mount table cache will not refresh.",
routerState.getAddress());
// remove if RouterClient is cached. // remove if RouterClient is cached.
removeFromCache(adminAddress); removeFromCache(adminAddress);
} else if (isLocalAdmin(adminAddress)) { } else if (isLocalAdmin(adminAddress)) {
@ -268,22 +277,23 @@ public class MountTableRefresherService extends AbstractService {
} }
private boolean isLocalAdmin(String adminAddress) { private boolean isLocalAdmin(String adminAddress) {
return adminAddress.contentEquals(localAdminAdress); return adminAddress.contentEquals(localAdminAddress);
} }
private void logResult(List<MountTableRefresherThread> refreshThreads) { private void logResult(List<MountTableRefresherThread> refreshThreads) {
int succesCount = 0; int successCount = 0;
int failureCount = 0; int failureCount = 0;
for (MountTableRefresherThread mountTableRefreshThread : refreshThreads) { for (MountTableRefresherThread mountTableRefreshThread : refreshThreads) {
if (mountTableRefreshThread.isSuccess()) { if (mountTableRefreshThread.isSuccess()) {
succesCount++; successCount++;
} else { } else {
failureCount++; failureCount++;
// remove RouterClient from cache so that new client is created // remove RouterClient from cache so that new client is created
removeFromCache(mountTableRefreshThread.getAdminAddress()); removeFromCache(mountTableRefreshThread.getAdminAddress());
} }
} }
LOG.info("Mount table entries cache refresh succesCount={},failureCount={}", LOG.info(
succesCount, failureCount); "Mount table entries cache refresh successCount={},failureCount={}",
successCount, failureCount);
} }
} }

View File

@ -285,7 +285,7 @@ public class Router extends CompositeService implements
MountTableRefresherService.class.getSimpleName()); MountTableRefresherService.class.getSimpleName());
} else { } else {
LOG.warn( LOG.warn(
"Service {} not enabled: depenendent service(s) {} not enabled.", "Service {} not enabled: dependent service(s) {} not enabled.",
MountTableRefresherService.class.getSimpleName(), MountTableRefresherService.class.getSimpleName(),
disabledDependentServices); disabledDependentServices);
} }