diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd index 2181e4726c9..b9853d622be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd @@ -59,7 +59,7 @@ if "%1" == "--loglevel" ( ) ) - set hdfscommands=dfs namenode secondarynamenode journalnode zkfc datanode dfsadmin haadmin fsck balancer jmxget oiv oev fetchdt getconf groups snapshotDiff lsSnapshottableDir cacheadmin mover storagepolicies classpath crypto debug + set hdfscommands=dfs namenode secondarynamenode journalnode zkfc datanode dfsadmin haadmin fsck balancer jmxget oiv oev fetchdt getconf groups snapshotDiff lsSnapshottableDir cacheadmin mover storagepolicies classpath crypto router debug for %%i in ( %hdfscommands% ) do ( if %hdfs-command% == %%i set hdfscommand=true ) @@ -179,6 +179,11 @@ goto :eof set CLASS=org.apache.hadoop.hdfs.tools.CryptoAdmin goto :eof +:router + set CLASS=org.apache.hadoop.hdfs.server.federation.router.Router + set HADOOP_OPTS=%HADOOP_OPTS% %HADOOP_ROUTER_OPTS% + goto :eof + :debug set CLASS=org.apache.hadoop.hdfs.tools.DebugAdmin goto :eof @@ -219,6 +224,7 @@ goto :eof @echo secondarynamenode run the DFS secondary namenode @echo namenode run the DFS namenode @echo journalnode run the DFS journalnode + @echo router run the DFS router @echo zkfc run the ZK Failover Controller daemon @echo datanode run a DFS datanode @echo dfsadmin run a DFS admin client diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index e4c02c21b92..912307fb8a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -1001,6 +1001,23 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.use.dfs.network.topology"; public static final boolean DFS_USE_DFS_NETWORK_TOPOLOGY_DEFAULT = true; + // HDFS federation + public static final String FEDERATION_PREFIX = "dfs.federation."; + + // HDFS Router-based federation + public static final String FEDERATION_ROUTER_PREFIX = + "dfs.federation.router."; + + // HDFS Router State Store connection + public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS = + FEDERATION_ROUTER_PREFIX + "file.resolver.client.class"; + public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS_DEFAULT = + "org.apache.hadoop.hdfs.server.federation.MockResolver"; + public static final String FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS = + FEDERATION_ROUTER_PREFIX + "namenode.resolver.client.class"; + public static final String FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS_DEFAULT = + "org.apache.hadoop.hdfs.server.federation.MockResolver"; + // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry @Deprecated public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java new file mode 100644 index 00000000000..477053d5d47 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Locates the most active NN for a given nameservice ID or blockpool ID. This + * interface is used by the {@link org.apache.hadoop.hdfs.server.federation. + * router.RouterRpcServer RouterRpcServer} to: + * + * The interface is also used by the {@link org.apache.hadoop.hdfs.server. + * federation.router.NamenodeHeartbeatService NamenodeHeartbeatService} to + * register a discovered NN. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface ActiveNamenodeResolver { + + /** + * Report a successful, active NN address for a nameservice or blockPool. + * + * @param ns Nameservice identifier. + * @param successfulAddress The address the successful responded to the + * command. + * @throws IOException If the state store cannot be accessed. + */ + void updateActiveNamenode( + String ns, InetSocketAddress successfulAddress) throws IOException; + + /** + * Returns a prioritized list of the most recent cached registration entries + * for a single nameservice ID. + * Returns an empty list if none are found. Returns entries in preference of: + * + * + * @param nameserviceId Nameservice identifier. + * @return Prioritized list of namenode contexts. + * @throws IOException If the state store cannot be accessed. + */ + List + getNamenodesForNameserviceId(String nameserviceId) throws IOException; + + /** + * Returns a prioritized list of the most recent cached registration entries + * for a single block pool ID. + * Returns an empty list if none are found. Returns entries in preference of: + * + * + * @param blockPoolId Block pool identifier for the nameservice. + * @return Prioritized list of namenode contexts. + * @throws IOException If the state store cannot be accessed. + */ + List + getNamenodesForBlockPoolId(String blockPoolId) throws IOException; + + /** + * Register a namenode in the State Store. + * + * @param report Namenode status report. + * @return True if the node was registered and successfully committed to the + * data store. + * @throws IOException Throws exception if the namenode could not be + * registered. + */ + boolean registerNamenode(NamenodeStatusReport report) throws IOException; + + /** + * Get a list of all namespaces that are registered and active in the + * federation. + * + * @return List of name spaces in the federation + * @throws Throws exception if the namespace list is not available. + */ + Set getNamespaces() throws IOException; + + /** + * Assign a unique identifier for the parent router service. + * Required to report the status to the namenode resolver. + * + * @param router Unique string identifier for the router. + */ + void setRouterId(String routerId); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FederationNamenodeContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FederationNamenodeContext.java new file mode 100644 index 00000000000..68ef02adcad --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FederationNamenodeContext.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver; + +/** + * Interface for a discovered NN and its current server endpoints. + */ +public interface FederationNamenodeContext { + + /** + * Get the RPC server address of the namenode. + * + * @return RPC server address in the form of host:port. + */ + String getRpcAddress(); + + /** + * Get the Service RPC server address of the namenode. + * + * @return Service RPC server address in the form of host:port. + */ + String getServiceAddress(); + + /** + * Get the Lifeline RPC server address of the namenode. + * + * @return Lifeline RPC server address in the form of host:port. + */ + String getLifelineAddress(); + + /** + * Get the HTTP server address of the namenode. + * + * @return HTTP address in the form of host:port. + */ + String getWebAddress(); + + /** + * Get the unique key representing the namenode. + * + * @return Combination of the nameservice and the namenode IDs. + */ + String getNamenodeKey(); + + /** + * Identifier for the nameservice/namespace. + * + * @return Namenode nameservice identifier. + */ + String getNameserviceId(); + + /** + * Identifier for the namenode. + * + * @return String + */ + String getNamenodeId(); + + /** + * The current state of the namenode (active, standby, etc). + * + * @return FederationNamenodeServiceState State of the namenode. + */ + FederationNamenodeServiceState getState(); + + /** + * The update date. + * + * @return Long with the update date. + */ + long getDateModified(); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FederationNamenodeServiceState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FederationNamenodeServiceState.java new file mode 100644 index 00000000000..c773f820dc9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FederationNamenodeServiceState.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver; + +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; + +/** + * Namenode state in the federation. The order of this enum is used to evaluate + * NN priority for RPC calls. + */ +public enum FederationNamenodeServiceState { + ACTIVE, // HAServiceState.ACTIVE or operational. + STANDBY, // HAServiceState.STANDBY. + UNAVAILABLE, // When the namenode cannot be reached. + EXPIRED; // When the last update is too old. + + public static FederationNamenodeServiceState getState(HAServiceState state) { + switch(state) { + case ACTIVE: + return FederationNamenodeServiceState.ACTIVE; + case STANDBY: + return FederationNamenodeServiceState.STANDBY; + case INITIALIZING: + return FederationNamenodeServiceState.UNAVAILABLE; + case STOPPING: + return FederationNamenodeServiceState.UNAVAILABLE; + default: + return FederationNamenodeServiceState.UNAVAILABLE; + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FederationNamespaceInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FederationNamespaceInfo.java new file mode 100644 index 00000000000..bbaeca36b23 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FederationNamespaceInfo.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver; + +import org.apache.hadoop.hdfs.server.federation.router.RemoteLocationContext; + +/** + * Represents information about a single nameservice/namespace in a federated + * HDFS cluster. + */ +public class FederationNamespaceInfo + implements Comparable, RemoteLocationContext { + + /** Block pool identifier. */ + private String blockPoolId; + /** Cluster identifier. */ + private String clusterId; + /** Nameservice identifier. */ + private String nameserviceId; + + public FederationNamespaceInfo(String bpId, String clId, String nsId) { + this.blockPoolId = bpId; + this.clusterId = clId; + this.nameserviceId = nsId; + } + + /** + * The HDFS nameservice id for this namespace. + * + * @return Nameservice identifier. + */ + public String getNameserviceId() { + return this.nameserviceId; + } + + /** + * The HDFS cluster id for this namespace. + * + * @return Cluster identifier. + */ + public String getClusterId() { + return this.clusterId; + } + + /** + * The HDFS block pool id for this namespace. + * + * @return Block pool identifier. + */ + public String getBlockPoolId() { + return this.blockPoolId; + } + + @Override + public int hashCode() { + return this.nameserviceId.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } else if (obj instanceof FederationNamespaceInfo) { + return this.compareTo((FederationNamespaceInfo) obj) == 0; + } else { + return false; + } + } + + @Override + public int compareTo(FederationNamespaceInfo info) { + return this.nameserviceId.compareTo(info.getNameserviceId()); + } + + @Override + public String toString() { + return this.nameserviceId + "->" + this.blockPoolId + ":" + this.clusterId; + } + + @Override + public String getDest() { + return this.nameserviceId; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FileSubclusterResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FileSubclusterResolver.java new file mode 100644 index 00000000000..af9f4938765 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FileSubclusterResolver.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.resolver; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Interface to map a file path in the global name space to a specific + * subcluster and path in an HDFS name space. + *

