HDFS-10646. Federation admin tool. Contributed by Inigo Goiri.
This commit is contained in:
parent
58b97df661
commit
ae27e31fbc
|
@ -332,6 +332,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
|||
<include>editlog.proto</include>
|
||||
<include>fsimage.proto</include>
|
||||
<include>FederationProtocol.proto</include>
|
||||
<include>RouterProtocol.proto</include>
|
||||
</includes>
|
||||
</source>
|
||||
</configuration>
|
||||
|
|
|
@ -31,6 +31,7 @@ function hadoop_usage
|
|||
hadoop_add_option "--hosts filename" "list of hosts to use in worker mode"
|
||||
hadoop_add_option "--workers" "turn on worker mode"
|
||||
|
||||
<<<<<<< HEAD
|
||||
hadoop_add_subcommand "balancer" daemon "run a cluster balancing utility"
|
||||
hadoop_add_subcommand "cacheadmin" admin "configure the HDFS cache"
|
||||
hadoop_add_subcommand "classpath" client "prints the class path needed to get the hadoop jar and the required libraries"
|
||||
|
@ -42,6 +43,7 @@ function hadoop_usage
|
|||
hadoop_add_subcommand "diskbalancer" daemon "Distributes data evenly among disks on a given node"
|
||||
hadoop_add_subcommand "envvars" client "display computed Hadoop environment variables"
|
||||
hadoop_add_subcommand "ec" admin "run a HDFS ErasureCoding CLI"
|
||||
hadoop_add_subcommand "federation" admin "manage Router-based federation"
|
||||
hadoop_add_subcommand "fetchdt" client "fetch a delegation token from the NameNode"
|
||||
hadoop_add_subcommand "fsck" admin "run a DFS filesystem checking utility"
|
||||
hadoop_add_subcommand "getconf" client "get config values from configuration"
|
||||
|
@ -181,6 +183,9 @@ function hdfscmd_case
|
|||
HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true"
|
||||
HADOOP_CLASSNAME='org.apache.hadoop.hdfs.server.federation.router.Router'
|
||||
;;
|
||||
federation)
|
||||
HADOOP_CLASSNAME='org.apache.hadoop.hdfs.tools.federation.RouterAdmin'
|
||||
;;
|
||||
secondarynamenode)
|
||||
HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true"
|
||||
HADOOP_CLASSNAME='org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode'
|
||||
|
|
|
@ -59,7 +59,7 @@ if "%1" == "--loglevel" (
|
|||
)
|
||||
)
|
||||
|
||||
set hdfscommands=dfs namenode secondarynamenode journalnode zkfc datanode dfsadmin haadmin fsck balancer jmxget oiv oev fetchdt getconf groups snapshotDiff lsSnapshottableDir cacheadmin mover storagepolicies classpath crypto router debug
|
||||
set hdfscommands=dfs namenode secondarynamenode journalnode zkfc datanode dfsadmin haadmin fsck balancer jmxget oiv oev fetchdt getconf groups snapshotDiff lsSnapshottableDir cacheadmin mover storagepolicies classpath crypto router federation debug
|
||||
for %%i in ( %hdfscommands% ) do (
|
||||
if %hdfs-command% == %%i set hdfscommand=true
|
||||
)
|
||||
|
@ -184,6 +184,11 @@ goto :eof
|
|||
set HADOOP_OPTS=%HADOOP_OPTS% %HADOOP_ROUTER_OPTS%
|
||||
goto :eof
|
||||
|
||||
:federation
|
||||
set CLASS=org.apache.hadoop.hdfs.tools.federation.RouterAdmin
|
||||
set HADOOP_OPTS=%HADOOP_OPTS% %HADOOP_ROUTER_OPTS%
|
||||
goto :eof
|
||||
|
||||
:debug
|
||||
set CLASS=org.apache.hadoop.hdfs.tools.DebugAdmin
|
||||
goto :eof
|
||||
|
|
|
@ -1196,6 +1196,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final String FEDERATION_STORE_PREFIX =
|
||||
FEDERATION_ROUTER_PREFIX + "store.";
|
||||
|
||||
public static final String DFS_ROUTER_STORE_ENABLE =
|
||||
FEDERATION_STORE_PREFIX + "enable";
|
||||
public static final boolean DFS_ROUTER_STORE_ENABLE_DEFAULT = true;
|
||||
|
||||
public static final String FEDERATION_STORE_SERIALIZER_CLASS =
|
||||
DFSConfigKeys.FEDERATION_STORE_PREFIX + "serializer";
|
||||
public static final Class<StateStoreSerializerPBImpl>
|
||||
|
@ -1222,6 +1226,21 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final long FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT =
|
||||
TimeUnit.MINUTES.toMillis(5);
|
||||
|
||||
// HDFS Router-based federation admin
|
||||
public static final String DFS_ROUTER_ADMIN_HANDLER_COUNT_KEY =
|
||||
FEDERATION_ROUTER_PREFIX + "admin.handler.count";
|
||||
public static final int DFS_ROUTER_ADMIN_HANDLER_COUNT_DEFAULT = 1;
|
||||
public static final int DFS_ROUTER_ADMIN_PORT_DEFAULT = 8111;
|
||||
public static final String DFS_ROUTER_ADMIN_ADDRESS_KEY =
|
||||
FEDERATION_ROUTER_PREFIX + "admin-address";
|
||||
public static final String DFS_ROUTER_ADMIN_ADDRESS_DEFAULT =
|
||||
"0.0.0.0:" + DFS_ROUTER_ADMIN_PORT_DEFAULT;
|
||||
public static final String DFS_ROUTER_ADMIN_BIND_HOST_KEY =
|
||||
FEDERATION_ROUTER_PREFIX + "admin-bind-host";
|
||||
public static final String DFS_ROUTER_ADMIN_ENABLE =
|
||||
FEDERATION_ROUTER_PREFIX + "admin.enable";
|
||||
public static final boolean DFS_ROUTER_ADMIN_ENABLE_DEFAULT = true;
|
||||
|
||||
// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
|
||||
@Deprecated
|
||||
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.protocolPB;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.RouterProtocolProtos.RouterAdminProtocolService;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
|
||||
import org.apache.hadoop.ipc.ProtocolInfo;
|
||||
import org.apache.hadoop.security.KerberosInfo;
|
||||
import org.apache.hadoop.security.token.TokenInfo;
|
||||
|
||||
/**
|
||||
* Protocol that a clients use to communicate with the NameNode.
|
||||
* Note: This extends the protocolbuffer service based interface to
|
||||
* add annotations required for security.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Stable
|
||||
@KerberosInfo(
|
||||
serverPrincipal = DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY)
|
||||
@TokenInfo(DelegationTokenSelector.class)
|
||||
@ProtocolInfo(protocolName = HdfsConstants.CLIENT_NAMENODE_PROTOCOL_NAME,
|
||||
protocolVersion = 1)
|
||||
public interface RouterAdminProtocolPB extends
|
||||
RouterAdminProtocolService.BlockingInterface {
|
||||
}
|
|
@ -0,0 +1,151 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.protocolPB;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntryRequestProto;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntryResponseProto;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetMountTableEntriesRequestProto;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetMountTableEntriesResponseProto;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryRequestProto;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryResponseProto;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryRequestProto;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryResponseProto;
|
||||
import org.apache.hadoop.hdfs.server.federation.router.RouterAdminServer;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.AddMountTableEntryRequestPBImpl;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.AddMountTableEntryResponsePBImpl;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetMountTableEntriesRequestPBImpl;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetMountTableEntriesResponsePBImpl;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryRequestPBImpl;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryResponsePBImpl;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.UpdateMountTableEntryRequestPBImpl;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.UpdateMountTableEntryResponsePBImpl;
|
||||
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
/**
|
||||
* This class is used on the server side. Calls come across the wire for the for
|
||||
* protocol {@link RouterAdminProtocolPB}. This class translates the PB data
|
||||
* types to the native data types used inside the HDFS Router as specified in
|
||||
* the generic RouterAdminProtocol.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Stable
|
||||
public class RouterAdminProtocolServerSideTranslatorPB implements
|
||||
RouterAdminProtocolPB {
|
||||
|
||||
private final RouterAdminServer server;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
* @param server The NN server.
|
||||
* @throws IOException
|
||||
*/
|
||||
public RouterAdminProtocolServerSideTranslatorPB(RouterAdminServer server)
|
||||
throws IOException {
|
||||
this.server = server;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AddMountTableEntryResponseProto addMountTableEntry(
|
||||
RpcController controller, AddMountTableEntryRequestProto request)
|
||||
throws ServiceException {
|
||||
|
||||
try {
|
||||
AddMountTableEntryRequest req =
|
||||
new AddMountTableEntryRequestPBImpl(request);
|
||||
AddMountTableEntryResponse response = server.addMountTableEntry(req);
|
||||
AddMountTableEntryResponsePBImpl responsePB =
|
||||
(AddMountTableEntryResponsePBImpl)response;
|
||||
return responsePB.getProto();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove an entry from the mount table.
|
||||
*/
|
||||
@Override
|
||||
public RemoveMountTableEntryResponseProto removeMountTableEntry(
|
||||
RpcController controller, RemoveMountTableEntryRequestProto request)
|
||||
throws ServiceException {
|
||||
try {
|
||||
RemoveMountTableEntryRequest req =
|
||||
new RemoveMountTableEntryRequestPBImpl(request);
|
||||
RemoveMountTableEntryResponse response =
|
||||
server.removeMountTableEntry(req);
|
||||
RemoveMountTableEntryResponsePBImpl responsePB =
|
||||
(RemoveMountTableEntryResponsePBImpl)response;
|
||||
return responsePB.getProto();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get matching mount table entries.
|
||||
*/
|
||||
@Override
|
||||
public GetMountTableEntriesResponseProto getMountTableEntries(
|
||||
RpcController controller, GetMountTableEntriesRequestProto request)
|
||||
throws ServiceException {
|
||||
try {
|
||||
GetMountTableEntriesRequest req =
|
||||
new GetMountTableEntriesRequestPBImpl(request);
|
||||
GetMountTableEntriesResponse response = server.getMountTableEntries(req);
|
||||
GetMountTableEntriesResponsePBImpl responsePB =
|
||||
(GetMountTableEntriesResponsePBImpl)response;
|
||||
return responsePB.getProto();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update a single mount table entry.
|
||||
*/
|
||||
@Override
|
||||
public UpdateMountTableEntryResponseProto updateMountTableEntry(
|
||||
RpcController controller, UpdateMountTableEntryRequestProto request)
|
||||
throws ServiceException {
|
||||
try {
|
||||
UpdateMountTableEntryRequest req =
|
||||
new UpdateMountTableEntryRequestPBImpl(request);
|
||||
UpdateMountTableEntryResponse response =
|
||||
server.updateMountTableEntry(req);
|
||||
UpdateMountTableEntryResponsePBImpl responsePB =
|
||||
(UpdateMountTableEntryResponsePBImpl)response;
|
||||
return responsePB.getProto();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,150 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.protocolPB;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntryRequestProto;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntryResponseProto;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetMountTableEntriesRequestProto;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetMountTableEntriesResponseProto;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryRequestProto;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryResponseProto;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryRequestProto;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryResponseProto;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.AddMountTableEntryRequestPBImpl;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.AddMountTableEntryResponsePBImpl;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetMountTableEntriesRequestPBImpl;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetMountTableEntriesResponsePBImpl;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryRequestPBImpl;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryResponsePBImpl;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.UpdateMountTableEntryRequestPBImpl;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.UpdateMountTableEntryResponsePBImpl;
|
||||
import org.apache.hadoop.ipc.ProtobufHelper;
|
||||
import org.apache.hadoop.ipc.ProtocolMetaInterface;
|
||||
import org.apache.hadoop.ipc.ProtocolTranslator;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.RpcClientUtil;
|
||||
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
/**
|
||||
* This class forwards NN's ClientProtocol calls as RPC calls to the NN server
|
||||
* while translating from the parameter types used in ClientProtocol to the
|
||||
* new PB types.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Stable
|
||||
public class RouterAdminProtocolTranslatorPB
|
||||
implements ProtocolMetaInterface, MountTableManager,
|
||||
Closeable, ProtocolTranslator {
|
||||
final private RouterAdminProtocolPB rpcProxy;
|
||||
|
||||
public RouterAdminProtocolTranslatorPB(RouterAdminProtocolPB proxy) {
|
||||
rpcProxy = proxy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
RPC.stopProxy(rpcProxy);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getUnderlyingProxyObject() {
|
||||
return rpcProxy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isMethodSupported(String methodName) throws IOException {
|
||||
return RpcClientUtil.isMethodSupported(rpcProxy,
|
||||
RouterAdminProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
|
||||
RPC.getProtocolVersion(RouterAdminProtocolPB.class), methodName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AddMountTableEntryResponse addMountTableEntry(
|
||||
AddMountTableEntryRequest request) throws IOException {
|
||||
AddMountTableEntryRequestPBImpl requestPB =
|
||||
(AddMountTableEntryRequestPBImpl)request;
|
||||
AddMountTableEntryRequestProto proto = requestPB.getProto();
|
||||
try {
|
||||
AddMountTableEntryResponseProto response =
|
||||
rpcProxy.addMountTableEntry(null, proto);
|
||||
return new AddMountTableEntryResponsePBImpl(response);
|
||||
} catch (ServiceException e) {
|
||||
throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateMountTableEntryResponse updateMountTableEntry(
|
||||
UpdateMountTableEntryRequest request) throws IOException {
|
||||
UpdateMountTableEntryRequestPBImpl requestPB =
|
||||
(UpdateMountTableEntryRequestPBImpl)request;
|
||||
UpdateMountTableEntryRequestProto proto = requestPB.getProto();
|
||||
try {
|
||||
UpdateMountTableEntryResponseProto response =
|
||||
rpcProxy.updateMountTableEntry(null, proto);
|
||||
return new UpdateMountTableEntryResponsePBImpl(response);
|
||||
} catch (ServiceException e) {
|
||||
throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoveMountTableEntryResponse removeMountTableEntry(
|
||||
RemoveMountTableEntryRequest request) throws IOException {
|
||||
RemoveMountTableEntryRequestPBImpl requestPB =
|
||||
(RemoveMountTableEntryRequestPBImpl)request;
|
||||
RemoveMountTableEntryRequestProto proto = requestPB.getProto();
|
||||
try {
|
||||
RemoveMountTableEntryResponseProto responseProto =
|
||||
rpcProxy.removeMountTableEntry(null, proto);
|
||||
return new RemoveMountTableEntryResponsePBImpl(responseProto);
|
||||
} catch (ServiceException e) {
|
||||
throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetMountTableEntriesResponse getMountTableEntries(
|
||||
GetMountTableEntriesRequest request) throws IOException {
|
||||
GetMountTableEntriesRequestPBImpl requestPB =
|
||||
(GetMountTableEntriesRequestPBImpl)request;
|
||||
GetMountTableEntriesRequestProto proto = requestPB.getProto();
|
||||
try {
|
||||
GetMountTableEntriesResponseProto response =
|
||||
rpcProxy.getMountTableEntries(null, proto);
|
||||
return new GetMountTableEntriesResponsePBImpl(response);
|
||||
} catch (ServiceException e) {
|
||||
throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -61,7 +61,7 @@ public class MembershipNamenodeResolver
|
|||
/** Reference to the State Store. */
|
||||
private final StateStoreService stateStore;
|
||||
/** Membership State Store interface. */
|
||||
private final MembershipStore membershipInterface;
|
||||
private MembershipStore membershipInterface;
|
||||
|
||||
/** Parent router ID. */
|
||||
private String routerId;
|
||||
|
@ -82,25 +82,27 @@ public class MembershipNamenodeResolver
|
|||
if (this.stateStore != null) {
|
||||
// Request cache updates from the state store
|
||||
this.stateStore.registerCacheExternal(this);
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize the interface to get the membership
|
||||
private synchronized MembershipStore getMembershipStore() throws IOException {
|
||||
if (this.membershipInterface == null) {
|
||||
this.membershipInterface = this.stateStore.getRegisteredRecordStore(
|
||||
MembershipStore.class);
|
||||
} else {
|
||||
this.membershipInterface = null;
|
||||
}
|
||||
|
||||
if (this.membershipInterface == null) {
|
||||
throw new IOException("State Store does not have an interface for " +
|
||||
MembershipStore.class.getSimpleName());
|
||||
if (this.membershipInterface == null) {
|
||||
throw new IOException("State Store does not have an interface for " +
|
||||
MembershipStore.class.getSimpleName());
|
||||
}
|
||||
}
|
||||
return this.membershipInterface;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean loadCache(boolean force) {
|
||||
// Our cache depends on the store, update it first
|
||||
try {
|
||||
this.membershipInterface.loadCache(force);
|
||||
MembershipStore membership = getMembershipStore();
|
||||
membership.loadCache(force);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Cannot update membership from the State Store", e);
|
||||
}
|
||||
|
@ -126,8 +128,9 @@ public class MembershipNamenodeResolver
|
|||
GetNamenodeRegistrationsRequest request =
|
||||
GetNamenodeRegistrationsRequest.newInstance(partial);
|
||||
|
||||
MembershipStore membership = getMembershipStore();
|
||||
GetNamenodeRegistrationsResponse response =
|
||||
this.membershipInterface.getNamenodeRegistrations(request);
|
||||
membership.getNamenodeRegistrations(request);
|
||||
List<MembershipState> records = response.getNamenodeMemberships();
|
||||
|
||||
if (records != null && records.size() == 1) {
|
||||
|
@ -135,7 +138,7 @@ public class MembershipNamenodeResolver
|
|||
UpdateNamenodeRegistrationRequest updateRequest =
|
||||
UpdateNamenodeRegistrationRequest.newInstance(
|
||||
record.getNameserviceId(), record.getNamenodeId(), ACTIVE);
|
||||
this.membershipInterface.updateNamenodeRegistration(updateRequest);
|
||||
membership.updateNamenodeRegistration(updateRequest);
|
||||
}
|
||||
} catch (StateStoreUnavailableException e) {
|
||||
LOG.error("Cannot update {} as active, State Store unavailable", address);
|
||||
|
@ -226,14 +229,14 @@ public class MembershipNamenodeResolver
|
|||
|
||||
NamenodeHeartbeatRequest request = NamenodeHeartbeatRequest.newInstance();
|
||||
request.setNamenodeMembership(record);
|
||||
return this.membershipInterface.namenodeHeartbeat(request).getResult();
|
||||
return getMembershipStore().namenodeHeartbeat(request).getResult();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<FederationNamespaceInfo> getNamespaces() throws IOException {
|
||||
GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance();
|
||||
GetNamespaceInfoResponse response =
|
||||
this.membershipInterface.getNamespaceInfo(request);
|
||||
getMembershipStore().getNamespaceInfo(request);
|
||||
return response.getNamespaceInfo();
|
||||
}
|
||||
|
||||
|
@ -259,8 +262,9 @@ public class MembershipNamenodeResolver
|
|||
// Retrieve a list of all registrations that match this query.
|
||||
// This may include all NN records for a namespace/blockpool, including
|
||||
// duplicate records for the same NN from different routers.
|
||||
MembershipStore membershipStore = getMembershipStore();
|
||||
GetNamenodeRegistrationsResponse response =
|
||||
this.membershipInterface.getNamenodeRegistrations(request);
|
||||
membershipStore.getNamenodeRegistrations(request);
|
||||
|
||||
List<MembershipState> memberships = response.getNamenodeMemberships();
|
||||
if (!addExpired || !addUnavailable) {
|
||||
|
|
|
@ -81,6 +81,10 @@ public class Router extends CompositeService {
|
|||
private RouterRpcServer rpcServer;
|
||||
private InetSocketAddress rpcAddress;
|
||||
|
||||
/** RPC interface for the admin. */
|
||||
private RouterAdminServer adminServer;
|
||||
private InetSocketAddress adminAddress;
|
||||
|
||||
/** Interface with the State Store. */
|
||||
private StateStoreService stateStore;
|
||||
|
||||
|
@ -116,6 +120,14 @@ public class Router extends CompositeService {
|
|||
protected void serviceInit(Configuration configuration) throws Exception {
|
||||
this.conf = configuration;
|
||||
|
||||
if (conf.getBoolean(
|
||||
DFSConfigKeys.DFS_ROUTER_STORE_ENABLE,
|
||||
DFSConfigKeys.DFS_ROUTER_STORE_ENABLE_DEFAULT)) {
|
||||
// Service that maintains the State Store connection
|
||||
this.stateStore = new StateStoreService();
|
||||
addService(this.stateStore);
|
||||
}
|
||||
|
||||
// Resolver to track active NNs
|
||||
this.namenodeResolver = newActiveNamenodeResolver(
|
||||
this.conf, this.stateStore);
|
||||
|
@ -138,6 +150,14 @@ public class Router extends CompositeService {
|
|||
this.setRpcServerAddress(rpcServer.getRpcAddress());
|
||||
}
|
||||
|
||||
if (conf.getBoolean(
|
||||
DFSConfigKeys.DFS_ROUTER_ADMIN_ENABLE,
|
||||
DFSConfigKeys.DFS_ROUTER_ADMIN_ENABLE_DEFAULT)) {
|
||||
// Create admin server
|
||||
this.adminServer = createAdminServer();
|
||||
addService(this.adminServer);
|
||||
}
|
||||
|
||||
if (conf.getBoolean(
|
||||
DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE,
|
||||
DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE_DEFAULT)) {
|
||||
|
@ -263,6 +283,38 @@ public class Router extends CompositeService {
|
|||
return this.rpcAddress;
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////
|
||||
// Admin server
|
||||
/////////////////////////////////////////////////////////
|
||||
|
||||
/**
|
||||
* Create a new router admin server to handle the router admin interface.
|
||||
*
|
||||
* @return RouterAdminServer
|
||||
* @throws IOException If the admin server was not successfully started.
|
||||
*/
|
||||
protected RouterAdminServer createAdminServer() throws IOException {
|
||||
return new RouterAdminServer(this.conf, this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the current Admin socket for the router.
|
||||
*
|
||||
* @param adminAddress Admin RPC address.
|
||||
*/
|
||||
protected void setAdminServerAddress(InetSocketAddress address) {
|
||||
this.adminAddress = address;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current Admin socket address for the router.
|
||||
*
|
||||
* @return InetSocketAddress Admin address.
|
||||
*/
|
||||
public InetSocketAddress getAdminServerAddress() {
|
||||
return adminAddress;
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////
|
||||
// Namenode heartbeat monitors
|
||||
/////////////////////////////////////////////////////////
|
||||
|
|
|
@ -0,0 +1,183 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.router;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.RouterProtocolProtos.RouterAdminProtocolService;
|
||||
import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolPB;
|
||||
import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.RPC.Server;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.protobuf.BlockingService;
|
||||
|
||||
/**
|
||||
* This class is responsible for handling all of the Admin calls to the HDFS
|
||||
* router. It is created, started, and stopped by {@link Router}.
|
||||
*/
|
||||
public class RouterAdminServer extends AbstractService
|
||||
implements MountTableManager {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(RouterAdminServer.class);
|
||||
|
||||
private Configuration conf;
|
||||
|
||||
private final Router router;
|
||||
|
||||
private MountTableStore mountTableStore;
|
||||
|
||||
/** The Admin server that listens to requests from clients. */
|
||||
private final Server adminServer;
|
||||
private final InetSocketAddress adminAddress;
|
||||
|
||||
public RouterAdminServer(Configuration conf, Router router)
|
||||
throws IOException {
|
||||
super(RouterAdminServer.class.getName());
|
||||
|
||||
this.conf = conf;
|
||||
this.router = router;
|
||||
|
||||
int handlerCount = this.conf.getInt(
|
||||
DFSConfigKeys.DFS_ROUTER_ADMIN_HANDLER_COUNT_KEY,
|
||||
DFSConfigKeys.DFS_ROUTER_ADMIN_HANDLER_COUNT_DEFAULT);
|
||||
|
||||
RPC.setProtocolEngine(this.conf, RouterAdminProtocolPB.class,
|
||||
ProtobufRpcEngine.class);
|
||||
|
||||
RouterAdminProtocolServerSideTranslatorPB routerAdminProtocolTranslator =
|
||||
new RouterAdminProtocolServerSideTranslatorPB(this);
|
||||
BlockingService clientNNPbService = RouterAdminProtocolService.
|
||||
newReflectiveBlockingService(routerAdminProtocolTranslator);
|
||||
|
||||
InetSocketAddress confRpcAddress = conf.getSocketAddr(
|
||||
DFSConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY,
|
||||
DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY,
|
||||
DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT,
|
||||
DFSConfigKeys.DFS_ROUTER_ADMIN_PORT_DEFAULT);
|
||||
|
||||
String bindHost = conf.get(
|
||||
DFSConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY,
|
||||
confRpcAddress.getHostName());
|
||||
LOG.info("Admin server binding to {}:{}",
|
||||
bindHost, confRpcAddress.getPort());
|
||||
|
||||
this.adminServer = new RPC.Builder(this.conf)
|
||||
.setProtocol(RouterAdminProtocolPB.class)
|
||||
.setInstance(clientNNPbService)
|
||||
.setBindAddress(bindHost)
|
||||
.setPort(confRpcAddress.getPort())
|
||||
.setNumHandlers(handlerCount)
|
||||
.setVerbose(false)
|
||||
.build();
|
||||
|
||||
// The RPC-server port can be ephemeral... ensure we have the correct info
|
||||
InetSocketAddress listenAddress = this.adminServer.getListenerAddress();
|
||||
this.adminAddress = new InetSocketAddress(
|
||||
confRpcAddress.getHostName(), listenAddress.getPort());
|
||||
router.setAdminServerAddress(this.adminAddress);
|
||||
}
|
||||
|
||||
/** Allow access to the client RPC server for testing. */
|
||||
@VisibleForTesting
|
||||
Server getAdminServer() {
|
||||
return this.adminServer;
|
||||
}
|
||||
|
||||
private MountTableStore getMountTableStore() throws IOException {
|
||||
if (this.mountTableStore == null) {
|
||||
this.mountTableStore = router.getStateStore().getRegisteredRecordStore(
|
||||
MountTableStore.class);
|
||||
if (this.mountTableStore == null) {
|
||||
throw new IOException("Mount table state store is not available.");
|
||||
}
|
||||
}
|
||||
return this.mountTableStore;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the RPC address of the admin service.
|
||||
* @return Administration service RPC address.
|
||||
*/
|
||||
public InetSocketAddress getRpcAddress() {
|
||||
return this.adminAddress;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceInit(Configuration configuration) throws Exception {
|
||||
this.conf = configuration;
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
this.adminServer.start();
|
||||
super.serviceStart();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStop() throws Exception {
|
||||
if (this.adminServer != null) {
|
||||
this.adminServer.stop();
|
||||
}
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
@Override
|
||||
public AddMountTableEntryResponse addMountTableEntry(
|
||||
AddMountTableEntryRequest request) throws IOException {
|
||||
return getMountTableStore().addMountTableEntry(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateMountTableEntryResponse updateMountTableEntry(
|
||||
UpdateMountTableEntryRequest request) throws IOException {
|
||||
return getMountTableStore().updateMountTableEntry(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoveMountTableEntryResponse removeMountTableEntry(
|
||||
RemoveMountTableEntryRequest request) throws IOException {
|
||||
return getMountTableStore().removeMountTableEntry(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetMountTableEntriesResponse getMountTableEntries(
|
||||
GetMountTableEntriesRequest request) throws IOException {
|
||||
return getMountTableStore().getMountTableEntries(request);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,76 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.router;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolPB;
|
||||
import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
||||
/**
|
||||
* Client to connect to the {@link Router} via the admin protocol.
|
||||
*/
|
||||
@Private
|
||||
public class RouterClient implements Closeable {
|
||||
|
||||
private final RouterAdminProtocolTranslatorPB proxy;
|
||||
private final UserGroupInformation ugi;
|
||||
|
||||
private static RouterAdminProtocolTranslatorPB createRouterProxy(
|
||||
InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
|
||||
throws IOException {
|
||||
|
||||
RPC.setProtocolEngine(
|
||||
conf, RouterAdminProtocolPB.class, ProtobufRpcEngine.class);
|
||||
|
||||
AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false);
|
||||
final long version = RPC.getProtocolVersion(RouterAdminProtocolPB.class);
|
||||
RouterAdminProtocolPB proxy = RPC.getProtocolProxy(
|
||||
RouterAdminProtocolPB.class, version, address, ugi, conf,
|
||||
NetUtils.getDefaultSocketFactory(conf),
|
||||
RPC.getRpcTimeout(conf), null,
|
||||
fallbackToSimpleAuth).getProxy();
|
||||
|
||||
return new RouterAdminProtocolTranslatorPB(proxy);
|
||||
}
|
||||
|
||||
public RouterClient(InetSocketAddress address, Configuration conf)
|
||||
throws IOException {
|
||||
this.ugi = UserGroupInformation.getCurrentUser();
|
||||
this.proxy = createRouterProxy(address, conf, ugi);
|
||||
}
|
||||
|
||||
public MountTableManager getMountTableManager() {
|
||||
return proxy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void close() throws IOException {
|
||||
RPC.stopProxy(proxy);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,341 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.tools.federation;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
|
||||
import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* This class provides some Federation administrative access shell commands.
|
||||
*/
|
||||
@Private
|
||||
public class RouterAdmin extends Configured implements Tool {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RouterAdmin.class);
|
||||
|
||||
private RouterClient client;
|
||||
|
||||
public static void main(String[] argv) throws Exception {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
RouterAdmin admin = new RouterAdmin(conf);
|
||||
|
||||
int res = ToolRunner.run(admin, argv);
|
||||
System.exit(res);
|
||||
}
|
||||
|
||||
public RouterAdmin(Configuration conf) {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Print the usage message.
|
||||
*/
|
||||
public void printUsage() {
|
||||
String usage = "Federation Admin Tools:\n"
|
||||
+ "\t[-add <source> <nameservice> <destination> "
|
||||
+ "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL]]\n"
|
||||
+ "\t[-rm <source>]\n"
|
||||
+ "\t[-ls <path>]\n";
|
||||
System.out.println(usage);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int run(String[] argv) throws Exception {
|
||||
if (argv.length < 1) {
|
||||
System.err.println("Not enough parameters specificed");
|
||||
printUsage();
|
||||
return -1;
|
||||
}
|
||||
|
||||
int exitCode = -1;
|
||||
int i = 0;
|
||||
String cmd = argv[i++];
|
||||
|
||||
// Verify that we have enough command line parameters
|
||||
if ("-add".equals(cmd)) {
|
||||
if (argv.length < 4) {
|
||||
System.err.println("Not enough parameters specificed for cmd " + cmd);
|
||||
printUsage();
|
||||
return exitCode;
|
||||
}
|
||||
} else if ("-rm".equalsIgnoreCase(cmd)) {
|
||||
if (argv.length < 2) {
|
||||
System.err.println("Not enough parameters specificed for cmd " + cmd);
|
||||
printUsage();
|
||||
return exitCode;
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize RouterClient
|
||||
try {
|
||||
String address = getConf().getTrimmed(
|
||||
DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY,
|
||||
DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT);
|
||||
InetSocketAddress routerSocket = NetUtils.createSocketAddr(address);
|
||||
client = new RouterClient(routerSocket, getConf());
|
||||
} catch (RPC.VersionMismatch v) {
|
||||
System.err.println(
|
||||
"Version mismatch between client and server... command aborted");
|
||||
return exitCode;
|
||||
} catch (IOException e) {
|
||||
System.err.println("Bad connection to Router... command aborted");
|
||||
return exitCode;
|
||||
}
|
||||
|
||||
Exception debugException = null;
|
||||
exitCode = 0;
|
||||
try {
|
||||
if ("-add".equals(cmd)) {
|
||||
if (addMount(argv, i)) {
|
||||
System.err.println("Successfuly added mount point " + argv[i]);
|
||||
}
|
||||
} else if ("-rm".equals(cmd)) {
|
||||
if (removeMount(argv[i])) {
|
||||
System.err.println("Successfully removed mount point " + argv[i]);
|
||||
}
|
||||
} else if ("-ls".equals(cmd)) {
|
||||
if (argv.length > 1) {
|
||||
listMounts(argv[i]);
|
||||
} else {
|
||||
listMounts("/");
|
||||
}
|
||||
} else {
|
||||
printUsage();
|
||||
return exitCode;
|
||||
}
|
||||
} catch (IllegalArgumentException arge) {
|
||||
debugException = arge;
|
||||
exitCode = -1;
|
||||
System.err.println(cmd.substring(1) + ": " + arge.getLocalizedMessage());
|
||||
printUsage();
|
||||
} catch (RemoteException e) {
|
||||
// This is a error returned by the server.
|
||||
// Print out the first line of the error message, ignore the stack trace.
|
||||
exitCode = -1;
|
||||
debugException = e;
|
||||
try {
|
||||
String[] content;
|
||||
content = e.getLocalizedMessage().split("\n");
|
||||
System.err.println(cmd.substring(1) + ": " + content[0]);
|
||||
e.printStackTrace();
|
||||
} catch (Exception ex) {
|
||||
System.err.println(cmd.substring(1) + ": " + ex.getLocalizedMessage());
|
||||
e.printStackTrace();
|
||||
debugException = ex;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
exitCode = -1;
|
||||
debugException = e;
|
||||
System.err.println(cmd.substring(1) + ": " + e.getLocalizedMessage());
|
||||
e.printStackTrace();
|
||||
}
|
||||
if (debugException != null) {
|
||||
LOG.debug("Exception encountered", debugException);
|
||||
}
|
||||
return exitCode;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a mount table entry or update if it exists.
|
||||
*
|
||||
* @param parameters Parameters for the mount point.
|
||||
* @param i Index in the parameters.
|
||||
*/
|
||||
public boolean addMount(String[] parameters, int i) throws IOException {
|
||||
// Mandatory parameters
|
||||
String mount = parameters[i++];
|
||||
String[] nss = parameters[i++].split(",");
|
||||
String dest = parameters[i++];
|
||||
|
||||
// Optional parameters
|
||||
boolean readOnly = false;
|
||||
DestinationOrder order = DestinationOrder.HASH;
|
||||
while (i < parameters.length) {
|
||||
if (parameters[i].equals("-readonly")) {
|
||||
readOnly = true;
|
||||
} else if (parameters[i].equals("-order")) {
|
||||
i++;
|
||||
try {
|
||||
order = DestinationOrder.valueOf(parameters[i]);
|
||||
} catch(Exception e) {
|
||||
System.err.println("Cannot parse order: " + parameters[i]);
|
||||
}
|
||||
}
|
||||
i++;
|
||||
}
|
||||
|
||||
return addMount(mount, nss, dest, readOnly, order);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a mount table entry or update if it exists.
|
||||
*
|
||||
* @param mount Mount point.
|
||||
* @param nss Namespaces where this is mounted to.
|
||||
* @param dest Destination path.
|
||||
* @param readonly If the mount point is read only.
|
||||
* @param order Order of the destination locations.
|
||||
* @return If the mount point was added.
|
||||
* @throws IOException Error adding the mount point.
|
||||
*/
|
||||
public boolean addMount(String mount, String[] nss, String dest,
|
||||
boolean readonly, DestinationOrder order) throws IOException {
|
||||
// Get the existing entry
|
||||
MountTableManager mountTable = client.getMountTableManager();
|
||||
GetMountTableEntriesRequest getRequest =
|
||||
GetMountTableEntriesRequest.newInstance(mount);
|
||||
GetMountTableEntriesResponse getResponse =
|
||||
mountTable.getMountTableEntries(getRequest);
|
||||
List<MountTable> results = getResponse.getEntries();
|
||||
MountTable existingEntry = null;
|
||||
for (MountTable result : results) {
|
||||
if (mount.equals(result.getSourcePath())) {
|
||||
existingEntry = result;
|
||||
}
|
||||
}
|
||||
|
||||
if (existingEntry == null) {
|
||||
// Create and add the entry if it doesn't exist
|
||||
Map<String, String> destMap = new LinkedHashMap<>();
|
||||
for (String ns : nss) {
|
||||
destMap.put(ns, dest);
|
||||
}
|
||||
MountTable newEntry = MountTable.newInstance(mount, destMap);
|
||||
if (readonly) {
|
||||
newEntry.setReadOnly(true);
|
||||
}
|
||||
if (order != null) {
|
||||
newEntry.setDestOrder(order);
|
||||
}
|
||||
AddMountTableEntryRequest request =
|
||||
AddMountTableEntryRequest.newInstance(newEntry);
|
||||
AddMountTableEntryResponse addResponse =
|
||||
mountTable.addMountTableEntry(request);
|
||||
boolean added = addResponse.getStatus();
|
||||
if (!added) {
|
||||
System.err.println("Cannot add mount point " + mount);
|
||||
}
|
||||
return added;
|
||||
} else {
|
||||
// Update the existing entry if it exists
|
||||
for (String nsId : nss) {
|
||||
if (!existingEntry.addDestination(nsId, dest)) {
|
||||
System.err.println("Cannot add destination at " + nsId + " " + dest);
|
||||
}
|
||||
}
|
||||
if (readonly) {
|
||||
existingEntry.setReadOnly(true);
|
||||
}
|
||||
if (order != null) {
|
||||
existingEntry.setDestOrder(order);
|
||||
}
|
||||
UpdateMountTableEntryRequest updateRequest =
|
||||
UpdateMountTableEntryRequest.newInstance(existingEntry);
|
||||
UpdateMountTableEntryResponse updateResponse =
|
||||
mountTable.updateMountTableEntry(updateRequest);
|
||||
boolean updated = updateResponse.getStatus();
|
||||
if (!updated) {
|
||||
System.err.println("Cannot update mount point " + mount);
|
||||
}
|
||||
return updated;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove mount point.
|
||||
*
|
||||
* @param path Path to remove.
|
||||
* @throws IOException If it cannot be removed.
|
||||
*/
|
||||
public boolean removeMount(String path) throws IOException {
|
||||
MountTableManager mountTable = client.getMountTableManager();
|
||||
RemoveMountTableEntryRequest request =
|
||||
RemoveMountTableEntryRequest.newInstance(path);
|
||||
RemoveMountTableEntryResponse response =
|
||||
mountTable.removeMountTableEntry(request);
|
||||
boolean removed = response.getStatus();
|
||||
if (!removed) {
|
||||
System.out.println("Cannot remove mount point " + path);
|
||||
}
|
||||
return removed;
|
||||
}
|
||||
|
||||
/**
|
||||
* List mount points.
|
||||
*
|
||||
* @param path Path to list.
|
||||
* @throws IOException If it cannot be listed.
|
||||
*/
|
||||
public void listMounts(String path) throws IOException {
|
||||
MountTableManager mountTable = client.getMountTableManager();
|
||||
GetMountTableEntriesRequest request =
|
||||
GetMountTableEntriesRequest.newInstance(path);
|
||||
GetMountTableEntriesResponse response =
|
||||
mountTable.getMountTableEntries(request);
|
||||
List<MountTable> entries = response.getEntries();
|
||||
printMounts(entries);
|
||||
}
|
||||
|
||||
private static void printMounts(List<MountTable> entries) {
|
||||
System.out.println("Mount Table Entries:");
|
||||
System.out.println(String.format(
|
||||
"%-25s %-25s",
|
||||
"Source", "Destinations"));
|
||||
for (MountTable entry : entries) {
|
||||
StringBuilder destBuilder = new StringBuilder();
|
||||
for (RemoteLocation location : entry.getDestinations()) {
|
||||
if (destBuilder.length() > 0) {
|
||||
destBuilder.append(",");
|
||||
}
|
||||
destBuilder.append(String.format("%s->%s", location.getNameserviceId(),
|
||||
location.getDest()));
|
||||
}
|
||||
System.out.println(String.format("%-25s %-25s", entry.getSourcePath(),
|
||||
destBuilder.toString()));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* It includes the tools to manage the Router-based federation. Includes the
|
||||
* utilities to add and remove mount table entries.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
package org.apache.hadoop.hdfs.tools.federation;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
|
@ -0,0 +1,47 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
option java_package = "org.apache.hadoop.hdfs.protocol.proto";
|
||||
option java_outer_classname = "RouterProtocolProtos";
|
||||
option java_generic_services = true;
|
||||
option java_generate_equals_and_hash = true;
|
||||
package hadoop.hdfs.router;
|
||||
|
||||
import "FederationProtocol.proto";
|
||||
|
||||
service RouterAdminProtocolService {
|
||||
/**
|
||||
* Add a mount table entry.
|
||||
*/
|
||||
rpc addMountTableEntry(AddMountTableEntryRequestProto) returns(AddMountTableEntryResponseProto);
|
||||
|
||||
/**
|
||||
* Update an existing mount table entry without copying files.
|
||||
*/
|
||||
rpc updateMountTableEntry(UpdateMountTableEntryRequestProto) returns(UpdateMountTableEntryResponseProto);
|
||||
|
||||
/**
|
||||
* Remove a mount table entry.
|
||||
*/
|
||||
rpc removeMountTableEntry(RemoveMountTableEntryRequestProto) returns(RemoveMountTableEntryResponseProto);
|
||||
|
||||
/**
|
||||
* Get matching mount entries
|
||||
*/
|
||||
rpc getMountTableEntries(GetMountTableEntriesRequestProto) returns(GetMountTableEntriesResponseProto);
|
||||
}
|
|
@ -4745,6 +4745,44 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.federation.router.admin.enable</name>
|
||||
<value>true</value>
|
||||
<description>
|
||||
If the RPC admin service to handle client requests in the router is
|
||||
enabled.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.federation.router.admin-address</name>
|
||||
<value>0.0.0.0:8111</value>
|
||||
<description>
|
||||
RPC address that handles the admin requests.
|
||||
The value of this property will take the form of router-host1:rpc-port.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.federation.router.admin-bind-host</name>
|
||||
<value></value>
|
||||
<description>
|
||||
The actual address the RPC admin server will bind to. If this optional
|
||||
address is set, it overrides only the hostname portion of
|
||||
dfs.federation.router.admin-address. This is useful for making the name
|
||||
node listen on all interfaces by setting it to 0.0.0.0.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.federation.router.admin.handler.count</name>
|
||||
<value>1</value>
|
||||
<description>
|
||||
The number of server threads for the router to handle RPC requests from
|
||||
admin.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.federation.router.file.resolver.client.class</name>
|
||||
<value>org.apache.hadoop.hdfs.server.federation.MockResolver</value>
|
||||
|
@ -4761,6 +4799,14 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.federation.router.store.enable</name>
|
||||
<value>true</value>
|
||||
<description>
|
||||
If the Router connects to the State Store.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.federation.router.store.serializer</name>
|
||||
<value>org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializerPBImpl</value>
|
||||
|
|
|
@ -28,8 +28,10 @@ public class RouterConfigBuilder {
|
|||
private Configuration conf;
|
||||
|
||||
private boolean enableRpcServer = false;
|
||||
private boolean enableAdminServer = false;
|
||||
private boolean enableHeartbeat = false;
|
||||
private boolean enableLocalHeartbeat = false;
|
||||
private boolean enableStateStore = false;
|
||||
|
||||
public RouterConfigBuilder(Configuration configuration) {
|
||||
this.conf = configuration;
|
||||
|
@ -41,8 +43,10 @@ public class RouterConfigBuilder {
|
|||
|
||||
public RouterConfigBuilder all() {
|
||||
this.enableRpcServer = true;
|
||||
this.enableAdminServer = true;
|
||||
this.enableHeartbeat = true;
|
||||
this.enableLocalHeartbeat = true;
|
||||
this.enableStateStore = true;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -56,21 +60,43 @@ public class RouterConfigBuilder {
|
|||
return this;
|
||||
}
|
||||
|
||||
public RouterConfigBuilder admin(boolean enable) {
|
||||
this.enableAdminServer = enable;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RouterConfigBuilder heartbeat(boolean enable) {
|
||||
this.enableHeartbeat = enable;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RouterConfigBuilder stateStore(boolean enable) {
|
||||
this.enableStateStore = enable;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RouterConfigBuilder rpc() {
|
||||
return this.rpc(true);
|
||||
}
|
||||
|
||||
public RouterConfigBuilder admin() {
|
||||
return this.admin(true);
|
||||
}
|
||||
|
||||
public RouterConfigBuilder heartbeat() {
|
||||
return this.heartbeat(true);
|
||||
}
|
||||
|
||||
public RouterConfigBuilder stateStore() {
|
||||
return this.stateStore(true);
|
||||
}
|
||||
|
||||
public Configuration build() {
|
||||
conf.setBoolean(DFSConfigKeys.DFS_ROUTER_STORE_ENABLE,
|
||||
this.enableStateStore);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_ROUTER_RPC_ENABLE, this.enableRpcServer);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_ROUTER_ADMIN_ENABLE,
|
||||
this.enableAdminServer);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE,
|
||||
this.enableHeartbeat);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE,
|
||||
|
|
|
@ -25,8 +25,12 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
|||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS;
|
||||
|
@ -46,6 +50,7 @@ import java.util.ArrayList;
|
|||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
|
@ -67,6 +72,7 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServi
|
|||
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
|
||||
import org.apache.hadoop.hdfs.server.federation.router.Router;
|
||||
import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSImage;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
|
@ -101,6 +107,15 @@ public class RouterDFSCluster {
|
|||
/** Mini cluster. */
|
||||
private MiniDFSCluster cluster;
|
||||
|
||||
protected static final long DEFAULT_HEARTBEAT_INTERVAL_MS =
|
||||
TimeUnit.SECONDS.toMillis(5);
|
||||
protected static final long DEFAULT_CACHE_INTERVAL_MS =
|
||||
TimeUnit.SECONDS.toMillis(5);
|
||||
/** Heartbeat interval in milliseconds. */
|
||||
private long heartbeatInterval;
|
||||
/** Cache flush interval in milliseconds. */
|
||||
private long cacheFlushInterval;
|
||||
|
||||
/** Router configuration overrides. */
|
||||
private Configuration routerOverrides;
|
||||
/** Namenode configuration overrides. */
|
||||
|
@ -118,6 +133,7 @@ public class RouterDFSCluster {
|
|||
private int rpcPort;
|
||||
private DFSClient client;
|
||||
private Configuration conf;
|
||||
private RouterClient adminClient;
|
||||
private URI fileSystemUri;
|
||||
|
||||
public RouterContext(Configuration conf, String nsId, String nnId)
|
||||
|
@ -183,6 +199,15 @@ public class RouterDFSCluster {
|
|||
});
|
||||
}
|
||||
|
||||
public RouterClient getAdminClient() throws IOException {
|
||||
if (adminClient == null) {
|
||||
InetSocketAddress routerSocket = router.getAdminServerAddress();
|
||||
LOG.info("Connecting to router admin at {}", routerSocket);
|
||||
adminClient = new RouterClient(routerSocket, conf);
|
||||
}
|
||||
return adminClient;
|
||||
}
|
||||
|
||||
public DFSClient getClient() throws IOException, URISyntaxException {
|
||||
if (client == null) {
|
||||
LOG.info("Connecting to router at {}", fileSystemUri);
|
||||
|
@ -304,13 +329,22 @@ public class RouterDFSCluster {
|
|||
}
|
||||
}
|
||||
|
||||
public RouterDFSCluster(boolean ha, int numNameservices, int numNamenodes) {
|
||||
public RouterDFSCluster(boolean ha, int numNameservices, int numNamenodes,
|
||||
long heartbeatInterval, long cacheFlushInterval) {
|
||||
this.highAvailability = ha;
|
||||
this.heartbeatInterval = heartbeatInterval;
|
||||
this.cacheFlushInterval = cacheFlushInterval;
|
||||
configureNameservices(numNameservices, numNamenodes);
|
||||
}
|
||||
|
||||
public RouterDFSCluster(boolean ha, int numNameservices) {
|
||||
this(ha, numNameservices, 2);
|
||||
this(ha, numNameservices, 2,
|
||||
DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS);
|
||||
}
|
||||
|
||||
public RouterDFSCluster(boolean ha, int numNameservices, int numNamnodes) {
|
||||
this(ha, numNameservices, numNamnodes,
|
||||
DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -404,7 +438,12 @@ public class RouterDFSCluster {
|
|||
conf.set(DFS_ROUTER_RPC_ADDRESS_KEY, "127.0.0.1:0");
|
||||
conf.set(DFS_ROUTER_RPC_BIND_HOST_KEY, "0.0.0.0");
|
||||
|
||||
conf.set(DFS_ROUTER_ADMIN_ADDRESS_KEY, "127.0.0.1:0");
|
||||
conf.set(DFS_ROUTER_ADMIN_BIND_HOST_KEY, "0.0.0.0");
|
||||
|
||||
conf.set(DFS_ROUTER_DEFAULT_NAMESERVICE, nameservices.get(0));
|
||||
conf.setLong(DFS_ROUTER_HEARTBEAT_INTERVAL_MS, heartbeatInterval);
|
||||
conf.setLong(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, cacheFlushInterval);
|
||||
|
||||
// Use mock resolver classes
|
||||
conf.setClass(FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
|
||||
|
|
|
@ -0,0 +1,148 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation;
|
||||
|
||||
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.createMockRegistrationForNamenode;
|
||||
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
|
||||
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
||||
|
||||
/**
|
||||
* Test utility to mimic a federated HDFS cluster with a router and a state
|
||||
* store.
|
||||
*/
|
||||
public class StateStoreDFSCluster extends RouterDFSCluster {
|
||||
|
||||
private static final Class<?> DEFAULT_FILE_RESOLVER =
|
||||
MountTableResolver.class;
|
||||
private static final Class<?> DEFAULT_NAMENODE_RESOLVER =
|
||||
MembershipNamenodeResolver.class;
|
||||
|
||||
public StateStoreDFSCluster(boolean ha, int numNameservices, int numNamenodes,
|
||||
long heartbeatInterval, long cacheFlushInterval)
|
||||
throws IOException, InterruptedException {
|
||||
this(ha, numNameservices, numNamenodes, heartbeatInterval,
|
||||
cacheFlushInterval, DEFAULT_FILE_RESOLVER);
|
||||
}
|
||||
|
||||
public StateStoreDFSCluster(boolean ha, int numNameservices, int numNamenodes,
|
||||
long heartbeatInterval, long cacheFlushInterval, Class<?> fileResolver)
|
||||
throws IOException, InterruptedException {
|
||||
super(ha, numNameservices, numNamenodes, heartbeatInterval,
|
||||
cacheFlushInterval);
|
||||
|
||||
// Attach state store and resolvers to router
|
||||
Configuration stateStoreConfig = getStateStoreConfiguration();
|
||||
// Use state store backed resolvers
|
||||
stateStoreConfig.setClass(
|
||||
DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
|
||||
DEFAULT_NAMENODE_RESOLVER, ActiveNamenodeResolver.class);
|
||||
stateStoreConfig.setClass(
|
||||
DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
|
||||
fileResolver, FileSubclusterResolver.class);
|
||||
this.addRouterOverrides(stateStoreConfig);
|
||||
}
|
||||
|
||||
public StateStoreDFSCluster(boolean ha, int numNameservices,
|
||||
Class<?> fileResolver) throws IOException, InterruptedException {
|
||||
this(ha, numNameservices, 2,
|
||||
DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS, fileResolver);
|
||||
}
|
||||
|
||||
public StateStoreDFSCluster(boolean ha, int numNameservices)
|
||||
throws IOException, InterruptedException {
|
||||
this(ha, numNameservices, 2,
|
||||
DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS);
|
||||
}
|
||||
|
||||
public StateStoreDFSCluster(boolean ha, int numNameservices,
|
||||
int numNamnodes) throws IOException, InterruptedException {
|
||||
this(ha, numNameservices, numNamnodes,
|
||||
DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS);
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
// State Store Test Fixtures
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
/**
|
||||
* Adds test fixtures for NN registation for each NN nameservice -> NS
|
||||
* namenode -> NN rpcAddress -> 0.0.0.0:0 webAddress -> 0.0.0.0:0 state ->
|
||||
* STANDBY safeMode -> false blockPool -> test.
|
||||
*
|
||||
* @param stateStore State Store.
|
||||
* @throws IOException If it cannot register.
|
||||
*/
|
||||
public void createTestRegistration(StateStoreService stateStore)
|
||||
throws IOException {
|
||||
List<MembershipState> entries = new ArrayList<MembershipState>();
|
||||
for (NamenodeContext nn : this.getNamenodes()) {
|
||||
MembershipState entry = createMockRegistrationForNamenode(
|
||||
nn.getNameserviceId(), nn.getNamenodeId(),
|
||||
FederationNamenodeServiceState.STANDBY);
|
||||
entries.add(entry);
|
||||
}
|
||||
synchronizeRecords(
|
||||
stateStore, entries, MembershipState.class);
|
||||
}
|
||||
|
||||
public void createTestMountTable(StateStoreService stateStore)
|
||||
throws IOException {
|
||||
List<MountTable> mounts = generateMockMountTable();
|
||||
synchronizeRecords(stateStore, mounts, MountTable.class);
|
||||
stateStore.refreshCaches();
|
||||
}
|
||||
|
||||
public List<MountTable> generateMockMountTable() throws IOException {
|
||||
// create table entries
|
||||
List<MountTable> entries = new ArrayList<>();
|
||||
for (String ns : this.getNameservices()) {
|
||||
Map<String, String> destMap = new HashMap<>();
|
||||
destMap.put(ns, getNamenodePathForNS(ns));
|
||||
|
||||
// Direct path
|
||||
String fedPath = getFederatedPathForNS(ns);
|
||||
MountTable entry = MountTable.newInstance(fedPath, destMap);
|
||||
entries.add(entry);
|
||||
}
|
||||
|
||||
// Root path goes to nameservice 1
|
||||
Map<String, String> destMap = new HashMap<>();
|
||||
String ns0 = this.getNameservices().get(0);
|
||||
destMap.put(ns0, "/");
|
||||
MountTable entry = MountTable.newInstance("/", destMap);
|
||||
entries.add(entry);
|
||||
return entries;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,261 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.router;
|
||||
|
||||
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
|
||||
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* The administrator interface of the {@link Router} implemented by
|
||||
* {@link RouterAdminServer}.
|
||||
*/
|
||||
public class TestRouterAdmin {
|
||||
|
||||
private static StateStoreDFSCluster cluster;
|
||||
private static RouterContext routerContext;
|
||||
public static final String RPC_BEAN =
|
||||
"Hadoop:service=Router,name=FederationRPC";
|
||||
private static List<MountTable> mockMountTable;
|
||||
private static StateStoreService stateStore;
|
||||
|
||||
@BeforeClass
|
||||
public static void globalSetUp() throws Exception {
|
||||
cluster = new StateStoreDFSCluster(false, 1);
|
||||
// Build and start a router with State Store + admin + RPC
|
||||
Configuration conf = new RouterConfigBuilder()
|
||||
.stateStore()
|
||||
.admin()
|
||||
.rpc()
|
||||
.build();
|
||||
cluster.addRouterOverrides(conf);
|
||||
cluster.startRouters();
|
||||
routerContext = cluster.getRandomRouter();
|
||||
mockMountTable = cluster.generateMockMountTable();
|
||||
Router router = routerContext.getRouter();
|
||||
stateStore = router.getStateStore();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
cluster.stopRouter(routerContext);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void testSetup() throws Exception {
|
||||
assertTrue(
|
||||
synchronizeRecords(stateStore, mockMountTable, MountTable.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddMountTable() throws IOException {
|
||||
MountTable newEntry = MountTable.newInstance(
|
||||
"/testpath", Collections.singletonMap("ns0", "/testdir"),
|
||||
Time.now(), Time.now());
|
||||
|
||||
RouterClient client = routerContext.getAdminClient();
|
||||
MountTableManager mountTable = client.getMountTableManager();
|
||||
|
||||
// Existing mount table size
|
||||
List<MountTable> records = getMountTableEntries(mountTable);
|
||||
assertEquals(records.size(), mockMountTable.size());
|
||||
|
||||
// Add
|
||||
AddMountTableEntryRequest addRequest =
|
||||
AddMountTableEntryRequest.newInstance(newEntry);
|
||||
AddMountTableEntryResponse addResponse =
|
||||
mountTable.addMountTableEntry(addRequest);
|
||||
assertTrue(addResponse.getStatus());
|
||||
|
||||
// New mount table size
|
||||
List<MountTable> records2 = getMountTableEntries(mountTable);
|
||||
assertEquals(records2.size(), mockMountTable.size() + 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddDuplicateMountTable() throws IOException {
|
||||
MountTable newEntry = MountTable.newInstance("/testpath",
|
||||
Collections.singletonMap("ns0", "/testdir"), Time.now(), Time.now());
|
||||
|
||||
RouterClient client = routerContext.getAdminClient();
|
||||
MountTableManager mountTable = client.getMountTableManager();
|
||||
|
||||
// Existing mount table size
|
||||
List<MountTable> entries1 = getMountTableEntries(mountTable);
|
||||
assertEquals(entries1.size(), mockMountTable.size());
|
||||
|
||||
// Add
|
||||
AddMountTableEntryRequest addRequest =
|
||||
AddMountTableEntryRequest.newInstance(newEntry);
|
||||
AddMountTableEntryResponse addResponse =
|
||||
mountTable.addMountTableEntry(addRequest);
|
||||
assertTrue(addResponse.getStatus());
|
||||
|
||||
// New mount table size
|
||||
List<MountTable> entries2 = getMountTableEntries(mountTable);
|
||||
assertEquals(entries2.size(), mockMountTable.size() + 1);
|
||||
|
||||
// Add again, should fail
|
||||
AddMountTableEntryResponse addResponse2 =
|
||||
mountTable.addMountTableEntry(addRequest);
|
||||
assertFalse(addResponse2.getStatus());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveMountTable() throws IOException {
|
||||
|
||||
RouterClient client = routerContext.getAdminClient();
|
||||
MountTableManager mountTable = client.getMountTableManager();
|
||||
|
||||
// Existing mount table size
|
||||
List<MountTable> entries1 = getMountTableEntries(mountTable);
|
||||
assertEquals(entries1.size(), mockMountTable.size());
|
||||
|
||||
// Remove an entry
|
||||
RemoveMountTableEntryRequest removeRequest =
|
||||
RemoveMountTableEntryRequest.newInstance("/");
|
||||
mountTable.removeMountTableEntry(removeRequest);
|
||||
|
||||
// New mount table size
|
||||
List<MountTable> entries2 = getMountTableEntries(mountTable);
|
||||
assertEquals(entries2.size(), mockMountTable.size() - 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEditMountTable() throws IOException {
|
||||
|
||||
RouterClient client = routerContext.getAdminClient();
|
||||
MountTableManager mountTable = client.getMountTableManager();
|
||||
|
||||
// Verify starting condition
|
||||
MountTable entry = getMountTableEntry("/");
|
||||
assertEquals(
|
||||
Collections.singletonList(new RemoteLocation("ns0", "/")),
|
||||
entry.getDestinations());
|
||||
|
||||
// Edit the entry for /
|
||||
MountTable updatedEntry = MountTable.newInstance(
|
||||
"/", Collections.singletonMap("ns1", "/"), Time.now(), Time.now());
|
||||
UpdateMountTableEntryRequest updateRequest =
|
||||
UpdateMountTableEntryRequest.newInstance(updatedEntry);
|
||||
mountTable.updateMountTableEntry(updateRequest);
|
||||
|
||||
// Verify edited condition
|
||||
entry = getMountTableEntry("/");
|
||||
assertEquals(
|
||||
Collections.singletonList(new RemoteLocation("ns1", "/")),
|
||||
entry.getDestinations());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetMountTable() throws IOException {
|
||||
|
||||
RouterClient client = routerContext.getAdminClient();
|
||||
MountTableManager mountTable = client.getMountTableManager();
|
||||
|
||||
// Verify size of table
|
||||
List<MountTable> entries = getMountTableEntries(mountTable);
|
||||
assertEquals(mockMountTable.size(), entries.size());
|
||||
|
||||
// Verify all entries are present
|
||||
int matches = 0;
|
||||
for (MountTable e : entries) {
|
||||
for (MountTable entry : mockMountTable) {
|
||||
assertEquals(e.getDestinations().size(), 1);
|
||||
assertNotNull(e.getDateCreated());
|
||||
assertNotNull(e.getDateModified());
|
||||
if (entry.getSourcePath().equals(e.getSourcePath())) {
|
||||
matches++;
|
||||
}
|
||||
}
|
||||
}
|
||||
assertEquals(matches, mockMountTable.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetSingleMountTableEntry() throws IOException {
|
||||
MountTable entry = getMountTableEntry("/ns0");
|
||||
assertNotNull(entry);
|
||||
assertEquals(entry.getSourcePath(), "/ns0");
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets an existing mount table record in the state store.
|
||||
*
|
||||
* @param mount The mount point of the record to remove.
|
||||
* @return The matching record if found, null if it is not found.
|
||||
* @throws IOException If the state store could not be accessed.
|
||||
*/
|
||||
private MountTable getMountTableEntry(final String mount) throws IOException {
|
||||
// Refresh the cache
|
||||
stateStore.loadCache(MountTableStoreImpl.class, true);
|
||||
|
||||
GetMountTableEntriesRequest request =
|
||||
GetMountTableEntriesRequest.newInstance(mount);
|
||||
RouterClient client = routerContext.getAdminClient();
|
||||
MountTableManager mountTable = client.getMountTableManager();
|
||||
List<MountTable> results = getMountTableEntries(mountTable, request);
|
||||
if (results.size() > 0) {
|
||||
// First result is sorted to have the shortest mount string length
|
||||
return results.get(0);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private List<MountTable> getMountTableEntries(MountTableManager mountTable)
|
||||
throws IOException {
|
||||
GetMountTableEntriesRequest request =
|
||||
GetMountTableEntriesRequest.newInstance("/");
|
||||
return getMountTableEntries(mountTable, request);
|
||||
}
|
||||
|
||||
private List<MountTable> getMountTableEntries(MountTableManager mountTable,
|
||||
GetMountTableEntriesRequest request) throws IOException {
|
||||
stateStore.loadCache(MountTableStoreImpl.class, true);
|
||||
GetMountTableEntriesResponse response =
|
||||
mountTable.getMountTableEntries(request);
|
||||
return response.getEntries();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue