From cf5abf8a7c06ae19ac97094ffba8562462fc72d7 Mon Sep 17 00:00:00 2001 From: Konstantin V Shvachko Date: Wed, 26 Apr 2017 17:28:49 -0700 Subject: [PATCH] HDFS-11384. Balancer disperses getBlocks calls to avoid NameNode's rpc queue saturation. Contributed by Konstantin V Shvachko. (cherry picked from commit 28eb2aabebd15c15a357d86e23ca407d3c85211c) # Conflicts: # hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java # hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java --- .../hdfs/server/balancer/Dispatcher.java | 41 +++++++++++++++- .../hdfs/server/balancer/TestBalancer.java | 47 +++++++++++++++---- .../server/balancer/TestBalancerRPCDelay.java | 32 +++++++++++++ .../blockmanagement/BlockManagerTestUtil.java | 5 ++ .../hdfs/server/namenode/NameNodeAdapter.java | 31 +++++++++++- 5 files changed, 145 insertions(+), 11 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerRPCDelay.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index 60919b89623..cb55d6ae467 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -41,6 +41,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -795,8 +796,11 @@ public class Dispatcher { * namenode for more blocks. It terminates when it has dispatch enough block * move tasks or it has received enough blocks from the namenode, or the * elapsed time of the iteration has exceeded the max time limit. + * + * @param delay - time to sleep before sending getBlocks. Intended to + * disperse Balancer RPCs to NameNode for large clusters. See HDFS-11384. */ - private void dispatchBlocks() { + private void dispatchBlocks(long delay) { this.blocksToReceive = 2 * getScheduledSize(); long previousMoveTimestamp = Time.monotonicNow(); while (getScheduledSize() > 0 && !isIterationOver() @@ -821,11 +825,21 @@ public class Dispatcher { if (shouldFetchMoreBlocks()) { // fetch new blocks try { + if(delay > 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Sleeping " + delay + " msec."); + } + Thread.sleep(delay); + } blocksToReceive -= getBlockList(); continue; + } catch (InterruptedException ignored) { + // nothing to do } catch (IOException e) { LOG.warn("Exception while getting block list", e); return; + } finally { + delay = 0L; } } else { // jump out of while-loop after the configured timeout. @@ -1008,6 +1022,12 @@ public class Dispatcher { return nnc.shouldContinue(dispatchBlockMoves()); } + /** + * The best-effort limit on the number of RPCs per second + * the Balancer will send to the NameNode. + */ + final static int BALANCER_NUM_RPC_PER_SEC = 20; + /** * Dispatch block moves for each source. The thread selects blocks to move & * sends request to proxy source to initiate block move. The process is flow @@ -1020,15 +1040,32 @@ public class Dispatcher { final long bytesLastMoved = getBytesMoved(); final Future[] futures = new Future[sources.size()]; + int concurrentThreads = Math.min(sources.size(), + ((ThreadPoolExecutor)dispatchExecutor).getCorePoolSize()); + assert concurrentThreads > 0 : "Number of concurrent threads is 0."; + if (LOG.isDebugEnabled()) { + LOG.debug("Balancer allowed RPCs per sec = " + BALANCER_NUM_RPC_PER_SEC); + LOG.debug("Balancer concurrent threads = " + concurrentThreads); + LOG.debug("Disperse Interval sec = " + + concurrentThreads / BALANCER_NUM_RPC_PER_SEC); + } + long dSec = 0; final Iterator i = sources.iterator(); for (int j = 0; j < futures.length; j++) { final Source s = i.next(); + final long delay = dSec * 1000; futures[j] = dispatchExecutor.submit(new Runnable() { @Override public void run() { - s.dispatchBlocks(); + s.dispatchBlocks(delay); } }); + // Calculate delay in seconds for the next iteration + if(j >= concurrentThreads) { + dSec = 0; + } else if((j + 1) % BALANCER_NUM_RPC_PER_SEC == 0) { + dSec++; + } } // wait for all dispatcher threads to finish diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 50522079d4b..398ad5a18f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -94,6 +94,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli; import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters; import org.apache.hadoop.hdfs.server.balancer.Balancer.Result; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.http.HttpConfig; @@ -118,6 +119,7 @@ public class TestBalancer { static { ((Log4JLogger)Balancer.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger)Dispatcher.LOG).getLogger().setLevel(Level.DEBUG); } final static long CAPACITY = 5000L; @@ -651,6 +653,7 @@ public class TestBalancer { * parsing, etc. Otherwise invoke balancer API directly. * @param useFile - if true, the hosts to included or excluded will be stored in a * file and then later read from the file. + * @param useNamesystemSpy - spy on FSNamesystem if true * @throws Exception */ private void doTest(Configuration conf, long[] capacities, @@ -663,15 +666,21 @@ public class TestBalancer { LOG.info("useTool = " + useTool); assertEquals(capacities.length, racks.length); int numOfDatanodes = capacities.length; - cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(capacities.length) - .racks(racks) - .simulatedCapacities(capacities) - .build(); + try { + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(0) + .build(); + cluster.getConfiguration(0).setInt(DFSConfigKeys.DFS_REPLICATION_KEY, + DFSConfigKeys.DFS_REPLICATION_DEFAULT); + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, + DFSConfigKeys.DFS_REPLICATION_DEFAULT); + cluster.startDataNodes(conf, numOfDatanodes, true, + StartupOption.REGULAR, racks, null, capacities, false); + cluster.waitClusterUp(); cluster.waitActive(); - client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(), - ClientProtocol.class).getProxy(); + client = NameNodeProxies.createProxy(conf, + cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy(); long totalCapacity = sum(capacities); @@ -751,7 +760,9 @@ public class TestBalancer { runBalancer(conf, totalUsedSpace, totalCapacity, p, expectedExcludedNodes); } } finally { - cluster.shutdown(); + if(cluster != null) { + cluster.shutdown(); + } } } @@ -1593,6 +1604,26 @@ public class TestBalancer { } } + /** + * Test that makes the Balancer to disperse RPCs to the NameNode + * in order to avoid NN's RPC queue saturation. + */ + void testBalancerRPCDelay() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + conf.setInt(DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY, 30); + + int numDNs = 40; + long[] capacities = new long[numDNs]; + String[] racks = new String[numDNs]; + for(int i = 0; i < numDNs; i++) { + capacities[i] = CAPACITY; + racks[i] = (i < numDNs/2 ? RACK0 : RACK1); + } + doTest(conf, capacities, racks, CAPACITY, RACK2, + new PortNumberBasedNodes(3, 0, 0), false, false); + } + /** * @param args */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerRPCDelay.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerRPCDelay.java new file mode 100644 index 00000000000..960ad257663 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerRPCDelay.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.balancer; + +import org.junit.Test; + +/** + * The Balancer ensures that it disperses RPCs to the NameNode + * in order to avoid NN's RPC queue saturation. + */ +public class TestBalancerRPCDelay { + + @Test(timeout=100000) + public void testBalancerRPCDelay() throws Exception { + new TestBalancer().testBalancerRPCDelay(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java index a67d2459f17..497ad58a3c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java @@ -140,6 +140,11 @@ public class BlockManagerTestUtil { } } + public static HeartbeatManager getHeartbeatManager( + final BlockManager blockManager) { + return blockManager.getDatanodeManager().getHeartbeatManager(); + } + /** * @return corruptReplicas from block manager */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java index fa23fbfe63a..f96b545c0a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java @@ -23,6 +23,7 @@ import java.io.File; import java.io.IOException; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.commons.lang.reflect.FieldUtils; import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; @@ -172,7 +173,35 @@ public class NameNodeAdapter { public static long[] getStats(final FSNamesystem fsn) { return fsn.getStats(); } - + + public static FSNamesystem spyOnNamesystem(NameNode nn) { + FSNamesystem fsnSpy = Mockito.spy(nn.getNamesystem()); + FSNamesystem fsnOld = nn.namesystem; + fsnOld.writeLock(); + fsnSpy.writeLock(); + nn.namesystem = fsnSpy; + try { + FieldUtils.writeDeclaredField( + (NameNodeRpcServer)nn.getRpcServer(), "namesystem", fsnSpy, true); + FieldUtils.writeDeclaredField( + fsnSpy.getBlockManager(), "namesystem", fsnSpy, true); + FieldUtils.writeDeclaredField( + fsnSpy.getLeaseManager(), "fsnamesystem", fsnSpy, true); + FieldUtils.writeDeclaredField( + fsnSpy.getBlockManager().getDatanodeManager(), + "namesystem", fsnSpy, true); + FieldUtils.writeDeclaredField( + BlockManagerTestUtil.getHeartbeatManager(fsnSpy.getBlockManager()), + "namesystem", fsnSpy, true); + } catch (IllegalAccessException e) { + throw new RuntimeException("Cannot set spy FSNamesystem", e); + } finally { + fsnSpy.writeUnlock(); + fsnOld.writeUnlock(); + } + return fsnSpy; + } + public static ReentrantReadWriteLock spyOnFsLock(FSNamesystem fsn) { ReentrantReadWriteLock spy = Mockito.spy(fsn.getFsLockForTests()); fsn.setFsLockForTests(spy);