HDFS-13443. RBF: Update mount table cache immediately after changing (add/update/remove) mount table entries. Contributed by Mohammad Arshad.

This commit is contained in:
Yiqun Lin 2018-12-19 11:40:00 +08:00 committed by Brahma Reddy Battula
parent 3d97142dff
commit 8f6f9d9c83
28 changed files with 1402 additions and 6 deletions

View File

@ -37,6 +37,8 @@ import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProt
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetSafeModeResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.LeaveSafeModeRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.LeaveSafeModeResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryRequestProto;
@ -58,6 +60,8 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeReques
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
@ -78,6 +82,8 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetSafeMo
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetSafeModeResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.LeaveSafeModeRequestPBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.LeaveSafeModeResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RefreshMountTableEntriesRequestPBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RefreshMountTableEntriesResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryRequestPBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.UpdateMountTableEntryRequestPBImpl;
@ -275,4 +281,21 @@ public class RouterAdminProtocolServerSideTranslatorPB implements
throw new ServiceException(e);
}
}
@Override
public RefreshMountTableEntriesResponseProto refreshMountTableEntries(
RpcController controller, RefreshMountTableEntriesRequestProto request)
throws ServiceException {
try {
RefreshMountTableEntriesRequest req =
new RefreshMountTableEntriesRequestPBImpl(request);
RefreshMountTableEntriesResponse response =
server.refreshMountTableEntries(req);
RefreshMountTableEntriesResponsePBImpl responsePB =
(RefreshMountTableEntriesResponsePBImpl) response;
return responsePB.getProto();
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -38,6 +38,8 @@ import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProt
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetSafeModeResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.LeaveSafeModeRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.LeaveSafeModeResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryRequestProto;
@ -61,6 +63,8 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeReques
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
@ -77,6 +81,8 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetMountT
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetMountTableEntriesResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetSafeModeResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.LeaveSafeModeResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RefreshMountTableEntriesRequestPBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RefreshMountTableEntriesResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryRequestPBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.UpdateMountTableEntryRequestPBImpl;
@ -267,4 +273,19 @@ public class RouterAdminProtocolTranslatorPB
throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
}
}
@Override
public RefreshMountTableEntriesResponse refreshMountTableEntries(
RefreshMountTableEntriesRequest request) throws IOException {
RefreshMountTableEntriesRequestPBImpl requestPB =
(RefreshMountTableEntriesRequestPBImpl) request;
RefreshMountTableEntriesRequestProto proto = requestPB.getProto();
try {
RefreshMountTableEntriesResponseProto response =
rpcProxy.refreshMountTableEntries(null, proto);
return new RefreshMountTableEntriesResponsePBImpl(response);
} catch (ServiceException e) {
throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
}
}
}

View File

@ -23,6 +23,8 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntr
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
@ -77,4 +79,18 @@ public interface MountTableManager {
*/
GetMountTableEntriesResponse getMountTableEntries(
GetMountTableEntriesRequest request) throws IOException;
/**
* Refresh mount table entries cache from the state store. Cache is updated
* periodically but with this API cache can be refreshed immediately. This API
* is primarily meant to be called from the Admin Server. Admin Server will
* call this API and refresh mount table cache of all the routers while
* changing mount table entries.
*
* @param request Fully populated request object.
* @return True the mount table entry was updated without any error.
* @throws IOException Throws exception if the data store is not initialized.
*/
RefreshMountTableEntriesResponse refreshMountTableEntries(
RefreshMountTableEntriesRequest request) throws IOException;
}

View File