+ * Each path in the global/federated namespace may map to 1-N different HDFS + * locations. Each location specifies a single nameservice and a single HDFS + * path. The behavior is similar to MergeFS and Nfly and allows the merger + * of multiple HDFS locations into a single path. See HADOOP-8298 and + * HADOOP-12077 + *

+ * For example, a directory listing will fetch listings for each destination + * path and combine them into a single set of results. + *

+ * When multiple destinations are available for a path, the destinations are + * prioritized in a consistent manner. This allows the proxy server to + * guess the best/most likely destination and attempt it first. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface FileSubclusterResolver { + + /** + * Get the destinations for a global path. Results are from the mount table + * cache. If multiple destinations are available, the first result is the + * highest priority destination. + * + * @param path Global path. + * @return Location in a destination namespace or null if it does not exist. + * @throws IOException Throws exception if the data is not available. + */ + PathLocation getDestinationForPath(String path) throws IOException; + + /** + * Get a list of mount points for a path. Results are from the mount table + * cache. + * + * @return List of mount points present at this path or zero-length list if + * none are found. + * @throws IOException Throws exception if the data is not available. + */ + List getMountPoints(String path) throws IOException; + + /** + * Get the default namespace for the cluster. + * + * @return Default namespace identifier. + */ + String getDefaultNamespace(); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodePriorityComparator.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodePriorityComparator.java new file mode 100644 index 00000000000..fe82f29fbaa --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodePriorityComparator.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver; + +import java.util.Comparator; + +/** + * Compares NNs in the same namespace and prioritizes by their status. The + * priorities are: + *

+ * When two NNs have the same state, the last modification date is the tie + * breaker, newest has priority. Expired NNs are excluded. + */ +public class NamenodePriorityComparator + implements Comparator { + + @Override + public int compare(FederationNamenodeContext o1, + FederationNamenodeContext o2) { + FederationNamenodeServiceState state1 = o1.getState(); + FederationNamenodeServiceState state2 = o2.getState(); + + if (state1 == state2) { + // Both have the same state, use mode dates + return compareModDates(o1, o2); + } else { + // Enum is ordered by priority + return state1.compareTo(state2); + } + } + + /** + * Compare the modification dates. + * + * @param o1 Context 1. + * @param o2 Context 2. + * @return Comparison between dates. + */ + private int compareModDates(FederationNamenodeContext o1, + FederationNamenodeContext o2) { + // Reverse sort, lowest position is highest priority. + return (int) (o2.getDateModified() - o1.getDateModified()); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java new file mode 100644 index 00000000000..9259048f26e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver; + +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; + +/** + * Status of the namenode. + */ +public class NamenodeStatusReport { + + /** Namenode information. */ + private String nameserviceId = ""; + private String namenodeId = ""; + private String clusterId = ""; + private String blockPoolId = ""; + private String rpcAddress = ""; + private String serviceAddress = ""; + private String lifelineAddress = ""; + private String webAddress = ""; + + /** Namenode state. */ + private HAServiceState status = HAServiceState.STANDBY; + private boolean safeMode = false; + + /** If the fields are valid. */ + private boolean registrationValid = false; + private boolean haStateValid = false; + + public NamenodeStatusReport(String ns, String nn, String rpc, String service, + String lifeline, String web) { + this.nameserviceId = ns; + this.namenodeId = nn; + this.rpcAddress = rpc; + this.serviceAddress = service; + this.lifelineAddress = lifeline; + this.webAddress = web; + } + + /** + * If the registration is valid. + * + * @return If the registration is valid. + */ + public boolean registrationValid() { + return this.registrationValid; + } + + /** + * If the HA state is valid. + * + * @return If the HA state is valid. + */ + public boolean haStateValid() { + return this.haStateValid; + } + + /** + * Get the state of the Namenode being monitored. + * + * @return State of the Namenode. + */ + public FederationNamenodeServiceState getState() { + if (!registrationValid) { + return FederationNamenodeServiceState.UNAVAILABLE; + } else if (haStateValid) { + return FederationNamenodeServiceState.getState(status); + } else { + return FederationNamenodeServiceState.ACTIVE; + } + } + + /** + * Get the name service identifier. + * + * @return The name service identifier. + */ + public String getNameserviceId() { + return this.nameserviceId; + } + + /** + * Get the namenode identifier. + * + * @return The namenode identifier. + */ + public String getNamenodeId() { + return this.namenodeId; + } + + /** + * Get the cluster identifier. + * + * @return The cluster identifier. + */ + public String getClusterId() { + return this.clusterId; + } + + /** + * Get the block pool identifier. + * + * @return The block pool identifier. + */ + public String getBlockPoolId() { + return this.blockPoolId; + } + + /** + * Get the RPC address. + * + * @return The RPC address. + */ + public String getRpcAddress() { + return this.rpcAddress; + } + + /** + * Get the Service RPC address. + * + * @return The Service RPC address. + */ + public String getServiceAddress() { + return this.serviceAddress; + } + + /** + * Get the Lifeline RPC address. + * + * @return The Lifeline RPC address. + */ + public String getLifelineAddress() { + return this.lifelineAddress; + } + + /** + * Get the web address. + * + * @return The web address. + */ + public String getWebAddress() { + return this.webAddress; + } + + /** + * Get the HA service state. + * + * @return The HA service state. + */ + public void setHAServiceState(HAServiceState state) { + this.status = state; + this.haStateValid = true; + } + + /** + * Set the namespace information. + * + * @param info Namespace information. + */ + public void setNamespaceInfo(NamespaceInfo info) { + this.clusterId = info.getClusterID(); + this.blockPoolId = info.getBlockPoolID(); + this.registrationValid = true; + } + + public void setSafeMode(boolean safemode) { + this.safeMode = safemode; + } + + public boolean getSafemode() { + return this.safeMode; + } + + @Override + public String toString() { + return String.format("%s-%s:%s", + nameserviceId, namenodeId, serviceAddress); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/PathLocation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/PathLocation.java new file mode 100644 index 00000000000..d90565c21d4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/PathLocation.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver; + +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +/** + * A map of the properties and target destinations (name space + path) for + * a path in the global/federated namespace. + * This data is generated from the @see MountTable records. + */ +public class PathLocation { + + /** Source path in global namespace. */ + private final String sourcePath; + + /** Remote paths in the target namespaces. */ + private final List destinations; + + /** List of name spaces present. */ + private final Set namespaces; + + + /** + * Create a new PathLocation. + * + * @param source Source path in the global name space. + * @param dest Destinations of the mount table entry. + * @param namespaces Unique identifier representing the combination of + * name spaces present in the destination list. + */ + public PathLocation( + String source, List dest, Set nss) { + this.sourcePath = source; + this.destinations = dest; + this.namespaces = nss; + } + + /** + * Create a path location from another path. + * + * @param other Other path location to copy from. + */ + public PathLocation(PathLocation other) { + this.sourcePath = other.sourcePath; + this.destinations = new LinkedList(other.destinations); + this.namespaces = new HashSet(other.namespaces); + } + + /** + * Get the source path in the global namespace for this path location. + * + * @return The path in the global namespace. + */ + public String getSourcePath() { + return this.sourcePath; + } + + /** + * Get the list of subclusters defined for the destinations. + */ + public Set getNamespaces() { + return Collections.unmodifiableSet(this.namespaces); + } + + @Override + public String toString() { + RemoteLocation loc = getDefaultLocation(); + return loc.getNameserviceId() + "->" + loc.getDest(); + } + + /** + * Check if this location supports multiple clusters/paths. + * + * @return If it has multiple destinations. + */ + public boolean hasMultipleDestinations() { + return this.destinations.size() > 1; + } + + /** + * Get the list of locations found in the mount table. + * The first result is the highest priority path. + * + * @return List of remote locations. + */ + public List getDestinations() { + return Collections.unmodifiableList(this.destinations); + } + + /** + * Get the default or highest priority location. + * + * @return The default location. + */ + public RemoteLocation getDefaultLocation() { + if (destinations.isEmpty() || destinations.get(0).getDest() == null) { + throw new UnsupportedOperationException( + "Unsupported path " + sourcePath + " please check mount table"); + } + return destinations.get(0); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/RemoteLocation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/RemoteLocation.java new file mode 100644 index 00000000000..eef136d808a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/RemoteLocation.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver; + +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.hadoop.hdfs.server.federation.router.RemoteLocationContext; + +/** + * A single in a remote namespace consisting of a nameservice ID + * and a HDFS path. + */ +public class RemoteLocation implements RemoteLocationContext { + + /** Identifier of the remote namespace for this location. */ + private String nameserviceId; + /** Path in the remote location. */ + private String path; + + /** + * Create a new remote location. + * + * @param nsId Destination namespace. + * @param pPath Path in the destination namespace. + */ + public RemoteLocation(String nsId, String pPath) { + this.nameserviceId = nsId; + this.path = pPath; + } + + @Override + public String getNameserviceId() { + return this.nameserviceId; + } + + @Override + public String getDest() { + return this.path; + } + + @Override + public String toString() { + return this.nameserviceId + "->" + this.path; + } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 31) + .append(this.nameserviceId) + .append(this.path) + .toHashCode(); + } + + @Override + public boolean equals(Object obj) { + return (obj != null && + obj.getClass() == this.getClass() && + obj.hashCode() == this.hashCode()); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/package-info.java new file mode 100644 index 00000000000..d8be9e37aed --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/package-info.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * The resolver package contains indepedent data resolvers used in HDFS + * federation. The data resolvers collect data from the cluster, including from + * the state store. The resolvers expose APIs used by HDFS federation to collect + * aggregated, cached data for use in Real-time request processing. The + * resolvers are perf-sensitive and are used in the flow of the + * {@link RouterRpcServer} request path. + *

+ * The principal resolvers are: + *

    + *
  • {@link ActiveNamenodeResolver} Real-time interface for locating the most + * recently active NN for a nameservice. + *
  • {@link FileSubclusterResolver} Real-time interface for determining the NN + * and local file path for a given file/folder based on the global namespace + * path. + *
+ */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +package org.apache.hadoop.hdfs.server.federation.resolver; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java new file mode 100644 index 00000000000..6e7e865a0a0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.lang.reflect.Constructor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; + +/** + * Utilities for managing HDFS federation. + */ +public final class FederationUtil { + + private static final Log LOG = LogFactory.getLog(FederationUtil.class); + + private FederationUtil() { + // Utility Class + } + + /** + * Create an instance of an interface with a constructor using a state store + * constructor. + * + * @param conf Configuration + * @param context Context object to pass to the instance. + * @param contextType Type of the context passed to the constructor. + * @param configurationKeyName Configuration key to retrieve the class to load + * @param defaultClassName Default class to load if the configuration key is + * not set + * @param clazz Class/interface that must be implemented by the instance. + * @return New instance of the specified class that implements the desired + * interface and a single parameter constructor containing a + * StateStore reference. + */ + private static T newInstance(final Configuration conf, + final R context, final Class contextClass, + final String configKeyName, final String defaultClassName, + final Class clazz) { + + String className = conf.get(configKeyName, defaultClassName); + try { + Class instance = conf.getClassByName(className); + if (clazz.isAssignableFrom(instance)) { + if (contextClass == null) { + // Default constructor if no context + @SuppressWarnings("unchecked") + Constructor constructor = + (Constructor) instance.getConstructor(); + return constructor.newInstance(); + } else { + // Constructor with context + @SuppressWarnings("unchecked") + Constructor constructor = (Constructor) instance.getConstructor( + Configuration.class, contextClass); + return constructor.newInstance(conf, context); + } + } else { + throw new RuntimeException("Class " + className + " not instance of " + + clazz.getCanonicalName()); + } + } catch (ReflectiveOperationException e) { + LOG.error("Could not instantiate: " + className, e); + return null; + } + } + + /** + * Creates an instance of a FileSubclusterResolver from the configuration. + * + * @param conf Configuration that defines the file resolver class. + * @param obj Context object passed to class constructor. + * @return FileSubclusterResolver + */ + public static FileSubclusterResolver newFileSubclusterResolver( + Configuration conf, StateStoreService stateStore) { + return newInstance(conf, stateStore, StateStoreService.class, + DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS, + DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS_DEFAULT, + FileSubclusterResolver.class); + } + + /** + * Creates an instance of an ActiveNamenodeResolver from the configuration. + * + * @param conf Configuration that defines the namenode resolver class. + * @param obj Context object passed to class constructor. + * @return ActiveNamenodeResolver + */ + public static ActiveNamenodeResolver newActiveNamenodeResolver( + Configuration conf, StateStoreService stateStore) { + return newInstance(conf, stateStore, StateStoreService.class, + DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS, + DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS_DEFAULT, + ActiveNamenodeResolver.class); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java new file mode 100644 index 00000000000..da6066b59d5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +/** + * Interface for objects that are unique to a namespace. + */ +public interface RemoteLocationContext { + + /** + * Returns an identifier for a unique namespace. + * + * @return Namespace identifier. + */ + String getNameserviceId(); + + /** + * Destination in this location. For example the path in a remote namespace. + * + * @return Destination in this location. + */ + String getDest(); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java new file mode 100644 index 00000000000..fe0d02adcd8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java @@ -0,0 +1,263 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.hdfs.server.federation.router.FederationUtil.newActiveNamenodeResolver; +import static org.apache.hadoop.hdfs.server.federation.router.FederationUtil.newFileSubclusterResolver; +import static org.apache.hadoop.util.ExitUtil.terminate; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.util.StringUtils; + +/** + * Router that provides a unified view of multiple federated HDFS clusters. It + * has two main roles: (1) federated interface and (2) NameNode heartbeat. + *

+ * For the federated interface, the Router receives a client request, checks the + * State Store for the correct subcluster, and forwards the request to the + * active Namenode of that subcluster. The reply from the Namenode then flows in + * the opposite direction. The Routers are stateless and can be behind a load + * balancer. HDFS clients connect to the router using the same interfaces as are + * used to communicate with a namenode, namely the ClientProtocol RPC interface + * and the WebHdfs HTTP interface exposed by the router. {@link RouterRpcServer} + * {@link RouterHttpServer} + *

+ * For NameNode heartbeat, the Router periodically checks the state of a + * NameNode (usually on the same server) and reports their high availability + * (HA) state and load/space status to the State Store. Note that this is an + * optional role as a Router can be independent of any subcluster. + * {@link StateStoreService} {@link NamenodeHeartbeatService} + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class Router extends CompositeService { + + private static final Log LOG = LogFactory.getLog(Router.class); + + + /** Configuration for the Router. */ + private Configuration conf; + + /** Router address/identifier. */ + private String routerId; + + /** RPC interface to the client. */ + private RouterRpcServer rpcServer; + + /** Interface with the State Store. */ + private StateStoreService stateStore; + + /** Interface to map global name space to HDFS subcluster name spaces. */ + private FileSubclusterResolver subclusterResolver; + + /** Interface to identify the active NN for a nameservice or blockpool ID. */ + private ActiveNamenodeResolver namenodeResolver; + + + /** Usage string for help message. */ + private static final String USAGE = "Usage: java Router"; + + /** Priority of the Router shutdown hook. */ + public static final int SHUTDOWN_HOOK_PRIORITY = 30; + + + ///////////////////////////////////////////////////////// + // Constructor + ///////////////////////////////////////////////////////// + + public Router() { + super(Router.class.getName()); + } + + ///////////////////////////////////////////////////////// + // Service management + ///////////////////////////////////////////////////////// + + @Override + protected void serviceInit(Configuration configuration) throws Exception { + this.conf = configuration; + + // TODO Interface to the State Store + this.stateStore = null; + + // Resolver to track active NNs + this.namenodeResolver = newActiveNamenodeResolver( + this.conf, this.stateStore); + if (this.namenodeResolver == null) { + throw new IOException("Cannot find namenode resolver."); + } + + // Lookup interface to map between the global and subcluster name spaces + this.subclusterResolver = newFileSubclusterResolver( + this.conf, this.stateStore); + if (this.subclusterResolver == null) { + throw new IOException("Cannot find subcluster resolver"); + } + + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + + super.serviceStop(); + } + + /** + * Shutdown the router. + */ + public void shutDown() { + new Thread() { + @Override + public void run() { + Router.this.stop(); + } + }.start(); + } + + /** + * Main run loop for the router. + * + * @param argv parameters. + */ + public static void main(String[] argv) { + if (DFSUtil.parseHelpArgument(argv, Router.USAGE, System.out, true)) { + System.exit(0); + } + + try { + StringUtils.startupShutdownMessage(Router.class, argv, LOG); + + Router router = new Router(); + + ShutdownHookManager.get().addShutdownHook( + new CompositeServiceShutdownHook(router), SHUTDOWN_HOOK_PRIORITY); + + Configuration conf = new HdfsConfiguration(); + router.init(conf); + router.start(); + } catch (Throwable e) { + LOG.error("Failed to start router.", e); + terminate(1, e); + } + } + + ///////////////////////////////////////////////////////// + // RPC Server + ///////////////////////////////////////////////////////// + + /** + * Create a new Router RPC server to proxy ClientProtocol requests. + * + * @return RouterRpcServer + * @throws IOException If the router RPC server was not started. + */ + protected RouterRpcServer createRpcServer() throws IOException { + return new RouterRpcServer(this.conf, this, this.getNamenodeResolver(), + this.getSubclusterResolver()); + } + + /** + * Get the Router RPC server. + * + * @return Router RPC server. + */ + public RouterRpcServer getRpcServer() { + return this.rpcServer; + } + + ///////////////////////////////////////////////////////// + // Submodule getters + ///////////////////////////////////////////////////////// + + /** + * Get the State Store service. + * + * @return State Store service. + */ + public StateStoreService getStateStore() { + return this.stateStore; + } + + /** + * Get the subcluster resolver for files. + * + * @return Subcluster resolver for files. + */ + public FileSubclusterResolver getSubclusterResolver() { + return this.subclusterResolver; + } + + /** + * Get the namenode resolver for a subcluster. + * + * @return The namenode resolver for a subcluster. + */ + public ActiveNamenodeResolver getNamenodeResolver() { + return this.namenodeResolver; + } + + ///////////////////////////////////////////////////////// + // Router info + ///////////////////////////////////////////////////////// + + /** + * Unique ID for the router, typically the hostname:port string for the + * router's RPC server. This ID may be null on router startup before the RPC + * server has bound to a port. + * + * @return Router identifier. + */ + public String getRouterId() { + return this.routerId; + } + + /** + * Sets a unique ID for this router. + * + * @param router Identifier of the Router. + */ + public void setRouterId(String id) { + this.routerId = id; + if (this.stateStore != null) { + this.stateStore.setIdentifier(this.routerId); + } + if (this.namenodeResolver != null) { + this.namenodeResolver.setRouterId(this.routerId); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java new file mode 100644 index 00000000000..24792bb0da5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; +import org.apache.hadoop.ipc.RPC.Server; +import org.apache.hadoop.service.AbstractService; + +import com.google.common.annotations.VisibleForTesting; + +/** + * This class is responsible for handling all of the RPC calls to the It is + * created, started, and stopped by {@link Router}. It implements the + * {@link ClientProtocol} to mimic a + * {@link org.apache.hadoop.hdfs.server.namenode.NameNode NameNode} and proxies + * the requests to the active + * {@link org.apache.hadoop.hdfs.server.namenode.NameNode NameNode}. + */ +public class RouterRpcServer extends AbstractService { + + /** The RPC server that listens to requests from clients. */ + private final Server rpcServer; + + /** + * Construct a router RPC server. + * + * @param configuration HDFS Configuration. + * @param nnResolver The NN resolver instance to determine active NNs in HA. + * @param fileResolver File resolver to resolve file paths to subclusters. + * @throws IOException If the RPC server could not be created. + */ + public RouterRpcServer(Configuration configuration, Router router, + ActiveNamenodeResolver nnResolver, FileSubclusterResolver fileResolver) + throws IOException { + super(RouterRpcServer.class.getName()); + + this.rpcServer = null; + } + + /** + * Allow access to the client RPC server for testing. + * + * @return The RPC server. + */ + @VisibleForTesting + public Server getServer() { + return this.rpcServer; + } + + @Override + protected void serviceInit(Configuration configuration) throws Exception { + super.serviceInit(configuration); + } + + /** + * Start client and service RPC servers. + */ + @Override + protected void serviceStart() throws Exception { + if (this.rpcServer != null) { + this.rpcServer.start(); + } + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if (this.rpcServer != null) { + this.rpcServer.stop(); + } + super.serviceStop(); + } + + /** + * Wait until the RPC servers have shutdown. + */ + void join() throws InterruptedException { + if (this.rpcServer != null) { + this.rpcServer.join(); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/package-info.java new file mode 100644 index 00000000000..327f39bb995 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/package-info.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * The router package includes the core services for a HDFS federation router. + * The {@link Router} acts as a transparent proxy in front of a cluster of + * multiple NameNodes and nameservices. The {@link RouterRpcServer} exposes the + * NameNode clientProtocol and is the primary contact point for DFS clients in a + * federated cluster. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java new file mode 100644 index 00000000000..866daa30a2c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.store; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.service.CompositeService; + +/** + * A service to initialize a + * {@link org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver + * StateStoreDriver} and maintain the connection to the data store. There + * are multiple state store driver connections supported: + *

    + *
  • File {@link org.apache.hadoop.hdfs.server.federation.store.driver.impl. + * StateStoreFileImpl StateStoreFileImpl} + *
  • ZooKeeper {@link org.apache.hadoop.hdfs.server.federation.store.driver. + * impl.StateStoreZooKeeperImpl StateStoreZooKeeperImpl} + *
+ *

+ * The service also supports the dynamic registration of data interfaces such as + * the following: + *

    + *
  • {@link MembershipStateStore}: state of the Namenodes in the + * federation. + *
  • {@link MountTableStore}: Mount table between to subclusters. + * See {@link org.apache.hadoop.fs.viewfs.ViewFs ViewFs}. + *
  • {@link RouterStateStore}: State of the routers in the federation. + *
+ */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class StateStoreService extends CompositeService { + + /** Identifier for the service. */ + private String identifier; + + // Stub class + public StateStoreService(String name) { + super(name); + } + + /** + * Fetch a unique identifier for this state store instance. Typically it is + * the address of the router. + * + * @return Unique identifier for this store. + */ + public String getIdentifier() { + return this.identifier; + } + + /** + * Set a unique synchronization identifier for this store. + * + * @param id Unique identifier, typically the router's RPC address. + */ + public void setIdentifier(String id) { + this.identifier = id; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/package-info.java new file mode 100644 index 00000000000..949ec7c93e1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/package-info.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * The federation state store tracks persistent values that are shared between + * multiple routers. + *

+ * Data is stored in data records that inherit from a common class. Data records + * are serialized when written to the data store using a modular serialization + * implementation. The default is profobuf serialization. Data is stored as rows + * of records of the same type with each data member in a record representing a + * column. + *

+ * The state store uses a modular data storage + * {@link org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver + * StateStoreDriver} to handle querying, updating and deleting data records. The + * data storage driver is initialized and maintained by the + * {@link org.apache.hadoop.hdfs.server.federation.store. + * StateStoreService FederationStateStoreService}. The state store + * supports fetching all records of a type, filtering by column values or + * fetching a single record by its primary key. + *

+ * The state store contains several API interfaces, one for each data records + * type. + *

+ *

    + *
  • FederationMembershipStateStore: state of all Namenodes in the federation. + * Uses the MembershipState record. + *
  • FederationMountTableStore: Mount table mapping paths in the global + * namespace to individual subcluster paths. Uses the MountTable record. + *
  • RouterStateStore: State of all routers in the federation. Uses the + * RouterState record. + *
+ *

+ * Each API is defined in a separate interface. The implementations of these + * interfaces are responsible for accessing the + * {@link org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver + * StateStoreDriver} to query, update and delete data records. + */ + +@InterfaceAudience.Private +@InterfaceStability.Evolving + +package org.apache.hadoop.hdfs.server.federation.store; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 15fd43487e2..20a096062b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4354,4 +4354,20 @@ + + dfs.federation.router.file.resolver.client.class + org.apache.hadoop.hdfs.server.federation.MockResolver + + Class to resolve files to subclusters. + + + + + dfs.federation.router.namenode.resolver.client.class + org.apache.hadoop.hdfs.server.federation.MockResolver + + Class to resolve the namenode for a subcluster. + + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java new file mode 100644 index 00000000000..867468274af --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.BufferedReader; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Date; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.security.AccessControlException; + +/** + * Helper utilities for testing HDFS Federation. + */ +public final class FederationTestUtils { + + public final static String NAMESERVICE1 = "ns0"; + public final static String NAMESERVICE2 = "ns1"; + public final static String NAMENODE1 = "nn0"; + public final static String NAMENODE2 = "nn1"; + public final static String NAMENODE3 = "nn2"; + public final static String NAMENODE4 = "nn3"; + public final static String ROUTER1 = "router0"; + public final static String ROUTER2 = "router1"; + public final static String ROUTER3 = "router2"; + public final static String ROUTER4 = "router3"; + public final static long BLOCK_SIZE_BYTES = 134217728; + + private FederationTestUtils() { + // Utility class + } + + public static void verifyException(Object obj, String methodName, + Class exceptionClass, Class[] parameterTypes, + Object[] arguments) { + + Throwable triggeredException = null; + try { + Method m = obj.getClass().getMethod(methodName, parameterTypes); + m.invoke(obj, arguments); + } catch (InvocationTargetException ex) { + triggeredException = ex.getTargetException(); + } catch (Exception e) { + triggeredException = e; + } + if (exceptionClass != null) { + assertNotNull("No exception was triggered, expected exception - " + + exceptionClass.getName(), triggeredException); + assertEquals(exceptionClass, triggeredException.getClass()); + } else { + assertNull("Exception was triggered but no exception was expected", + triggeredException); + } + } + + public static NamenodeStatusReport createNamenodeReport(String ns, String nn, + HAServiceState state) { + Random rand = new Random(); + NamenodeStatusReport report = new NamenodeStatusReport(ns, nn, + "localhost:" + rand.nextInt(10000), "localhost:" + rand.nextInt(10000), + "localhost:" + rand.nextInt(10000), "testwebaddress-" + ns + nn); + if (state == null) { + // Unavailable, no additional info + return report; + } + report.setHAServiceState(state); + report.setNamespaceInfo(new NamespaceInfo(1, "tesclusterid", ns, 0, + "testbuildvesion", "testsoftwareversion")); + return report; + } + + public static void waitNamenodeRegistered(ActiveNamenodeResolver resolver, + String nameserviceId, String namenodeId, + FederationNamenodeServiceState finalState) + throws InterruptedException, IllegalStateException, IOException { + + for (int loopCount = 0; loopCount < 20; loopCount++) { + + if (loopCount > 0) { + Thread.sleep(1000); + } + + List namenodes; + namenodes = + resolver.getNamenodesForNameserviceId(nameserviceId); + for (FederationNamenodeContext namenode : namenodes) { + if (namenodeId != null + && !namenode.getNamenodeId().equals(namenodeId)) { + // Keep looking + continue; + } + if (finalState != null && !namenode.getState().equals(finalState)) { + // Wrong state, wait a bit more + break; + } + // Found + return; + } + } + assertTrue("Failed to verify state store registration for state - " + + finalState + " - " + " - " + nameserviceId + " - ", false); + } + + public static boolean verifyDate(Date d1, Date d2, long precision) { + if (Math.abs(d1.getTime() - d2.getTime()) < precision) { + return true; + } + return false; + } + + public static boolean addDirectory(FileSystem context, String path) + throws IOException { + context.mkdirs(new Path(path), new FsPermission("777")); + return verifyFileExists(context, path); + } + + public static FileStatus getFileStatus(FileSystem context, String path) + throws IOException { + return context.getFileStatus(new Path(path)); + } + + public static boolean verifyFileExists(FileSystem context, String path) { + try { + FileStatus status = getFileStatus(context, path); + if (status != null) { + return true; + } + } catch (Exception e) { + return false; + } + + return false; + } + + public static boolean checkForFileInDirectory(FileSystem context, + String testPath, String targetFile) throws AccessControlException, + FileNotFoundException, + UnsupportedFileSystemException, IllegalArgumentException, + IOException { + FileStatus[] fileStatus = context.listStatus(new Path(testPath)); + String file = null; + String verifyPath = testPath + "/" + targetFile; + if (testPath.equals("/")) { + verifyPath = testPath + targetFile; + } + + Boolean found = false; + for (int i = 0; i < fileStatus.length; i++) { + FileStatus f = fileStatus[i]; + file = Path.getPathWithoutSchemeAndAuthority(f.getPath()).toString(); + if (file.equals(verifyPath)) { + found = true; + } + } + return found; + } + + public static int countContents(FileSystem context, String testPath) + throws IOException { + FileStatus[] fileStatus = context.listStatus(new Path(testPath)); + return fileStatus.length; + } + + public static void createFile(FileSystem fs, String path, long length) + throws IOException { + FsPermission permissions = new FsPermission("700"); + FSDataOutputStream writeStream = fs.create(new Path(path), permissions, + true, 1000, (short) 1, BLOCK_SIZE_BYTES, null); + for (int i = 0; i < length; i++) { + writeStream.write(i); + } + writeStream.close(); + } + + public static String readFile(FileSystem fs, String path) throws IOException { + // Read the file from the filesystem via the active namenode + Path fileName = new Path(path); + InputStreamReader reader = new InputStreamReader(fs.open(fileName)); + BufferedReader bufferedReader = new BufferedReader(reader); + StringBuilder data = new StringBuilder(); + String line; + + while ((line = bufferedReader.readLine()) != null) { + data.append(line); + } + + bufferedReader.close(); + reader.close(); + return data.toString(); + } + + public static boolean deleteFile(FileSystem fs, String path) + throws IOException { + return fs.delete(new Path(path), true); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java new file mode 100644 index 00000000000..ee6f57dda8e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java @@ -0,0 +1,290 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; +import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.NamenodePriorityComparator; +import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport; +import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.util.Time; + +/** + * In-memory cache/mock of a namenode and file resolver. Stores the most + * recently updated NN information for each nameservice and block pool. Also + * stores a virtual mount table for resolving global namespace paths to local NN + * paths. + */ +public class MockResolver + implements ActiveNamenodeResolver, FileSubclusterResolver { + + private Map> resolver = + new HashMap>(); + private Map> locations = + new HashMap>(); + private Set namespaces = + new HashSet(); + private String defaultNamespace = null; + + public MockResolver(Configuration conf, StateStoreService store) { + this.cleanRegistrations(); + } + + public void addLocation(String mount, String nameservice, String location) { + RemoteLocation remoteLocation = new RemoteLocation(nameservice, location); + List locationsList = locations.get(mount); + if (locationsList == null) { + locationsList = new LinkedList(); + locations.put(mount, locationsList); + } + if (!locationsList.contains(remoteLocation)) { + locationsList.add(remoteLocation); + } + + if (this.defaultNamespace == null) { + this.defaultNamespace = nameservice; + } + } + + public synchronized void cleanRegistrations() { + this.resolver = + new HashMap>(); + this.namespaces = new HashSet(); + } + + @Override + public void updateActiveNamenode( + String ns, InetSocketAddress successfulAddress) { + + String address = successfulAddress.getHostName() + ":" + + successfulAddress.getPort(); + String key = ns; + if (key != null) { + // Update the active entry + @SuppressWarnings("unchecked") + List iterator = + (List) resolver.get(key); + for (FederationNamenodeContext namenode : iterator) { + if (namenode.getRpcAddress().equals(address)) { + MockNamenodeContext nn = (MockNamenodeContext) namenode; + nn.setState(FederationNamenodeServiceState.ACTIVE); + break; + } + } + Collections.sort(iterator, new NamenodePriorityComparator()); + } + } + + @Override + public List + getNamenodesForNameserviceId(String nameserviceId) { + return resolver.get(nameserviceId); + } + + @Override + public List getNamenodesForBlockPoolId( + String blockPoolId) { + return resolver.get(blockPoolId); + } + + private static class MockNamenodeContext + implements FederationNamenodeContext { + private String webAddress; + private String rpcAddress; + private String serviceAddress; + private String lifelineAddress; + private String namenodeId; + private String nameserviceId; + private FederationNamenodeServiceState state; + private long dateModified; + + MockNamenodeContext( + String rpc, String service, String lifeline, String web, + String ns, String nn, FederationNamenodeServiceState state) { + this.rpcAddress = rpc; + this.serviceAddress = service; + this.lifelineAddress = lifeline; + this.webAddress = web; + this.namenodeId = nn; + this.nameserviceId = ns; + this.state = state; + this.dateModified = Time.now(); + } + + public void setState(FederationNamenodeServiceState newState) { + this.state = newState; + this.dateModified = Time.now(); + } + + @Override + public String getRpcAddress() { + return rpcAddress; + } + + @Override + public String getServiceAddress() { + return serviceAddress; + } + + @Override + public String getLifelineAddress() { + return lifelineAddress; + } + + @Override + public String getWebAddress() { + return webAddress; + } + + @Override + public String getNamenodeKey() { + return nameserviceId + " " + namenodeId + " " + rpcAddress; + } + + @Override + public String getNameserviceId() { + return nameserviceId; + } + + @Override + public String getNamenodeId() { + return namenodeId; + } + + @Override + public FederationNamenodeServiceState getState() { + return state; + } + + @Override + public long getDateModified() { + return dateModified; + } + } + + @Override + public synchronized boolean registerNamenode(NamenodeStatusReport report) + throws IOException { + MockNamenodeContext context = new MockNamenodeContext( + report.getRpcAddress(), report.getServiceAddress(), + report.getLifelineAddress(), report.getWebAddress(), + report.getNameserviceId(), report.getNamenodeId(), report.getState()); + + String nsId = report.getNameserviceId(); + String bpId = report.getBlockPoolId(); + String cId = report.getClusterId(); + @SuppressWarnings("unchecked") + List existingItems = + (List) resolver.get(nsId); + if (existingItems == null) { + existingItems = new ArrayList(); + resolver.put(bpId, existingItems); + resolver.put(nsId, existingItems); + } + boolean added = false; + for (int i=0; i getNamespaces() throws IOException { + return this.namespaces; + } + + @Override + public PathLocation getDestinationForPath(String path) throws IOException { + String finalPath = null; + String nameservice = null; + Set namespaceSet = new HashSet(); + LinkedList remoteLocations = + new LinkedList(); + for(String key : this.locations.keySet()) { + if(path.startsWith(key)) { + for (RemoteLocation location : this.locations.get(key)) { + finalPath = location.getDest() + path.substring(key.length()); + nameservice = location.getNameserviceId(); + RemoteLocation remoteLocation = + new RemoteLocation(nameservice, finalPath); + remoteLocations.add(remoteLocation); + namespaceSet.add(nameservice); + } + break; + } + } + if (remoteLocations.isEmpty()) { + // Path isn't supported, mimic resolver behavior. + return null; + } + return new PathLocation(path, remoteLocations, namespaceSet); + } + + @Override + public List getMountPoints(String path) throws IOException { + List mounts = new ArrayList(); + if (path.equals("/")) { + // Mounts only supported under root level + for (String mount : this.locations.keySet()) { + if (mount.length() > 1) { + // Remove leading slash, this is the behavior of the mount tree, + // return only names. + mounts.add(mount.replace("/", "")); + } + } + } + return mounts; + } + + @Override + public void setRouterId(String router) { + } + + @Override + public String getDefaultNamespace() { + return defaultNamespace; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java new file mode 100644 index 00000000000..16d624cc7e1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation; + +import org.apache.hadoop.conf.Configuration; + +/** + * Constructs a router configuration with individual features enabled/disabled. + */ +public class RouterConfigBuilder { + + private Configuration conf; + + public RouterConfigBuilder(Configuration configuration) { + this.conf = configuration; + } + + public RouterConfigBuilder() { + this.conf = new Configuration(); + } + + public Configuration build() { + return conf; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java new file mode 100644 index 00000000000..55d04ada23f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java @@ -0,0 +1,767 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf; +import org.apache.hadoop.hdfs.MiniDFSNNTopology.NSConf; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.Service.STATE; + +/** + * Test utility to mimic a federated HDFS cluster with a router. + */ +public class RouterDFSCluster { + /** + * Router context. + */ + public class RouterContext { + private Router router; + private FileContext fileContext; + private String nameserviceId; + private String namenodeId; + private int rpcPort; + private DFSClient client; + private Configuration conf; + private URI fileSystemUri; + + public RouterContext(Configuration conf, String ns, String nn) + throws URISyntaxException { + this.namenodeId = nn; + this.nameserviceId = ns; + this.conf = conf; + router = new Router(); + router.init(conf); + } + + public Router getRouter() { + return this.router; + } + + public String getNameserviceId() { + return this.nameserviceId; + } + + public String getNamenodeId() { + return this.namenodeId; + } + + public int getRpcPort() { + return this.rpcPort; + } + + public FileContext getFileContext() { + return this.fileContext; + } + + public void initRouter() throws URISyntaxException { + } + + public DistributedFileSystem getFileSystem() throws IOException { + DistributedFileSystem fs = + (DistributedFileSystem) DistributedFileSystem.get(conf); + return fs; + } + + public DFSClient getClient(UserGroupInformation user) + throws IOException, URISyntaxException, InterruptedException { + + LOG.info("Connecting to router at " + fileSystemUri); + return user.doAs(new PrivilegedExceptionAction() { + @Override + public DFSClient run() throws IOException { + return new DFSClient(fileSystemUri, conf); + } + }); + } + + public DFSClient getClient() throws IOException, URISyntaxException { + + if (client == null) { + LOG.info("Connecting to router at " + fileSystemUri); + client = new DFSClient(fileSystemUri, conf); + } + return client; + } + } + + /** + * Namenode context. + */ + public class NamenodeContext { + private NameNode namenode; + private String nameserviceId; + private String namenodeId; + private FileContext fileContext; + private int rpcPort; + private int servicePort; + private int lifelinePort; + private int httpPort; + private URI fileSystemUri; + private int index; + private Configuration conf; + private DFSClient client; + + public NamenodeContext(Configuration conf, String ns, String nn, + int index) { + this.conf = conf; + this.namenodeId = nn; + this.nameserviceId = ns; + this.index = index; + } + + public NameNode getNamenode() { + return this.namenode; + } + + public String getNameserviceId() { + return this.nameserviceId; + } + + public String getNamenodeId() { + return this.namenodeId; + } + + public FileContext getFileContext() { + return this.fileContext; + } + + public void setNamenode(NameNode n) throws URISyntaxException { + namenode = n; + + // Store the bound ports and override the default FS with the local NN's + // RPC + rpcPort = n.getNameNodeAddress().getPort(); + servicePort = n.getServiceRpcAddress().getPort(); + lifelinePort = n.getServiceRpcAddress().getPort(); + httpPort = n.getHttpAddress().getPort(); + fileSystemUri = new URI("hdfs://" + namenode.getHostAndPort()); + DistributedFileSystem.setDefaultUri(conf, fileSystemUri); + + try { + this.fileContext = FileContext.getFileContext(conf); + } catch (UnsupportedFileSystemException e) { + this.fileContext = null; + } + } + + public String getRpcAddress() { + return namenode.getNameNodeAddress().getHostName() + ":" + rpcPort; + } + + public String getServiceAddress() { + return namenode.getServiceRpcAddress().getHostName() + ":" + servicePort; + } + + public String getLifelineAddress() { + return namenode.getServiceRpcAddress().getHostName() + ":" + lifelinePort; + } + + public String getHttpAddress() { + return namenode.getHttpAddress().getHostName() + ":" + httpPort; + } + + public DistributedFileSystem getFileSystem() throws IOException { + DistributedFileSystem fs = + (DistributedFileSystem) DistributedFileSystem.get(conf); + return fs; + } + + public void resetClient() { + client = null; + } + + public DFSClient getClient(UserGroupInformation user) + throws IOException, URISyntaxException, InterruptedException { + + LOG.info("Connecting to namenode at " + fileSystemUri); + return user.doAs(new PrivilegedExceptionAction() { + @Override + public DFSClient run() throws IOException { + return new DFSClient(fileSystemUri, conf); + } + }); + } + + public DFSClient getClient() throws IOException, URISyntaxException { + if (client == null) { + LOG.info("Connecting to namenode at " + fileSystemUri); + client = new DFSClient(fileSystemUri, conf); + } + return client; + } + + public String getConfSuffix() { + String suffix = nameserviceId; + if (highAvailability) { + suffix += "." + namenodeId; + } + return suffix; + } + } + + public static final String NAMENODE1 = "nn0"; + public static final String NAMENODE2 = "nn1"; + public static final String NAMENODE3 = "nn2"; + public static final String TEST_STRING = "teststring"; + public static final String TEST_DIR = "testdir"; + public static final String TEST_FILE = "testfile"; + + private List nameservices; + private List routers; + private List namenodes; + private static final Log LOG = LogFactory.getLog(RouterDFSCluster.class); + private MiniDFSCluster cluster; + private boolean highAvailability; + + protected static final int DEFAULT_HEARTBEAT_INTERVAL = 5; + protected static final int DEFAULT_CACHE_INTERVAL_SEC = 5; + private Configuration routerOverrides; + private Configuration namenodeOverrides; + + private static final String NAMENODES = NAMENODE1 + "," + NAMENODE2; + + public RouterDFSCluster(boolean ha, int numNameservices) { + this(ha, numNameservices, 2); + } + + public RouterDFSCluster(boolean ha, int numNameservices, int numNamenodes) { + this.highAvailability = ha; + configureNameservices(numNameservices, numNamenodes); + } + + public void addRouterOverrides(Configuration conf) { + if (this.routerOverrides == null) { + this.routerOverrides = conf; + } else { + this.routerOverrides.addResource(conf); + } + } + + public void addNamenodeOverrides(Configuration conf) { + if (this.namenodeOverrides == null) { + this.namenodeOverrides = conf; + } else { + this.namenodeOverrides.addResource(conf); + } + } + + public Configuration generateNamenodeConfiguration( + String defaultNameserviceId) { + Configuration c = new HdfsConfiguration(); + + c.set(DFSConfigKeys.DFS_NAMESERVICES, getNameservicesKey()); + c.set("fs.defaultFS", "hdfs://" + defaultNameserviceId); + + for (String ns : nameservices) { + if (highAvailability) { + c.set(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, NAMENODES); + } + + for (NamenodeContext context : getNamenodes(ns)) { + String suffix = context.getConfSuffix(); + + c.set(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + suffix, + "127.0.0.1:" + context.rpcPort); + c.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY + "." + suffix, + "127.0.0.1:" + context.httpPort); + c.set(DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY + "." + suffix, + "0.0.0.0"); + } + } + + if (namenodeOverrides != null) { + c.addResource(namenodeOverrides); + } + return c; + } + + public Configuration generateClientConfiguration() { + Configuration conf = new HdfsConfiguration(); + conf.addResource(generateNamenodeConfiguration(getNameservices().get(0))); + return conf; + } + + public Configuration generateRouterConfiguration(String localNameserviceId, + String localNamenodeId) throws IOException { + Configuration conf = new HdfsConfiguration(); + conf.addResource(generateNamenodeConfiguration(localNameserviceId)); + + // Use mock resolver classes + conf.set(DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS, + MockResolver.class.getCanonicalName()); + conf.set(DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS, + MockResolver.class.getCanonicalName()); + + // Set the nameservice ID for the default NN monitor + conf.set(DFSConfigKeys.DFS_NAMESERVICE_ID, localNameserviceId); + + if (localNamenodeId != null) { + conf.set(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY, localNamenodeId); + } + + StringBuilder routerBuilder = new StringBuilder(); + for (String ns : nameservices) { + for (NamenodeContext context : getNamenodes(ns)) { + String suffix = context.getConfSuffix(); + + if (routerBuilder.length() != 0) { + routerBuilder.append(","); + } + routerBuilder.append(suffix); + } + } + + return conf; + } + + public void configureNameservices(int numNameservices, int numNamenodes) { + nameservices = new ArrayList(); + for (int i = 0; i < numNameservices; i++) { + nameservices.add("ns" + i); + } + namenodes = new ArrayList(); + int index = 0; + for (String ns : nameservices) { + Configuration nnConf = generateNamenodeConfiguration(ns); + if (highAvailability) { + NamenodeContext context = + new NamenodeContext(nnConf, ns, NAMENODE1, index); + namenodes.add(context); + index++; + + if (numNamenodes > 1) { + context = new NamenodeContext(nnConf, ns, NAMENODE2, index + 1); + namenodes.add(context); + index++; + } + + if (numNamenodes > 2) { + context = new NamenodeContext(nnConf, ns, NAMENODE3, index + 1); + namenodes.add(context); + index++; + } + + } else { + NamenodeContext context = new NamenodeContext(nnConf, ns, null, index); + namenodes.add(context); + index++; + } + } + } + + public String getNameservicesKey() { + StringBuilder ns = new StringBuilder(); + for (int i = 0; i < nameservices.size(); i++) { + if (i > 0) { + ns.append(","); + } + ns.append(nameservices.get(i)); + } + return ns.toString(); + } + + public String getRandomNameservice() { + Random r = new Random(); + return nameservices.get(r.nextInt(nameservices.size())); + } + + public List getNameservices() { + return nameservices; + } + + public List getNamenodes(String nameservice) { + ArrayList nns = new ArrayList(); + for (NamenodeContext c : namenodes) { + if (c.nameserviceId.equals(nameservice)) { + nns.add(c); + } + } + return nns; + } + + public NamenodeContext getRandomNamenode() { + Random rand = new Random(); + return namenodes.get(rand.nextInt(namenodes.size())); + } + + public List getNamenodes() { + return namenodes; + } + + public boolean isHighAvailability() { + return highAvailability; + } + + public NamenodeContext getNamenode(String nameservice, + String namenode) { + for (NamenodeContext c : namenodes) { + if (c.nameserviceId.equals(nameservice)) { + if (namenode == null || c.namenodeId == null || namenode.isEmpty() + || c.namenodeId.isEmpty()) { + return c; + } else if (c.namenodeId.equals(namenode)) { + return c; + } + } + } + return null; + } + + public List getRouters(String nameservice) { + ArrayList nns = new ArrayList(); + for (RouterContext c : routers) { + if (c.nameserviceId.equals(nameservice)) { + nns.add(c); + } + } + return nns; + } + + public RouterContext getRouterContext(String nameservice, + String namenode) { + for (RouterContext c : routers) { + if (namenode == null) { + return c; + } + if (c.namenodeId.equals(namenode) + && c.nameserviceId.equals(nameservice)) { + return c; + } + } + return null; + } + + public RouterContext getRandomRouter() { + Random rand = new Random(); + return routers.get(rand.nextInt(routers.size())); + } + + public List getRouters() { + return routers; + } + + public RouterContext buildRouter(String nameservice, String namenode) + throws URISyntaxException, IOException { + Configuration config = generateRouterConfiguration(nameservice, namenode); + RouterContext rc = new RouterContext(config, nameservice, namenode); + return rc; + } + + public void startCluster() { + startCluster(null); + } + + public void startCluster(Configuration overrideConf) { + try { + MiniDFSNNTopology topology = new MiniDFSNNTopology(); + for (String ns : nameservices) { + + NSConf conf = new MiniDFSNNTopology.NSConf(ns); + if (highAvailability) { + for(int i = 0; i < namenodes.size()/nameservices.size(); i++) { + NNConf nnConf = new MiniDFSNNTopology.NNConf("nn" + i); + conf.addNN(nnConf); + } + } else { + NNConf nnConf = new MiniDFSNNTopology.NNConf(null); + conf.addNN(nnConf); + } + topology.addNameservice(conf); + } + topology.setFederation(true); + + // Start mini DFS cluster + Configuration nnConf = generateNamenodeConfiguration(nameservices.get(0)); + if (overrideConf != null) { + nnConf.addResource(overrideConf); + } + cluster = new MiniDFSCluster.Builder(nnConf).nnTopology(topology).build(); + cluster.waitActive(); + + // Store NN pointers + for (int i = 0; i < namenodes.size(); i++) { + NameNode nn = cluster.getNameNode(i); + namenodes.get(i).setNamenode(nn); + } + + } catch (Exception e) { + LOG.error("Cannot start Router DFS cluster: " + e.getMessage(), e); + cluster.shutdown(); + } + } + + public void startRouters() + throws InterruptedException, URISyntaxException, IOException { + // Create routers + routers = new ArrayList(); + for (String ns : nameservices) { + + for (NamenodeContext context : getNamenodes(ns)) { + routers.add(buildRouter(ns, context.namenodeId)); + } + } + + // Start all routers + for (RouterContext router : routers) { + router.router.start(); + } + // Wait until all routers are active and record their ports + for (RouterContext router : routers) { + waitActive(router); + router.initRouter(); + } + } + + public void waitActive(NamenodeContext nn) throws IOException { + cluster.waitActive(nn.index); + } + + public void waitActive(RouterContext router) + throws InterruptedException { + for (int loopCount = 0; loopCount < 20; loopCount++) { + // Validate connection of routers to NNs + if (router.router.getServiceState() == STATE.STARTED) { + return; + } + Thread.sleep(1000); + } + assertFalse( + "Timeout waiting for " + router.router.toString() + " to activate.", + true); + } + + + public void registerNamenodes() throws IOException { + for (RouterContext r : routers) { + ActiveNamenodeResolver resolver = r.router.getNamenodeResolver(); + for (NamenodeContext nn : namenodes) { + // Generate a report + NamenodeStatusReport report = new NamenodeStatusReport(nn.nameserviceId, + nn.namenodeId, nn.getRpcAddress(), nn.getServiceAddress(), + nn.getLifelineAddress(), nn.getHttpAddress()); + report.setNamespaceInfo(nn.namenode.getNamesystem().getFSImage() + .getStorage().getNamespaceInfo()); + + // Determine HA state from nn public state string + String nnState = nn.namenode.getState(); + HAServiceState haState = HAServiceState.ACTIVE; + for (HAServiceState state : HAServiceState.values()) { + if (nnState.equalsIgnoreCase(state.name())) { + haState = state; + break; + } + } + report.setHAServiceState(haState); + + // Register with the resolver + resolver.registerNamenode(report); + } + } + } + + public void waitNamenodeRegistration() + throws InterruptedException, IllegalStateException, IOException { + for (RouterContext r : routers) { + for (NamenodeContext nn : namenodes) { + FederationTestUtils.waitNamenodeRegistered( + r.router.getNamenodeResolver(), nn.nameserviceId, nn.namenodeId, + null); + } + } + } + + public void waitRouterRegistrationQuorum(RouterContext router, + FederationNamenodeServiceState state, String nameservice, String namenode) + throws InterruptedException, IOException { + LOG.info("Waiting for NN - " + nameservice + ":" + namenode + + " to transition to state - " + state); + FederationTestUtils.waitNamenodeRegistered( + router.router.getNamenodeResolver(), nameservice, namenode, state); + } + + public String getFederatedPathForNameservice(String ns) { + return "/" + ns; + } + + public String getNamenodePathForNameservice(String ns) { + return "/target-" + ns; + } + + /** + * @return example: + *

    + *
  • /ns0/testdir which maps to ns0->/target-ns0/testdir + *
+ */ + public String getFederatedTestDirectoryForNameservice(String ns) { + return getFederatedPathForNameservice(ns) + "/" + TEST_DIR; + } + + /** + * @return example: + *
    + *
  • /target-ns0/testdir + *
+ */ + public String getNamenodeTestDirectoryForNameservice(String ns) { + return getNamenodePathForNameservice(ns) + "/" + TEST_DIR; + } + + /** + * @return example: + *
    + *
  • /ns0/testfile which maps to ns0->/target-ns0/testfile + *
+ */ + public String getFederatedTestFileForNameservice(String ns) { + return getFederatedPathForNameservice(ns) + "/" + TEST_FILE; + } + + /** + * @return example: + *
    + *
  • /target-ns0/testfile + *
+ */ + public String getNamenodeTestFileForNameservice(String ns) { + return getNamenodePathForNameservice(ns) + "/" + TEST_FILE; + } + + public void shutdown() { + cluster.shutdown(); + if (routers != null) { + for (RouterContext context : routers) { + stopRouter(context); + } + } + } + + public void stopRouter(RouterContext router) { + try { + + router.router.shutDown(); + + int loopCount = 0; + while (router.router.getServiceState() != STATE.STOPPED) { + loopCount++; + Thread.sleep(1000); + if (loopCount > 20) { + LOG.error("Unable to shutdown router - " + router.rpcPort); + break; + } + } + } catch (InterruptedException e) { + } + } + + ///////////////////////////////////////////////////////////////////////////// + // Namespace Test Fixtures + ///////////////////////////////////////////////////////////////////////////// + + /** + * Creates test directories via the namenode. + * 1) /target-ns0/testfile + * 2) /target-ns1/testfile + * @throws IOException + */ + public void createTestDirectoriesNamenode() throws IOException { + // Add a test dir to each NS and verify + for (String ns : getNameservices()) { + NamenodeContext context = getNamenode(ns, null); + if (!createTestDirectoriesNamenode(context)) { + throw new IOException("Unable to create test directory for ns - " + ns); + } + } + } + + public boolean createTestDirectoriesNamenode(NamenodeContext nn) + throws IOException { + return FederationTestUtils.addDirectory(nn.getFileSystem(), + getNamenodeTestDirectoryForNameservice(nn.nameserviceId)); + } + + public void deleteAllFiles() throws IOException { + // Delete all files via the NNs and verify + for (NamenodeContext context : getNamenodes()) { + FileStatus[] status = context.getFileSystem().listStatus(new Path("/")); + for(int i = 0; i + *
  • / -> [ns0->/]. + *
  • /nso -> ns0->/target-ns0. + *
  • /ns1 -> ns1->/target-ns1. + * + */ + public void installMockLocations() { + for (RouterContext r : routers) { + MockResolver resolver = + (MockResolver) r.router.getSubclusterResolver(); + // create table entries + for (String ns : nameservices) { + // Direct path + resolver.addLocation(getFederatedPathForNameservice(ns), ns, + getNamenodePathForNameservice(ns)); + } + + // Root path goes to both NS1 + resolver.addLocation("/", nameservices.get(0), "/"); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java new file mode 100644 index 00000000000..8c720c70232 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.net.URISyntaxException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.federation.MockResolver; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.service.Service.STATE; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * The the safe mode for the {@link Router} controlled by + * {@link SafeModeTimer}. + */ +public class TestRouter { + + private static Configuration conf; + + @BeforeClass + public static void create() throws IOException { + // Basic configuration without the state store + conf = new Configuration(); + // Mock resolver classes + conf.set(DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS, + MockResolver.class.getCanonicalName()); + conf.set(DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS, + MockResolver.class.getCanonicalName()); + + // Simulate a co-located NN + conf.set(DFSConfigKeys.DFS_NAMESERVICES, "ns0"); + conf.set("fs.defaultFS", "hdfs://" + "ns0"); + conf.set(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + "ns0", + "127.0.0.1:0" + 0); + conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY + "." + "ns0", + "127.0.0.1:" + 0); + conf.set(DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY + "." + "ns0", + "0.0.0.0"); + } + + @AfterClass + public static void destroy() { + } + + @Before + public void setup() throws IOException, URISyntaxException { + } + + @After + public void cleanup() { + } + + private static void testRouterStartup(Configuration routerConfig) + throws InterruptedException, IOException { + Router router = new Router(); + assertEquals(STATE.NOTINITED, router.getServiceState()); + router.init(routerConfig); + assertEquals(STATE.INITED, router.getServiceState()); + router.start(); + assertEquals(STATE.STARTED, router.getServiceState()); + router.stop(); + assertEquals(STATE.STOPPED, router.getServiceState()); + router.close(); + } + + @Test + public void testRouterService() throws InterruptedException, IOException { + + // Run with all services + testRouterStartup((new RouterConfigBuilder(conf)).build()); + } +}