diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolServerSideTranslatorPB.java index 6341ebd11a3..a31c46d2912 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolServerSideTranslatorPB.java @@ -37,6 +37,8 @@ import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetSafeModeResponseProto; import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.LeaveSafeModeRequestProto; import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.LeaveSafeModeResponseProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesRequestProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesResponseProto; import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryRequestProto; import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryResponseProto; import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryRequestProto; @@ -58,6 +60,8 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; @@ -78,6 +82,8 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetSafeModeResponsePBImpl; import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.LeaveSafeModeRequestPBImpl; import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.LeaveSafeModeResponsePBImpl; +import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RefreshMountTableEntriesRequestPBImpl; +import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RefreshMountTableEntriesResponsePBImpl; import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryRequestPBImpl; import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryResponsePBImpl; import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.UpdateMountTableEntryRequestPBImpl; @@ -275,4 +281,21 @@ public GetDisabledNameservicesResponseProto getDisabledNameservices( throw new ServiceException(e); } } + + @Override + public RefreshMountTableEntriesResponseProto refreshMountTableEntries( + RpcController controller, RefreshMountTableEntriesRequestProto request) + throws ServiceException { + try { + RefreshMountTableEntriesRequest req = + new RefreshMountTableEntriesRequestPBImpl(request); + RefreshMountTableEntriesResponse response = + server.refreshMountTableEntries(req); + RefreshMountTableEntriesResponsePBImpl responsePB = + (RefreshMountTableEntriesResponsePBImpl) response; + return responsePB.getProto(); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolTranslatorPB.java index 6e244381691..1fbb06d2a7f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolTranslatorPB.java @@ -38,6 +38,8 @@ import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetSafeModeResponseProto; import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.LeaveSafeModeRequestProto; import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.LeaveSafeModeResponseProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesRequestProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesResponseProto; import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryRequestProto; import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryResponseProto; import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryRequestProto; @@ -61,6 +63,8 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; @@ -77,6 +81,8 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetMountTableEntriesResponsePBImpl; import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetSafeModeResponsePBImpl; import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.LeaveSafeModeResponsePBImpl; +import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RefreshMountTableEntriesRequestPBImpl; +import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RefreshMountTableEntriesResponsePBImpl; import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryRequestPBImpl; import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryResponsePBImpl; import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.UpdateMountTableEntryRequestPBImpl; @@ -267,4 +273,19 @@ public GetDisabledNameservicesResponse getDisabledNameservices( throw new IOException(ProtobufHelper.getRemoteException(e).getMessage()); } } + + @Override + public RefreshMountTableEntriesResponse refreshMountTableEntries( + RefreshMountTableEntriesRequest request) throws IOException { + RefreshMountTableEntriesRequestPBImpl requestPB = + (RefreshMountTableEntriesRequestPBImpl) request; + RefreshMountTableEntriesRequestProto proto = requestPB.getProto(); + try { + RefreshMountTableEntriesResponseProto response = + rpcProxy.refreshMountTableEntries(null, proto); + return new RefreshMountTableEntriesResponsePBImpl(response); + } catch (ServiceException e) { + throw new IOException(ProtobufHelper.getRemoteException(e).getMessage()); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableManager.java index c2e4a5b4473..9a1e4160245 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableManager.java @@ -23,6 +23,8 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; @@ -77,4 +79,18 @@ RemoveMountTableEntryResponse removeMountTableEntry( */ GetMountTableEntriesResponse getMountTableEntries( GetMountTableEntriesRequest request) throws IOException; + + /** + * Refresh mount table entries cache from the state store. Cache is updated + * periodically but with this API cache can be refreshed immediately. This API + * is primarily meant to be called from the Admin Server. Admin Server will + * call this API and refresh mount table cache of all the routers while + * changing mount table entries. + * + * @param request Fully populated request object. + * @return True the mount table entry was updated without any error. + * @throws IOException Throws exception if the data store is not initialized. + */ + RefreshMountTableEntriesResponse refreshMountTableEntries( + RefreshMountTableEntriesRequest request) throws IOException; } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/MountTableRefresherService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/MountTableRefresherService.java new file mode 100644 index 00000000000..fafcef475a4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/MountTableRefresherService.java @@ -0,0 +1,289 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.store.MountTableStore; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; +import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.service.AbstractService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * This service is invoked from {@link MountTableStore} when there is change in + * mount table entries and it updates mount table entry cache on local router as + * well as on all remote routers. Refresh on local router is done by calling + * {@link MountTableStore#loadCache(boolean)}} API directly, no RPC call + * involved, but on remote routers refresh is done through RouterClient(RPC + * call). To improve performance, all routers are refreshed in separate thread + * and all connection are cached. Cached connections are removed from + * cache and closed when their max live time is elapsed. + */ +public class MountTableRefresherService extends AbstractService { + private static final String ROUTER_CONNECT_ERROR_MSG = + "Router {} connection failed. Mount table cache will not refesh."; + private static final Logger LOG = + LoggerFactory.getLogger(MountTableRefresherService.class); + + /** Local router. */ + private final Router router; + /** Mount table store. */ + private MountTableStore mountTableStore; + /** Local router admin address in the form of host:port. */ + private String localAdminAdress; + /** Timeout in ms to update mount table cache on all the routers. */ + private long cacheUpdateTimeout; + + /** + * All router admin clients cached. So no need to create the client again and + * again. Router admin address(host:port) is used as key to cache RouterClient + * objects. + */ + private LoadingCache routerClientsCache; + + /** + * Removes expired RouterClient from routerClientsCache. + */ + private ScheduledExecutorService clientCacheCleanerScheduler; + + /** + * Create a new service to refresh mount table cache when there is change in + * mount table entries. + * + * @param router whose mount table cache will be refreshed + */ + public MountTableRefresherService(Router router) { + super(MountTableRefresherService.class.getSimpleName()); + this.router = router; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + this.mountTableStore = getMountTableStore(); + // attach this service to mount table store. + this.mountTableStore.setRefreshService(this); + this.localAdminAdress = + StateStoreUtils.getHostPortString(router.getAdminServerAddress()); + this.cacheUpdateTimeout = conf.getTimeDuration( + RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_TIMEOUT, + RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + long routerClientMaxLiveTime = conf.getTimeDuration( + RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_CLIENT_MAX_TIME, + RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_CLIENT_MAX_TIME_DEFAULT, + TimeUnit.MILLISECONDS); + routerClientsCache = CacheBuilder.newBuilder() + .expireAfterWrite(routerClientMaxLiveTime, TimeUnit.MILLISECONDS) + .removalListener(getClientRemover()).build(getClientCreator()); + + initClientCacheCleaner(routerClientMaxLiveTime); + } + + private void initClientCacheCleaner(long routerClientMaxLiveTime) { + clientCacheCleanerScheduler = + Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() + .setNameFormat("MountTableRefresh_ClientsCacheCleaner") + .setDaemon(true).build()); + /* + * When cleanUp() method is called, expired RouterClient will be removed and + * closed. + */ + clientCacheCleanerScheduler.scheduleWithFixedDelay( + () -> routerClientsCache.cleanUp(), routerClientMaxLiveTime, + routerClientMaxLiveTime, TimeUnit.MILLISECONDS); + } + + /** + * Create cache entry remove listener. + */ + private RemovalListener getClientRemover() { + return new RemovalListener() { + @Override + public void onRemoval( + RemovalNotification notification) { + closeRouterClient(notification.getValue()); + } + }; + } + + @VisibleForTesting + protected void closeRouterClient(RouterClient client) { + try { + client.close(); + } catch (IOException e) { + LOG.error("Error while closing RouterClient", e); + } + } + + /** + * Creates RouterClient and caches it. + */ + private CacheLoader getClientCreator() { + return new CacheLoader() { + public RouterClient load(String adminAddress) throws IOException { + InetSocketAddress routerSocket = + NetUtils.createSocketAddr(adminAddress); + Configuration config = getConfig(); + return createRouterClient(routerSocket, config); + } + }; + } + + @VisibleForTesting + protected RouterClient createRouterClient(InetSocketAddress routerSocket, + Configuration config) throws IOException { + return new RouterClient(routerSocket, config); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + clientCacheCleanerScheduler.shutdown(); + // remove and close all admin clients + routerClientsCache.invalidateAll(); + } + + private MountTableStore getMountTableStore() throws IOException { + MountTableStore mountTblStore = + router.getStateStore().getRegisteredRecordStore(MountTableStore.class); + if (mountTblStore == null) { + throw new IOException("Mount table state store is not available."); + } + return mountTblStore; + } + + /** + * Refresh mount table cache of this router as well as all other routers. + */ + public void refresh() throws StateStoreUnavailableException { + List cachedRecords = + router.getRouterStateManager().getCachedRecords(); + List refreshThreads = new ArrayList<>(); + for (RouterState routerState : cachedRecords) { + String adminAddress = routerState.getAdminAddress(); + if (adminAddress == null || adminAddress.length() == 0) { + // this router has not enabled router admin + continue; + } + // No use of calling refresh on router which is not running state + if (routerState.getStatus() != RouterServiceState.RUNNING) { + LOG.info( + "Router {} is not running. Mount table cache will not refesh."); + // remove if RouterClient is cached. + removeFromCache(adminAddress); + } else if (isLocalAdmin(adminAddress)) { + /* + * Local router's cache update does not require RPC call, so no need for + * RouterClient + */ + refreshThreads.add(getLocalRefresher(adminAddress)); + } else { + try { + RouterClient client = routerClientsCache.get(adminAddress); + refreshThreads.add(new MountTableRefresherThread( + client.getMountTableManager(), adminAddress)); + } catch (ExecutionException execExcep) { + // Can not connect, seems router is stopped now. + LOG.warn(ROUTER_CONNECT_ERROR_MSG, adminAddress, execExcep); + } + } + } + if (!refreshThreads.isEmpty()) { + invokeRefresh(refreshThreads); + } + } + + @VisibleForTesting + protected MountTableRefresherThread getLocalRefresher(String adminAddress) { + return new MountTableRefresherThread(router.getAdminServer(), adminAddress); + } + + private void removeFromCache(String adminAddress) { + routerClientsCache.invalidate(adminAddress); + } + + private void invokeRefresh(List refreshThreads) { + CountDownLatch countDownLatch = new CountDownLatch(refreshThreads.size()); + // start all the threads + for (MountTableRefresherThread refThread : refreshThreads) { + refThread.setCountDownLatch(countDownLatch); + refThread.start(); + } + try { + /* + * Wait for all the thread to complete, await method returns false if + * refresh is not finished within specified time + */ + boolean allReqCompleted = + countDownLatch.await(cacheUpdateTimeout, TimeUnit.MILLISECONDS); + if (!allReqCompleted) { + LOG.warn("Not all router admins updated their cache"); + } + } catch (InterruptedException e) { + LOG.error("Mount table cache refresher was interrupted.", e); + } + logResult(refreshThreads); + } + + private boolean isLocalAdmin(String adminAddress) { + return adminAddress.contentEquals(localAdminAdress); + } + + private void logResult(List refreshThreads) { + int succesCount = 0; + int failureCount = 0; + for (MountTableRefresherThread mountTableRefreshThread : refreshThreads) { + if (mountTableRefreshThread.isSuccess()) { + succesCount++; + } else { + failureCount++; + // remove RouterClient from cache so that new client is created + removeFromCache(mountTableRefreshThread.getAdminAddress()); + } + } + LOG.info("Mount table entries cache refresh succesCount={},failureCount={}", + succesCount, failureCount); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/MountTableRefresherThread.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/MountTableRefresherThread.java new file mode 100644 index 00000000000..c9967a20736 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/MountTableRefresherThread.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; + +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Base class for updating mount table cache on all the router. + */ +public class MountTableRefresherThread extends Thread { + private static final Logger LOG = + LoggerFactory.getLogger(MountTableRefresherThread.class); + private boolean success; + /** Admin server on which refreshed to be invoked. */ + private String adminAddress; + private CountDownLatch countDownLatch; + private MountTableManager manager; + + public MountTableRefresherThread(MountTableManager manager, + String adminAddress) { + this.manager = manager; + this.adminAddress = adminAddress; + setName("MountTableRefresh_" + adminAddress); + setDaemon(true); + } + + /** + * Refresh mount table cache of local and remote routers. Local and remote + * routers will be refreshed differently. Lets understand what are the + * local and remote routers and refresh will be done differently on these + * routers. Suppose there are three routers R1, R2 and R3. User want to add + * new mount table entry. He will connect to only one router, not all the + * routers. Suppose He connects to R1 and calls add mount table entry through + * API or CLI. Now in this context R1 is local router, R2 and R3 are remote + * routers. Because add mount table entry is invoked on R1, R1 will update the + * cache locally it need not to make RPC call. But R1 will make RPC calls to + * update cache on R2 and R3. + */ + @Override + public void run() { + try { + RefreshMountTableEntriesResponse refreshMountTableEntries = + manager.refreshMountTableEntries( + RefreshMountTableEntriesRequest.newInstance()); + success = refreshMountTableEntries.getResult(); + } catch (IOException e) { + LOG.error("Failed to refresh mount table entries cache at router {}", + adminAddress, e); + } finally { + countDownLatch.countDown(); + } + } + + /** + * @return true if cache was refreshed successfully. + */ + public boolean isSuccess() { + return success; + } + + public void setCountDownLatch(CountDownLatch countDownLatch) { + this.countDownLatch = countDownLatch; + } + + @Override + public String toString() { + return "MountTableRefreshThread [success=" + success + ", adminAddress=" + + adminAddress + "]"; + } + + public String getAdminAddress() { + return adminAddress; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index 0070de73b48..5e907c8a55e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -204,6 +204,31 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { FEDERATION_ROUTER_PREFIX + "mount-table.max-cache-size"; /** Remove cache entries if we have more than 10k. */ public static final int FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE_DEFAULT = 10000; + /** + * If true then cache updated immediately after mount table entry change + * otherwise it is updated periodically based configuration. + */ + public static final String MOUNT_TABLE_CACHE_UPDATE = + FEDERATION_ROUTER_PREFIX + "mount-table.cache.update"; + public static final boolean MOUNT_TABLE_CACHE_UPDATE_DEFAULT = + false; + /** + * Timeout to update mount table cache on all the routers. + */ + public static final String MOUNT_TABLE_CACHE_UPDATE_TIMEOUT = + FEDERATION_ROUTER_PREFIX + "mount-table.cache.update.timeout"; + public static final long MOUNT_TABLE_CACHE_UPDATE_TIMEOUT_DEFAULT = + TimeUnit.MINUTES.toMillis(1); + /** + * Remote router mount table cache is updated through RouterClient(RPC call). + * To improve performance, RouterClient connections are cached but it should + * not be kept in cache forever. This property defines the max time a + * connection can be cached. + */ + public static final String MOUNT_TABLE_CACHE_UPDATE_CLIENT_MAX_TIME = + FEDERATION_ROUTER_PREFIX + "mount-table.cache.update.client.max.time"; + public static final long MOUNT_TABLE_CACHE_UPDATE_CLIENT_MAX_TIME_DEFAULT = + TimeUnit.MINUTES.toMillis(5); public static final String FEDERATION_MOUNT_TABLE_CACHE_ENABLE = FEDERATION_ROUTER_PREFIX + "mount-table.cache.enable"; public static final boolean FEDERATION_MOUNT_TABLE_CACHE_ENABLE_DEFAULT = diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java index 3182e27bcc9..6a7437f29b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java @@ -254,9 +254,50 @@ protected void serviceInit(Configuration configuration) throws Exception { addService(this.safemodeService); } + /* + * Refresh mount table cache immediately after adding, modifying or deleting + * the mount table entries. If this service is not enabled mount table cache + * are refreshed periodically by StateStoreCacheUpdateService + */ + if (conf.getBoolean(RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE, + RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_DEFAULT)) { + // There is no use of starting refresh service if state store and admin + // servers are not enabled + String disabledDependentServices = getDisabledDependentServices(); + /* + * disabledDependentServices null means all dependent services are + * enabled. + */ + if (disabledDependentServices == null) { + + MountTableRefresherService refreshService = + new MountTableRefresherService(this); + addService(refreshService); + LOG.info("Service {} is enabled.", + MountTableRefresherService.class.getSimpleName()); + } else { + LOG.warn( + "Service {} not enabled: depenendent service(s) {} not enabled.", + MountTableRefresherService.class.getSimpleName(), + disabledDependentServices); + } + } + super.serviceInit(conf); } + private String getDisabledDependentServices() { + if (this.stateStore == null && this.adminServer == null) { + return StateStoreService.class.getSimpleName() + "," + + RouterAdminServer.class.getSimpleName(); + } else if (this.stateStore == null) { + return StateStoreService.class.getSimpleName(); + } else if (this.adminServer == null) { + return RouterAdminServer.class.getSimpleName(); + } + return null; + } + /** * Returns the hostname for this Router. If the hostname is not * explicitly configured in the given config, then it is determined. @@ -696,9 +737,19 @@ Collection getNamenodeHearbeatServices() { } /** - * Get the Router safe mode service + * Get the Router safe mode service. */ RouterSafemodeService getSafemodeService() { return this.safemodeService; } + + /** + * Get router admin server. + * + * @return Null if admin is not enabled. + */ + public RouterAdminServer getAdminServer() { + return adminServer; + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java index f34dc419eb6..5bb7751cd1e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; import org.apache.hadoop.hdfs.server.federation.store.DisabledNameserviceStore; import org.apache.hadoop.hdfs.server.federation.store.MountTableStore; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreCache; import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.DisableNameserviceRequest; @@ -55,6 +56,8 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; @@ -102,6 +105,7 @@ public class RouterAdminServer extends AbstractService private static String routerOwner; private static String superGroup; private static boolean isPermissionEnabled; + private boolean iStateStoreCache; public RouterAdminServer(Configuration conf, Router router) throws IOException { @@ -154,6 +158,8 @@ public RouterAdminServer(Configuration conf, Router router) this.adminAddress = new InetSocketAddress( confRpcAddress.getHostName(), listenAddress.getPort()); router.setAdminServerAddress(this.adminAddress); + iStateStoreCache = + router.getSubclusterResolver() instanceof StateStoreCache; } /** @@ -243,7 +249,7 @@ public UpdateMountTableEntryResponse updateMountTableEntry( getMountTableStore().updateMountTableEntry(request); MountTable mountTable = request.getEntry(); - if (mountTable != null) { + if (mountTable != null && router.isQuotaEnabled()) { synchronizeQuota(mountTable); } return response; @@ -331,6 +337,26 @@ public GetSafeModeResponse getSafeMode(GetSafeModeRequest request) return GetSafeModeResponse.newInstance(isInSafeMode); } + @Override + public RefreshMountTableEntriesResponse refreshMountTableEntries( + RefreshMountTableEntriesRequest request) throws IOException { + if (iStateStoreCache) { + /* + * MountTableResolver updates MountTableStore cache also. Expecting other + * SubclusterResolver implementations to update MountTableStore cache also + * apart from updating its cache. + */ + boolean result = ((StateStoreCache) this.router.getSubclusterResolver()) + .loadCache(true); + RefreshMountTableEntriesResponse response = + RefreshMountTableEntriesResponse.newInstance(); + response.setResult(result); + return response; + } else { + return getMountTableStore().refreshMountTableEntries(request); + } + } + /** * Verify if Router set safe mode state correctly. * @param isInSafeMode Expected state to be set. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java index a7f02d33bdf..c497d853359 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.server.federation.store.RecordStore; import org.apache.hadoop.hdfs.server.federation.store.RouterStore; import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatResponse; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; @@ -91,6 +92,10 @@ synchronized void updateStateStore() { getStateStoreVersion(MembershipStore.class), getStateStoreVersion(MountTableStore.class)); record.setStateStoreVersion(stateStoreVersion); + // if admin server not started then hostPort will be empty + String hostPort = + StateStoreUtils.getHostPortString(router.getAdminServerAddress()); + record.setAdminAddress(hostPort); RouterHeartbeatRequest request = RouterHeartbeatRequest.newInstance(record); RouterHeartbeatResponse response = routerStore.routerHeartbeat(request); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MountTableStore.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MountTableStore.java index b43965997d0..9d4b64b7f4f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MountTableStore.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MountTableStore.java @@ -20,8 +20,11 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; +import org.apache.hadoop.hdfs.server.federation.router.MountTableRefresherService; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Management API for the HDFS mount table information stored in @@ -42,8 +45,29 @@ @InterfaceStability.Evolving public abstract class MountTableStore extends CachedRecordStore implements MountTableManager { + private static final Logger LOG = + LoggerFactory.getLogger(MountTableStore.class); + private MountTableRefresherService refreshService; public MountTableStore(StateStoreDriver driver) { super(MountTable.class, driver); } + + public void setRefreshService(MountTableRefresherService refreshService) { + this.refreshService = refreshService; + } + + /** + * Update mount table cache of this router as well as all other routers. + */ + protected void updateCacheAllRouters() { + if (refreshService != null) { + try { + refreshService.refresh(); + } catch (StateStoreUnavailableException e) { + LOG.error("Cannot refresh mount table: state store not available", e); + } + } + } + } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java index 924c96a5dc3..4b932d6d939 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hdfs.server.federation.store; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; @@ -110,4 +113,27 @@ public static List filterMultiple( } return matchingList; } + + /** + * Returns address in form of host:port, empty string if address is null. + * + * @param address address + * @return host:port + */ + public static String getHostPortString(InetSocketAddress address) { + if (null == address) { + return ""; + } + String hostName = address.getHostName(); + if (hostName.equals("0.0.0.0")) { + try { + hostName = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + LOG.error("Failed to get local host name", e); + return ""; + } + } + return hostName + ":" + address.getPort(); + } + } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java index eb117d64424..76c7e781ab9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java @@ -33,6 +33,8 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; @@ -68,6 +70,7 @@ public AddMountTableEntryResponse addMountTableEntry( AddMountTableEntryResponse response = AddMountTableEntryResponse.newInstance(); response.setStatus(status); + updateCacheAllRouters(); return response; } @@ -86,6 +89,7 @@ public UpdateMountTableEntryResponse updateMountTableEntry( UpdateMountTableEntryResponse response = UpdateMountTableEntryResponse.newInstance(); response.setStatus(status); + updateCacheAllRouters(); return response; } @@ -110,6 +114,7 @@ public RemoveMountTableEntryResponse removeMountTableEntry( RemoveMountTableEntryResponse response = RemoveMountTableEntryResponse.newInstance(); response.setStatus(status); + updateCacheAllRouters(); return response; } @@ -151,4 +156,17 @@ public GetMountTableEntriesResponse getMountTableEntries( response.setTimestamp(Time.now()); return response; } + + @Override + public RefreshMountTableEntriesResponse refreshMountTableEntries( + RefreshMountTableEntriesRequest request) throws IOException { + // Because this refresh is done through admin API, it should always be force + // refresh. + boolean result = loadCache(true); + RefreshMountTableEntriesResponse response = + RefreshMountTableEntriesResponse.newInstance(); + response.setResult(result); + return response; + } + } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RefreshMountTableEntriesRequest.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RefreshMountTableEntriesRequest.java new file mode 100644 index 00000000000..899afe75c08 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RefreshMountTableEntriesRequest.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; + +/** + * API request for refreshing mount table cached entries from state store. + */ +public abstract class RefreshMountTableEntriesRequest { + + public static RefreshMountTableEntriesRequest newInstance() + throws IOException { + return StateStoreSerializer + .newRecord(RefreshMountTableEntriesRequest.class); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RefreshMountTableEntriesResponse.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RefreshMountTableEntriesResponse.java new file mode 100644 index 00000000000..6c9ed77bc08 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RefreshMountTableEntriesResponse.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; + +/** + * API response for refreshing mount table entries cache from state store. + */ +public abstract class RefreshMountTableEntriesResponse { + + public static RefreshMountTableEntriesResponse newInstance() + throws IOException { + return StateStoreSerializer + .newRecord(RefreshMountTableEntriesResponse.class); + } + + @Public + @Unstable + public abstract boolean getResult(); + + @Public + @Unstable + public abstract void setResult(boolean result); +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/RefreshMountTableEntriesRequestPBImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/RefreshMountTableEntriesRequestPBImpl.java new file mode 100644 index 00000000000..cec0699c183 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/RefreshMountTableEntriesRequestPBImpl.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesRequestProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesRequestProto.Builder; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesRequestProtoOrBuilder; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord; + +import com.google.protobuf.Message; + +/** + * Protobuf implementation of the state store API object + * RefreshMountTableEntriesRequest. + */ +public class RefreshMountTableEntriesRequestPBImpl + extends RefreshMountTableEntriesRequest implements PBRecord { + + private FederationProtocolPBTranslator translator = + new FederationProtocolPBTranslator<>( + RefreshMountTableEntriesRequestProto.class); + + public RefreshMountTableEntriesRequestPBImpl() { + } + + public RefreshMountTableEntriesRequestPBImpl( + RefreshMountTableEntriesRequestProto proto) { + this.translator.setProto(proto); + } + + @Override + public RefreshMountTableEntriesRequestProto getProto() { + // if builder is null build() returns null, calling getBuilder() to + // instantiate builder + this.translator.getBuilder(); + return this.translator.build(); + } + + @Override + public void setProto(Message proto) { + this.translator.setProto(proto); + } + + @Override + public void readInstance(String base64String) throws IOException { + this.translator.readInstance(base64String); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/RefreshMountTableEntriesResponsePBImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/RefreshMountTableEntriesResponsePBImpl.java new file mode 100644 index 00000000000..5acf47906b2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/RefreshMountTableEntriesResponsePBImpl.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesResponseProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesResponseProto.Builder; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesResponseProtoOrBuilder; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord; + +import com.google.protobuf.Message; + +/** + * Protobuf implementation of the state store API object + * RefreshMountTableEntriesResponse. + */ +public class RefreshMountTableEntriesResponsePBImpl + extends RefreshMountTableEntriesResponse implements PBRecord { + + private FederationProtocolPBTranslator translator = + new FederationProtocolPBTranslator<>( + RefreshMountTableEntriesResponseProto.class); + + public RefreshMountTableEntriesResponsePBImpl() { + } + + public RefreshMountTableEntriesResponsePBImpl( + RefreshMountTableEntriesResponseProto proto) { + this.translator.setProto(proto); + } + + @Override + public RefreshMountTableEntriesResponseProto getProto() { + return this.translator.build(); + } + + @Override + public void setProto(Message proto) { + this.translator.setProto(proto); + } + + @Override + public void readInstance(String base64String) throws IOException { + this.translator.readInstance(base64String); + } + + @Override + public boolean getResult() { + return this.translator.getProtoOrBuilder().getResult(); + }; + + @Override + public void setResult(boolean result) { + this.translator.getBuilder().setResult(result); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/RouterState.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/RouterState.java index c90abcc155c..2fe6941ba14 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/RouterState.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/RouterState.java @@ -88,6 +88,10 @@ public static RouterState newInstance(String addr, long startTime, public abstract long getDateStarted(); + public abstract void setAdminAddress(String adminAddress); + + public abstract String getAdminAddress(); + /** * Get the identifier for the Router. It uses the address. * diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/RouterStatePBImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/RouterStatePBImpl.java index 23a61f92b7d..d837386585f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/RouterStatePBImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/RouterStatePBImpl.java @@ -199,4 +199,14 @@ public void setDateCreated(long time) { public long getDateCreated() { return this.translator.getProtoOrBuilder().getDateCreated(); } + + @Override + public void setAdminAddress(String adminAddress) { + this.translator.getBuilder().setAdminAddress(adminAddress); + } + + @Override + public String getAdminAddress() { + return this.translator.getProtoOrBuilder().getAdminAddress(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java index bdaabe821f8..27c42cd634d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java @@ -54,6 +54,8 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; @@ -107,7 +109,8 @@ private String getUsage(String cmd) { if (cmd == null) { String[] commands = {"-add", "-update", "-rm", "-ls", "-setQuota", "-clrQuota", - "-safemode", "-nameservice", "-getDisabledNameservices"}; + "-safemode", "-nameservice", "-getDisabledNameservices", + "-refresh"}; StringBuilder usage = new StringBuilder(); usage.append("Usage: hdfs dfsrouteradmin :\n"); for (int i = 0; i < commands.length; i++) { @@ -142,6 +145,8 @@ private String getUsage(String cmd) { return "\t[-nameservice enable | disable ]"; } else if (cmd.equals("-getDisabledNameservices")) { return "\t[-getDisabledNameservices]"; + } else if (cmd.equals("-refresh")) { + return "\t[-refresh]"; } return getUsage(null); } @@ -230,9 +235,10 @@ public int run(String[] argv) throws Exception { printUsage(cmd); return exitCode; } + String address = null; // Initialize RouterClient try { - String address = getConf().getTrimmed( + address = getConf().getTrimmed( RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT); InetSocketAddress routerSocket = NetUtils.createSocketAddr(address); @@ -302,6 +308,8 @@ public int run(String[] argv) throws Exception { manageNameservice(subcmd, nsId); } else if ("-getDisabledNameservices".equals(cmd)) { getDisabledNameservices(); + } else if ("-refresh".equals(cmd)) { + refresh(address); } else { throw new IllegalArgumentException("Unknown Command: " + cmd); } @@ -337,6 +345,27 @@ public int run(String[] argv) throws Exception { return exitCode; } + private void refresh(String address) throws IOException { + if (refreshRouterCache()) { + System.out.println( + "Successfully updated mount table cache on router " + address); + } + } + + /** + * Refresh mount table cache on connected router. + * + * @return true if cache refreshed successfully + * @throws IOException + */ + private boolean refreshRouterCache() throws IOException { + RefreshMountTableEntriesResponse response = + client.getMountTableManager().refreshMountTableEntries( + RefreshMountTableEntriesRequest.newInstance()); + return response.getResult(); + } + + /** * Add a mount table entry or update if it exists. * diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto index b1a62b1c345..17ae299bcd2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto @@ -193,6 +193,7 @@ message RouterRecordProto { optional string version = 6; optional string compileInfo = 7; optional uint64 dateStarted = 8; + optional string adminAddress = 9; } message GetRouterRegistrationRequestProto { @@ -219,6 +220,13 @@ message RouterHeartbeatResponseProto { optional bool status = 1; } +message RefreshMountTableEntriesRequestProto { +} + +message RefreshMountTableEntriesResponseProto { + optional bool result = 1; +} + ///////////////////////////////////////////////// // Route State ///////////////////////////////////////////////// diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/RouterProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/RouterProtocol.proto index f3a2b6e8abc..34a012acd87 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/RouterProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/RouterProtocol.proto @@ -74,4 +74,9 @@ service RouterAdminProtocolService { * Get the list of disabled name services. */ rpc getDisabledNameservices(GetDisabledNameservicesRequestProto) returns (GetDisabledNameservicesResponseProto); + + /** + * Refresh mount entries + */ + rpc refreshMountTableEntries(RefreshMountTableEntriesRequestProto) returns(RefreshMountTableEntriesResponseProto); } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index afb3c32ba6d..72f6c2f1104 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -547,4 +547,38 @@ + + dfs.federation.router.mount-table.cache.update + false + Set true to enable MountTableRefreshService. This service + updates mount table cache immediately after adding, modifying or + deleting the mount table entries. If this service is not enabled + mount table cache are refreshed periodically by + StateStoreCacheUpdateService + + + + + dfs.federation.router.mount-table.cache.update.timeout + 1m + This property defines how long to wait for all the + admin servers to finish their mount table cache update. This setting + supports multiple time unit suffixes as described in + dfs.federation.router.safemode.extension. + + + + + dfs.federation.router.mount-table.cache.update.client.max.time + + 5m + Remote router mount table cache is updated through + RouterClient(RPC call). To improve performance, RouterClient + connections are cached but it should not be kept in cache forever. + This property defines the max time a connection can be cached. This + setting supports multiple time unit suffixes as described in + dfs.federation.router.safemode.extension. + + + \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md index 72bf6af965e..adc43838fe5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md @@ -230,6 +230,12 @@ Ls command will show below information for each mount table entry: Source Destinations Owner Group Mode Quota/Usage /path ns0->/path root supergroup rwxr-xr-x [NsQuota: 50/0, SsQuota: 100 B/0 B] +Mount table cache is refreshed periodically but it can also be refreshed by executing refresh command: + + [hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -refresh + +The above command will refresh cache of the connected router. This command is redundant when mount table refresh service is enabled as the service will always keep the cache updated. + #### Multiple subclusters A mount point also supports mapping multiple subclusters. For example, to create a mount point that stores files in subclusters `ns1` and `ns2`. @@ -380,6 +386,9 @@ The connection to the State Store and the internal caching at the Router. | dfs.federation.router.store.connection.test | 60000 | How often to check for the connection to the State Store in milliseconds. | | dfs.federation.router.cache.ttl | 60000 | How often to refresh the State Store caches in milliseconds. | | dfs.federation.router.store.membership.expiration | 300000 | Expiration time in milliseconds for a membership record. | +| dfs.federation.router.mount-table.cache.update | false | If true, Mount table cache is updated whenever a mount table entry is added, modified or removed for all the routers. | +| dfs.federation.router.mount-table.cache.update.timeout | 1m | Max time to wait for all the routers to finish their mount table cache update. | +| dfs.federation.router.mount-table.cache.update.client.max.time | 5m | Max time a RouterClient connection can be cached. | ### Routing diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java index c48e6e26b9e..5095c6b139c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java @@ -56,6 +56,8 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; +import org.apache.hadoop.hdfs.server.federation.store.RouterStore; +import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.test.GenericTestUtils; @@ -316,4 +318,29 @@ public Object answer(InvocationOnMock invocation) throws Throwable { }).when(spyHAContext).checkOperation(any(OperationCategory.class)); Whitebox.setInternalState(namesystem, "haContext", spyHAContext); } + + /** + * Wait for a number of routers to be registered in state store. + * + * @param stateManager number of routers to be registered. + * @param routerCount number of routers to be registered. + * @param tiemout max wait time in ms + */ + public static void waitRouterRegistered(RouterStore stateManager, + long routerCount, int timeout) throws Exception { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + List cachedRecords = stateManager.getCachedRecords(); + if (cachedRecords.size() == routerCount) { + return true; + } + } catch (IOException e) { + // Ignore + } + return false; + } + }, 100, timeout); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java index be0de529b51..6d9b2c08779 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java @@ -38,6 +38,7 @@ public class RouterConfigBuilder { private boolean enableMetrics = false; private boolean enableQuota = false; private boolean enableSafemode = false; + private boolean enableCacheRefresh; public RouterConfigBuilder(Configuration configuration) { this.conf = configuration; @@ -104,6 +105,11 @@ public RouterConfigBuilder safemode(boolean enable) { return this; } + public RouterConfigBuilder refreshCache(boolean enable) { + this.enableCacheRefresh = enable; + return this; + } + public RouterConfigBuilder rpc() { return this.rpc(true); } @@ -140,6 +146,10 @@ public RouterConfigBuilder safemode() { return this.safemode(true); } + public RouterConfigBuilder refreshCache() { + return this.refreshCache(true); + } + public Configuration build() { conf.setBoolean(RBFConfigKeys.DFS_ROUTER_STORE_ENABLE, this.enableStateStore); @@ -158,6 +168,8 @@ public Configuration build() { this.enableQuota); conf.setBoolean(RBFConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE, this.enableSafemode); + conf.setBoolean(RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE, + this.enableCacheRefresh); return conf; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java index d0e3e50fce4..445022bbce9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java @@ -799,6 +799,28 @@ public void testNameserviceManager() throws Exception { assertTrue(err.toString().contains("No arguments allowed")); } + @Test + public void testRefreshMountTableCache() throws Exception { + String src = "/refreshMount"; + + // create mount table entry + String[] argv = new String[] {"-add", src, "refreshNS0", "/refreshDest"}; + assertEquals(0, ToolRunner.run(admin, argv)); + + // refresh the mount table entry cache + System.setOut(new PrintStream(out)); + argv = new String[] {"-refresh"}; + assertEquals(0, ToolRunner.run(admin, argv)); + assertTrue( + out.toString().startsWith("Successfully updated mount table cache")); + + // Now ls should return that mount table entry + out.reset(); + argv = new String[] {"-ls", src}; + assertEquals(0, ToolRunner.run(admin, argv)); + assertTrue(out.toString().contains(src)); + } + /** * Wait for the Router transforming to expected state. * @param expectedState Expected Router state. @@ -836,8 +858,7 @@ public void testUpdateNonExistingMountTable() throws Exception { } @Test - public void testUpdateDestinationForExistingMountTable() throws - Exception { + public void testUpdateDestinationForExistingMountTable() throws Exception { // Add a mount table firstly String nsId = "ns0"; String src = "/test-updateDestinationForExistingMountTable"; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTableCacheRefresh.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTableCacheRefresh.java new file mode 100644 index 00000000000..c90e614a5cd --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTableCacheRefresh.java @@ -0,0 +1,396 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.curator.test.TestingServer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.hdfs.server.federation.FederationTestUtils; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; +import org.apache.hadoop.hdfs.server.federation.store.RouterStore; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.service.Service.STATE; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Time; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * This test class verifies that mount table cache is updated on all the routers + * when MountTableRefreshService is enabled and there is a change in mount table + * entries. + */ +public class TestRouterMountTableCacheRefresh { + private static TestingServer curatorTestingServer; + private static MiniRouterDFSCluster cluster; + private static RouterContext routerContext; + private static MountTableManager mountTableManager; + + @BeforeClass + public static void setUp() throws Exception { + curatorTestingServer = new TestingServer(); + curatorTestingServer.start(); + final String connectString = curatorTestingServer.getConnectString(); + int numNameservices = 2; + cluster = new MiniRouterDFSCluster(false, numNameservices); + Configuration conf = new RouterConfigBuilder().refreshCache().admin().rpc() + .heartbeat().build(); + conf.setClass(RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS, + RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS_DEFAULT, + FileSubclusterResolver.class); + conf.set(CommonConfigurationKeys.ZK_ADDRESS, connectString); + conf.setBoolean(RBFConfigKeys.DFS_ROUTER_STORE_ENABLE, true); + cluster.addRouterOverrides(conf); + cluster.startCluster(); + cluster.startRouters(); + cluster.waitClusterUp(); + routerContext = cluster.getRandomRouter(); + RouterStore routerStateManager = + routerContext.getRouter().getRouterStateManager(); + mountTableManager = routerContext.getAdminClient().getMountTableManager(); + // wait for one minute for all the routers to get registered + FederationTestUtils.waitRouterRegistered(routerStateManager, + numNameservices, 60000); + } + + @AfterClass + public static void destory() { + try { + curatorTestingServer.close(); + cluster.shutdown(); + } catch (IOException e) { + // do nothing + } + } + + @After + public void tearDown() throws IOException { + clearEntries(); + } + + private void clearEntries() throws IOException { + List result = getMountTableEntries(); + for (MountTable mountTable : result) { + RemoveMountTableEntryResponse removeMountTableEntry = + mountTableManager.removeMountTableEntry(RemoveMountTableEntryRequest + .newInstance(mountTable.getSourcePath())); + assertTrue(removeMountTableEntry.getStatus()); + } + } + + /** + * addMountTableEntry API should internally update the cache on all the + * routers. + */ + @Test + public void testMountTableEntriesCacheUpdatedAfterAddAPICall() + throws IOException { + + // Existing mount table size + int existingEntriesCount = getNumMountTableEntries(); + String srcPath = "/addPath"; + MountTable newEntry = MountTable.newInstance(srcPath, + Collections.singletonMap("ns0", "/addPathDest"), Time.now(), + Time.now()); + addMountTableEntry(mountTableManager, newEntry); + + // When Add entry is done, all the routers must have updated its mount table + // entry + List routers = getRouters(); + for (RouterContext rc : routers) { + List result = + getMountTableEntries(rc.getAdminClient().getMountTableManager()); + assertEquals(1 + existingEntriesCount, result.size()); + MountTable mountTableResult = result.get(0); + assertEquals(srcPath, mountTableResult.getSourcePath()); + } + } + + /** + * removeMountTableEntry API should internally update the cache on all the + * routers. + */ + @Test + public void testMountTableEntriesCacheUpdatedAfterRemoveAPICall() + throws IOException { + // add + String srcPath = "/removePathSrc"; + MountTable newEntry = MountTable.newInstance(srcPath, + Collections.singletonMap("ns0", "/removePathDest"), Time.now(), + Time.now()); + addMountTableEntry(mountTableManager, newEntry); + int addCount = getNumMountTableEntries(); + assertEquals(1, addCount); + + // remove + RemoveMountTableEntryResponse removeMountTableEntry = + mountTableManager.removeMountTableEntry( + RemoveMountTableEntryRequest.newInstance(srcPath)); + assertTrue(removeMountTableEntry.getStatus()); + + int removeCount = getNumMountTableEntries(); + assertEquals(addCount - 1, removeCount); + } + + /** + * updateMountTableEntry API should internally update the cache on all the + * routers. + */ + @Test + public void testMountTableEntriesCacheUpdatedAfterUpdateAPICall() + throws IOException { + // add + String srcPath = "/updatePathSrc"; + MountTable newEntry = MountTable.newInstance(srcPath, + Collections.singletonMap("ns0", "/updatePathDest"), Time.now(), + Time.now()); + addMountTableEntry(mountTableManager, newEntry); + int addCount = getNumMountTableEntries(); + assertEquals(1, addCount); + + // update + String key = "ns1"; + String value = "/updatePathDest2"; + MountTable upateEntry = MountTable.newInstance(srcPath, + Collections.singletonMap(key, value), Time.now(), Time.now()); + UpdateMountTableEntryResponse updateMountTableEntry = + mountTableManager.updateMountTableEntry( + UpdateMountTableEntryRequest.newInstance(upateEntry)); + assertTrue(updateMountTableEntry.getStatus()); + MountTable updatedMountTable = getMountTableEntry(srcPath); + assertNotNull("Updated mount table entrty cannot be null", + updatedMountTable); + assertEquals(1, updatedMountTable.getDestinations().size()); + assertEquals(key, + updatedMountTable.getDestinations().get(0).getNameserviceId()); + assertEquals(value, updatedMountTable.getDestinations().get(0).getDest()); + } + + /** + * After caching RouterClient if router goes down, refresh should be + * successful on other available router. The router which is not running + * should be ignored. + */ + @Test + public void testCachedRouterClientBehaviourAfterRouterStoped() + throws IOException { + String srcPath = "/addPathClientCache"; + MountTable newEntry = MountTable.newInstance(srcPath, + Collections.singletonMap("ns0", "/addPathClientCacheDest"), Time.now(), + Time.now()); + addMountTableEntry(mountTableManager, newEntry); + + // When Add entry is done, all the routers must have updated its mount table + // entry + List routers = getRouters(); + for (RouterContext rc : routers) { + List result = + getMountTableEntries(rc.getAdminClient().getMountTableManager()); + assertEquals(1, result.size()); + MountTable mountTableResult = result.get(0); + assertEquals(srcPath, mountTableResult.getSourcePath()); + } + + // Lets stop one router + for (RouterContext rc : routers) { + InetSocketAddress adminServerAddress = + rc.getRouter().getAdminServerAddress(); + if (!routerContext.getRouter().getAdminServerAddress() + .equals(adminServerAddress)) { + cluster.stopRouter(rc); + break; + } + } + + srcPath = "/addPathClientCache2"; + newEntry = MountTable.newInstance(srcPath, + Collections.singletonMap("ns0", "/addPathClientCacheDest2"), Time.now(), + Time.now()); + addMountTableEntry(mountTableManager, newEntry); + for (RouterContext rc : getRouters()) { + List result = + getMountTableEntries(rc.getAdminClient().getMountTableManager()); + assertEquals(2, result.size()); + } + } + + private List getRouters() { + List result = new ArrayList<>(); + for (RouterContext rc : cluster.getRouters()) { + if (rc.getRouter().getServiceState() == STATE.STARTED) { + result.add(rc); + } + } + return result; + } + + @Test + public void testRefreshMountTableEntriesAPI() throws IOException { + RefreshMountTableEntriesRequest request = + RefreshMountTableEntriesRequest.newInstance(); + RefreshMountTableEntriesResponse refreshMountTableEntriesRes = + mountTableManager.refreshMountTableEntries(request); + // refresh should be successful + assertTrue(refreshMountTableEntriesRes.getResult()); + } + + /** + * Verify cache update timeouts when any of the router takes more time than + * the configured timeout period. + */ + @Test(timeout = 10000) + public void testMountTableEntriesCacheUpdateTimeout() throws IOException { + // Resources will be closed when router is closed + @SuppressWarnings("resource") + MountTableRefresherService mountTableRefresherService = + new MountTableRefresherService(routerContext.getRouter()) { + @Override + protected MountTableRefresherThread getLocalRefresher( + String adminAddress) { + return new MountTableRefresherThread(null, adminAddress) { + @Override + public void run() { + try { + // Sleep 1 minute + Thread.sleep(60000); + } catch (InterruptedException e) { + // Do nothing + } + } + }; + } + }; + Configuration config = routerContext.getRouter().getConfig(); + config.setTimeDuration(RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_TIMEOUT, 5, + TimeUnit.SECONDS); + mountTableRefresherService.init(config); + // One router is not responding for 1 minute, still refresh should + // finished in 5 second as cache update timeout is set 5 second. + mountTableRefresherService.refresh(); + // Test case timeout is assert for this test case. + } + + /** + * Verify Cached RouterClient connections are removed from cache and closed + * when their max live time is elapsed. + */ + @Test + public void testRouterClientConnectionExpiration() throws Exception { + final AtomicInteger createCounter = new AtomicInteger(); + final AtomicInteger removeCounter = new AtomicInteger(); + // Resources will be closed when router is closed + @SuppressWarnings("resource") + MountTableRefresherService mountTableRefresherService = + new MountTableRefresherService(routerContext.getRouter()) { + @Override + protected void closeRouterClient(RouterClient client) { + super.closeRouterClient(client); + removeCounter.incrementAndGet(); + } + + @Override + protected RouterClient createRouterClient( + InetSocketAddress routerSocket, Configuration config) + throws IOException { + createCounter.incrementAndGet(); + return super.createRouterClient(routerSocket, config); + } + }; + int clientCacheTime = 2000; + Configuration config = routerContext.getRouter().getConfig(); + config.setTimeDuration( + RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_CLIENT_MAX_TIME, clientCacheTime, + TimeUnit.MILLISECONDS); + mountTableRefresherService.init(config); + // Do refresh to created RouterClient + mountTableRefresherService.refresh(); + assertNotEquals("No RouterClient is created.", 0, createCounter.get()); + /* + * Wait for clients to expire. Lets wait triple the cache eviction period. + * After cache eviction period all created client must be removed and + * closed. + */ + GenericTestUtils.waitFor(() -> createCounter.get() == removeCounter.get(), + 100, 3 * clientCacheTime); + } + + private int getNumMountTableEntries() throws IOException { + List records = getMountTableEntries(); + int oldEntriesCount = records.size(); + return oldEntriesCount; + } + + private MountTable getMountTableEntry(String srcPath) throws IOException { + List mountTableEntries = getMountTableEntries(); + for (MountTable mountTable : mountTableEntries) { + String sourcePath = mountTable.getSourcePath(); + if (srcPath.equals(sourcePath)) { + return mountTable; + } + } + return null; + } + + private void addMountTableEntry(MountTableManager mountTableMgr, + MountTable newEntry) throws IOException { + AddMountTableEntryRequest addRequest = + AddMountTableEntryRequest.newInstance(newEntry); + AddMountTableEntryResponse addResponse = + mountTableMgr.addMountTableEntry(addRequest); + assertTrue(addResponse.getStatus()); + } + + private List getMountTableEntries() throws IOException { + return getMountTableEntries(mountTableManager); + } + + private List getMountTableEntries( + MountTableManager mountTableManagerParam) throws IOException { + GetMountTableEntriesRequest request = + GetMountTableEntriesRequest.newInstance("/"); + return mountTableManagerParam.getMountTableEntries(request).getEntries(); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md index eba81afe3bf..a967ee4342b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md @@ -437,6 +437,7 @@ Usage: [-safemode enter | leave | get] [-nameservice disable | enable ] [-getDisabledNameservices] + [-refresh] | COMMAND\_OPTION | Description | |:---- |:---- | @@ -449,6 +450,7 @@ Usage: | `-safemode` `enter` `leave` `get` | Manually set the Router entering or leaving safe mode. The option *get* will be used for verifying if the Router is in safe mode state. | | `-nameservice` `disable` `enable` *nameservice* | Disable/enable a name service from the federation. If disabled, requests will not go to that name service. | | `-getDisabledNameservices` | Get the name services that are disabled in the federation. | +| `-refresh` | Update mount table cache of the connected router. | The commands for managing Router-based federation. See [Mount table management](../hadoop-hdfs-rbf/HDFSRouterFederation.html#Mount_table_management) for more info.