@ -0,0 +1,289 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.service.AbstractService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* This service is invoked from {@link MountTableStore} when there is change in
* mount table entries and it updates mount table entry cache on local router as
* well as on all remote routers. Refresh on local router is done by calling
* {@link MountTableStore#loadCache(boolean)}} API directly, no RPC call
* involved, but on remote routers refresh is done through RouterClient(RPC
* call). To improve performance, all routers are refreshed in separate thread
* and all connection are cached. Cached connections are removed from
* cache and closed when their max live time is elapsed.
*/
public class MountTableRefresherService extends AbstractService {
private static final String ROUTER_CONNECT_ERROR_MSG =
"Router {} connection failed. Mount table cache will not refesh.";
private static final Logger LOG =
LoggerFactory.getLogger(MountTableRefresherService.class);
/** Local router. */
private final Router router;
/** Mount table store. */
private MountTableStore mountTableStore;
/** Local router admin address in the form of host:port. */
private String localAdminAdress;
/** Timeout in ms to update mount table cache on all the routers. */
private long cacheUpdateTimeout;
/**
* All router admin clients cached. So no need to create the client again and
* again. Router admin address(host:port) is used as key to cache RouterClient
* objects.
*/
private LoadingCache<String, RouterClient> routerClientsCache;
/**
* Removes expired RouterClient from routerClientsCache.
*/
private ScheduledExecutorService clientCacheCleanerScheduler;
/**
* Create a new service to refresh mount table cache when there is change in
* mount table entries.
*
* @param router whose mount table cache will be refreshed
*/
public MountTableRefresherService(Router router) {
super(MountTableRefresherService.class.getSimpleName());
this.router = router;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
this.mountTableStore = getMountTableStore();
// attach this service to mount table store.
this.mountTableStore.setRefreshService(this);
this.localAdminAdress =
StateStoreUtils.getHostPortString(router.getAdminServerAddress());
this.cacheUpdateTimeout = conf.getTimeDuration(
RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_TIMEOUT,
RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
long routerClientMaxLiveTime = conf.getTimeDuration(
RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_CLIENT_MAX_TIME,
RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_CLIENT_MAX_TIME_DEFAULT,
TimeUnit.MILLISECONDS);
routerClientsCache = CacheBuilder.newBuilder()
.expireAfterWrite(routerClientMaxLiveTime, TimeUnit.MILLISECONDS)
.removalListener(getClientRemover()).build(getClientCreator());
initClientCacheCleaner(routerClientMaxLiveTime);
}
private void initClientCacheCleaner(long routerClientMaxLiveTime) {
clientCacheCleanerScheduler =
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
.setNameFormat("MountTableRefresh_ClientsCacheCleaner")
.setDaemon(true).build());
/*
* When cleanUp() method is called, expired RouterClient will be removed and
* closed.
*/
clientCacheCleanerScheduler.scheduleWithFixedDelay(
() -> routerClientsCache.cleanUp(), routerClientMaxLiveTime,
routerClientMaxLiveTime, TimeUnit.MILLISECONDS);
}
/**
* Create cache entry remove listener.
*/
private RemovalListener<String, RouterClient> getClientRemover() {
return new RemovalListener<String, RouterClient>() {
@Override
public void onRemoval(
RemovalNotification<String, RouterClient> notification) {
closeRouterClient(notification.getValue());
}
};
}
@VisibleForTesting
protected void closeRouterClient(RouterClient client) {
try {
client.close();
} catch (IOException e) {
LOG.error("Error while closing RouterClient", e);
}
}
/**
* Creates RouterClient and caches it.
*/
private CacheLoader<String, RouterClient> getClientCreator() {
return new CacheLoader<String, RouterClient>() {
public RouterClient load(String adminAddress) throws IOException {
InetSocketAddress routerSocket =
NetUtils.createSocketAddr(adminAddress);
Configuration config = getConfig();
return createRouterClient(routerSocket, config);
}
};
}
@VisibleForTesting
protected RouterClient createRouterClient(InetSocketAddress routerSocket,
Configuration config) throws IOException {
return new RouterClient(routerSocket, config);
}
@Override
protected void serviceStart() throws Exception {
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
super.serviceStop();
clientCacheCleanerScheduler.shutdown();
// remove and close all admin clients
routerClientsCache.invalidateAll();
}
private MountTableStore getMountTableStore() throws IOException {
MountTableStore mountTblStore =
router.getStateStore().getRegisteredRecordStore(MountTableStore.class);
if (mountTblStore == null) {
throw new IOException("Mount table state store is not available.");
}
return mountTblStore;
}
/**
* Refresh mount table cache of this router as well as all other routers.
*/
public void refresh() throws StateStoreUnavailableException {
List<RouterState> cachedRecords =
router.getRouterStateManager().getCachedRecords();
List<MountTableRefresherThread> refreshThreads = new ArrayList<>();
for (RouterState routerState : cachedRecords) {
String adminAddress = routerState.getAdminAddress();
if (adminAddress == null || adminAddress.length() == 0) {
// this router has not enabled router admin
continue;
}
// No use of calling refresh on router which is not running state
if (routerState.getStatus() != RouterServiceState.RUNNING) {
LOG.info(
"Router {} is not running. Mount table cache will not refesh.");
// remove if RouterClient is cached.
removeFromCache(adminAddress);
} else if (isLocalAdmin(adminAddress)) {
/*
* Local router's cache update does not require RPC call, so no need for
* RouterClient
*/
refreshThreads.add(getLocalRefresher(adminAddress));
} else {
try {
RouterClient client = routerClientsCache.get(adminAddress);
refreshThreads.add(new MountTableRefresherThread(
client.getMountTableManager(), adminAddress));
} catch (ExecutionException execExcep) {
// Can not connect, seems router is stopped now.
LOG.warn(ROUTER_CONNECT_ERROR_MSG, adminAddress, execExcep);
}
}
}
if (!refreshThreads.isEmpty()) {
invokeRefresh(refreshThreads);
}
}
@VisibleForTesting
protected MountTableRefresherThread getLocalRefresher(String adminAddress) {
return new MountTableRefresherThread(router.getAdminServer(), adminAddress);
}
private void removeFromCache(String adminAddress) {
routerClientsCache.invalidate(adminAddress);
}
private void invokeRefresh(List<MountTableRefresherThread> refreshThreads) {
CountDownLatch countDownLatch = new CountDownLatch(refreshThreads.size());
// start all the threads
for (MountTableRefresherThread refThread : refreshThreads) {
refThread.setCountDownLatch(countDownLatch);
refThread.start();
}
try {
/*
* Wait for all the thread to complete, await method returns false if
* refresh is not finished within specified time
*/
boolean allReqCompleted =
countDownLatch.await(cacheUpdateTimeout, TimeUnit.MILLISECONDS);
if (!allReqCompleted) {
LOG.warn("Not all router admins updated their cache");
}
} catch (InterruptedException e) {
LOG.error("Mount table cache refresher was interrupted.", e);
}
logResult(refreshThreads);
}
private boolean isLocalAdmin(String adminAddress) {
return adminAddress.contentEquals(localAdminAdress);
}
private void logResult(List<MountTableRefresherThread> refreshThreads) {
int succesCount = 0;
int failureCount = 0;
for (MountTableRefresherThread mountTableRefreshThread : refreshThreads) {
if (mountTableRefreshThread.isSuccess()) {
succesCount++;
} else {
failureCount++;
// remove RouterClient from cache so that new client is created
removeFromCache(mountTableRefreshThread.getAdminAddress());
}
}
LOG.info("Mount table entries cache refresh succesCount={},failureCount={}",
succesCount, failureCount);
}
}

View File

@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Base class for updating mount table cache on all the router.
*/
public class MountTableRefresherThread extends Thread {
private static final Logger LOG =
LoggerFactory.getLogger(MountTableRefresherThread.class);
private boolean success;
/** Admin server on which refreshed to be invoked. */
private String adminAddress;
private CountDownLatch countDownLatch;
private MountTableManager manager;
public MountTableRefresherThread(MountTableManager manager,
String adminAddress) {
this.manager = manager;
this.adminAddress = adminAddress;
setName("MountTableRefresh_" + adminAddress);
setDaemon(true);
}
/**
* Refresh mount table cache of local and remote routers. Local and remote
* routers will be refreshed differently. Lets understand what are the
* local and remote routers and refresh will be done differently on these
* routers. Suppose there are three routers R1, R2 and R3. User want to add
* new mount table entry. He will connect to only one router, not all the
* routers. Suppose He connects to R1 and calls add mount table entry through
* API or CLI. Now in this context R1 is local router, R2 and R3 are remote
* routers. Because add mount table entry is invoked on R1, R1 will update the
* cache locally it need not to make RPC call. But R1 will make RPC calls to
* update cache on R2 and R3.
*/
@Override
public void run() {
try {
RefreshMountTableEntriesResponse refreshMountTableEntries =
manager.refreshMountTableEntries(
RefreshMountTableEntriesRequest.newInstance());
success = refreshMountTableEntries.getResult();
} catch (IOException e) {
LOG.error("Failed to refresh mount table entries cache at router {}",
adminAddress, e);
} finally {
countDownLatch.countDown();
}
}
/**
* @return true if cache was refreshed successfully.
*/
public boolean isSuccess() {
return success;
}
public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public String toString() {
return "MountTableRefreshThread [success=" + success + ", adminAddress="
+ adminAddress + "]";
}
public String getAdminAddress() {
return adminAddress;
}
}

View File

@ -204,6 +204,31 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
FEDERATION_ROUTER_PREFIX + "mount-table.max-cache-size";
/** Remove cache entries if we have more than 10k. */
public static final int FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE_DEFAULT = 10000;
/**
* If true then cache updated immediately after mount table entry change
* otherwise it is updated periodically based configuration.
*/
public static final String MOUNT_TABLE_CACHE_UPDATE =
FEDERATION_ROUTER_PREFIX + "mount-table.cache.update";
public static final boolean MOUNT_TABLE_CACHE_UPDATE_DEFAULT =
false;
/**
* Timeout to update mount table cache on all the routers.
*/
public static final String MOUNT_TABLE_CACHE_UPDATE_TIMEOUT =
FEDERATION_ROUTER_PREFIX + "mount-table.cache.update.timeout";
public static final long MOUNT_TABLE_CACHE_UPDATE_TIMEOUT_DEFAULT =
TimeUnit.MINUTES.toMillis(1);
/**
* Remote router mount table cache is updated through RouterClient(RPC call).
* To improve performance, RouterClient connections are cached but it should
* not be kept in cache forever. This property defines the max time a
* connection can be cached.
*/
public static final String MOUNT_TABLE_CACHE_UPDATE_CLIENT_MAX_TIME =
FEDERATION_ROUTER_PREFIX + "mount-table.cache.update.client.max.time";
public static final long MOUNT_TABLE_CACHE_UPDATE_CLIENT_MAX_TIME_DEFAULT =
TimeUnit.MINUTES.toMillis(5);
public static final String FEDERATION_MOUNT_TABLE_CACHE_ENABLE =
FEDERATION_ROUTER_PREFIX + "mount-table.cache.enable";
public static final boolean FEDERATION_MOUNT_TABLE_CACHE_ENABLE_DEFAULT =

View File

@ -254,9 +254,50 @@ public class Router extends CompositeService {
addService(this.safemodeService);
}
/*
* Refresh mount table cache immediately after adding, modifying or deleting
* the mount table entries. If this service is not enabled mount table cache
* are refreshed periodically by StateStoreCacheUpdateService
*/
if (conf.getBoolean(RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE,
RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_DEFAULT)) {
// There is no use of starting refresh service if state store and admin
// servers are not enabled
String disabledDependentServices = getDisabledDependentServices();
/*
* disabledDependentServices null means all dependent services are
* enabled.
*/
if (disabledDependentServices == null) {
MountTableRefresherService refreshService =
new MountTableRefresherService(this);
addService(refreshService);
LOG.info("Service {} is enabled.",
MountTableRefresherService.class.getSimpleName());
} else {
LOG.warn(
"Service {} not enabled: depenendent service(s) {} not enabled.",
MountTableRefresherService.class.getSimpleName(),
disabledDependentServices);
}
}
super.serviceInit(conf);
}
private String getDisabledDependentServices() {
if (this.stateStore == null && this.adminServer == null) {
return StateStoreService.class.getSimpleName() + ","
+ RouterAdminServer.class.getSimpleName();
} else if (this.stateStore == null) {
return StateStoreService.class.getSimpleName();
} else if (this.adminServer == null) {
return RouterAdminServer.class.getSimpleName();
}
return null;
}
/**
* Returns the hostname for this Router. If the hostname is not
* explicitly configured in the given config, then it is determined.
@ -696,9 +737,19 @@ public class Router extends CompositeService {
}
/**
* Get the Router safe mode service
* Get the Router safe mode service.
*/
RouterSafemodeService getSafemodeService() {
return this.safemodeService;
}
/**
* Get router admin server.
*
* @return Null if admin is not enabled.
*/
public RouterAdminServer getAdminServer() {
return adminServer;
}
}

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.store.DisabledNameserviceStore;
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreCache;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.DisableNameserviceRequest;
@ -55,6 +56,8 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeReques
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
@ -102,6 +105,7 @@ public class RouterAdminServer extends AbstractService
private static String routerOwner;
private static String superGroup;
private static boolean isPermissionEnabled;
private boolean iStateStoreCache;
public RouterAdminServer(Configuration conf, Router router)
throws IOException {
@ -154,6 +158,8 @@ public class RouterAdminServer extends AbstractService
this.adminAddress = new InetSocketAddress(
confRpcAddress.getHostName(), listenAddress.getPort());
router.setAdminServerAddress(this.adminAddress);
iStateStoreCache =
router.getSubclusterResolver() instanceof StateStoreCache;
}
/**
@ -243,7 +249,7 @@ public class RouterAdminServer extends AbstractService
getMountTableStore().updateMountTableEntry(request);
MountTable mountTable = request.getEntry();
if (mountTable != null) {
if (mountTable != null && router.isQuotaEnabled()) {
synchronizeQuota(mountTable);
}
return response;
@ -331,6 +337,26 @@ public class RouterAdminServer extends AbstractService
return GetSafeModeResponse.newInstance(isInSafeMode);
}
@Override
public RefreshMountTableEntriesResponse refreshMountTableEntries(
RefreshMountTableEntriesRequest request) throws IOException {
if (iStateStoreCache) {
/*
* MountTableResolver updates MountTableStore cache also. Expecting other
* SubclusterResolver implementations to update MountTableStore cache also
* apart from updating its cache.
*/
boolean result = ((StateStoreCache) this.router.getSubclusterResolver())
.loadCache(true);
RefreshMountTableEntriesResponse response =
RefreshMountTableEntriesResponse.newInstance();
response.setResult(result);
return response;
} else {
return getMountTableStore().refreshMountTableEntries(request);
}
}
/**
* Verify if Router set safe mode state correctly.
* @param isInSafeMode Expected state to be set.

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
import org.apache.hadoop.hdfs.server.federation.store.RecordStore;
import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
@ -91,6 +92,10 @@ public class RouterHeartbeatService extends PeriodicService {
getStateStoreVersion(MembershipStore.class),
getStateStoreVersion(MountTableStore.class));
record.setStateStoreVersion(stateStoreVersion);
// if admin server not started then hostPort will be empty
String hostPort =
StateStoreUtils.getHostPortString(router.getAdminServerAddress());
record.setAdminAddress(hostPort);
RouterHeartbeatRequest request =
RouterHeartbeatRequest.newInstance(record);
RouterHeartbeatResponse response = routerStore.routerHeartbeat(request);

View File

@ -20,8 +20,11 @@ package org.apache.hadoop.hdfs.server.federation.store;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.router.MountTableRefresherService;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Management API for the HDFS mount table information stored in
@ -42,8 +45,29 @@ import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
@InterfaceStability.Evolving
public abstract class MountTableStore extends CachedRecordStore<MountTable>
implements MountTableManager {
private static final Logger LOG =
LoggerFactory.getLogger(MountTableStore.class);
private MountTableRefresherService refreshService;
public MountTableStore(StateStoreDriver driver) {
super(MountTable.class, driver);
}
public void setRefreshService(MountTableRefresherService refreshService) {
this.refreshService = refreshService;
}
/**
* Update mount table cache of this router as well as all other routers.
*/
protected void updateCacheAllRouters() {
if (refreshService != null) {
try {
refreshService.refresh();
} catch (StateStoreUnavailableException e) {
LOG.error("Cannot refresh mount table: state store not available", e);
}
}
}
}

