HDFS-10646. Federation admin tool. Contributed by Inigo Goiri.

(cherry picked from commit ae27e31fbc)
This commit is contained in:
Inigo Goiri 2017-08-08 14:44:43 -07:00
parent 6f0de27318
commit b3e6bd22e3
19 changed files with 1644 additions and 18 deletions

View File

@ -332,6 +332,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<include>editlog.proto</include> <include>editlog.proto</include>
<include>fsimage.proto</include> <include>fsimage.proto</include>
<include>FederationProtocol.proto</include> <include>FederationProtocol.proto</include>
<include>RouterProtocol.proto</include>
</includes> </includes>
</source> </source>
</configuration> </configuration>

View File

@ -31,6 +31,7 @@ function hadoop_usage
hadoop_add_option "--hosts filename" "list of hosts to use in worker mode" hadoop_add_option "--hosts filename" "list of hosts to use in worker mode"
hadoop_add_option "--workers" "turn on worker mode" hadoop_add_option "--workers" "turn on worker mode"
<<<<<<< HEAD
hadoop_add_subcommand "balancer" daemon "run a cluster balancing utility" hadoop_add_subcommand "balancer" daemon "run a cluster balancing utility"
hadoop_add_subcommand "cacheadmin" admin "configure the HDFS cache" hadoop_add_subcommand "cacheadmin" admin "configure the HDFS cache"
hadoop_add_subcommand "classpath" client "prints the class path needed to get the hadoop jar and the required libraries" hadoop_add_subcommand "classpath" client "prints the class path needed to get the hadoop jar and the required libraries"
@ -42,6 +43,7 @@ function hadoop_usage
hadoop_add_subcommand "diskbalancer" daemon "Distributes data evenly among disks on a given node" hadoop_add_subcommand "diskbalancer" daemon "Distributes data evenly among disks on a given node"
hadoop_add_subcommand "envvars" client "display computed Hadoop environment variables" hadoop_add_subcommand "envvars" client "display computed Hadoop environment variables"
hadoop_add_subcommand "ec" admin "run a HDFS ErasureCoding CLI" hadoop_add_subcommand "ec" admin "run a HDFS ErasureCoding CLI"
hadoop_add_subcommand "federation" admin "manage Router-based federation"
hadoop_add_subcommand "fetchdt" client "fetch a delegation token from the NameNode" hadoop_add_subcommand "fetchdt" client "fetch a delegation token from the NameNode"
hadoop_add_subcommand "fsck" admin "run a DFS filesystem checking utility" hadoop_add_subcommand "fsck" admin "run a DFS filesystem checking utility"
hadoop_add_subcommand "getconf" client "get config values from configuration" hadoop_add_subcommand "getconf" client "get config values from configuration"
@ -181,6 +183,9 @@ function hdfscmd_case
HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true" HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true"
HADOOP_CLASSNAME='org.apache.hadoop.hdfs.server.federation.router.Router' HADOOP_CLASSNAME='org.apache.hadoop.hdfs.server.federation.router.Router'
;; ;;
federation)
HADOOP_CLASSNAME='org.apache.hadoop.hdfs.tools.federation.RouterAdmin'
;;
secondarynamenode) secondarynamenode)
HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true" HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true"
HADOOP_CLASSNAME='org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode' HADOOP_CLASSNAME='org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode'

View File

@ -59,7 +59,7 @@ if "%1" == "--loglevel" (
) )
) )
set hdfscommands=dfs namenode secondarynamenode journalnode zkfc datanode dfsadmin haadmin fsck balancer jmxget oiv oev fetchdt getconf groups snapshotDiff lsSnapshottableDir cacheadmin mover storagepolicies classpath crypto router debug set hdfscommands=dfs namenode secondarynamenode journalnode zkfc datanode dfsadmin haadmin fsck balancer jmxget oiv oev fetchdt getconf groups snapshotDiff lsSnapshottableDir cacheadmin mover storagepolicies classpath crypto router federation debug
for %%i in ( %hdfscommands% ) do ( for %%i in ( %hdfscommands% ) do (
if %hdfs-command% == %%i set hdfscommand=true if %hdfs-command% == %%i set hdfscommand=true
) )
@ -184,6 +184,11 @@ goto :eof
set HADOOP_OPTS=%HADOOP_OPTS% %HADOOP_ROUTER_OPTS% set HADOOP_OPTS=%HADOOP_OPTS% %HADOOP_ROUTER_OPTS%
goto :eof goto :eof
:federation
set CLASS=org.apache.hadoop.hdfs.tools.federation.RouterAdmin
set HADOOP_OPTS=%HADOOP_OPTS% %HADOOP_ROUTER_OPTS%
goto :eof
:debug :debug
set CLASS=org.apache.hadoop.hdfs.tools.DebugAdmin set CLASS=org.apache.hadoop.hdfs.tools.DebugAdmin
goto :eof goto :eof

View File

