diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt index 605e7e37d2f..0f42b77cb6f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt @@ -113,3 +113,5 @@ HDFS-2772. On transition to active, standby should not swallow ELIE. (atm) HDFS-2767. ConfiguredFailoverProxyProvider should support NameNodeProtocol. (Uma Maheswara Rao G via todd) HDFS-2795. Standby NN takes a long time to recover from a dead DN starting up. (todd) + +HDFS-2592. Balancer support for HA namenodes. (Uma Maheswara Rao G via todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java index 939105871cb..cdeeb23e6ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java @@ -22,11 +22,9 @@ import java.io.IOException; import java.io.OutputStream; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.URI; import java.util.Collection; import java.util.EnumSet; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.hadoop.classification.InterfaceAudience; @@ -34,11 +32,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; -import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; @@ -46,13 +43,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.retry.RetryPolicies; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.io.retry.RetryProxy; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Daemon; @@ -83,13 +74,24 @@ class NameNodeConnector { NameNodeConnector(Collection haNNs, Configuration conf) throws IOException { - InetSocketAddress nn = Lists.newArrayList(haNNs).get(0); - // TODO(HA): need to deal with connecting to HA NN pair here - this.namenodeAddress = nn; - this.namenode = DFSUtil.createNNProxyWithNamenodeProtocol(nn, conf, - UserGroupInformation.getCurrentUser()); - this.client = DFSUtil.createNamenode(conf); - this.fs = FileSystem.get(NameNode.getUri(nn), conf); + this.namenodeAddress = Lists.newArrayList(haNNs).get(0); + URI nameNodeUri = NameNode.getUri(this.namenodeAddress); + NamenodeProtocol failoverNamenode = (NamenodeProtocol) HAUtil + .createFailoverProxy(conf, nameNodeUri, NamenodeProtocol.class); + if (null != failoverNamenode) { + this.namenode = failoverNamenode; + } else { + this.namenode = DFSUtil.createNNProxyWithNamenodeProtocol( + this.namenodeAddress, conf, UserGroupInformation.getCurrentUser()); + } + ClientProtocol failOverClient = (ClientProtocol) HAUtil + .createFailoverProxy(conf, nameNodeUri, ClientProtocol.class); + if (null != failOverClient) { + this.client = failOverClient; + } else { + this.client = DFSUtil.createNamenode(conf); + } + this.fs = FileSystem.get(nameNodeUri, conf); final NamespaceInfo namespaceinfo = namenode.versionRequest(); this.blockpoolID = namespaceinfo.getBlockPoolID(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 8bbcc3f60b8..a1bc504fe02 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -329,7 +329,7 @@ class NameNodeRpcServer implements NamenodeProtocols { throw new IllegalArgumentException( "Unexpected not positive size: "+size); } - + namesystem.checkOperation(OperationCategory.READ); return namesystem.getBlockManager().getBlocks(datanode, size); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 84235112aa5..63b061001b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -42,24 +42,23 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; -import org.apache.hadoop.hdfs.server.namenode.NameNode; /** * This class tests if a balancer schedules tasks correctly. */ public class TestBalancer extends TestCase { private static final Log LOG = LogFactory.getLog( - "org.apache.hadoop.hdfs.TestReplication"); + "org.apache.hadoop.hdfs.TestBalancer"); - final private static long CAPACITY = 500L; - final private static String RACK0 = "/rack0"; - final private static String RACK1 = "/rack1"; - final private static String RACK2 = "/rack2"; - final static private String fileName = "/tmp.txt"; - final static private Path filePath = new Path(fileName); + final static long CAPACITY = 500L; + final static String RACK0 = "/rack0"; + final static String RACK1 = "/rack1"; + final static String RACK2 = "/rack2"; + final private static String fileName = "/tmp.txt"; + final static Path filePath = new Path(fileName); private MiniDFSCluster cluster; ClientProtocol client; @@ -83,9 +82,10 @@ public class TestBalancer extends TestCase { } /* create a file with a length of fileLen */ - private void createFile(long fileLen, short replicationFactor) + static void createFile(MiniDFSCluster cluster, Path filePath, long fileLen, + short replicationFactor, int nnIndex) throws IOException { - FileSystem fs = cluster.getFileSystem(); + FileSystem fs = cluster.getFileSystem(nnIndex); DFSTestUtil.createFile(fs, filePath, fileLen, replicationFactor, r.nextLong()); DFSTestUtil.waitReplication(fs, filePath, replicationFactor); @@ -104,7 +104,7 @@ public class TestBalancer extends TestCase { short replicationFactor = (short)(numNodes-1); long fileLen = size/replicationFactor; - createFile(fileLen, replicationFactor); + createFile(cluster , filePath, fileLen, replicationFactor, 0); List locatedBlocks = client. getBlockLocations(fileName, 0, fileLen).getLocatedBlocks(); @@ -212,7 +212,8 @@ public class TestBalancer extends TestCase { * @throws IOException - if getStats() fails * @throws TimeoutException */ - private void waitForHeartBeat(long expectedUsedSpace, long expectedTotalSpace) + static void waitForHeartBeat(long expectedUsedSpace, + long expectedTotalSpace, ClientProtocol client, MiniDFSCluster cluster) throws IOException, TimeoutException { long timeout = TIMEOUT; long failtime = (timeout <= 0L) ? Long.MAX_VALUE @@ -249,7 +250,8 @@ public class TestBalancer extends TestCase { * @throws IOException * @throws TimeoutException */ - private void waitForBalancer(long totalUsedSpace, long totalCapacity) + static void waitForBalancer(long totalUsedSpace, long totalCapacity, + ClientProtocol client, MiniDFSCluster cluster) throws IOException, TimeoutException { long timeout = TIMEOUT; long failtime = (timeout <= 0L) ? Long.MAX_VALUE @@ -312,7 +314,8 @@ public class TestBalancer extends TestCase { // fill up the cluster to be 30% full long totalUsedSpace = totalCapacity*3/10; - createFile(totalUsedSpace/numOfDatanodes, (short)numOfDatanodes); + createFile(cluster, filePath, totalUsedSpace / numOfDatanodes, + (short) numOfDatanodes, 0); // start up an empty node with the same capacity and on the same rack cluster.startDataNodes(conf, 1, true, null, new String[]{newRack}, new long[]{newCapacity}); @@ -328,7 +331,7 @@ public class TestBalancer extends TestCase { private void runBalancer(Configuration conf, long totalUsedSpace, long totalCapacity) throws Exception { - waitForHeartBeat(totalUsedSpace, totalCapacity); + waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); // start rebalancing Map> namenodes = @@ -336,9 +339,9 @@ public class TestBalancer extends TestCase { final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf); assertEquals(Balancer.ReturnStatus.SUCCESS.code, r); - waitForHeartBeat(totalUsedSpace, totalCapacity); + waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); LOG.info("Rebalancing with default ctor."); - waitForBalancer(totalUsedSpace, totalCapacity); + waitForBalancer(totalUsedSpace, totalCapacity, client, cluster); } /** one-node cluster test*/ @@ -403,7 +406,8 @@ public class TestBalancer extends TestCase { // fill up the cluster to be 30% full long totalUsedSpace = totalCapacity * 3 / 10; - createFile(totalUsedSpace / numOfDatanodes, (short) numOfDatanodes); + createFile(cluster, filePath, totalUsedSpace / numOfDatanodes, + (short) numOfDatanodes, 0); // start up an empty node with the same capacity and on the same rack cluster.startDataNodes(conf, 1, true, null, new String[] { newRack }, new long[] { newCapacity }); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java new file mode 100644 index 00000000000..6764213e12d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.balancer; + +import static org.junit.Assert.assertEquals; + +import java.net.InetSocketAddress; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; +import org.junit.Test; + +/** + * Test balancer with HA NameNodes + */ +public class TestBalancerWithHANameNodes { + private MiniDFSCluster cluster; + ClientProtocol client; + + static { + Balancer.setBlockMoveWaitTime(1000L); + } + + /** + * Test a cluster with even distribution, then a new empty node is added to + * the cluster. Test start a cluster with specified number of nodes, and fills + * it to be 30% full (with a single file replicated identically to all + * datanodes); It then adds one new empty node and starts balancing. + */ + @Test(timeout = 60000) + public void testBalancerWithHANameNodes() throws Exception { + Configuration conf = new HdfsConfiguration(); + TestBalancer.initConf(conf); + long newNodeCapacity = TestBalancer.CAPACITY; // new node's capacity + String newNodeRack = TestBalancer.RACK2; // new node's rack + // array of racks for original nodes in cluster + String[] racks = new String[] { TestBalancer.RACK0, TestBalancer.RACK1 }; + // array of capacities of original nodes in cluster + long[] capacities = new long[] { TestBalancer.CAPACITY, + TestBalancer.CAPACITY }; + assertEquals(capacities.length, racks.length); + int numOfDatanodes = capacities.length; + NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1"); + nn1Conf.setIpcPort(NameNode.DEFAULT_PORT); + MiniDFSNNTopology simpleHATopology = new MiniDFSNNTopology() + .addNameservice(new MiniDFSNNTopology.NSConf(null).addNN(nn1Conf) + .addNN(new MiniDFSNNTopology.NNConf("nn2"))); + cluster = new MiniDFSCluster.Builder(conf).nnTopology(simpleHATopology) + .numDataNodes(capacities.length).racks(racks).simulatedCapacities( + capacities).build(); + try { + cluster.waitActive(); + cluster.transitionToActive(1); + Thread.sleep(500); + client = DFSUtil.createNamenode(cluster.getNameNode(1) + .getNameNodeAddress(), conf); + long totalCapacity = TestBalancer.sum(capacities); + // fill up the cluster to be 30% full + long totalUsedSpace = totalCapacity * 3 / 10; + TestBalancer.createFile(cluster, TestBalancer.filePath, totalUsedSpace + / numOfDatanodes, (short) numOfDatanodes, 1); + + // start up an empty node with the same capacity and on the same rack + cluster.startDataNodes(conf, 1, true, null, new String[] { newNodeRack }, + new long[] { newNodeCapacity }); + + HATestUtil.setFailoverConfigurations(cluster, conf, NameNode.getUri( + cluster.getNameNode(0).getNameNodeAddress()).getHost()); + totalCapacity += newNodeCapacity; + TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client, + cluster); + Map> namenodes = DFSUtil + .getNNServiceRpcAddresses(conf); + final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf); + assertEquals(Balancer.ReturnStatus.SUCCESS.code, r); + TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client, + cluster); + } finally { + cluster.shutdown(); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java index ba05da82414..cee846d7620 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java @@ -127,34 +127,36 @@ public abstract class HATestUtil { super(message); } } - + + /** Gets the filesystem instance by setting the failover configurations */ public static FileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf) throws IOException, URISyntaxException { + conf = new Configuration(conf); + String logicalName = getLogicalHostname(cluster); + setFailoverConfigurations(cluster, conf, logicalName); + FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf); + return fs; + } + + /** Sets the required configurations for performing failover */ + public static void setFailoverConfigurations(MiniDFSCluster cluster, + Configuration conf, String logicalName) { InetSocketAddress nnAddr1 = cluster.getNameNode(0).getNameNodeAddress(); InetSocketAddress nnAddr2 = cluster.getNameNode(1).getNameNodeAddress(); - String nsId = "nameserviceId1"; - String nameNodeId1 = "nn1"; String nameNodeId2 = "nn2"; - String logicalName = getLogicalHostname(cluster); - - conf = new Configuration(conf); String address1 = "hdfs://" + nnAddr1.getHostName() + ":" + nnAddr1.getPort(); String address2 = "hdfs://" + nnAddr2.getHostName() + ":" + nnAddr2.getPort(); conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, nsId, nameNodeId1), address1); conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, nsId, nameNodeId2), address2); - conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES, nsId); conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY, nsId), nameNodeId1 + "," + nameNodeId2); conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + logicalName, ConfiguredFailoverProxyProvider.class.getName()); - - FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf); - return fs; }