HDFS-13443. RBF: Update mount table cache immediately after changing (add/update/remove) mount table entries. Contributed by Mohammad Arshad.
This commit is contained in:
parent
7d8cc5d12c
commit
cd73cb8d00
|
@ -37,6 +37,8 @@ import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProt
|
|||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetSafeModeResponseProto;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.LeaveSafeModeRequestProto;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.LeaveSafeModeResponseProto;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesRequestProto;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesResponseProto;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryRequestProto;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryResponseProto;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryRequestProto;
|
||||
|
@ -58,6 +60,8 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeReques
|
|||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
|
||||
|
@ -78,6 +82,8 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetSafeMo
|
|||
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetSafeModeResponsePBImpl;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.LeaveSafeModeRequestPBImpl;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.LeaveSafeModeResponsePBImpl;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RefreshMountTableEntriesRequestPBImpl;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RefreshMountTableEntriesResponsePBImpl;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryRequestPBImpl;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryResponsePBImpl;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.UpdateMountTableEntryRequestPBImpl;
|
||||
|
@ -275,4 +281,21 @@ public class RouterAdminProtocolServerSideTranslatorPB implements
|
|||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshMountTableEntriesResponseProto refreshMountTableEntries(
|
||||
RpcController controller, RefreshMountTableEntriesRequestProto request)
|
||||
throws ServiceException {
|
||||
try {
|
||||
RefreshMountTableEntriesRequest req =
|
||||
new RefreshMountTableEntriesRequestPBImpl(request);
|
||||
RefreshMountTableEntriesResponse response =
|
||||
server.refreshMountTableEntries(req);
|
||||
RefreshMountTableEntriesResponsePBImpl responsePB =
|
||||
(RefreshMountTableEntriesResponsePBImpl) response;
|
||||
return responsePB.getProto();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,6 +38,8 @@ import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProt
|
|||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetSafeModeResponseProto;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.LeaveSafeModeRequestProto;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.LeaveSafeModeResponseProto;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesRequestProto;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesResponseProto;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryRequestProto;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryResponseProto;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryRequestProto;
|
||||
|
@ -61,6 +63,8 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeReques
|
|||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
|
||||
|
@ -77,6 +81,8 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetMountT
|
|||
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetMountTableEntriesResponsePBImpl;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetSafeModeResponsePBImpl;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.LeaveSafeModeResponsePBImpl;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RefreshMountTableEntriesRequestPBImpl;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RefreshMountTableEntriesResponsePBImpl;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryRequestPBImpl;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryResponsePBImpl;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.UpdateMountTableEntryRequestPBImpl;
|
||||
|
@ -267,4 +273,19 @@ public class RouterAdminProtocolTranslatorPB
|
|||
throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshMountTableEntriesResponse refreshMountTableEntries(
|
||||
RefreshMountTableEntriesRequest request) throws IOException {
|
||||
RefreshMountTableEntriesRequestPBImpl requestPB =
|
||||
(RefreshMountTableEntriesRequestPBImpl) request;
|
||||
RefreshMountTableEntriesRequestProto proto = requestPB.getProto();
|
||||
try {
|
||||
RefreshMountTableEntriesResponseProto response =
|
||||
rpcProxy.refreshMountTableEntries(null, proto);
|
||||
return new RefreshMountTableEntriesResponsePBImpl(response);
|
||||
} catch (ServiceException e) {
|
||||
throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,8 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntr
|
|||
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
|
||||
|
@ -77,4 +79,18 @@ public interface MountTableManager {
|
|||
*/
|
||||
GetMountTableEntriesResponse getMountTableEntries(
|
||||
GetMountTableEntriesRequest request) throws IOException;
|
||||
|
||||
/**
|
||||
* Refresh mount table entries cache from the state store. Cache is updated
|
||||
* periodically but with this API cache can be refreshed immediately. This API
|
||||
* is primarily meant to be called from the Admin Server. Admin Server will
|
||||
* call this API and refresh mount table cache of all the routers while
|
||||
* changing mount table entries.
|
||||
*
|
||||
* @param request Fully populated request object.
|
||||
* @return True the mount table entry was updated without any error.
|
||||
* @throws IOException Throws exception if the data store is not initialized.
|
||||
*/
|
||||
RefreshMountTableEntriesResponse refreshMountTableEntries(
|
||||
RefreshMountTableEntriesRequest request) throws IOException;
|
||||
}
|
|
@ -0,0 +1,289 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.router;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.google.common.cache.CacheLoader;
|
||||
import com.google.common.cache.LoadingCache;
|
||||
import com.google.common.cache.RemovalListener;
|
||||
import com.google.common.cache.RemovalNotification;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
/**
|
||||
* This service is invoked from {@link MountTableStore} when there is change in
|
||||
* mount table entries and it updates mount table entry cache on local router as
|
||||
* well as on all remote routers. Refresh on local router is done by calling
|
||||
* {@link MountTableStore#loadCache(boolean)}} API directly, no RPC call
|
||||
* involved, but on remote routers refresh is done through RouterClient(RPC
|
||||
* call). To improve performance, all routers are refreshed in separate thread
|
||||
* and all connection are cached. Cached connections are removed from
|
||||
* cache and closed when their max live time is elapsed.
|
||||
*/
|
||||
public class MountTableRefresherService extends AbstractService {
|
||||
private static final String ROUTER_CONNECT_ERROR_MSG =
|
||||
"Router {} connection failed. Mount table cache will not refesh.";
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(MountTableRefresherService.class);
|
||||
|
||||
/** Local router. */
|
||||
private final Router router;
|
||||
/** Mount table store. */
|
||||
private MountTableStore mountTableStore;
|
||||
/** Local router admin address in the form of host:port. */
|
||||
private String localAdminAdress;
|
||||
/** Timeout in ms to update mount table cache on all the routers. */
|
||||
private long cacheUpdateTimeout;
|
||||
|
||||
/**
|
||||
* All router admin clients cached. So no need to create the client again and
|
||||
* again. Router admin address(host:port) is used as key to cache RouterClient
|
||||
* objects.
|
||||
*/
|
||||
private LoadingCache<String, RouterClient> routerClientsCache;
|
||||
|
||||
/**
|
||||
* Removes expired RouterClient from routerClientsCache.
|
||||
*/
|
||||
private ScheduledExecutorService clientCacheCleanerScheduler;
|
||||
|
||||
/**
|
||||
* Create a new service to refresh mount table cache when there is change in
|
||||
* mount table entries.
|
||||
*
|
||||
* @param router whose mount table cache will be refreshed
|
||||
*/
|
||||
public MountTableRefresherService(Router router) {
|
||||
super(MountTableRefresherService.class.getSimpleName());
|
||||
this.router = router;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
super.serviceInit(conf);
|
||||
this.mountTableStore = getMountTableStore();
|
||||
// attach this service to mount table store.
|
||||
this.mountTableStore.setRefreshService(this);
|
||||
this.localAdminAdress =
|
||||
StateStoreUtils.getHostPortString(router.getAdminServerAddress());
|
||||
this.cacheUpdateTimeout = conf.getTimeDuration(
|
||||
RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_TIMEOUT,
|
||||
RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_TIMEOUT_DEFAULT,
|
||||
TimeUnit.MILLISECONDS);
|
||||
long routerClientMaxLiveTime = conf.getTimeDuration(
|
||||
RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_CLIENT_MAX_TIME,
|
||||
RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_CLIENT_MAX_TIME_DEFAULT,
|
||||
TimeUnit.MILLISECONDS);
|
||||
routerClientsCache = CacheBuilder.newBuilder()
|
||||
.expireAfterWrite(routerClientMaxLiveTime, TimeUnit.MILLISECONDS)
|
||||
.removalListener(getClientRemover()).build(getClientCreator());
|
||||
|
||||
initClientCacheCleaner(routerClientMaxLiveTime);
|
||||
}
|
||||
|
||||
private void initClientCacheCleaner(long routerClientMaxLiveTime) {
|
||||
clientCacheCleanerScheduler =
|
||||
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
|
||||
.setNameFormat("MountTableRefresh_ClientsCacheCleaner")
|
||||
.setDaemon(true).build());
|
||||
/*
|
||||
* When cleanUp() method is called, expired RouterClient will be removed and
|
||||
* closed.
|
||||
*/
|
||||
clientCacheCleanerScheduler.scheduleWithFixedDelay(
|
||||
() -> routerClientsCache.cleanUp(), routerClientMaxLiveTime,
|
||||
routerClientMaxLiveTime, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create cache entry remove listener.
|
||||
*/
|
||||
private RemovalListener<String, RouterClient> getClientRemover() {
|
||||
return new RemovalListener<String, RouterClient>() {
|
||||
@Override
|
||||
public void onRemoval(
|
||||
RemovalNotification<String, RouterClient> notification) {
|
||||
closeRouterClient(notification.getValue());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected void closeRouterClient(RouterClient client) {
|
||||
try {
|
||||
client.close();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error while closing RouterClient", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates RouterClient and caches it.
|
||||
*/
|
||||
private CacheLoader<String, RouterClient> getClientCreator() {
|
||||
return new CacheLoader<String, RouterClient>() {
|
||||
public RouterClient load(String adminAddress) throws IOException {
|
||||
InetSocketAddress routerSocket =
|
||||
NetUtils.createSocketAddr(adminAddress);
|
||||
Configuration config = getConfig();
|
||||
return createRouterClient(routerSocket, config);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected RouterClient createRouterClient(InetSocketAddress routerSocket,
|
||||
Configuration config) throws IOException {
|
||||
return new RouterClient(routerSocket, config);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
super.serviceStart();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStop() throws Exception {
|
||||
super.serviceStop();
|
||||
clientCacheCleanerScheduler.shutdown();
|
||||
// remove and close all admin clients
|
||||
routerClientsCache.invalidateAll();
|
||||
}
|
||||
|
||||
private MountTableStore getMountTableStore() throws IOException {
|
||||
MountTableStore mountTblStore =
|
||||
router.getStateStore().getRegisteredRecordStore(MountTableStore.class);
|
||||
if (mountTblStore == null) {
|
||||
throw new IOException("Mount table state store is not available.");
|
||||
}
|
||||
return mountTblStore;
|
||||
}
|
||||
|
||||
/**
|
||||
* Refresh mount table cache of this router as well as all other routers.
|
||||
*/
|
||||
public void refresh() throws StateStoreUnavailableException {
|
||||
List<RouterState> cachedRecords =
|
||||
router.getRouterStateManager().getCachedRecords();
|
||||
List<MountTableRefresherThread> refreshThreads = new ArrayList<>();
|
||||
for (RouterState routerState : cachedRecords) {
|
||||
String adminAddress = routerState.getAdminAddress();
|
||||
if (adminAddress == null || adminAddress.length() == 0) {
|
||||
// this router has not enabled router admin
|
||||
continue;
|
||||
}
|
||||
// No use of calling refresh on router which is not running state
|
||||
if (routerState.getStatus() != RouterServiceState.RUNNING) {
|
||||
LOG.info(
|
||||
"Router {} is not running. Mount table cache will not refesh.");
|
||||
// remove if RouterClient is cached.
|
||||
removeFromCache(adminAddress);
|
||||
} else if (isLocalAdmin(adminAddress)) {
|
||||
/*
|
||||
* Local router's cache update does not require RPC call, so no need for
|
||||
* RouterClient
|
||||
*/
|
||||
refreshThreads.add(getLocalRefresher(adminAddress));
|
||||
} else {
|
||||
try {
|
||||
RouterClient client = routerClientsCache.get(adminAddress);
|
||||
refreshThreads.add(new MountTableRefresherThread(
|
||||
client.getMountTableManager(), adminAddress));
|
||||
} catch (ExecutionException execExcep) {
|
||||
// Can not connect, seems router is stopped now.
|
||||
LOG.warn(ROUTER_CONNECT_ERROR_MSG, adminAddress, execExcep);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!refreshThreads.isEmpty()) {
|
||||
invokeRefresh(refreshThreads);
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected MountTableRefresherThread getLocalRefresher(String adminAddress) {
|
||||
return new MountTableRefresherThread(router.getAdminServer(), adminAddress);
|
||||
}
|
||||
|
||||
private void removeFromCache(String adminAddress) {
|
||||
routerClientsCache.invalidate(adminAddress);
|
||||
}
|
||||
|
||||
private void invokeRefresh(List<MountTableRefresherThread> refreshThreads) {
|
||||
CountDownLatch countDownLatch = new CountDownLatch(refreshThreads.size());
|
||||
// start all the threads
|
||||
for (MountTableRefresherThread refThread : refreshThreads) {
|
||||
refThread.setCountDownLatch(countDownLatch);
|
||||
refThread.start();
|
||||
}
|
||||
try {
|
||||
/*
|
||||
* Wait for all the thread to complete, await method returns false if
|
||||
* refresh is not finished within specified time
|
||||
*/
|
||||
boolean allReqCompleted =
|
||||
countDownLatch.await(cacheUpdateTimeout, TimeUnit.MILLISECONDS);
|
||||
if (!allReqCompleted) {
|
||||
LOG.warn("Not all router admins updated their cache");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error("Mount table cache refresher was interrupted.", e);
|
||||
}
|
||||
logResult(refreshThreads);
|
||||
}
|
||||
|
||||
private boolean isLocalAdmin(String adminAddress) {
|
||||
return adminAddress.contentEquals(localAdminAdress);
|
||||
}
|
||||
|
||||
private void logResult(List<MountTableRefresherThread> refreshThreads) {
|
||||
int succesCount = 0;
|
||||
int failureCount = 0;
|
||||
for (MountTableRefresherThread mountTableRefreshThread : refreshThreads) {
|
||||
if (mountTableRefreshThread.isSuccess()) {
|
||||
succesCount++;
|
||||
} else {
|
||||
failureCount++;
|
||||
// remove RouterClient from cache so that new client is created
|
||||
removeFromCache(mountTableRefreshThread.getAdminAddress());
|
||||
}
|
||||
}
|
||||
LOG.info("Mount table entries cache refresh succesCount={},failureCount={}",
|
||||
succesCount, failureCount);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,96 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.router;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Base class for updating mount table cache on all the router.
|
||||
*/
|
||||
public class MountTableRefresherThread extends Thread {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(MountTableRefresherThread.class);
|
||||
private boolean success;
|
||||
/** Admin server on which refreshed to be invoked. */
|
||||
private String adminAddress;
|
||||
private CountDownLatch countDownLatch;
|
||||
private MountTableManager manager;
|
||||
|
||||
public MountTableRefresherThread(MountTableManager manager,
|
||||
String adminAddress) {
|
||||
this.manager = manager;
|
||||
this.adminAddress = adminAddress;
|
||||
setName("MountTableRefresh_" + adminAddress);
|
||||
setDaemon(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Refresh mount table cache of local and remote routers. Local and remote
|
||||
* routers will be refreshed differently. Lets understand what are the
|
||||
* local and remote routers and refresh will be done differently on these
|
||||
* routers. Suppose there are three routers R1, R2 and R3. User want to add
|
||||
* new mount table entry. He will connect to only one router, not all the
|
||||
* routers. Suppose He connects to R1 and calls add mount table entry through
|
||||
* API or CLI. Now in this context R1 is local router, R2 and R3 are remote
|
||||
* routers. Because add mount table entry is invoked on R1, R1 will update the
|
||||
* cache locally it need not to make RPC call. But R1 will make RPC calls to
|
||||
* update cache on R2 and R3.
|
||||
*/
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
RefreshMountTableEntriesResponse refreshMountTableEntries =
|
||||
manager.refreshMountTableEntries(
|
||||
RefreshMountTableEntriesRequest.newInstance());
|
||||
success = refreshMountTableEntries.getResult();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to refresh mount table entries cache at router {}",
|
||||
adminAddress, e);
|
||||
} finally {
|
||||
countDownLatch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if cache was refreshed successfully.
|
||||
*/
|
||||
public boolean isSuccess() {
|
||||
return success;
|
||||
}
|
||||
|
||||
public void setCountDownLatch(CountDownLatch countDownLatch) {
|
||||
this.countDownLatch = countDownLatch;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "MountTableRefreshThread [success=" + success + ", adminAddress="
|
||||
+ adminAddress + "]";
|
||||
}
|
||||
|
||||
public String getAdminAddress() {
|
||||
return adminAddress;
|
||||
}
|
||||
}
|
|
@ -204,6 +204,31 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
|
|||
FEDERATION_ROUTER_PREFIX + "mount-table.max-cache-size";
|
||||
/** Remove cache entries if we have more than 10k. */
|
||||
public static final int FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE_DEFAULT = 10000;
|
||||
/**
|
||||
* If true then cache updated immediately after mount table entry change
|
||||
* otherwise it is updated periodically based configuration.
|
||||
*/
|
||||
public static final String MOUNT_TABLE_CACHE_UPDATE =
|
||||
FEDERATION_ROUTER_PREFIX + "mount-table.cache.update";
|
||||
public static final boolean MOUNT_TABLE_CACHE_UPDATE_DEFAULT =
|
||||
false;
|
||||
/**
|
||||
* Timeout to update mount table cache on all the routers.
|
||||
*/
|
||||
public static final String MOUNT_TABLE_CACHE_UPDATE_TIMEOUT =
|
||||
FEDERATION_ROUTER_PREFIX + "mount-table.cache.update.timeout";
|
||||
public static final long MOUNT_TABLE_CACHE_UPDATE_TIMEOUT_DEFAULT =
|
||||
TimeUnit.MINUTES.toMillis(1);
|
||||
/**
|
||||
* Remote router mount table cache is updated through RouterClient(RPC call).
|
||||
* To improve performance, RouterClient connections are cached but it should
|
||||
* not be kept in cache forever. This property defines the max time a
|
||||
* connection can be cached.
|
||||
*/
|
||||
public static final String MOUNT_TABLE_CACHE_UPDATE_CLIENT_MAX_TIME =
|
||||
FEDERATION_ROUTER_PREFIX + "mount-table.cache.update.client.max.time";
|
||||
public static final long MOUNT_TABLE_CACHE_UPDATE_CLIENT_MAX_TIME_DEFAULT =
|
||||
TimeUnit.MINUTES.toMillis(5);
|
||||
public static final String FEDERATION_MOUNT_TABLE_CACHE_ENABLE =
|
||||
FEDERATION_ROUTER_PREFIX + "mount-table.cache.enable";
|
||||
public static final boolean FEDERATION_MOUNT_TABLE_CACHE_ENABLE_DEFAULT =
|
||||
|
|
|
@ -254,9 +254,50 @@ public class Router extends CompositeService {
|
|||
addService(this.safemodeService);
|
||||
}
|
||||
|
||||
/*
|
||||
* Refresh mount table cache immediately after adding, modifying or deleting
|
||||
* the mount table entries. If this service is not enabled mount table cache
|
||||
* are refreshed periodically by StateStoreCacheUpdateService
|
||||
*/
|
||||
if (conf.getBoolean(RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE,
|
||||
RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_DEFAULT)) {
|
||||
// There is no use of starting refresh service if state store and admin
|
||||
// servers are not enabled
|
||||
String disabledDependentServices = getDisabledDependentServices();
|
||||
/*
|
||||
* disabledDependentServices null means all dependent services are
|
||||
* enabled.
|
||||
*/
|
||||
if (disabledDependentServices == null) {
|
||||
|
||||
MountTableRefresherService refreshService =
|
||||
new MountTableRefresherService(this);
|
||||
addService(refreshService);
|
||||
LOG.info("Service {} is enabled.",
|
||||
MountTableRefresherService.class.getSimpleName());
|
||||
} else {
|
||||
LOG.warn(
|
||||
"Service {} not enabled: depenendent service(s) {} not enabled.",
|
||||
MountTableRefresherService.class.getSimpleName(),
|
||||
disabledDependentServices);
|
||||
}
|
||||
}
|
||||
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
private String getDisabledDependentServices() {
|
||||
if (this.stateStore == null && this.adminServer == null) {
|
||||
return StateStoreService.class.getSimpleName() + ","
|
||||
+ RouterAdminServer.class.getSimpleName();
|
||||
} else if (this.stateStore == null) {
|
||||
return StateStoreService.class.getSimpleName();
|
||||
} else if (this.adminServer == null) {
|
||||
return RouterAdminServer.class.getSimpleName();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the hostname for this Router. If the hostname is not
|
||||
* explicitly configured in the given config, then it is determined.
|
||||
|
@ -696,9 +737,19 @@ public class Router extends CompositeService {
|
|||
}
|
||||
|
||||
/**
|
||||
* Get the Router safe mode service
|
||||
* Get the Router safe mode service.
|
||||
*/
|
||||
RouterSafemodeService getSafemodeService() {
|
||||
return this.safemodeService;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get router admin server.
|
||||
*
|
||||
* @return Null if admin is not enabled.
|
||||
*/
|
||||
public RouterAdminServer getAdminServer() {
|
||||
return adminServer;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo
|
|||
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.DisabledNameserviceStore;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreCache;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.DisableNameserviceRequest;
|
||||
|
@ -55,6 +56,8 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeReques
|
|||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
|
||||
|
@ -102,6 +105,7 @@ public class RouterAdminServer extends AbstractService
|
|||
private static String routerOwner;
|
||||
private static String superGroup;
|
||||
private static boolean isPermissionEnabled;
|
||||
private boolean iStateStoreCache;
|
||||
|
||||
public RouterAdminServer(Configuration conf, Router router)
|
||||
throws IOException {
|
||||
|
@ -154,6 +158,8 @@ public class RouterAdminServer extends AbstractService
|
|||
this.adminAddress = new InetSocketAddress(
|
||||
confRpcAddress.getHostName(), listenAddress.getPort());
|
||||
router.setAdminServerAddress(this.adminAddress);
|
||||
iStateStoreCache =
|
||||
router.getSubclusterResolver() instanceof StateStoreCache;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -243,7 +249,7 @@ public class RouterAdminServer extends AbstractService
|
|||
getMountTableStore().updateMountTableEntry(request);
|
||||
|
||||
MountTable mountTable = request.getEntry();
|
||||
if (mountTable != null) {
|
||||
if (mountTable != null && router.isQuotaEnabled()) {
|
||||
synchronizeQuota(mountTable);
|
||||
}
|
||||
return response;
|
||||
|
@ -331,6 +337,26 @@ public class RouterAdminServer extends AbstractService
|
|||
return GetSafeModeResponse.newInstance(isInSafeMode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshMountTableEntriesResponse refreshMountTableEntries(
|
||||
RefreshMountTableEntriesRequest request) throws IOException {
|
||||
if (iStateStoreCache) {
|
||||
/*
|
||||
* MountTableResolver updates MountTableStore cache also. Expecting other
|
||||
* SubclusterResolver implementations to update MountTableStore cache also
|
||||
* apart from updating its cache.
|
||||
*/
|
||||
boolean result = ((StateStoreCache) this.router.getSubclusterResolver())
|
||||
.loadCache(true);
|
||||
RefreshMountTableEntriesResponse response =
|
||||
RefreshMountTableEntriesResponse.newInstance();
|
||||
response.setResult(result);
|
||||
return response;
|
||||
} else {
|
||||
return getMountTableStore().refreshMountTableEntries(request);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify if Router set safe mode state correctly.
|
||||
* @param isInSafeMode Expected state to be set.
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
|
|||
import org.apache.hadoop.hdfs.server.federation.store.RecordStore;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
|
||||
|
@ -91,6 +92,10 @@ public class RouterHeartbeatService extends PeriodicService {
|
|||
getStateStoreVersion(MembershipStore.class),
|
||||
getStateStoreVersion(MountTableStore.class));
|
||||
record.setStateStoreVersion(stateStoreVersion);
|
||||
// if admin server not started then hostPort will be empty
|
||||
String hostPort =
|
||||
StateStoreUtils.getHostPortString(router.getAdminServerAddress());
|
||||
record.setAdminAddress(hostPort);
|
||||
RouterHeartbeatRequest request =
|
||||
RouterHeartbeatRequest.newInstance(record);
|
||||
RouterHeartbeatResponse response = routerStore.routerHeartbeat(request);
|
||||
|
|
|
@ -20,8 +20,11 @@ package org.apache.hadoop.hdfs.server.federation.store;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
|
||||
import org.apache.hadoop.hdfs.server.federation.router.MountTableRefresherService;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Management API for the HDFS mount table information stored in
|
||||
|
@ -42,8 +45,29 @@ import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
|||
@InterfaceStability.Evolving
|
||||
public abstract class MountTableStore extends CachedRecordStore<MountTable>
|
||||
implements MountTableManager {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(MountTableStore.class);
|
||||
private MountTableRefresherService refreshService;
|
||||
|
||||
public MountTableStore(StateStoreDriver driver) {
|
||||
super(MountTable.class, driver);
|
||||
}
|
||||
|
||||
public void setRefreshService(MountTableRefresherService refreshService) {
|
||||
this.refreshService = refreshService;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update mount table cache of this router as well as all other routers.
|
||||
*/
|
||||
protected void updateCacheAllRouters() {
|
||||
if (refreshService != null) {
|
||||
try {
|
||||
refreshService.refresh();
|
||||
} catch (StateStoreUnavailableException e) {
|
||||
LOG.error("Cannot refresh mount table: state store not available", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -17,6 +17,9 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.store;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -110,4 +113,27 @@ public final class StateStoreUtils {
|
|||
}
|
||||
return matchingList;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns address in form of host:port, empty string if address is null.
|
||||
*
|
||||
* @param address address
|
||||
* @return host:port
|
||||
*/
|
||||
public static String getHostPortString(InetSocketAddress address) {
|
||||
if (null == address) {
|
||||
return "";
|
||||
}
|
||||
String hostName = address.getHostName();
|
||||
if (hostName.equals("0.0.0.0")) {
|
||||
try {
|
||||
hostName = InetAddress.getLocalHost().getHostName();
|
||||
} catch (UnknownHostException e) {
|
||||
LOG.error("Failed to get local host name", e);
|
||||
return "";
|
||||
}
|
||||
}
|
||||
return hostName + ":" + address.getPort();
|
||||
}
|
||||
|
||||
}
|
|
@ -33,6 +33,8 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntr
|
|||
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
|
||||
|
@ -68,6 +70,7 @@ public class MountTableStoreImpl extends MountTableStore {
|
|||
AddMountTableEntryResponse response =
|
||||
AddMountTableEntryResponse.newInstance();
|
||||
response.setStatus(status);
|
||||
updateCacheAllRouters();
|
||||
return response;
|
||||
}
|
||||
|
||||
|
@ -86,6 +89,7 @@ public class MountTableStoreImpl extends MountTableStore {
|
|||
UpdateMountTableEntryResponse response =
|
||||
UpdateMountTableEntryResponse.newInstance();
|
||||
response.setStatus(status);
|
||||
updateCacheAllRouters();
|
||||
return response;
|
||||
}
|
||||
|
||||
|
@ -110,6 +114,7 @@ public class MountTableStoreImpl extends MountTableStore {
|
|||
RemoveMountTableEntryResponse response =
|
||||
RemoveMountTableEntryResponse.newInstance();
|
||||
response.setStatus(status);
|
||||
updateCacheAllRouters();
|
||||
return response;
|
||||
}
|
||||
|
||||
|
@ -151,4 +156,17 @@ public class MountTableStoreImpl extends MountTableStore {
|
|||
response.setTimestamp(Time.now());
|
||||
return response;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshMountTableEntriesResponse refreshMountTableEntries(
|
||||
RefreshMountTableEntriesRequest request) throws IOException {
|
||||
// Because this refresh is done through admin API, it should always be force
|
||||
// refresh.
|
||||
boolean result = loadCache(true);
|
||||
RefreshMountTableEntriesResponse response =
|
||||
RefreshMountTableEntriesResponse.newInstance();
|
||||
response.setResult(result);
|
||||
return response;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.store.protocol;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
|
||||
|
||||
/**
|
||||
* API request for refreshing mount table cached entries from state store.
|
||||
*/
|
||||
public abstract class RefreshMountTableEntriesRequest {
|
||||
|
||||
public static RefreshMountTableEntriesRequest newInstance()
|
||||
throws IOException {
|
||||
return StateStoreSerializer
|
||||
.newRecord(RefreshMountTableEntriesRequest.class);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,44 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.store.protocol;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
|
||||
|
||||
/**
|
||||
* API response for refreshing mount table entries cache from state store.
|
||||
*/
|
||||
public abstract class RefreshMountTableEntriesResponse {
|
||||
|
||||
public static RefreshMountTableEntriesResponse newInstance()
|
||||
throws IOException {
|
||||
return StateStoreSerializer
|
||||
.newRecord(RefreshMountTableEntriesResponse.class);
|
||||
}
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract boolean getResult();
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void setResult(boolean result);
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesRequestProto;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesRequestProto.Builder;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesRequestProtoOrBuilder;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
|
||||
|
||||
import com.google.protobuf.Message;
|
||||
|
||||
/**
|
||||
* Protobuf implementation of the state store API object
|
||||
* RefreshMountTableEntriesRequest.
|
||||
*/
|
||||
public class RefreshMountTableEntriesRequestPBImpl
|
||||
extends RefreshMountTableEntriesRequest implements PBRecord {
|
||||
|
||||
private FederationProtocolPBTranslator<RefreshMountTableEntriesRequestProto,
|
||||
Builder, RefreshMountTableEntriesRequestProtoOrBuilder> translator =
|
||||
new FederationProtocolPBTranslator<>(
|
||||
RefreshMountTableEntriesRequestProto.class);
|
||||
|
||||
public RefreshMountTableEntriesRequestPBImpl() {
|
||||
}
|
||||
|
||||
public RefreshMountTableEntriesRequestPBImpl(
|
||||
RefreshMountTableEntriesRequestProto proto) {
|
||||
this.translator.setProto(proto);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshMountTableEntriesRequestProto getProto() {
|
||||
// if builder is null build() returns null, calling getBuilder() to
|
||||
// instantiate builder
|
||||
this.translator.getBuilder();
|
||||
return this.translator.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setProto(Message proto) {
|
||||
this.translator.setProto(proto);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readInstance(String base64String) throws IOException {
|
||||
this.translator.readInstance(base64String);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,74 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesResponseProto;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesResponseProto.Builder;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesResponseProtoOrBuilder;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
|
||||
|
||||
import com.google.protobuf.Message;
|
||||
|
||||
/**
|
||||
* Protobuf implementation of the state store API object
|
||||
* RefreshMountTableEntriesResponse.
|
||||
*/
|
||||
public class RefreshMountTableEntriesResponsePBImpl
|
||||
extends RefreshMountTableEntriesResponse implements PBRecord {
|
||||
|
||||
private FederationProtocolPBTranslator<RefreshMountTableEntriesResponseProto,
|
||||
Builder, RefreshMountTableEntriesResponseProtoOrBuilder> translator =
|
||||
new FederationProtocolPBTranslator<>(
|
||||
RefreshMountTableEntriesResponseProto.class);
|
||||
|
||||
public RefreshMountTableEntriesResponsePBImpl() {
|
||||
}
|
||||
|
||||
public RefreshMountTableEntriesResponsePBImpl(
|
||||
RefreshMountTableEntriesResponseProto proto) {
|
||||
this.translator.setProto(proto);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshMountTableEntriesResponseProto getProto() {
|
||||
return this.translator.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setProto(Message proto) {
|
||||
this.translator.setProto(proto);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readInstance(String base64String) throws IOException {
|
||||
this.translator.readInstance(base64String);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean getResult() {
|
||||
return this.translator.getProtoOrBuilder().getResult();
|
||||
};
|
||||
|
||||
@Override
|
||||
public void setResult(boolean result) {
|
||||
this.translator.getBuilder().setResult(result);
|
||||
}
|
||||
}
|
|
@ -88,6 +88,10 @@ public abstract class RouterState extends BaseRecord {
|
|||
|
||||
public abstract long getDateStarted();
|
||||
|
||||
public abstract void setAdminAddress(String adminAddress);
|
||||
|
||||
public abstract String getAdminAddress();
|
||||
|
||||
/**
|
||||
* Get the identifier for the Router. It uses the address.
|
||||
*
|
||||
|
|
|
@ -199,4 +199,14 @@ public class RouterStatePBImpl extends RouterState implements PBRecord {
|
|||
public long getDateCreated() {
|
||||
return this.translator.getProtoOrBuilder().getDateCreated();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAdminAddress(String adminAddress) {
|
||||
this.translator.getBuilder().setAdminAddress(adminAddress);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getAdminAddress() {
|
||||
return this.translator.getProtoOrBuilder().getAdminAddress();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -54,6 +54,8 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeReques
|
|||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
|
||||
|
@ -107,7 +109,8 @@ public class RouterAdmin extends Configured implements Tool {
|
|||
if (cmd == null) {
|
||||
String[] commands =
|
||||
{"-add", "-update", "-rm", "-ls", "-setQuota", "-clrQuota",
|
||||
"-safemode", "-nameservice", "-getDisabledNameservices"};
|
||||
"-safemode", "-nameservice", "-getDisabledNameservices",
|
||||
"-refresh"};
|
||||
StringBuilder usage = new StringBuilder();
|
||||
usage.append("Usage: hdfs dfsrouteradmin :\n");
|
||||
for (int i = 0; i < commands.length; i++) {
|
||||
|
@ -142,6 +145,8 @@ public class RouterAdmin extends Configured implements Tool {
|
|||
return "\t[-nameservice enable | disable <nameservice>]";
|
||||
} else if (cmd.equals("-getDisabledNameservices")) {
|
||||
return "\t[-getDisabledNameservices]";
|
||||
} else if (cmd.equals("-refresh")) {
|
||||
return "\t[-refresh]";
|
||||
}
|
||||
return getUsage(null);
|
||||
}
|
||||
|
@ -230,9 +235,10 @@ public class RouterAdmin extends Configured implements Tool {
|
|||
printUsage(cmd);
|
||||
return exitCode;
|
||||
}
|
||||
String address = null;
|
||||
// Initialize RouterClient
|
||||
try {
|
||||
String address = getConf().getTrimmed(
|
||||
address = getConf().getTrimmed(
|
||||
RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY,
|
||||
RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT);
|
||||
InetSocketAddress routerSocket = NetUtils.createSocketAddr(address);
|
||||
|
@ -302,6 +308,8 @@ public class RouterAdmin extends Configured implements Tool {
|
|||
manageNameservice(subcmd, nsId);
|
||||
} else if ("-getDisabledNameservices".equals(cmd)) {
|
||||
getDisabledNameservices();
|
||||
} else if ("-refresh".equals(cmd)) {
|
||||
refresh(address);
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unknown Command: " + cmd);
|
||||
}
|
||||
|
@ -337,6 +345,27 @@ public class RouterAdmin extends Configured implements Tool {
|
|||
return exitCode;
|
||||
}
|
||||
|
||||
private void refresh(String address) throws IOException {
|
||||
if (refreshRouterCache()) {
|
||||
System.out.println(
|
||||
"Successfully updated mount table cache on router " + address);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Refresh mount table cache on connected router.
|
||||
*
|
||||
* @return true if cache refreshed successfully
|
||||
* @throws IOException
|
||||
*/
|
||||
private boolean refreshRouterCache() throws IOException {
|
||||
RefreshMountTableEntriesResponse response =
|
||||
client.getMountTableManager().refreshMountTableEntries(
|
||||
RefreshMountTableEntriesRequest.newInstance());
|
||||
return response.getResult();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Add a mount table entry or update if it exists.
|
||||
*
|
||||
|
|
|
@ -193,6 +193,7 @@ message RouterRecordProto {
|
|||
optional string version = 6;
|
||||
optional string compileInfo = 7;
|
||||
optional uint64 dateStarted = 8;
|
||||
optional string adminAddress = 9;
|
||||
}
|
||||
|
||||
message GetRouterRegistrationRequestProto {
|
||||
|
@ -219,6 +220,13 @@ message RouterHeartbeatResponseProto {
|
|||
optional bool status = 1;
|
||||
}
|
||||
|
||||
message RefreshMountTableEntriesRequestProto {
|
||||
}
|
||||
|
||||
message RefreshMountTableEntriesResponseProto {
|
||||
optional bool result = 1;
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////
|
||||
// Route State
|
||||
/////////////////////////////////////////////////
|
||||
|
|
|
@ -74,4 +74,9 @@ service RouterAdminProtocolService {
|
|||
* Get the list of disabled name services.
|
||||
*/
|
||||
rpc getDisabledNameservices(GetDisabledNameservicesRequestProto) returns (GetDisabledNameservicesResponseProto);
|
||||
|
||||
/**
|
||||
* Refresh mount entries
|
||||
*/
|
||||
rpc refreshMountTableEntries(RefreshMountTableEntriesRequestProto) returns(RefreshMountTableEntriesResponseProto);
|
||||
}
|
|
@ -547,4 +547,38 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.federation.router.mount-table.cache.update</name>
|
||||
<value>false</value>
|
||||
<description>Set true to enable MountTableRefreshService. This service
|
||||
updates mount table cache immediately after adding, modifying or
|
||||
deleting the mount table entries. If this service is not enabled
|
||||
mount table cache are refreshed periodically by
|
||||
StateStoreCacheUpdateService
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.federation.router.mount-table.cache.update.timeout</name>
|
||||
<value>1m</value>
|
||||
<description>This property defines how long to wait for all the
|
||||
admin servers to finish their mount table cache update. This setting
|
||||
supports multiple time unit suffixes as described in
|
||||
dfs.federation.router.safemode.extension.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.federation.router.mount-table.cache.update.client.max.time
|
||||
</name>
|
||||
<value>5m</value>
|
||||
<description>Remote router mount table cache is updated through
|
||||
RouterClient(RPC call). To improve performance, RouterClient
|
||||
connections are cached but it should not be kept in cache forever.
|
||||
This property defines the max time a connection can be cached. This
|
||||
setting supports multiple time unit suffixes as described in
|
||||
dfs.federation.router.safemode.extension.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
</configuration>
|
|
@ -230,6 +230,12 @@ Ls command will show below information for each mount table entry:
|
|||
Source Destinations Owner Group Mode Quota/Usage
|
||||
/path ns0->/path root supergroup rwxr-xr-x [NsQuota: 50/0, SsQuota: 100 B/0 B]
|
||||
|
||||
Mount table cache is refreshed periodically but it can also be refreshed by executing refresh command:
|
||||
|
||||
[hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -refresh
|
||||
|
||||
The above command will refresh cache of the connected router. This command is redundant when mount table refresh service is enabled as the service will always keep the cache updated.
|
||||
|
||||
#### Multiple subclusters
|
||||
A mount point also supports mapping multiple subclusters.
|
||||
For example, to create a mount point that stores files in subclusters `ns1` and `ns2`.
|
||||
|
@ -380,6 +386,9 @@ The connection to the State Store and the internal caching at the Router.
|
|||
| dfs.federation.router.store.connection.test | 60000 | How often to check for the connection to the State Store in milliseconds. |
|
||||
| dfs.federation.router.cache.ttl | 60000 | How often to refresh the State Store caches in milliseconds. |
|
||||
| dfs.federation.router.store.membership.expiration | 300000 | Expiration time in milliseconds for a membership record. |
|
||||
| dfs.federation.router.mount-table.cache.update | false | If true, Mount table cache is updated whenever a mount table entry is added, modified or removed for all the routers. |
|
||||
| dfs.federation.router.mount-table.cache.update.timeout | 1m | Max time to wait for all the routers to finish their mount table cache update. |
|
||||
| dfs.federation.router.mount-table.cache.update.client.max.time | 5m | Max time a RouterClient connection can be cached. |
|
||||
|
||||
### Routing
|
||||
|
||||
|
|
|
@ -56,6 +56,8 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
|||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
|
@ -316,4 +318,29 @@ public final class FederationTestUtils {
|
|||
}).when(spyHAContext).checkOperation(any(OperationCategory.class));
|
||||
Whitebox.setInternalState(namesystem, "haContext", spyHAContext);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for a number of routers to be registered in state store.
|
||||
*
|
||||
* @param stateManager number of routers to be registered.
|
||||
* @param routerCount number of routers to be registered.
|
||||
* @param tiemout max wait time in ms
|
||||
*/
|
||||
public static void waitRouterRegistered(RouterStore stateManager,
|
||||
long routerCount, int timeout) throws Exception {
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
try {
|
||||
List<RouterState> cachedRecords = stateManager.getCachedRecords();
|
||||
if (cachedRecords.size() == routerCount) {
|
||||
return true;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// Ignore
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}, 100, timeout);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,6 +38,7 @@ public class RouterConfigBuilder {
|
|||
private boolean enableMetrics = false;
|
||||
private boolean enableQuota = false;
|
||||
private boolean enableSafemode = false;
|
||||
private boolean enableCacheRefresh;
|
||||
|
||||
public RouterConfigBuilder(Configuration configuration) {
|
||||
this.conf = configuration;
|
||||
|
@ -104,6 +105,11 @@ public class RouterConfigBuilder {
|
|||
return this;
|
||||
}
|
||||
|
||||
public RouterConfigBuilder refreshCache(boolean enable) {
|
||||
this.enableCacheRefresh = enable;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RouterConfigBuilder rpc() {
|
||||
return this.rpc(true);
|
||||
}
|
||||
|
@ -140,6 +146,10 @@ public class RouterConfigBuilder {
|
|||
return this.safemode(true);
|
||||
}
|
||||
|
||||
public RouterConfigBuilder refreshCache() {
|
||||
return this.refreshCache(true);
|
||||
}
|
||||
|
||||
public Configuration build() {
|
||||
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_STORE_ENABLE,
|
||||
this.enableStateStore);
|
||||
|
@ -158,6 +168,8 @@ public class RouterConfigBuilder {
|
|||
this.enableQuota);
|
||||
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE,
|
||||
this.enableSafemode);
|
||||
conf.setBoolean(RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE,
|
||||
this.enableCacheRefresh);
|
||||
return conf;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -799,6 +799,28 @@ public class TestRouterAdminCLI {
|
|||
assertTrue(err.toString().contains("No arguments allowed"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshMountTableCache() throws Exception {
|
||||
String src = "/refreshMount";
|
||||
|
||||
// create mount table entry
|
||||
String[] argv = new String[] {"-add", src, "refreshNS0", "/refreshDest"};
|
||||
assertEquals(0, ToolRunner.run(admin, argv));
|
||||
|
||||
// refresh the mount table entry cache
|
||||
System.setOut(new PrintStream(out));
|
||||
argv = new String[] {"-refresh"};
|
||||
assertEquals(0, ToolRunner.run(admin, argv));
|
||||
assertTrue(
|
||||
out.toString().startsWith("Successfully updated mount table cache"));
|
||||
|
||||
// Now ls should return that mount table entry
|
||||
out.reset();
|
||||
argv = new String[] {"-ls", src};
|
||||
assertEquals(0, ToolRunner.run(admin, argv));
|
||||
assertTrue(out.toString().contains(src));
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the Router transforming to expected state.
|
||||
* @param expectedState Expected Router state.
|
||||
|
@ -836,8 +858,7 @@ public class TestRouterAdminCLI {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateDestinationForExistingMountTable() throws
|
||||
Exception {
|
||||
public void testUpdateDestinationForExistingMountTable() throws Exception {
|
||||
// Add a mount table firstly
|
||||
String nsId = "ns0";
|
||||
String src = "/test-updateDestinationForExistingMountTable";
|
||||
|
|
|
@ -0,0 +1,396 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.router;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.curator.test.TestingServer;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.hdfs.server.federation.FederationTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
|
||||
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
||||
import org.apache.hadoop.service.Service.STATE;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* This test class verifies that mount table cache is updated on all the routers
|
||||
* when MountTableRefreshService is enabled and there is a change in mount table
|
||||
* entries.
|
||||
*/
|
||||
public class TestRouterMountTableCacheRefresh {
|
||||
private static TestingServer curatorTestingServer;
|
||||
private static MiniRouterDFSCluster cluster;
|
||||
private static RouterContext routerContext;
|
||||
private static MountTableManager mountTableManager;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
curatorTestingServer = new TestingServer();
|
||||
curatorTestingServer.start();
|
||||
final String connectString = curatorTestingServer.getConnectString();
|
||||
int numNameservices = 2;
|
||||
cluster = new MiniRouterDFSCluster(false, numNameservices);
|
||||
Configuration conf = new RouterConfigBuilder().refreshCache().admin().rpc()
|
||||
.heartbeat().build();
|
||||
conf.setClass(RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
|
||||
RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS_DEFAULT,
|
||||
FileSubclusterResolver.class);
|
||||
conf.set(CommonConfigurationKeys.ZK_ADDRESS, connectString);
|
||||
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_STORE_ENABLE, true);
|
||||
cluster.addRouterOverrides(conf);
|
||||
cluster.startCluster();
|
||||
cluster.startRouters();
|
||||
cluster.waitClusterUp();
|
||||
routerContext = cluster.getRandomRouter();
|
||||
RouterStore routerStateManager =
|
||||
routerContext.getRouter().getRouterStateManager();
|
||||
mountTableManager = routerContext.getAdminClient().getMountTableManager();
|
||||
// wait for one minute for all the routers to get registered
|
||||
FederationTestUtils.waitRouterRegistered(routerStateManager,
|
||||
numNameservices, 60000);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void destory() {
|
||||
try {
|
||||
curatorTestingServer.close();
|
||||
cluster.shutdown();
|
||||
} catch (IOException e) {
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws IOException {
|
||||
clearEntries();
|
||||
}
|
||||
|
||||
private void clearEntries() throws IOException {
|
||||
List<MountTable> result = getMountTableEntries();
|
||||
for (MountTable mountTable : result) {
|
||||
RemoveMountTableEntryResponse removeMountTableEntry =
|
||||
mountTableManager.removeMountTableEntry(RemoveMountTableEntryRequest
|
||||
.newInstance(mountTable.getSourcePath()));
|
||||
assertTrue(removeMountTableEntry.getStatus());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* addMountTableEntry API should internally update the cache on all the
|
||||
* routers.
|
||||
*/
|
||||
@Test
|
||||
public void testMountTableEntriesCacheUpdatedAfterAddAPICall()
|
||||
throws IOException {
|
||||
|
||||
// Existing mount table size
|
||||
int existingEntriesCount = getNumMountTableEntries();
|
||||
String srcPath = "/addPath";
|
||||
MountTable newEntry = MountTable.newInstance(srcPath,
|
||||
Collections.singletonMap("ns0", "/addPathDest"), Time.now(),
|
||||
Time.now());
|
||||
addMountTableEntry(mountTableManager, newEntry);
|
||||
|
||||
// When Add entry is done, all the routers must have updated its mount table
|
||||
// entry
|
||||
List<RouterContext> routers = getRouters();
|
||||
for (RouterContext rc : routers) {
|
||||
List<MountTable> result =
|
||||
getMountTableEntries(rc.getAdminClient().getMountTableManager());
|
||||
assertEquals(1 + existingEntriesCount, result.size());
|
||||
MountTable mountTableResult = result.get(0);
|
||||
assertEquals(srcPath, mountTableResult.getSourcePath());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* removeMountTableEntry API should internally update the cache on all the
|
||||
* routers.
|
||||
*/
|
||||
@Test
|
||||
public void testMountTableEntriesCacheUpdatedAfterRemoveAPICall()
|
||||
throws IOException {
|
||||
// add
|
||||
String srcPath = "/removePathSrc";
|
||||
MountTable newEntry = MountTable.newInstance(srcPath,
|
||||
Collections.singletonMap("ns0", "/removePathDest"), Time.now(),
|
||||
Time.now());
|
||||
addMountTableEntry(mountTableManager, newEntry);
|
||||
int addCount = getNumMountTableEntries();
|
||||
assertEquals(1, addCount);
|
||||
|
||||
// remove
|
||||
RemoveMountTableEntryResponse removeMountTableEntry =
|
||||
mountTableManager.removeMountTableEntry(
|
||||
RemoveMountTableEntryRequest.newInstance(srcPath));
|
||||
assertTrue(removeMountTableEntry.getStatus());
|
||||
|
||||
int removeCount = getNumMountTableEntries();
|
||||
assertEquals(addCount - 1, removeCount);
|
||||
}
|
||||
|
||||
/**
|
||||
* updateMountTableEntry API should internally update the cache on all the
|
||||
* routers.
|
||||
*/
|
||||
@Test
|
||||
public void testMountTableEntriesCacheUpdatedAfterUpdateAPICall()
|
||||
throws IOException {
|
||||
// add
|
||||
String srcPath = "/updatePathSrc";
|
||||
MountTable newEntry = MountTable.newInstance(srcPath,
|
||||
Collections.singletonMap("ns0", "/updatePathDest"), Time.now(),
|
||||
Time.now());
|
||||
addMountTableEntry(mountTableManager, newEntry);
|
||||
int addCount = getNumMountTableEntries();
|
||||
assertEquals(1, addCount);
|
||||
|
||||
// update
|
||||
String key = "ns1";
|
||||
String value = "/updatePathDest2";
|
||||
MountTable upateEntry = MountTable.newInstance(srcPath,
|
||||
Collections.singletonMap(key, value), Time.now(), Time.now());
|
||||
UpdateMountTableEntryResponse updateMountTableEntry =
|
||||
mountTableManager.updateMountTableEntry(
|
||||
UpdateMountTableEntryRequest.newInstance(upateEntry));
|
||||
assertTrue(updateMountTableEntry.getStatus());
|
||||
MountTable updatedMountTable = getMountTableEntry(srcPath);
|
||||
assertNotNull("Updated mount table entrty cannot be null",
|
||||
updatedMountTable);
|
||||
assertEquals(1, updatedMountTable.getDestinations().size());
|
||||
assertEquals(key,
|
||||
updatedMountTable.getDestinations().get(0).getNameserviceId());
|
||||
assertEquals(value, updatedMountTable.getDestinations().get(0).getDest());
|
||||
}
|
||||
|
||||
/**
|
||||
* After caching RouterClient if router goes down, refresh should be
|
||||
* successful on other available router. The router which is not running
|
||||
* should be ignored.
|
||||
*/
|
||||
@Test
|
||||
public void testCachedRouterClientBehaviourAfterRouterStoped()
|
||||
throws IOException {
|
||||
String srcPath = "/addPathClientCache";
|
||||
MountTable newEntry = MountTable.newInstance(srcPath,
|
||||
Collections.singletonMap("ns0", "/addPathClientCacheDest"), Time.now(),
|
||||
Time.now());
|
||||
addMountTableEntry(mountTableManager, newEntry);
|
||||
|
||||
// When Add entry is done, all the routers must have updated its mount table
|
||||
// entry
|
||||
List<RouterContext> routers = getRouters();
|
||||
for (RouterContext rc : routers) {
|
||||
List<MountTable> result =
|
||||
getMountTableEntries(rc.getAdminClient().getMountTableManager());
|
||||
assertEquals(1, result.size());
|
||||
MountTable mountTableResult = result.get(0);
|
||||
assertEquals(srcPath, mountTableResult.getSourcePath());
|
||||
}
|
||||
|
||||
// Lets stop one router
|
||||
for (RouterContext rc : routers) {
|
||||
InetSocketAddress adminServerAddress =
|
||||
rc.getRouter().getAdminServerAddress();
|
||||
if (!routerContext.getRouter().getAdminServerAddress()
|
||||
.equals(adminServerAddress)) {
|
||||
cluster.stopRouter(rc);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
srcPath = "/addPathClientCache2";
|
||||
newEntry = MountTable.newInstance(srcPath,
|
||||
Collections.singletonMap("ns0", "/addPathClientCacheDest2"), Time.now(),
|
||||
Time.now());
|
||||
addMountTableEntry(mountTableManager, newEntry);
|
||||
for (RouterContext rc : getRouters()) {
|
||||
List<MountTable> result =
|
||||
getMountTableEntries(rc.getAdminClient().getMountTableManager());
|
||||
assertEquals(2, result.size());
|
||||
}
|
||||
}
|
||||
|
||||
private List<RouterContext> getRouters() {
|
||||
List<RouterContext> result = new ArrayList<>();
|
||||
for (RouterContext rc : cluster.getRouters()) {
|
||||
if (rc.getRouter().getServiceState() == STATE.STARTED) {
|
||||
result.add(rc);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshMountTableEntriesAPI() throws IOException {
|
||||
RefreshMountTableEntriesRequest request =
|
||||
RefreshMountTableEntriesRequest.newInstance();
|
||||
RefreshMountTableEntriesResponse refreshMountTableEntriesRes =
|
||||
mountTableManager.refreshMountTableEntries(request);
|
||||
// refresh should be successful
|
||||
assertTrue(refreshMountTableEntriesRes.getResult());
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify cache update timeouts when any of the router takes more time than
|
||||
* the configured timeout period.
|
||||
*/
|
||||
@Test(timeout = 10000)
|
||||
public void testMountTableEntriesCacheUpdateTimeout() throws IOException {
|
||||
// Resources will be closed when router is closed
|
||||
@SuppressWarnings("resource")
|
||||
MountTableRefresherService mountTableRefresherService =
|
||||
new MountTableRefresherService(routerContext.getRouter()) {
|
||||
@Override
|
||||
protected MountTableRefresherThread getLocalRefresher(
|
||||
String adminAddress) {
|
||||
return new MountTableRefresherThread(null, adminAddress) {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
// Sleep 1 minute
|
||||
Thread.sleep(60000);
|
||||
} catch (InterruptedException e) {
|
||||
// Do nothing
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
Configuration config = routerContext.getRouter().getConfig();
|
||||
config.setTimeDuration(RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_TIMEOUT, 5,
|
||||
TimeUnit.SECONDS);
|
||||
mountTableRefresherService.init(config);
|
||||
// One router is not responding for 1 minute, still refresh should
|
||||
// finished in 5 second as cache update timeout is set 5 second.
|
||||
mountTableRefresherService.refresh();
|
||||
// Test case timeout is assert for this test case.
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify Cached RouterClient connections are removed from cache and closed
|
||||
* when their max live time is elapsed.
|
||||
*/
|
||||
@Test
|
||||
public void testRouterClientConnectionExpiration() throws Exception {
|
||||
final AtomicInteger createCounter = new AtomicInteger();
|
||||
final AtomicInteger removeCounter = new AtomicInteger();
|
||||
// Resources will be closed when router is closed
|
||||
@SuppressWarnings("resource")
|
||||
MountTableRefresherService mountTableRefresherService =
|
||||
new MountTableRefresherService(routerContext.getRouter()) {
|
||||
@Override
|
||||
protected void closeRouterClient(RouterClient client) {
|
||||
super.closeRouterClient(client);
|
||||
removeCounter.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RouterClient createRouterClient(
|
||||
InetSocketAddress routerSocket, Configuration config)
|
||||
throws IOException {
|
||||
createCounter.incrementAndGet();
|
||||
return super.createRouterClient(routerSocket, config);
|
||||
}
|
||||
};
|
||||
int clientCacheTime = 2000;
|
||||
Configuration config = routerContext.getRouter().getConfig();
|
||||
config.setTimeDuration(
|
||||
RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_CLIENT_MAX_TIME, clientCacheTime,
|
||||
TimeUnit.MILLISECONDS);
|
||||
mountTableRefresherService.init(config);
|
||||
// Do refresh to created RouterClient
|
||||
mountTableRefresherService.refresh();
|
||||
assertNotEquals("No RouterClient is created.", 0, createCounter.get());
|
||||
/*
|
||||
* Wait for clients to expire. Lets wait triple the cache eviction period.
|
||||
* After cache eviction period all created client must be removed and
|
||||
* closed.
|
||||
*/
|
||||
GenericTestUtils.waitFor(() -> createCounter.get() == removeCounter.get(),
|
||||
100, 3 * clientCacheTime);
|
||||
}
|
||||
|
||||
private int getNumMountTableEntries() throws IOException {
|
||||
List<MountTable> records = getMountTableEntries();
|
||||
int oldEntriesCount = records.size();
|
||||
return oldEntriesCount;
|
||||
}
|
||||
|
||||
private MountTable getMountTableEntry(String srcPath) throws IOException {
|
||||
List<MountTable> mountTableEntries = getMountTableEntries();
|
||||
for (MountTable mountTable : mountTableEntries) {
|
||||
String sourcePath = mountTable.getSourcePath();
|
||||
if (srcPath.equals(sourcePath)) {
|
||||
return mountTable;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private void addMountTableEntry(MountTableManager mountTableMgr,
|
||||
MountTable newEntry) throws IOException {
|
||||
AddMountTableEntryRequest addRequest =
|
||||
AddMountTableEntryRequest.newInstance(newEntry);
|
||||
AddMountTableEntryResponse addResponse =
|
||||
mountTableMgr.addMountTableEntry(addRequest);
|
||||
assertTrue(addResponse.getStatus());
|
||||
}
|
||||
|
||||
private List<MountTable> getMountTableEntries() throws IOException {
|
||||
return getMountTableEntries(mountTableManager);
|
||||
}
|
||||
|
||||
private List<MountTable> getMountTableEntries(
|
||||
MountTableManager mountTableManagerParam) throws IOException {
|
||||
GetMountTableEntriesRequest request =
|
||||
GetMountTableEntriesRequest.newInstance("/");
|
||||
return mountTableManagerParam.getMountTableEntries(request).getEntries();
|
||||
}
|
||||
}
|
|
@ -437,6 +437,7 @@ Usage:
|
|||
[-safemode enter | leave | get]
|
||||
[-nameservice disable | enable <nameservice>]
|
||||
[-getDisabledNameservices]
|
||||
[-refresh]
|
||||
|
||||
| COMMAND\_OPTION | Description |
|
||||
|:---- |:---- |
|
||||
|
@ -449,6 +450,7 @@ Usage:
|
|||
| `-safemode` `enter` `leave` `get` | Manually set the Router entering or leaving safe mode. The option *get* will be used for verifying if the Router is in safe mode state. |
|
||||
| `-nameservice` `disable` `enable` *nameservice* | Disable/enable a name service from the federation. If disabled, requests will not go to that name service. |
|
||||
| `-getDisabledNameservices` | Get the name services that are disabled in the federation. |
|
||||
| `-refresh` | Update mount table cache of the connected router. |
|
||||
|
||||
The commands for managing Router-based federation. See [Mount table management](../hadoop-hdfs-rbf/HDFSRouterFederation.html#Mount_table_management) for more info.
|
||||
|
||||
|
|
Loading…
Reference in New Issue