View File

@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hdfs.server.federation.store;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
@ -110,4 +113,27 @@ public final class StateStoreUtils {
}
return matchingList;
}
/**
* Returns address in form of host:port, empty string if address is null.
*
* @param address address
* @return host:port
*/
public static String getHostPortString(InetSocketAddress address) {
if (null == address) {
return "";
}
String hostName = address.getHostName();
if (hostName.equals("0.0.0.0")) {
try {
hostName = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
LOG.error("Failed to get local host name", e);
return "";
}
}
return hostName + ":" + address.getPort();
}
}

View File

@ -33,6 +33,8 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntr
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
@ -68,6 +70,7 @@ public class MountTableStoreImpl extends MountTableStore {
AddMountTableEntryResponse response =
AddMountTableEntryResponse.newInstance();
response.setStatus(status);
updateCacheAllRouters();
return response;
}
@ -86,6 +89,7 @@ public class MountTableStoreImpl extends MountTableStore {
UpdateMountTableEntryResponse response =
UpdateMountTableEntryResponse.newInstance();
response.setStatus(status);
updateCacheAllRouters();
return response;
}
@ -110,6 +114,7 @@ public class MountTableStoreImpl extends MountTableStore {
RemoveMountTableEntryResponse response =
RemoveMountTableEntryResponse.newInstance();
response.setStatus(status);
updateCacheAllRouters();
return response;
}
@ -151,4 +156,17 @@ public class MountTableStoreImpl extends MountTableStore {
response.setTimestamp(Time.now());
return response;
}
@Override
public RefreshMountTableEntriesResponse refreshMountTableEntries(
RefreshMountTableEntriesRequest request) throws IOException {
// Because this refresh is done through admin API, it should always be force
// refresh.
boolean result = loadCache(true);
RefreshMountTableEntriesResponse response =
RefreshMountTableEntriesResponse.newInstance();
response.setResult(result);
return response;
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol;
import java.io.IOException;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
/**
* API request for refreshing mount table cached entries from state store.
*/
public abstract class RefreshMountTableEntriesRequest {
public static RefreshMountTableEntriesRequest newInstance()
throws IOException {
return StateStoreSerializer
.newRecord(RefreshMountTableEntriesRequest.class);
}
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
/**
* API response for refreshing mount table entries cache from state store.
*/
public abstract class RefreshMountTableEntriesResponse {
public static RefreshMountTableEntriesResponse newInstance()
throws IOException {
return StateStoreSerializer
.newRecord(RefreshMountTableEntriesResponse.class);
}
@Public
@Unstable
public abstract boolean getResult();
@Public
@Unstable
public abstract void setResult(boolean result);
}

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb;
import java.io.IOException;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesRequestProto.Builder;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesRequestProtoOrBuilder;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
import com.google.protobuf.Message;
/**
* Protobuf implementation of the state store API object
* RefreshMountTableEntriesRequest.
*/
public class RefreshMountTableEntriesRequestPBImpl
extends RefreshMountTableEntriesRequest implements PBRecord {
private FederationProtocolPBTranslator<RefreshMountTableEntriesRequestProto,
Builder, RefreshMountTableEntriesRequestProtoOrBuilder> translator =
new FederationProtocolPBTranslator<>(
RefreshMountTableEntriesRequestProto.class);
public RefreshMountTableEntriesRequestPBImpl() {
}
public RefreshMountTableEntriesRequestPBImpl(
RefreshMountTableEntriesRequestProto proto) {
this.translator.setProto(proto);
}
@Override
public RefreshMountTableEntriesRequestProto getProto() {
// if builder is null build() returns null, calling getBuilder() to
// instantiate builder
this.translator.getBuilder();
return this.translator.build();
}
@Override
public void setProto(Message proto) {
this.translator.setProto(proto);
}
@Override
public void readInstance(String base64String) throws IOException {
this.translator.readInstance(base64String);
}
}

View File

@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb;
import java.io.IOException;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesResponseProto.Builder;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesResponseProtoOrBuilder;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
import com.google.protobuf.Message;
/**
* Protobuf implementation of the state store API object
* RefreshMountTableEntriesResponse.
*/
public class RefreshMountTableEntriesResponsePBImpl
extends RefreshMountTableEntriesResponse implements PBRecord {
private FederationProtocolPBTranslator<RefreshMountTableEntriesResponseProto,
Builder, RefreshMountTableEntriesResponseProtoOrBuilder> translator =
new FederationProtocolPBTranslator<>(
RefreshMountTableEntriesResponseProto.class);
public RefreshMountTableEntriesResponsePBImpl() {
}
public RefreshMountTableEntriesResponsePBImpl(
RefreshMountTableEntriesResponseProto proto) {
this.translator.setProto(proto);
}
@Override
public RefreshMountTableEntriesResponseProto getProto() {
return this.translator.build();
}
@Override
public void setProto(Message proto) {
this.translator.setProto(proto);
}
@Override
public void readInstance(String base64String) throws IOException {
this.translator.readInstance(base64String);
}
@Override
public boolean getResult() {
return this.translator.getProtoOrBuilder().getResult();
};
@Override
public void setResult(boolean result) {
this.translator.getBuilder().setResult(result);
}
}

View File

@ -88,6 +88,10 @@ public abstract class RouterState extends BaseRecord {
public abstract long getDateStarted();
public abstract void setAdminAddress(String adminAddress);
public abstract String getAdminAddress();
/**
* Get the identifier for the Router. It uses the address.
*

View File

@ -199,4 +199,14 @@ public class RouterStatePBImpl extends RouterState implements PBRecord {
public long getDateCreated() {
return this.translator.getProtoOrBuilder().getDateCreated();
}
@Override
public void setAdminAddress(String adminAddress) {
this.translator.getBuilder().setAdminAddress(adminAddress);
}
@Override
public String getAdminAddress() {
return this.translator.getProtoOrBuilder().getAdminAddress();
}
}

View File

@ -54,6 +54,8 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeReques
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
@ -107,7 +109,8 @@ public class RouterAdmin extends Configured implements Tool {
if (cmd == null) {
String[] commands =
{"-add", "-update", "-rm", "-ls", "-setQuota", "-clrQuota",
"-safemode", "-nameservice", "-getDisabledNameservices"};
"-safemode", "-nameservice", "-getDisabledNameservices",
"-refresh"};
StringBuilder usage = new StringBuilder();
usage.append("Usage: hdfs dfsrouteradmin :\n");
for (int i = 0; i < commands.length; i++) {
@ -142,6 +145,8 @@ public class RouterAdmin extends Configured implements Tool {
return "\t[-nameservice enable | disable <nameservice>]";
} else if (cmd.equals("-getDisabledNameservices")) {
return "\t[-getDisabledNameservices]";
} else if (cmd.equals("-refresh")) {
return "\t[-refresh]";
}
return getUsage(null);
}
@ -230,9 +235,10 @@ public class RouterAdmin extends Configured implements Tool {
printUsage(cmd);
return exitCode;
}
String address = null;
// Initialize RouterClient
try {
String address = getConf().getTrimmed(
address = getConf().getTrimmed(
RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY,
RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT);
InetSocketAddress routerSocket = NetUtils.createSocketAddr(address);
@ -302,6 +308,8 @@ public class RouterAdmin extends Configured implements Tool {
manageNameservice(subcmd, nsId);
} else if ("-getDisabledNameservices".equals(cmd)) {
getDisabledNameservices();
} else if ("-refresh".equals(cmd)) {
refresh(address);
} else {
throw new IllegalArgumentException("Unknown Command: " + cmd);
}
@ -337,6 +345,27 @@ public class RouterAdmin extends Configured implements Tool {
return exitCode;
}
private void refresh(String address) throws IOException {
if (refreshRouterCache()) {
System.out.println(
"Successfully updated mount table cache on router " + address);
}
}
/**
* Refresh mount table cache on connected router.
*
* @return true if cache refreshed successfully
* @throws IOException
*/
private boolean refreshRouterCache() throws IOException {
RefreshMountTableEntriesResponse response =
client.getMountTableManager().refreshMountTableEntries(
RefreshMountTableEntriesRequest.newInstance());
return response.getResult();
}
/**
* Add a mount table entry or update if it exists.
*

View File

@ -193,6 +193,7 @@ message RouterRecordProto {
optional string version = 6;
optional string compileInfo = 7;
optional uint64 dateStarted = 8;
optional string adminAddress = 9;
}
message GetRouterRegistrationRequestProto {
@ -219,6 +220,13 @@ message RouterHeartbeatResponseProto {
optional bool status = 1;
}
message RefreshMountTableEntriesRequestProto {
}
message RefreshMountTableEntriesResponseProto {
optional bool result = 1;
}
/////////////////////////////////////////////////
// Route State
/////////////////////////////////////////////////

View File

@ -74,4 +74,9 @@ service RouterAdminProtocolService {
* Get the list of disabled name services.
*/
rpc getDisabledNameservices(GetDisabledNameservicesRequestProto) returns (GetDisabledNameservicesResponseProto);
/**
* Refresh mount entries
*/
rpc refreshMountTableEntries(RefreshMountTableEntriesRequestProto) returns(RefreshMountTableEntriesResponseProto);
}

View File

@ -547,4 +547,38 @@
</description>
</property>
<property>
<name>dfs.federation.router.mount-table.cache.update</name>
<value>false</value>
<description>Set true to enable MountTableRefreshService. This service
updates mount table cache immediately after adding, modifying or
deleting the mount table entries. If this service is not enabled
mount table cache are refreshed periodically by
StateStoreCacheUpdateService
</description>
</property>
<property>
<name>dfs.federation.router.mount-table.cache.update.timeout</name>
<value>1m</value>
<description>This property defines how long to wait for all the
admin servers to finish their mount table cache update. This setting
supports multiple time unit suffixes as described in
dfs.federation.router.safemode.extension.
</description>
</property>
<property>
<name>dfs.federation.router.mount-table.cache.update.client.max.time
</name>
<value>5m</value>
<description>Remote router mount table cache is updated through
RouterClient(RPC call). To improve performance, RouterClient
connections are cached but it should not be kept in cache forever.
This property defines the max time a connection can be cached. This
setting supports multiple time unit suffixes as described in
dfs.federation.router.safemode.extension.
</description>
</property>
</configuration>

View File

@ -230,6 +230,12 @@ Ls command will show below information for each mount table entry:
Source Destinations Owner Group Mode Quota/Usage
/path ns0->/path root supergroup rwxr-xr-x [NsQuota: 50/0, SsQuota: 100 B/0 B]
Mount table cache is refreshed periodically but it can also be refreshed by executing refresh command:
[hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -refresh
The above command will refresh cache of the connected router. This command is redundant when mount table refresh service is enabled as the service will always keep the cache updated.
#### Multiple subclusters
A mount point also supports mapping multiple subclusters.
For example, to create a mount point that stores files in subclusters `ns1` and `ns2`.
@ -380,6 +386,9 @@ The connection to the State Store and the internal caching at the Router.
| dfs.federation.router.store.connection.test | 60000 | How often to check for the connection to the State Store in milliseconds. |
| dfs.federation.router.cache.ttl | 60000 | How often to refresh the State Store caches in milliseconds. |
| dfs.federation.router.store.membership.expiration | 300000 | Expiration time in milliseconds for a membership record. |
| dfs.federation.router.mount-table.cache.update | false | If true, Mount table cache is updated whenever a mount table entry is added, modified or removed for all the routers. |
| dfs.federation.router.mount-table.cache.update.timeout | 1m | Max time to wait for all the routers to finish their mount table cache update. |
| dfs.federation.router.mount-table.cache.update.client.max.time | 5m | Max time a RouterClient connection can be cached. |
### Routing

View File

@ -56,6 +56,8 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.test.GenericTestUtils;
@ -316,4 +318,29 @@ public final class FederationTestUtils {
}).when(spyHAContext).checkOperation(any(OperationCategory.class));
Whitebox.setInternalState(namesystem, "haContext", spyHAContext);
}
/**
* Wait for a number of routers to be registered in state store.
*
* @param stateManager number of routers to be registered.
* @param routerCount number of routers to be registered.
* @param tiemout max wait time in ms
*/
public static void waitRouterRegistered(RouterStore stateManager,
long routerCount, int timeout) throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
List<RouterState> cachedRecords = stateManager.getCachedRecords();
if (cachedRecords.size() == routerCount) {
return true;
}
} catch (IOException e) {
// Ignore
}
return false;
}
}, 100, timeout);
}
}

