diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index dc8190135bc..91dc90799e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -42,6 +42,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -901,8 +902,11 @@ public class Dispatcher { * namenode for more blocks. It terminates when it has dispatch enough block * move tasks or it has received enough blocks from the namenode, or the * elapsed time of the iteration has exceeded the max time limit. + * + * @param delay - time to sleep before sending getBlocks. Intended to + * disperse Balancer RPCs to NameNode for large clusters. See HDFS-11384. */ - private void dispatchBlocks() { + private void dispatchBlocks(long delay) { this.blocksToReceive = 2 * getScheduledSize(); long previousMoveTimestamp = Time.monotonicNow(); while (getScheduledSize() > 0 && !isIterationOver() @@ -927,15 +931,25 @@ public class Dispatcher { if (shouldFetchMoreBlocks()) { // fetch new blocks try { + if(delay > 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Sleeping " + delay + " msec."); + } + Thread.sleep(delay); + } final long received = getBlockList(); if (received == 0) { return; } blocksToReceive -= received; continue; + } catch (InterruptedException ignored) { + // nothing to do } catch (IOException e) { LOG.warn("Exception while getting reportedBlock list", e); return; + } finally { + delay = 0L; } } else { // jump out of while-loop after the configured timeout. @@ -1124,6 +1138,12 @@ public class Dispatcher { return nnc.shouldContinue(dispatchBlockMoves()); } + /** + * The best-effort limit on the number of RPCs per second + * the Balancer will send to the NameNode. + */ + final static int BALANCER_NUM_RPC_PER_SEC = 20; + /** * Dispatch block moves for each source. The thread selects blocks to move & * sends request to proxy source to initiate block move. The process is flow @@ -1136,15 +1156,32 @@ public class Dispatcher { final long bytesLastMoved = getBytesMoved(); final Future[] futures = new Future[sources.size()]; + int concurrentThreads = Math.min(sources.size(), + ((ThreadPoolExecutor)dispatchExecutor).getCorePoolSize()); + assert concurrentThreads > 0 : "Number of concurrent threads is 0."; + if (LOG.isDebugEnabled()) { + LOG.debug("Balancer allowed RPCs per sec = " + BALANCER_NUM_RPC_PER_SEC); + LOG.debug("Balancer concurrent threads = " + concurrentThreads); + LOG.debug("Disperse Interval sec = " + + concurrentThreads / BALANCER_NUM_RPC_PER_SEC); + } + long dSec = 0; final Iterator i = sources.iterator(); for (int j = 0; j < futures.length; j++) { final Source s = i.next(); + final long delay = dSec * 1000; futures[j] = dispatchExecutor.submit(new Runnable() { @Override public void run() { - s.dispatchBlocks(); + s.dispatchBlocks(delay); } }); + // Calculate delay in seconds for the next iteration + if(j >= concurrentThreads) { + dSec = 0; + } else if((j + 1) % BALANCER_NUM_RPC_PER_SEC == 0) { + dSec++; + } } // wait for all dispatcher threads to finish diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 30a3a327d8c..e177da375a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -50,6 +50,9 @@ import org.junit.AfterClass; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.doAnswer; import java.io.File; import java.io.IOException; @@ -105,9 +108,14 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithUpgradeDomain; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.minikdc.MiniKdc; @@ -121,6 +129,8 @@ import org.apache.hadoop.util.Tool; import org.apache.log4j.Level; import org.junit.After; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; /** * This class tests if a balancer schedules tasks correctly. @@ -130,6 +140,7 @@ public class TestBalancer { static { GenericTestUtils.setLogLevel(Balancer.LOG, Level.ALL); + GenericTestUtils.setLogLevel(Dispatcher.LOG, Level.DEBUG); } final static long CAPACITY = 5000L; @@ -776,6 +787,13 @@ public class TestBalancer { doTest(conf, capacities, racks, newCapacity, newRack, null, useTool, false); } + private void doTest(Configuration conf, long[] capacities, String[] racks, + long newCapacity, String newRack, NewNodeInfo nodes, + boolean useTool, boolean useFile) throws Exception { + doTest(conf, capacities, racks, newCapacity, newRack, nodes, + useTool, useFile, false); + } + /** This test start a cluster with specified number of nodes, * and fills it to be 30% full (with a single file replicated identically * to all datanodes); @@ -791,11 +809,13 @@ public class TestBalancer { * parsing, etc. Otherwise invoke balancer API directly. * @param useFile - if true, the hosts to included or excluded will be stored in a * file and then later read from the file. + * @param useNamesystemSpy - spy on FSNamesystem if true * @throws Exception */ private void doTest(Configuration conf, long[] capacities, String[] racks, long newCapacity, String newRack, NewNodeInfo nodes, - boolean useTool, boolean useFile) throws Exception { + boolean useTool, boolean useFile, + boolean useNamesystemSpy) throws Exception { LOG.info("capacities = " + long2String(capacities)); LOG.info("racks = " + Arrays.asList(racks)); LOG.info("newCapacity= " + newCapacity); @@ -803,15 +823,25 @@ public class TestBalancer { LOG.info("useTool = " + useTool); assertEquals(capacities.length, racks.length); int numOfDatanodes = capacities.length; - cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(capacities.length) - .racks(racks) - .simulatedCapacities(capacities) - .build(); + try { + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(0) + .build(); + cluster.getConfiguration(0).setInt(DFSConfigKeys.DFS_REPLICATION_KEY, + DFSConfigKeys.DFS_REPLICATION_DEFAULT); + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, + DFSConfigKeys.DFS_REPLICATION_DEFAULT); + if(useNamesystemSpy) { + LOG.info("Using Spy Namesystem"); + spyFSNamesystem(cluster.getNameNode()); + } + cluster.startDataNodes(conf, numOfDatanodes, true, + StartupOption.REGULAR, racks, null, capacities, false); + cluster.waitClusterUp(); cluster.waitActive(); - client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(), - ClientProtocol.class).getProxy(); + client = NameNodeProxies.createProxy(conf, + cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy(); long totalCapacity = sum(capacities); @@ -891,7 +921,9 @@ public class TestBalancer { runBalancer(conf, totalUsedSpace, totalCapacity, p, expectedExcludedNodes); } } finally { - cluster.shutdown(); + if(cluster != null) { + cluster.shutdown(); + } } } @@ -2004,6 +2036,54 @@ public class TestBalancer { } } + private static int numGetBlocksCalls; + private static long startGetBlocksTime, endGetBlocksTime; + + private void spyFSNamesystem(NameNode nn) throws IOException { + FSNamesystem fsnSpy = NameNodeAdapter.spyOnNamesystem(nn); + numGetBlocksCalls = 0; + endGetBlocksTime = startGetBlocksTime = Time.monotonicNow(); + doAnswer(new Answer() { + @Override + public BlocksWithLocations answer(InvocationOnMock invocation) + throws Throwable { + BlocksWithLocations blk = + (BlocksWithLocations)invocation.callRealMethod(); + endGetBlocksTime = Time.monotonicNow(); + numGetBlocksCalls++; + return blk; + }}).when(fsnSpy).getBlocks(any(DatanodeID.class), anyLong()); + } + + /** + * Test that makes the Balancer to disperse RPCs to the NameNode + * in order to avoid NN's RPC queue saturation. + */ + void testBalancerRPCDelay() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + conf.setInt(DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY, 30); + + int numDNs = 40; + long[] capacities = new long[numDNs]; + String[] racks = new String[numDNs]; + for(int i = 0; i < numDNs; i++) { + capacities[i] = CAPACITY; + racks[i] = (i < numDNs/2 ? RACK0 : RACK1); + } + doTest(conf, capacities, racks, CAPACITY, RACK2, + new PortNumberBasedNodes(3, 0, 0), false, false, true); + assertTrue("Number of getBlocks should be not less than " + + Dispatcher.BALANCER_NUM_RPC_PER_SEC, + numGetBlocksCalls > Dispatcher.BALANCER_NUM_RPC_PER_SEC); + long d = 1 + endGetBlocksTime - startGetBlocksTime; + LOG.info("Balancer executed " + numGetBlocksCalls + + " getBlocks in " + d + " msec."); + assertTrue("Expected BALANCER_NUM_RPC_PER_SEC = " + + Dispatcher.BALANCER_NUM_RPC_PER_SEC, + (numGetBlocksCalls * 1000 / d) < Dispatcher.BALANCER_NUM_RPC_PER_SEC); + } + /** * @param args */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerRPCDelay.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerRPCDelay.java new file mode 100644 index 00000000000..960ad257663 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerRPCDelay.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.balancer; + +import org.junit.Test; + +/** + * The Balancer ensures that it disperses RPCs to the NameNode + * in order to avoid NN's RPC queue saturation. + */ +public class TestBalancerRPCDelay { + + @Test(timeout=100000) + public void testBalancerRPCDelay() throws Exception { + new TestBalancer().testBalancerRPCDelay(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java index ee4e2b0e4c3..77e2ffb1dc3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java @@ -142,6 +142,11 @@ public class BlockManagerTestUtil { } } + public static HeartbeatManager getHeartbeatManager( + final BlockManager blockManager) { + return blockManager.getDatanodeManager().getHeartbeatManager(); + } + /** * @return corruptReplicas from block manager */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java index 33af59e0c3d..242e8f5bb5f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java @@ -24,6 +24,7 @@ import java.io.File; import java.io.IOException; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.commons.lang.reflect.FieldUtils; import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; @@ -189,7 +190,35 @@ public class NameNodeAdapter { public static long[] getStats(final FSNamesystem fsn) { return fsn.getStats(); } - + + public static FSNamesystem spyOnNamesystem(NameNode nn) { + FSNamesystem fsnSpy = Mockito.spy(nn.getNamesystem()); + FSNamesystem fsnOld = nn.namesystem; + fsnOld.writeLock(); + fsnSpy.writeLock(); + nn.namesystem = fsnSpy; + try { + FieldUtils.writeDeclaredField( + (NameNodeRpcServer)nn.getRpcServer(), "namesystem", fsnSpy, true); + FieldUtils.writeDeclaredField( + fsnSpy.getBlockManager(), "namesystem", fsnSpy, true); + FieldUtils.writeDeclaredField( + fsnSpy.getLeaseManager(), "fsnamesystem", fsnSpy, true); + FieldUtils.writeDeclaredField( + fsnSpy.getBlockManager().getDatanodeManager(), + "namesystem", fsnSpy, true); + FieldUtils.writeDeclaredField( + BlockManagerTestUtil.getHeartbeatManager(fsnSpy.getBlockManager()), + "namesystem", fsnSpy, true); + } catch (IllegalAccessException e) { + throw new RuntimeException("Cannot set spy FSNamesystem", e); + } finally { + fsnSpy.writeUnlock(); + fsnOld.writeUnlock(); + } + return fsnSpy; + } + public static ReentrantReadWriteLock spyOnFsLock(FSNamesystem fsn) { ReentrantReadWriteLock spy = Mockito.spy(fsn.getFsLockForTests()); fsn.setFsLockForTests(spy);