@ -1196,6 +1196,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String FEDERATION_STORE_PREFIX = public static final String FEDERATION_STORE_PREFIX =
FEDERATION_ROUTER_PREFIX + "store."; FEDERATION_ROUTER_PREFIX + "store.";
public static final String DFS_ROUTER_STORE_ENABLE =
FEDERATION_STORE_PREFIX + "enable";
public static final boolean DFS_ROUTER_STORE_ENABLE_DEFAULT = true;
public static final String FEDERATION_STORE_SERIALIZER_CLASS = public static final String FEDERATION_STORE_SERIALIZER_CLASS =
DFSConfigKeys.FEDERATION_STORE_PREFIX + "serializer"; DFSConfigKeys.FEDERATION_STORE_PREFIX + "serializer";
public static final Class<StateStoreSerializerPBImpl> public static final Class<StateStoreSerializerPBImpl>
@ -1222,6 +1226,21 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT = public static final long FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT =
TimeUnit.MINUTES.toMillis(5); TimeUnit.MINUTES.toMillis(5);
// HDFS Router-based federation admin
public static final String DFS_ROUTER_ADMIN_HANDLER_COUNT_KEY =
FEDERATION_ROUTER_PREFIX + "admin.handler.count";
public static final int DFS_ROUTER_ADMIN_HANDLER_COUNT_DEFAULT = 1;
public static final int DFS_ROUTER_ADMIN_PORT_DEFAULT = 8111;
public static final String DFS_ROUTER_ADMIN_ADDRESS_KEY =
FEDERATION_ROUTER_PREFIX + "admin-address";
public static final String DFS_ROUTER_ADMIN_ADDRESS_DEFAULT =
"0.0.0.0:" + DFS_ROUTER_ADMIN_PORT_DEFAULT;
public static final String DFS_ROUTER_ADMIN_BIND_HOST_KEY =
FEDERATION_ROUTER_PREFIX + "admin-bind-host";
public static final String DFS_ROUTER_ADMIN_ENABLE =
FEDERATION_ROUTER_PREFIX + "admin.enable";
public static final boolean DFS_ROUTER_ADMIN_ENABLE_DEFAULT = true;
// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
@Deprecated @Deprecated
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.proto.RouterProtocolProtos.RouterAdminProtocolService;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.security.token.TokenInfo;
/**
* Protocol that a clients use to communicate with the NameNode.
* Note: This extends the protocolbuffer service based interface to
* add annotations required for security.
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
@KerberosInfo(
serverPrincipal = DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY)
@TokenInfo(DelegationTokenSelector.class)
@ProtocolInfo(protocolName = HdfsConstants.CLIENT_NAMENODE_PROTOCOL_NAME,
protocolVersion = 1)
public interface RouterAdminProtocolPB extends
RouterAdminProtocolService.BlockingInterface {
}

View File

@ -0,0 +1,151 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntryRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntryResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetMountTableEntriesRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetMountTableEntriesResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryResponseProto;
import org.apache.hadoop.hdfs.server.federation.router.RouterAdminServer;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.AddMountTableEntryRequestPBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.AddMountTableEntryResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetMountTableEntriesRequestPBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetMountTableEntriesResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryRequestPBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.UpdateMountTableEntryRequestPBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.UpdateMountTableEntryResponsePBImpl;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
* This class is used on the server side. Calls come across the wire for the for
* protocol {@link RouterAdminProtocolPB}. This class translates the PB data
* types to the native data types used inside the HDFS Router as specified in
* the generic RouterAdminProtocol.
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
public class RouterAdminProtocolServerSideTranslatorPB implements
RouterAdminProtocolPB {
private final RouterAdminServer server;
/**
* Constructor.
* @param server The NN server.
* @throws IOException
*/
public RouterAdminProtocolServerSideTranslatorPB(RouterAdminServer server)
throws IOException {
this.server = server;
}
@Override
public AddMountTableEntryResponseProto addMountTableEntry(
RpcController controller, AddMountTableEntryRequestProto request)
throws ServiceException {
try {
AddMountTableEntryRequest req =
new AddMountTableEntryRequestPBImpl(request);
AddMountTableEntryResponse response = server.addMountTableEntry(req);
AddMountTableEntryResponsePBImpl responsePB =
(AddMountTableEntryResponsePBImpl)response;
return responsePB.getProto();
} catch (IOException e) {
throw new ServiceException(e);
}
}
/**
* Remove an entry from the mount table.
*/
@Override
public RemoveMountTableEntryResponseProto removeMountTableEntry(
RpcController controller, RemoveMountTableEntryRequestProto request)
throws ServiceException {
try {
RemoveMountTableEntryRequest req =
new RemoveMountTableEntryRequestPBImpl(request);
RemoveMountTableEntryResponse response =
server.removeMountTableEntry(req);
RemoveMountTableEntryResponsePBImpl responsePB =
(RemoveMountTableEntryResponsePBImpl)response;
return responsePB.getProto();
} catch (IOException e) {
throw new ServiceException(e);
}
}
/**
* Get matching mount table entries.
*/
@Override
public GetMountTableEntriesResponseProto getMountTableEntries(
RpcController controller, GetMountTableEntriesRequestProto request)
throws ServiceException {
try {
GetMountTableEntriesRequest req =
new GetMountTableEntriesRequestPBImpl(request);
GetMountTableEntriesResponse response = server.getMountTableEntries(req);
GetMountTableEntriesResponsePBImpl responsePB =
(GetMountTableEntriesResponsePBImpl)response;
return responsePB.getProto();
} catch (IOException e) {
throw new ServiceException(e);
}
}
/**
* Update a single mount table entry.
*/
@Override
public UpdateMountTableEntryResponseProto updateMountTableEntry(
RpcController controller, UpdateMountTableEntryRequestProto request)
throws ServiceException {
try {
UpdateMountTableEntryRequest req =
new UpdateMountTableEntryRequestPBImpl(request);
UpdateMountTableEntryResponse response =
server.updateMountTableEntry(req);
UpdateMountTableEntryResponsePBImpl responsePB =
(UpdateMountTableEntryResponsePBImpl)response;
return responsePB.getProto();
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -0,0 +1,150 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import java.io.Closeable;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntryRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntryResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetMountTableEntriesRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetMountTableEntriesResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryResponseProto;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.AddMountTableEntryRequestPBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.AddMountTableEntryResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetMountTableEntriesRequestPBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetMountTableEntriesResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryRequestPBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.UpdateMountTableEntryRequestPBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.UpdateMountTableEntryResponsePBImpl;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil;
import com.google.protobuf.ServiceException;
/**
* This class forwards NN's ClientProtocol calls as RPC calls to the NN server
* while translating from the parameter types used in ClientProtocol to the
* new PB types.
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
public class RouterAdminProtocolTranslatorPB
implements ProtocolMetaInterface, MountTableManager,
Closeable, ProtocolTranslator {
final private RouterAdminProtocolPB rpcProxy;
public RouterAdminProtocolTranslatorPB(RouterAdminProtocolPB proxy) {
rpcProxy = proxy;
}
@Override
public void close() {
RPC.stopProxy(rpcProxy);
}
@Override
public Object getUnderlyingProxyObject() {
return rpcProxy;
}
@Override
public boolean isMethodSupported(String methodName) throws IOException {
return RpcClientUtil.isMethodSupported(rpcProxy,
RouterAdminProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
RPC.getProtocolVersion(RouterAdminProtocolPB.class), methodName);
}
@Override
public AddMountTableEntryResponse addMountTableEntry(
AddMountTableEntryRequest request) throws IOException {
AddMountTableEntryRequestPBImpl requestPB =
(AddMountTableEntryRequestPBImpl)request;
AddMountTableEntryRequestProto proto = requestPB.getProto();
try {
AddMountTableEntryResponseProto response =
rpcProxy.addMountTableEntry(null, proto);
return new AddMountTableEntryResponsePBImpl(response);
} catch (ServiceException e) {
throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
}
}
@Override
public UpdateMountTableEntryResponse updateMountTableEntry(
UpdateMountTableEntryRequest request) throws IOException {
UpdateMountTableEntryRequestPBImpl requestPB =
(UpdateMountTableEntryRequestPBImpl)request;
UpdateMountTableEntryRequestProto proto = requestPB.getProto();
try {
UpdateMountTableEntryResponseProto response =
rpcProxy.updateMountTableEntry(null, proto);
return new UpdateMountTableEntryResponsePBImpl(response);
} catch (ServiceException e) {
throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
}
}
@Override
public RemoveMountTableEntryResponse removeMountTableEntry(
RemoveMountTableEntryRequest request) throws IOException {
RemoveMountTableEntryRequestPBImpl requestPB =
(RemoveMountTableEntryRequestPBImpl)request;
RemoveMountTableEntryRequestProto proto = requestPB.getProto();
try {
RemoveMountTableEntryResponseProto responseProto =
rpcProxy.removeMountTableEntry(null, proto);
return new RemoveMountTableEntryResponsePBImpl(responseProto);
} catch (ServiceException e) {
throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
}
}
@Override
public GetMountTableEntriesResponse getMountTableEntries(
GetMountTableEntriesRequest request) throws IOException {
GetMountTableEntriesRequestPBImpl requestPB =
(GetMountTableEntriesRequestPBImpl)request;
GetMountTableEntriesRequestProto proto = requestPB.getProto();
try {
GetMountTableEntriesResponseProto response =
rpcProxy.getMountTableEntries(null, proto);
return new GetMountTableEntriesResponsePBImpl(response);
} catch (ServiceException e) {
throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
}
}
}

View File

@ -61,7 +61,7 @@ public class MembershipNamenodeResolver
/** Reference to the State Store. */ /** Reference to the State Store. */
private final StateStoreService stateStore; private final StateStoreService stateStore;
/** Membership State Store interface. */ /** Membership State Store interface. */
private final MembershipStore membershipInterface; private MembershipStore membershipInterface;
/** Parent router ID. */ /** Parent router ID. */
private String routerId; private String routerId;
@ -82,25 +82,27 @@ public MembershipNamenodeResolver(
if (this.stateStore != null) { if (this.stateStore != null) {
// Request cache updates from the state store // Request cache updates from the state store
this.stateStore.registerCacheExternal(this); this.stateStore.registerCacheExternal(this);
}
// Initialize the interface to get the membership
this.membershipInterface = this.stateStore.getRegisteredRecordStore(
MembershipStore.class);
} else {
this.membershipInterface = null;
} }
private synchronized MembershipStore getMembershipStore() throws IOException {
if (this.membershipInterface == null) {
this.membershipInterface = this.stateStore.getRegisteredRecordStore(
MembershipStore.class);
if (this.membershipInterface == null) { if (this.membershipInterface == null) {
throw new IOException("State Store does not have an interface for " + throw new IOException("State Store does not have an interface for " +
MembershipStore.class.getSimpleName()); MembershipStore.class.getSimpleName());
} }
} }
return this.membershipInterface;
}
@Override @Override
public boolean loadCache(boolean force) { public boolean loadCache(boolean force) {
// Our cache depends on the store, update it first // Our cache depends on the store, update it first
try { try {
this.membershipInterface.loadCache(force); MembershipStore membership = getMembershipStore();
membership.loadCache(force);
} catch (IOException e) { } catch (IOException e) {
LOG.error("Cannot update membership from the State Store", e); LOG.error("Cannot update membership from the State Store", e);
} }
@ -126,8 +128,9 @@ public void updateActiveNamenode(
GetNamenodeRegistrationsRequest request = GetNamenodeRegistrationsRequest request =
GetNamenodeRegistrationsRequest.newInstance(partial); GetNamenodeRegistrationsRequest.newInstance(partial);
MembershipStore membership = getMembershipStore();
GetNamenodeRegistrationsResponse response = GetNamenodeRegistrationsResponse response =
this.membershipInterface.getNamenodeRegistrations(request); membership.getNamenodeRegistrations(request);
List<MembershipState> records = response.getNamenodeMemberships(); List<MembershipState> records = response.getNamenodeMemberships();
if (records != null && records.size() == 1) { if (records != null && records.size() == 1) {
@ -135,7 +138,7 @@ public void updateActiveNamenode(
UpdateNamenodeRegistrationRequest updateRequest = UpdateNamenodeRegistrationRequest updateRequest =
UpdateNamenodeRegistrationRequest.newInstance( UpdateNamenodeRegistrationRequest.newInstance(
record.getNameserviceId(), record.getNamenodeId(), ACTIVE); record.getNameserviceId(), record.getNamenodeId(), ACTIVE);
this.membershipInterface.updateNamenodeRegistration(updateRequest); membership.updateNamenodeRegistration(updateRequest);
} }
} catch (StateStoreUnavailableException e) { } catch (StateStoreUnavailableException e) {
LOG.error("Cannot update {} as active, State Store unavailable", address); LOG.error("Cannot update {} as active, State Store unavailable", address);
@ -226,14 +229,14 @@ public boolean registerNamenode(NamenodeStatusReport report)
NamenodeHeartbeatRequest request = NamenodeHeartbeatRequest.newInstance(); NamenodeHeartbeatRequest request = NamenodeHeartbeatRequest.newInstance();
request.setNamenodeMembership(record); request.setNamenodeMembership(record);
return this.membershipInterface.namenodeHeartbeat(request).getResult(); return getMembershipStore().namenodeHeartbeat(request).getResult();
} }
@Override @Override
public Set<FederationNamespaceInfo> getNamespaces() throws IOException { public Set<FederationNamespaceInfo> getNamespaces() throws IOException {
GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance(); GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance();
GetNamespaceInfoResponse response = GetNamespaceInfoResponse response =
this.membershipInterface.getNamespaceInfo(request); getMembershipStore().getNamespaceInfo(request);
return response.getNamespaceInfo(); return response.getNamespaceInfo();
} }
@ -259,8 +262,9 @@ private List<MembershipState> getRecentRegistrationForQuery(
// Retrieve a list of all registrations that match this query. // Retrieve a list of all registrations that match this query.
// This may include all NN records for a namespace/blockpool, including // This may include all NN records for a namespace/blockpool, including
// duplicate records for the same NN from different routers. // duplicate records for the same NN from different routers.
MembershipStore membershipStore = getMembershipStore();
GetNamenodeRegistrationsResponse response = GetNamenodeRegistrationsResponse response =
this.membershipInterface.getNamenodeRegistrations(request); membershipStore.getNamenodeRegistrations(request);
List<MembershipState> memberships = response.getNamenodeMemberships(); List<MembershipState> memberships = response.getNamenodeMemberships();
if (!addExpired || !addUnavailable) { if (!addExpired || !addUnavailable) {

View File

@ -81,6 +81,10 @@ public class Router extends CompositeService {
private RouterRpcServer rpcServer; private RouterRpcServer rpcServer;
private InetSocketAddress rpcAddress; private InetSocketAddress rpcAddress;
/** RPC interface for the admin. */
private RouterAdminServer adminServer;
private InetSocketAddress adminAddress;
/** Interface with the State Store. */ /** Interface with the State Store. */
private StateStoreService stateStore; private StateStoreService stateStore;
@ -116,6 +120,14 @@ public Router() {
protected void serviceInit(Configuration configuration) throws Exception { protected void serviceInit(Configuration configuration) throws Exception {
this.conf = configuration; this.conf = configuration;
if (conf.getBoolean(
DFSConfigKeys.DFS_ROUTER_STORE_ENABLE,
DFSConfigKeys.DFS_ROUTER_STORE_ENABLE_DEFAULT)) {
// Service that maintains the State Store connection
this.stateStore = new StateStoreService();
addService(this.stateStore);
}
// Resolver to track active NNs // Resolver to track active NNs
this.namenodeResolver = newActiveNamenodeResolver( this.namenodeResolver = newActiveNamenodeResolver(
this.conf, this.stateStore); this.conf, this.stateStore);
@ -138,6 +150,14 @@ protected void serviceInit(Configuration configuration) throws Exception {
this.setRpcServerAddress(rpcServer.getRpcAddress()); this.setRpcServerAddress(rpcServer.getRpcAddress());
} }
if (conf.getBoolean(
DFSConfigKeys.DFS_ROUTER_ADMIN_ENABLE,
DFSConfigKeys.DFS_ROUTER_ADMIN_ENABLE_DEFAULT)) {
// Create admin server
this.adminServer = createAdminServer();
addService(this.adminServer);
}
if (conf.getBoolean( if (conf.getBoolean(
DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE, DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE,
DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE_DEFAULT)) { DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE_DEFAULT)) {
@ -263,6 +283,38 @@ public InetSocketAddress getRpcServerAddress() {
return this.rpcAddress; return this.rpcAddress;
} }
/////////////////////////////////////////////////////////
// Admin server
/////////////////////////////////////////////////////////
/**
* Create a new router admin server to handle the router admin interface.
*
* @return RouterAdminServer
* @throws IOException If the admin server was not successfully started.
*/
protected RouterAdminServer createAdminServer() throws IOException {
return new RouterAdminServer(this.conf, this);
}
/**
* Set the current Admin socket for the router.
*
* @param adminAddress Admin RPC address.
*/
protected void setAdminServerAddress(InetSocketAddress address) {
this.adminAddress = address;
}
/**
* Get the current Admin socket address for the router.
*
* @return InetSocketAddress Admin address.
*/
public InetSocketAddress getAdminServerAddress() {
return adminAddress;
}
///////////////////////////////////////////////////////// /////////////////////////////////////////////////////////
// Namenode heartbeat monitors // Namenode heartbeat monitors
///////////////////////////////////////////////////////// /////////////////////////////////////////////////////////

View File

@ -0,0 +1,183 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.proto.RouterProtocolProtos.RouterAdminProtocolService;
import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.service.AbstractService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
/**
* This class is responsible for handling all of the Admin calls to the HDFS
* router. It is created, started, and stopped by {@link Router}.
*/
public class RouterAdminServer extends AbstractService
implements MountTableManager {
private static final Logger LOG =
LoggerFactory.getLogger(RouterAdminServer.class);
private Configuration conf;
private final Router router;
private MountTableStore mountTableStore;
/** The Admin server that listens to requests from clients. */
private final Server adminServer;
private final InetSocketAddress adminAddress;
public RouterAdminServer(Configuration conf, Router router)
throws IOException {
super(RouterAdminServer.class.getName());
this.conf = conf;
this.router = router;
int handlerCount = this.conf.getInt(
DFSConfigKeys.DFS_ROUTER_ADMIN_HANDLER_COUNT_KEY,
DFSConfigKeys.DFS_ROUTER_ADMIN_HANDLER_COUNT_DEFAULT);
RPC.setProtocolEngine(this.conf, RouterAdminProtocolPB.class,
ProtobufRpcEngine.class);
RouterAdminProtocolServerSideTranslatorPB routerAdminProtocolTranslator =
new RouterAdminProtocolServerSideTranslatorPB(this);
BlockingService clientNNPbService = RouterAdminProtocolService.
newReflectiveBlockingService(routerAdminProtocolTranslator);
InetSocketAddress confRpcAddress = conf.getSocketAddr(
DFSConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY,
DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY,
DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT,
DFSConfigKeys.DFS_ROUTER_ADMIN_PORT_DEFAULT);
String bindHost = conf.get(
DFSConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY,
confRpcAddress.getHostName());
LOG.info("Admin server binding to {}:{}",
bindHost, confRpcAddress.getPort());
this.adminServer = new RPC.Builder(this.conf)
.setProtocol(RouterAdminProtocolPB.class)
.setInstance(clientNNPbService)
.setBindAddress(bindHost)
.setPort(confRpcAddress.getPort())
.setNumHandlers(handlerCount)
.setVerbose(false)
.build();
// The RPC-server port can be ephemeral... ensure we have the correct info
InetSocketAddress listenAddress = this.adminServer.getListenerAddress();
this.adminAddress = new InetSocketAddress(
confRpcAddress.getHostName(), listenAddress.getPort());
router.setAdminServerAddress(this.adminAddress);
}
/** Allow access to the client RPC server for testing. */
@VisibleForTesting
Server getAdminServer() {
return this.adminServer;
}
private MountTableStore getMountTableStore() throws IOException {
if (this.mountTableStore == null) {
this.mountTableStore = router.getStateStore().getRegisteredRecordStore(
MountTableStore.class);
if (this.mountTableStore == null) {
throw new IOException("Mount table state store is not available.");
}
}
return this.mountTableStore;
}
/**
* Get the RPC address of the admin service.
* @return Administration service RPC address.
*/
public InetSocketAddress getRpcAddress() {
return this.adminAddress;
}
@Override
protected void serviceInit(Configuration configuration) throws Exception {
this.conf = configuration;
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
this.adminServer.start();
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
if (this.adminServer != null) {
this.adminServer.stop();
}
super.serviceStop();
}
@Override
public AddMountTableEntryResponse addMountTableEntry(
AddMountTableEntryRequest request) throws IOException {
return getMountTableStore().addMountTableEntry(request);
}
@Override
public UpdateMountTableEntryResponse updateMountTableEntry(
UpdateMountTableEntryRequest request) throws IOException {
return getMountTableStore().updateMountTableEntry(request);
}
@Override
public RemoveMountTableEntryResponse removeMountTableEntry(
RemoveMountTableEntryRequest request) throws IOException {
return getMountTableStore().removeMountTableEntry(request);
}
@Override
public GetMountTableEntriesResponse getMountTableEntries(
GetMountTableEntriesRequest request) throws IOException {
return getMountTableStore().getMountTableEntries(request);
}
}

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolTranslatorPB;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
/**
* Client to connect to the {@link Router} via the admin protocol.
*/
@Private
public class RouterClient implements Closeable {
private final RouterAdminProtocolTranslatorPB proxy;
private final UserGroupInformation ugi;
private static RouterAdminProtocolTranslatorPB createRouterProxy(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
throws IOException {
RPC.setProtocolEngine(
conf, RouterAdminProtocolPB.class, ProtobufRpcEngine.class);
AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false);
final long version = RPC.getProtocolVersion(RouterAdminProtocolPB.class);
RouterAdminProtocolPB proxy = RPC.getProtocolProxy(
RouterAdminProtocolPB.class, version, address, ugi, conf,
NetUtils.getDefaultSocketFactory(conf),
RPC.getRpcTimeout(conf), null,
fallbackToSimpleAuth).getProxy();
return new RouterAdminProtocolTranslatorPB(proxy);
}
public RouterClient(InetSocketAddress address, Configuration conf)
throws IOException {
this.ugi = UserGroupInformation.getCurrentUser();
this.proxy = createRouterProxy(address, conf, ugi);
}
public MountTableManager getMountTableManager() {
return proxy;
}
@Override
public synchronized void close() throws IOException {
RPC.stopProxy(proxy);
}
}

View File

@ -0,0 +1,341 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.tools.federation;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class provides some Federation administrative access shell commands.
*/
@Private
public class RouterAdmin extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(RouterAdmin.class);
private RouterClient client;
public static void main(String[] argv) throws Exception {
Configuration conf = new HdfsConfiguration();
RouterAdmin admin = new RouterAdmin(conf);
int res = ToolRunner.run(admin, argv);
System.exit(res);
}
public RouterAdmin(Configuration conf) {
super(conf);
}
/**
* Print the usage message.
*/
public void printUsage() {
String usage = "Federation Admin Tools:\n"
+ "\t[-add <source> <nameservice> <destination> "
+ "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL]]\n"
+ "\t[-rm <source>]\n"
+ "\t[-ls <path>]\n";
System.out.println(usage);
}
@Override
public int run(String[] argv) throws Exception {
if (argv.length < 1) {
System.err.println("Not enough parameters specificed");
printUsage();
return -1;
}
int exitCode = -1;
int i = 0;
String cmd = argv[i++];
// Verify that we have enough command line parameters
if ("-add".equals(cmd)) {
if (argv.length < 4) {
System.err.println("Not enough parameters specificed for cmd " + cmd);
printUsage();
return exitCode;
}
} else if ("-rm".equalsIgnoreCase(cmd)) {
if (argv.length < 2) {
System.err.println("Not enough parameters specificed for cmd " + cmd);
printUsage();
return exitCode;
}
}
// Initialize RouterClient
try {
String address = getConf().getTrimmed(
DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY,
DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT);
InetSocketAddress routerSocket = NetUtils.createSocketAddr(address);
client = new RouterClient(routerSocket, getConf());
} catch (RPC.VersionMismatch v) {
System.err.println(
"Version mismatch between client and server... command aborted");
return exitCode;
} catch (IOException e) {
System.err.println("Bad connection to Router... command aborted");
return exitCode;
}
Exception debugException = null;
exitCode = 0;
try {
if ("-add".equals(cmd)) {
if (addMount(argv, i)) {
System.err.println("Successfuly added mount point " + argv[i]);
}
} else if ("-rm".equals(cmd)) {
if (removeMount(argv[i])) {
System.err.println("Successfully removed mount point " + argv[i]);
}
} else if ("-ls".equals(cmd)) {
if (argv.length > 1) {
listMounts(argv[i]);
} else {
listMounts("/");
}
} else {
printUsage();
return exitCode;
}
} catch (IllegalArgumentException arge) {
debugException = arge;
exitCode = -1;
System.err.println(cmd.substring(1) + ": " + arge.getLocalizedMessage());
printUsage();
} catch (RemoteException e) {
// This is a error returned by the server.
// Print out the first line of the error message, ignore the stack trace.
exitCode = -1;
debugException = e;
try {
String[] content;
content = e.getLocalizedMessage().split("\n");
System.err.println(cmd.substring(1) + ": " + content[0]);
e.printStackTrace();
} catch (Exception ex) {
System.err.println(cmd.substring(1) + ": " + ex.getLocalizedMessage());
e.printStackTrace();
debugException = ex;
}
} catch (Exception e) {
exitCode = -1;
debugException = e;
System.err.println(cmd.substring(1) + ": " + e.getLocalizedMessage());
e.printStackTrace();
}
if (debugException != null) {
LOG.debug("Exception encountered", debugException);
}
return exitCode;
}
/**
* Add a mount table entry or update if it exists.
*
* @param parameters Parameters for the mount point.
* @param i Index in the parameters.
*/
public boolean addMount(String[] parameters, int i) throws IOException {
// Mandatory parameters
String mount = parameters[i++];
String[] nss = parameters[i++].split(",");
String dest = parameters[i++];
// Optional parameters
boolean readOnly = false;
DestinationOrder order = DestinationOrder.HASH;
while (i < parameters.length) {
if (parameters[i].equals("-readonly")) {
readOnly = true;
} else if (parameters[i].equals("-order")) {
i++;
try {
order = DestinationOrder.valueOf(parameters[i]);
} catch(Exception e) {
System.err.println("Cannot parse order: " + parameters[i]);
}
}
i++;
}
return addMount(mount, nss, dest, readOnly, order);
}
/**
* Add a mount table entry or update if it exists.
*
* @param mount Mount point.
* @param nss Namespaces where this is mounted to.
* @param dest Destination path.
* @param readonly If the mount point is read only.
* @param order Order of the destination locations.
* @return If the mount point was added.
* @throws IOException Error adding the mount point.
*/
public boolean addMount(String mount, String[] nss, String dest,
boolean readonly, DestinationOrder order) throws IOException {
// Get the existing entry
MountTableManager mountTable = client.getMountTableManager();
GetMountTableEntriesRequest getRequest =
GetMountTableEntriesRequest.newInstance(mount);
GetMountTableEntriesResponse getResponse =
mountTable.getMountTableEntries(getRequest);
List<MountTable> results = getResponse.getEntries();
MountTable existingEntry = null;
for (MountTable result : results) {
if (mount.equals(result.getSourcePath())) {
existingEntry = result;
}
}
if (existingEntry == null) {
// Create and add the entry if it doesn't exist
Map<String, String> destMap = new LinkedHashMap<>();
for (String ns : nss) {
destMap.put(ns, dest);
}
MountTable newEntry = MountTable.newInstance(mount, destMap);
if (readonly) {
newEntry.setReadOnly(true);
}
if (order != null) {
newEntry.setDestOrder(order);
}
AddMountTableEntryRequest request =
AddMountTableEntryRequest.newInstance(newEntry);
AddMountTableEntryResponse addResponse =
mountTable.addMountTableEntry(request);
boolean added = addResponse.getStatus();
if (!added) {
System.err.println("Cannot add mount point " + mount);
}
return added;
} else {
// Update the existing entry if it exists
for (String nsId : nss) {
if (!existingEntry.addDestination(nsId, dest)) {
System.err.println("Cannot add destination at " + nsId + " " + dest);
}
}
if (readonly) {
existingEntry.setReadOnly(true);
}
if (order != null) {
existingEntry.setDestOrder(order);
}
UpdateMountTableEntryRequest updateRequest =
UpdateMountTableEntryRequest.newInstance(existingEntry);
UpdateMountTableEntryResponse updateResponse =
mountTable.updateMountTableEntry(updateRequest);
boolean updated = updateResponse.getStatus();
if (!updated) {
System.err.println("Cannot update mount point " + mount);
}
return updated;
}
}
/**
* Remove mount point.
*
* @param path Path to remove.
* @throws IOException If it cannot be removed.
*/
public boolean removeMount(String path) throws IOException {
MountTableManager mountTable = client.getMountTableManager();
RemoveMountTableEntryRequest request =
RemoveMountTableEntryRequest.newInstance(path);
RemoveMountTableEntryResponse response =
mountTable.removeMountTableEntry(request);
boolean removed = response.getStatus();
if (!removed) {
System.out.println("Cannot remove mount point " + path);
}
return removed;
}
/**
* List mount points.
*
* @param path Path to list.
* @throws IOException If it cannot be listed.
*/
public void listMounts(String path) throws IOException {
MountTableManager mountTable = client.getMountTableManager();
GetMountTableEntriesRequest request =
GetMountTableEntriesRequest.newInstance(path);
GetMountTableEntriesResponse response =
mountTable.getMountTableEntries(request);
List<MountTable> entries = response.getEntries();
printMounts(entries);
}
private static void printMounts(List<MountTable> entries) {
System.out.println("Mount Table Entries:");
System.out.println(String.format(
"%-25s %-25s",
"Source", "Destinations"));
for (MountTable entry : entries) {
StringBuilder destBuilder = new StringBuilder();
for (RemoteLocation location : entry.getDestinations()) {
if (destBuilder.length() > 0) {
destBuilder.append(",");
}
destBuilder.append(String.format("%s->%s", location.getNameserviceId(),
location.getDest()));
}
System.out.println(String.format("%-25s %-25s", entry.getSourcePath(),
destBuilder.toString()));
}
}
}

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* It includes the tools to manage the Router-based federation. Includes the
* utilities to add and remove mount table entries.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
package org.apache.hadoop.hdfs.tools.federation;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.hdfs.protocol.proto";
option java_outer_classname = "RouterProtocolProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.hdfs.router;
import "FederationProtocol.proto";
service RouterAdminProtocolService {
/**
* Add a mount table entry.
*/
rpc addMountTableEntry(AddMountTableEntryRequestProto) returns(AddMountTableEntryResponseProto);
/**
* Update an existing mount table entry without copying files.
*/
rpc updateMountTableEntry(UpdateMountTableEntryRequestProto) returns(UpdateMountTableEntryResponseProto);
/**
* Remove a mount table entry.
*/
rpc removeMountTableEntry(RemoveMountTableEntryRequestProto) returns(RemoveMountTableEntryResponseProto);
/**
* Get matching mount entries
*/
rpc getMountTableEntries(GetMountTableEntriesRequestProto) returns(GetMountTableEntriesResponseProto);
}

View File

@ -4745,6 +4745,44 @@
</description> </description>
</property> </property>
<property>
<name>dfs.federation.router.admin.enable</name>
<value>true</value>
<description>
If the RPC admin service to handle client requests in the router is
enabled.
</description>
</property>
<property>
<name>dfs.federation.router.admin-address</name>
<value>0.0.0.0:8111</value>
<description>
RPC address that handles the admin requests.
The value of this property will take the form of router-host1:rpc-port.
</description>
</property>
<property>
<name>dfs.federation.router.admin-bind-host</name>
<value></value>
<description>
The actual address the RPC admin server will bind to. If this optional
address is set, it overrides only the hostname portion of
dfs.federation.router.admin-address. This is useful for making the name
node listen on all interfaces by setting it to 0.0.0.0.
</description>
</property>
<property>
<name>dfs.federation.router.admin.handler.count</name>
<value>1</value>
<description>
The number of server threads for the router to handle RPC requests from
admin.
</description>
</property>
<property> <property>
<name>dfs.federation.router.file.resolver.client.class</name> <name>dfs.federation.router.file.resolver.client.class</name>
<value>org.apache.hadoop.hdfs.server.federation.MockResolver</value> <value>org.apache.hadoop.hdfs.server.federation.MockResolver</value>
@ -4761,6 +4799,14 @@
</description> </description>
</property> </property>
<property>
<name>dfs.federation.router.store.enable</name>
<value>true</value>
<description>
If the Router connects to the State Store.
</description>
</property>
<property> <property>
<name>dfs.federation.router.store.serializer</name> <name>dfs.federation.router.store.serializer</name>
<value>org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializerPBImpl</value> <value>org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializerPBImpl</value>

View File

@ -28,8 +28,10 @@ public class RouterConfigBuilder {
private Configuration conf; private Configuration conf;
private boolean enableRpcServer = false; private boolean enableRpcServer = false;
private boolean enableAdminServer = false;
private boolean enableHeartbeat = false; private boolean enableHeartbeat = false;
private boolean enableLocalHeartbeat = false; private boolean enableLocalHeartbeat = false;
private boolean enableStateStore = false;
public RouterConfigBuilder(Configuration configuration) { public RouterConfigBuilder(Configuration configuration) {
this.conf = configuration; this.conf = configuration;
@ -41,8 +43,10 @@ public RouterConfigBuilder() {
public RouterConfigBuilder all() { public RouterConfigBuilder all() {
this.enableRpcServer = true; this.enableRpcServer = true;
this.enableAdminServer = true;
this.enableHeartbeat = true; this.enableHeartbeat = true;
this.enableLocalHeartbeat = true; this.enableLocalHeartbeat = true;
this.enableStateStore = true;
return this; return this;
} }
@ -56,21 +60,43 @@ public RouterConfigBuilder rpc(boolean enable) {
return this; return this;
} }
public RouterConfigBuilder admin(boolean enable) {
this.enableAdminServer = enable;
return this;
}
public RouterConfigBuilder heartbeat(boolean enable) { public RouterConfigBuilder heartbeat(boolean enable) {
this.enableHeartbeat = enable; this.enableHeartbeat = enable;
return this; return this;
} }
public RouterConfigBuilder stateStore(boolean enable) {
this.enableStateStore = enable;
return this;
}
public RouterConfigBuilder rpc() { public RouterConfigBuilder rpc() {
return this.rpc(true); return this.rpc(true);
} }
public RouterConfigBuilder admin() {
return this.admin(true);
}
public RouterConfigBuilder heartbeat() { public RouterConfigBuilder heartbeat() {
return this.heartbeat(true); return this.heartbeat(true);
} }
public RouterConfigBuilder stateStore() {
return this.stateStore(true);
}
public Configuration build() { public Configuration build() {
conf.setBoolean(DFSConfigKeys.DFS_ROUTER_STORE_ENABLE,
this.enableStateStore);
conf.setBoolean(DFSConfigKeys.DFS_ROUTER_RPC_ENABLE, this.enableRpcServer); conf.setBoolean(DFSConfigKeys.DFS_ROUTER_RPC_ENABLE, this.enableRpcServer);
conf.setBoolean(DFSConfigKeys.DFS_ROUTER_ADMIN_ENABLE,
this.enableAdminServer);
conf.setBoolean(DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE, conf.setBoolean(DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE,
this.enableHeartbeat); this.enableHeartbeat);
conf.setBoolean(DFSConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE, conf.setBoolean(DFSConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE,

View File

@ -25,8 +25,12 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS; import static org.apache.hadoop.hdfs.DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS;
@ -46,6 +50,7 @@
import java.util.List; import java.util.List;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Random; import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
@ -67,6 +72,7 @@
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport; import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
import org.apache.hadoop.hdfs.server.federation.router.Router; import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@ -101,6 +107,15 @@ public class RouterDFSCluster {
/** Mini cluster. */ /** Mini cluster. */
private MiniDFSCluster cluster; private MiniDFSCluster cluster;
protected static final long DEFAULT_HEARTBEAT_INTERVAL_MS =
TimeUnit.SECONDS.toMillis(5);
protected static final long DEFAULT_CACHE_INTERVAL_MS =
TimeUnit.SECONDS.toMillis(5);
/** Heartbeat interval in milliseconds. */
private long heartbeatInterval;
/** Cache flush interval in milliseconds. */
private long cacheFlushInterval;
/** Router configuration overrides. */ /** Router configuration overrides. */
private Configuration routerOverrides; private Configuration routerOverrides;
/** Namenode configuration overrides. */ /** Namenode configuration overrides. */
@ -118,6 +133,7 @@ public class RouterContext {
private int rpcPort; private int rpcPort;
private DFSClient client; private DFSClient client;
private Configuration conf; private Configuration conf;
private RouterClient adminClient;
private URI fileSystemUri; private URI fileSystemUri;
public RouterContext(Configuration conf, String nsId, String nnId) public RouterContext(Configuration conf, String nsId, String nnId)
@ -183,6 +199,15 @@ public DFSClient run() throws IOException {
}); });
} }
public RouterClient getAdminClient() throws IOException {
if (adminClient == null) {
InetSocketAddress routerSocket = router.getAdminServerAddress();
LOG.info("Connecting to router admin at {}", routerSocket);
adminClient = new RouterClient(routerSocket, conf);
}
return adminClient;
}
public DFSClient getClient() throws IOException, URISyntaxException { public DFSClient getClient() throws IOException, URISyntaxException {
if (client == null) { if (client == null) {
LOG.info("Connecting to router at {}", fileSystemUri); LOG.info("Connecting to router at {}", fileSystemUri);
@ -304,13 +329,22 @@ public String getConfSuffix() {
} }
} }
public RouterDFSCluster(boolean ha, int numNameservices, int numNamenodes) { public RouterDFSCluster(boolean ha, int numNameservices, int numNamenodes,
long heartbeatInterval, long cacheFlushInterval) {
this.highAvailability = ha; this.highAvailability = ha;
this.heartbeatInterval = heartbeatInterval;
this.cacheFlushInterval = cacheFlushInterval;
configureNameservices(numNameservices, numNamenodes); configureNameservices(numNameservices, numNamenodes);
} }
public RouterDFSCluster(boolean ha, int numNameservices) { public RouterDFSCluster(boolean ha, int numNameservices) {
this(ha, numNameservices, 2); this(ha, numNameservices, 2,
DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS);
}
public RouterDFSCluster(boolean ha, int numNameservices, int numNamnodes) {
this(ha, numNameservices, numNamnodes,
DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS);
} }
/** /**
@ -404,7 +438,12 @@ public Configuration generateRouterConfiguration(String nsId, String nnId) {
conf.set(DFS_ROUTER_RPC_ADDRESS_KEY, "127.0.0.1:0"); conf.set(DFS_ROUTER_RPC_ADDRESS_KEY, "127.0.0.1:0");
conf.set(DFS_ROUTER_RPC_BIND_HOST_KEY, "0.0.0.0"); conf.set(DFS_ROUTER_RPC_BIND_HOST_KEY, "0.0.0.0");
conf.set(DFS_ROUTER_ADMIN_ADDRESS_KEY, "127.0.0.1:0");
conf.set(DFS_ROUTER_ADMIN_BIND_HOST_KEY, "0.0.0.0");
conf.set(DFS_ROUTER_DEFAULT_NAMESERVICE, nameservices.get(0)); conf.set(DFS_ROUTER_DEFAULT_NAMESERVICE, nameservices.get(0));
conf.setLong(DFS_ROUTER_HEARTBEAT_INTERVAL_MS, heartbeatInterval);
conf.setLong(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, cacheFlushInterval);
// Use mock resolver classes // Use mock resolver classes
conf.setClass(FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS, conf.setClass(FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,

View File

@ -0,0 +1,148 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation;
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.createMockRegistrationForNamenode;
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
/**
* Test utility to mimic a federated HDFS cluster with a router and a state
* store.
*/
public class StateStoreDFSCluster extends RouterDFSCluster {
private static final Class<?> DEFAULT_FILE_RESOLVER =
MountTableResolver.class;
private static final Class<?> DEFAULT_NAMENODE_RESOLVER =
MembershipNamenodeResolver.class;
public StateStoreDFSCluster(boolean ha, int numNameservices, int numNamenodes,
long heartbeatInterval, long cacheFlushInterval)
throws IOException, InterruptedException {
this(ha, numNameservices, numNamenodes, heartbeatInterval,
cacheFlushInterval, DEFAULT_FILE_RESOLVER);
}
public StateStoreDFSCluster(boolean ha, int numNameservices, int numNamenodes,
long heartbeatInterval, long cacheFlushInterval, Class<?> fileResolver)
throws IOException, InterruptedException {
super(ha, numNameservices, numNamenodes, heartbeatInterval,
cacheFlushInterval);
// Attach state store and resolvers to router
Configuration stateStoreConfig = getStateStoreConfiguration();
// Use state store backed resolvers
stateStoreConfig.setClass(
DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
DEFAULT_NAMENODE_RESOLVER, ActiveNamenodeResolver.class);
stateStoreConfig.setClass(
DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
fileResolver, FileSubclusterResolver.class);
this.addRouterOverrides(stateStoreConfig);
}
public StateStoreDFSCluster(boolean ha, int numNameservices,
Class<?> fileResolver) throws IOException, InterruptedException {
this(ha, numNameservices, 2,
DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS, fileResolver);
}
public StateStoreDFSCluster(boolean ha, int numNameservices)
throws IOException, InterruptedException {
this(ha, numNameservices, 2,
DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS);
}
public StateStoreDFSCluster(boolean ha, int numNameservices,
int numNamnodes) throws IOException, InterruptedException {
this(ha, numNameservices, numNamnodes,
DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS);
}
/////////////////////////////////////////////////////////////////////////////
// State Store Test Fixtures
/////////////////////////////////////////////////////////////////////////////
/**
* Adds test fixtures for NN registation for each NN nameservice -> NS
* namenode -> NN rpcAddress -> 0.0.0.0:0 webAddress -> 0.0.0.0:0 state ->
* STANDBY safeMode -> false blockPool -> test.
*
* @param stateStore State Store.
* @throws IOException If it cannot register.
*/
public void createTestRegistration(StateStoreService stateStore)
throws IOException {
List<MembershipState> entries = new ArrayList<MembershipState>();
for (NamenodeContext nn : this.getNamenodes()) {
MembershipState entry = createMockRegistrationForNamenode(
nn.getNameserviceId(), nn.getNamenodeId(),
FederationNamenodeServiceState.STANDBY);
entries.add(entry);
}
synchronizeRecords(
stateStore, entries, MembershipState.class);
}
public void createTestMountTable(StateStoreService stateStore)
throws IOException {
List<MountTable> mounts = generateMockMountTable();
synchronizeRecords(stateStore, mounts, MountTable.class);
stateStore.refreshCaches();
}
public List<MountTable> generateMockMountTable() throws IOException {
// create table entries
List<MountTable> entries = new ArrayList<>();
for (String ns : this.getNameservices()) {
Map<String, String> destMap = new HashMap<>();
destMap.put(ns, getNamenodePathForNS(ns));
// Direct path
String fedPath = getFederatedPathForNS(ns);
MountTable entry = MountTable.newInstance(fedPath, destMap);
entries.add(entry);
}
// Root path goes to nameservice 1
Map<String, String> destMap = new HashMap<>();
String ns0 = this.getNameservices().get(0);
destMap.put(ns0, "/");
MountTable entry = MountTable.newInstance("/", destMap);
entries.add(entry);
return entries;
}
}

View File

@ -0,0 +1,261 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.util.Time;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* The administrator interface of the {@link Router} implemented by
* {@link RouterAdminServer}.
*/
public class TestRouterAdmin {
private static StateStoreDFSCluster cluster;
private static RouterContext routerContext;
public static final String RPC_BEAN =
"Hadoop:service=Router,name=FederationRPC";
private static List<MountTable> mockMountTable;
private static StateStoreService stateStore;
@BeforeClass
public static void globalSetUp() throws Exception {
cluster = new StateStoreDFSCluster(false, 1);
// Build and start a router with State Store + admin + RPC
Configuration conf = new RouterConfigBuilder()
.stateStore()
.admin()
.rpc()
.build();
cluster.addRouterOverrides(conf);
cluster.startRouters();
routerContext = cluster.getRandomRouter();
mockMountTable = cluster.generateMockMountTable();
Router router = routerContext.getRouter();
stateStore = router.getStateStore();
}
@AfterClass
public static void tearDown() {
cluster.stopRouter(routerContext);
}
@Before
public void testSetup() throws Exception {
assertTrue(
synchronizeRecords(stateStore, mockMountTable, MountTable.class));
}
@Test
public void testAddMountTable() throws IOException {
MountTable newEntry = MountTable.newInstance(
"/testpath", Collections.singletonMap("ns0", "/testdir"),
Time.now(), Time.now());
RouterClient client = routerContext.getAdminClient();
MountTableManager mountTable = client.getMountTableManager();
// Existing mount table size
List<MountTable> records = getMountTableEntries(mountTable);
assertEquals(records.size(), mockMountTable.size());
// Add
AddMountTableEntryRequest addRequest =
AddMountTableEntryRequest.newInstance(newEntry);
AddMountTableEntryResponse addResponse =
mountTable.addMountTableEntry(addRequest);
assertTrue(addResponse.getStatus());
// New mount table size
List<MountTable> records2 = getMountTableEntries(mountTable);
assertEquals(records2.size(), mockMountTable.size() + 1);
}
@Test
public void testAddDuplicateMountTable() throws IOException {
MountTable newEntry = MountTable.newInstance("/testpath",
Collections.singletonMap("ns0", "/testdir"), Time.now(), Time.now());
RouterClient client = routerContext.getAdminClient();
MountTableManager mountTable = client.getMountTableManager();
// Existing mount table size
List<MountTable> entries1 = getMountTableEntries(mountTable);
assertEquals(entries1.size(), mockMountTable.size());
// Add
AddMountTableEntryRequest addRequest =
AddMountTableEntryRequest.newInstance(newEntry);
AddMountTableEntryResponse addResponse =
mountTable.addMountTableEntry(addRequest);
assertTrue(addResponse.getStatus());
// New mount table size
List<MountTable> entries2 = getMountTableEntries(mountTable);
assertEquals(entries2.size(), mockMountTable.size() + 1);
// Add again, should fail
AddMountTableEntryResponse addResponse2 =
mountTable.addMountTableEntry(addRequest);
assertFalse(addResponse2.getStatus());
}
@Test
public void testRemoveMountTable() throws IOException {
RouterClient client = routerContext.getAdminClient();
MountTableManager mountTable = client.getMountTableManager();
// Existing mount table size
List<MountTable> entries1 = getMountTableEntries(mountTable);
assertEquals(entries1.size(), mockMountTable.size());
// Remove an entry
RemoveMountTableEntryRequest removeRequest =
RemoveMountTableEntryRequest.newInstance("/");
mountTable.removeMountTableEntry(removeRequest);
// New mount table size
List<MountTable> entries2 = getMountTableEntries(mountTable);
assertEquals(entries2.size(), mockMountTable.size() - 1);
}
@Test
public void testEditMountTable() throws IOException {
RouterClient client = routerContext.getAdminClient();
MountTableManager mountTable = client.getMountTableManager();
// Verify starting condition
MountTable entry = getMountTableEntry("/");
assertEquals(
Collections.singletonList(new RemoteLocation("ns0", "/")),
entry.getDestinations());
// Edit the entry for /
MountTable updatedEntry = MountTable.newInstance(
"/", Collections.singletonMap("ns1", "/"), Time.now(), Time.now());
UpdateMountTableEntryRequest updateRequest =
UpdateMountTableEntryRequest.newInstance(updatedEntry);
mountTable.updateMountTableEntry(updateRequest);
// Verify edited condition
entry = getMountTableEntry("/");
assertEquals(
Collections.singletonList(new RemoteLocation("ns1", "/")),
entry.getDestinations());
}
@Test
public void testGetMountTable() throws IOException {
RouterClient client = routerContext.getAdminClient();
MountTableManager mountTable = client.getMountTableManager();
// Verify size of table
List<MountTable> entries = getMountTableEntries(mountTable);
assertEquals(mockMountTable.size(), entries.size());
// Verify all entries are present
int matches = 0;
for (MountTable e : entries) {
for (MountTable entry : mockMountTable) {
assertEquals(e.getDestinations().size(), 1);
assertNotNull(e.getDateCreated());
assertNotNull(e.getDateModified());
if (entry.getSourcePath().equals(e.getSourcePath())) {
matches++;
}
}
}
assertEquals(matches, mockMountTable.size());
}
@Test
public void testGetSingleMountTableEntry() throws IOException {
MountTable entry = getMountTableEntry("/ns0");
assertNotNull(entry);
assertEquals(entry.getSourcePath(), "/ns0");
}
/**
* Gets an existing mount table record in the state store.
*
* @param mount The mount point of the record to remove.
* @return The matching record if found, null if it is not found.
* @throws IOException If the state store could not be accessed.
*/
private MountTable getMountTableEntry(final String mount) throws IOException {
// Refresh the cache
stateStore.loadCache(MountTableStoreImpl.class, true);
GetMountTableEntriesRequest request =
GetMountTableEntriesRequest.newInstance(mount);
RouterClient client = routerContext.getAdminClient();
MountTableManager mountTable = client.getMountTableManager();
List<MountTable> results = getMountTableEntries(mountTable, request);
if (results.size() > 0) {
// First result is sorted to have the shortest mount string length
return results.get(0);
}
return null;
}
private List<MountTable> getMountTableEntries(MountTableManager mountTable)
throws IOException {
GetMountTableEntriesRequest request =
GetMountTableEntriesRequest.newInstance("/");
return getMountTableEntries(mountTable, request);
}
private List<MountTable> getMountTableEntries(MountTableManager mountTable,
GetMountTableEntriesRequest request) throws IOException {
stateStore.loadCache(MountTableStoreImpl.class, true);
GetMountTableEntriesResponse response =
mountTable.getMountTableEntries(request);
return response.getEntries();
}
}