View File

@ -38,6 +38,7 @@ public class RouterConfigBuilder {
private boolean enableMetrics = false;
private boolean enableQuota = false;
private boolean enableSafemode = false;
private boolean enableCacheRefresh;
public RouterConfigBuilder(Configuration configuration) {
this.conf = configuration;
@ -104,6 +105,11 @@ public class RouterConfigBuilder {
return this;
}
public RouterConfigBuilder refreshCache(boolean enable) {
this.enableCacheRefresh = enable;
return this;
}
public RouterConfigBuilder rpc() {
return this.rpc(true);
}
@ -140,6 +146,10 @@ public class RouterConfigBuilder {
return this.safemode(true);
}
public RouterConfigBuilder refreshCache() {
return this.refreshCache(true);
}
public Configuration build() {
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_STORE_ENABLE,
this.enableStateStore);
@ -158,6 +168,8 @@ public class RouterConfigBuilder {
this.enableQuota);
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE,
this.enableSafemode);
conf.setBoolean(RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE,
this.enableCacheRefresh);
return conf;
}
}

View File

@ -799,6 +799,28 @@ public class TestRouterAdminCLI {
assertTrue(err.toString().contains("No arguments allowed"));
}
@Test
public void testRefreshMountTableCache() throws Exception {
String src = "/refreshMount";
// create mount table entry
String[] argv = new String[] {"-add", src, "refreshNS0", "/refreshDest"};
assertEquals(0, ToolRunner.run(admin, argv));
// refresh the mount table entry cache
System.setOut(new PrintStream(out));
argv = new String[] {"-refresh"};
assertEquals(0, ToolRunner.run(admin, argv));
assertTrue(
out.toString().startsWith("Successfully updated mount table cache"));
// Now ls should return that mount table entry
out.reset();
argv = new String[] {"-ls", src};
assertEquals(0, ToolRunner.run(admin, argv));
assertTrue(out.toString().contains(src));
}
/**
* Wait for the Router transforming to expected state.
* @param expectedState Expected Router state.
@ -836,8 +858,7 @@ public class TestRouterAdminCLI {
}
@Test
public void testUpdateDestinationForExistingMountTable() throws
Exception {
public void testUpdateDestinationForExistingMountTable() throws Exception {
// Add a mount table firstly
String nsId = "ns0";
String src = "/test-updateDestinationForExistingMountTable";

View File

@ -0,0 +1,396 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.server.federation.FederationTestUtils;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* This test class verifies that mount table cache is updated on all the routers
* when MountTableRefreshService is enabled and there is a change in mount table
* entries.
*/
public class TestRouterMountTableCacheRefresh {
private static TestingServer curatorTestingServer;
private static MiniRouterDFSCluster cluster;
private static RouterContext routerContext;
private static MountTableManager mountTableManager;
@BeforeClass
public static void setUp() throws Exception {
curatorTestingServer = new TestingServer();
curatorTestingServer.start();
final String connectString = curatorTestingServer.getConnectString();
int numNameservices = 2;
cluster = new MiniRouterDFSCluster(false, numNameservices);
Configuration conf = new RouterConfigBuilder().refreshCache().admin().rpc()
.heartbeat().build();
conf.setClass(RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS_DEFAULT,
FileSubclusterResolver.class);
conf.set(CommonConfigurationKeys.ZK_ADDRESS, connectString);
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_STORE_ENABLE, true);
cluster.addRouterOverrides(conf);
cluster.startCluster();
cluster.startRouters();
cluster.waitClusterUp();
routerContext = cluster.getRandomRouter();
RouterStore routerStateManager =
routerContext.getRouter().getRouterStateManager();
mountTableManager = routerContext.getAdminClient().getMountTableManager();
// wait for one minute for all the routers to get registered
FederationTestUtils.waitRouterRegistered(routerStateManager,
numNameservices, 60000);
}
@AfterClass
public static void destory() {
try {
curatorTestingServer.close();
cluster.shutdown();
} catch (IOException e) {
// do nothing
}
}
@After
public void tearDown() throws IOException {
clearEntries();
}
private void clearEntries() throws IOException {
List<MountTable> result = getMountTableEntries();
for (MountTable mountTable : result) {
RemoveMountTableEntryResponse removeMountTableEntry =
mountTableManager.removeMountTableEntry(RemoveMountTableEntryRequest
.newInstance(mountTable.getSourcePath()));
assertTrue(removeMountTableEntry.getStatus());
}
}
/**
* addMountTableEntry API should internally update the cache on all the
* routers.
*/
@Test
public void testMountTableEntriesCacheUpdatedAfterAddAPICall()
throws IOException {
// Existing mount table size
int existingEntriesCount = getNumMountTableEntries();
String srcPath = "/addPath";
MountTable newEntry = MountTable.newInstance(srcPath,
Collections.singletonMap("ns0", "/addPathDest"), Time.now(),
Time.now());
addMountTableEntry(mountTableManager, newEntry);
// When Add entry is done, all the routers must have updated its mount table
// entry
List<RouterContext> routers = getRouters();
for (RouterContext rc : routers) {
List<MountTable> result =
getMountTableEntries(rc.getAdminClient().getMountTableManager());
assertEquals(1 + existingEntriesCount, result.size());
MountTable mountTableResult = result.get(0);
assertEquals(srcPath, mountTableResult.getSourcePath());
}
}
/**
* removeMountTableEntry API should internally update the cache on all the
* routers.
*/
@Test
public void testMountTableEntriesCacheUpdatedAfterRemoveAPICall()
throws IOException {
// add
String srcPath = "/removePathSrc";
MountTable newEntry = MountTable.newInstance(srcPath,
Collections.singletonMap("ns0", "/removePathDest"), Time.now(),
Time.now());
addMountTableEntry(mountTableManager, newEntry);
int addCount = getNumMountTableEntries();
assertEquals(1, addCount);
// remove
RemoveMountTableEntryResponse removeMountTableEntry =
mountTableManager.removeMountTableEntry(
RemoveMountTableEntryRequest.newInstance(srcPath));
assertTrue(removeMountTableEntry.getStatus());
int removeCount = getNumMountTableEntries();
assertEquals(addCount - 1, removeCount);
}
/**
* updateMountTableEntry API should internally update the cache on all the
* routers.
*/
@Test
public void testMountTableEntriesCacheUpdatedAfterUpdateAPICall()
throws IOException {
// add
String srcPath = "/updatePathSrc";
MountTable newEntry = MountTable.newInstance(srcPath,
Collections.singletonMap("ns0", "/updatePathDest"), Time.now(),
Time.now());
addMountTableEntry(mountTableManager, newEntry);
int addCount = getNumMountTableEntries();
assertEquals(1, addCount);
// update
String key = "ns1";
String value = "/updatePathDest2";
MountTable upateEntry = MountTable.newInstance(srcPath,
Collections.singletonMap(key, value), Time.now(), Time.now());
UpdateMountTableEntryResponse updateMountTableEntry =
mountTableManager.updateMountTableEntry(
UpdateMountTableEntryRequest.newInstance(upateEntry));
assertTrue(updateMountTableEntry.getStatus());
MountTable updatedMountTable = getMountTableEntry(srcPath);
assertNotNull("Updated mount table entrty cannot be null",
updatedMountTable);
assertEquals(1, updatedMountTable.getDestinations().size());
assertEquals(key,
updatedMountTable.getDestinations().get(0).getNameserviceId());
assertEquals(value, updatedMountTable.getDestinations().get(0).getDest());
}
/**
* After caching RouterClient if router goes down, refresh should be
* successful on other available router. The router which is not running
* should be ignored.
*/
@Test
public void testCachedRouterClientBehaviourAfterRouterStoped()
throws IOException {
String srcPath = "/addPathClientCache";
MountTable newEntry = MountTable.newInstance(srcPath,
Collections.singletonMap("ns0", "/addPathClientCacheDest"), Time.now(),
Time.now());
addMountTableEntry(mountTableManager, newEntry);
// When Add entry is done, all the routers must have updated its mount table
// entry
List<RouterContext> routers = getRouters();
for (RouterContext rc : routers) {
List<MountTable> result =
getMountTableEntries(rc.getAdminClient().getMountTableManager());
assertEquals(1, result.size());
MountTable mountTableResult = result.get(0);
assertEquals(srcPath, mountTableResult.getSourcePath());
}
// Lets stop one router
for (RouterContext rc : routers) {
InetSocketAddress adminServerAddress =
rc.getRouter().getAdminServerAddress();
if (!routerContext.getRouter().getAdminServerAddress()
.equals(adminServerAddress)) {
cluster.stopRouter(rc);
break;
}
}
srcPath = "/addPathClientCache2";
newEntry = MountTable.newInstance(srcPath,
Collections.singletonMap("ns0", "/addPathClientCacheDest2"), Time.now(),
Time.now());
addMountTableEntry(mountTableManager, newEntry);
for (RouterContext rc : getRouters()) {
List<MountTable> result =
getMountTableEntries(rc.getAdminClient().getMountTableManager());
assertEquals(2, result.size());
}
}
private List<RouterContext> getRouters() {
List<RouterContext> result = new ArrayList<>();
for (RouterContext rc : cluster.getRouters()) {
if (rc.getRouter().getServiceState() == STATE.STARTED) {
result.add(rc);
}
}
return result;
}
@Test
public void testRefreshMountTableEntriesAPI() throws IOException {
RefreshMountTableEntriesRequest request =
RefreshMountTableEntriesRequest.newInstance();
RefreshMountTableEntriesResponse refreshMountTableEntriesRes =
mountTableManager.refreshMountTableEntries(request);
// refresh should be successful
assertTrue(refreshMountTableEntriesRes.getResult());
}
/**
* Verify cache update timeouts when any of the router takes more time than
* the configured timeout period.
*/
@Test(timeout = 10000)
public void testMountTableEntriesCacheUpdateTimeout() throws IOException {
// Resources will be closed when router is closed
@SuppressWarnings("resource")
MountTableRefresherService mountTableRefresherService =
new MountTableRefresherService(routerContext.getRouter()) {
@Override
protected MountTableRefresherThread getLocalRefresher(
String adminAddress) {
return new MountTableRefresherThread(null, adminAddress) {
@Override
public void run() {
try {
// Sleep 1 minute
Thread.sleep(60000);
} catch (InterruptedException e) {
// Do nothing
}
}
};
}
};
Configuration config = routerContext.getRouter().getConfig();
config.setTimeDuration(RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_TIMEOUT, 5,
TimeUnit.SECONDS);
mountTableRefresherService.init(config);
// One router is not responding for 1 minute, still refresh should
// finished in 5 second as cache update timeout is set 5 second.
mountTableRefresherService.refresh();
// Test case timeout is assert for this test case.
}
/**
* Verify Cached RouterClient connections are removed from cache and closed
* when their max live time is elapsed.
*/
@Test
public void testRouterClientConnectionExpiration() throws Exception {
final AtomicInteger createCounter = new AtomicInteger();
final AtomicInteger removeCounter = new AtomicInteger();
// Resources will be closed when router is closed
@SuppressWarnings("resource")
MountTableRefresherService mountTableRefresherService =
new MountTableRefresherService(routerContext.getRouter()) {
@Override
protected void closeRouterClient(RouterClient client) {
super.closeRouterClient(client);
removeCounter.incrementAndGet();
}
@Override
protected RouterClient createRouterClient(
InetSocketAddress routerSocket, Configuration config)
throws IOException {
createCounter.incrementAndGet();
return super.createRouterClient(routerSocket, config);
}
};
int clientCacheTime = 2000;
Configuration config = routerContext.getRouter().getConfig();
config.setTimeDuration(
RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_CLIENT_MAX_TIME, clientCacheTime,
TimeUnit.MILLISECONDS);
mountTableRefresherService.init(config);
// Do refresh to created RouterClient
mountTableRefresherService.refresh();
assertNotEquals("No RouterClient is created.", 0, createCounter.get());
/*
* Wait for clients to expire. Lets wait triple the cache eviction period.
* After cache eviction period all created client must be removed and
* closed.
*/
GenericTestUtils.waitFor(() -> createCounter.get() == removeCounter.get(),
100, 3 * clientCacheTime);
}
private int getNumMountTableEntries() throws IOException {
List<MountTable> records = getMountTableEntries();
int oldEntriesCount = records.size();
return oldEntriesCount;
}
private MountTable getMountTableEntry(String srcPath) throws IOException {
List<MountTable> mountTableEntries = getMountTableEntries();
for (MountTable mountTable : mountTableEntries) {
String sourcePath = mountTable.getSourcePath();
if (srcPath.equals(sourcePath)) {
return mountTable;
}
}
return null;
}
private void addMountTableEntry(MountTableManager mountTableMgr,
MountTable newEntry) throws IOException {
AddMountTableEntryRequest addRequest =
AddMountTableEntryRequest.newInstance(newEntry);
AddMountTableEntryResponse addResponse =
mountTableMgr.addMountTableEntry(addRequest);
assertTrue(addResponse.getStatus());
}
private List<MountTable> getMountTableEntries() throws IOException {
return getMountTableEntries(mountTableManager);
}
private List<MountTable> getMountTableEntries(
MountTableManager mountTableManagerParam) throws IOException {
GetMountTableEntriesRequest request =
GetMountTableEntriesRequest.newInstance("/");
return mountTableManagerParam.getMountTableEntries(request).getEntries();
}
}

View File

@ -437,6 +437,7 @@ Usage:
[-safemode enter | leave | get]
[-nameservice disable | enable <nameservice>]
[-getDisabledNameservices]
[-refresh]
| COMMAND\_OPTION | Description |
|:---- |:---- |
@ -449,6 +450,7 @@ Usage:
| `-safemode` `enter` `leave` `get` | Manually set the Router entering or leaving safe mode. The option *get* will be used for verifying if the Router is in safe mode state. |
| `-nameservice` `disable` `enable` *nameservice* | Disable/enable a name service from the federation. If disabled, requests will not go to that name service. |
| `-getDisabledNameservices` | Get the name services that are disabled in the federation. |
| `-refresh` | Update mount table cache of the connected router. |
The commands for managing Router-based federation. See [Mount table management](../hadoop-hdfs-rbf/HDFSRouterFederation.html#Mount_table_management) for more info.