From 10fe37522c943274d14263fc2f67c0720c26b2ad Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Thu, 6 Nov 2014 17:48:36 -0800 Subject: [PATCH] HDFS-7364. Balancer always shows zero Bytes Already Moved. Contributed by Tsz Wo Nicholas Sze. Conflicts: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hadoop/hdfs/server/balancer/Balancer.java | 81 +++++++++++------- .../hdfs/server/balancer/Dispatcher.java | 13 ++- .../server/balancer/NameNodeConnector.java | 7 ++ .../hdfs/server/balancer/TestBalancer.java | 84 +++++++++++++++++-- 5 files changed, 143 insertions(+), 45 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 29211b96a0d..7676030503d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -897,6 +897,9 @@ Release 2.6.0 - UNRELEASED file descriptors when SASL is enabled on DataTransferProtocol. (Chris Nauroth via wheat9) + HDFS-7364. Balancer always shows zero Bytes Already Moved. + (Tsz Wo Nicholas Sze via jing9) + BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS HDFS-6387. HDFS CLI admin tool for creating & deleting an diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index 87348b36096..eeac6ee24b9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -27,7 +27,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Date; -import java.util.Formatter; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -163,7 +162,7 @@ import com.google.common.base.Preconditions; public class Balancer { static final Log LOG = LogFactory.getLog(Balancer.class); - private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id"); + static final Path BALANCER_ID_PATH = new Path("/system/balancer.id"); private static final long GB = 1L << 30; //1GB private static final long MAX_SIZE_TO_MOVE = 10*GB; @@ -460,7 +459,7 @@ public class Balancer { } /* reset all fields in a balancer preparing for the next iteration */ - private void resetData(Configuration conf) { + void resetData(Configuration conf) { this.overUtilized.clear(); this.aboveAvgUtilized.clear(); this.belowAvgUtilized.clear(); @@ -468,16 +467,47 @@ public class Balancer { this.policy.reset(); dispatcher.reset(conf);; } - + + static class Result { + final ExitStatus exitStatus; + final long bytesLeftToMove; + final long bytesBeingMoved; + final long bytesAlreadyMoved; + + Result(ExitStatus exitStatus, long bytesLeftToMove, long bytesBeingMoved, + long bytesAlreadyMoved) { + this.exitStatus = exitStatus; + this.bytesLeftToMove = bytesLeftToMove; + this.bytesBeingMoved = bytesBeingMoved; + this.bytesAlreadyMoved = bytesAlreadyMoved; + } + + void print(int iteration, PrintStream out) { + out.printf("%-24s %10d %19s %18s %17s%n", + DateFormat.getDateTimeInstance().format(new Date()), iteration, + StringUtils.byteDesc(bytesAlreadyMoved), + StringUtils.byteDesc(bytesLeftToMove), + StringUtils.byteDesc(bytesBeingMoved)); + } + } + + Result newResult(ExitStatus exitStatus, long bytesLeftToMove, long bytesBeingMoved) { + return new Result(exitStatus, bytesLeftToMove, bytesBeingMoved, + dispatcher.getBytesMoved()); + } + + Result newResult(ExitStatus exitStatus) { + return new Result(exitStatus, -1, -1, dispatcher.getBytesMoved()); + } + /** Run an iteration for all datanodes. */ - private ExitStatus run(int iteration, Formatter formatter, - Configuration conf) { + Result runOneIteration() { try { final List reports = dispatcher.init(); final long bytesLeftToMove = init(reports); if (bytesLeftToMove == 0) { System.out.println("The cluster is balanced. Exiting..."); - return ExitStatus.SUCCESS; + return newResult(ExitStatus.SUCCESS, bytesLeftToMove, -1); } else { LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove) + " to make the cluster balanced." ); @@ -488,22 +518,14 @@ public class Balancer { * in this iteration. Maximum bytes to be moved per node is * Min(1 Band worth of bytes, MAX_SIZE_TO_MOVE). */ - final long bytesToMove = chooseStorageGroups(); - if (bytesToMove == 0) { + final long bytesBeingMoved = chooseStorageGroups(); + if (bytesBeingMoved == 0) { System.out.println("No block can be moved. Exiting..."); - return ExitStatus.NO_MOVE_BLOCK; + return newResult(ExitStatus.NO_MOVE_BLOCK, bytesLeftToMove, bytesBeingMoved); } else { - LOG.info( "Will move " + StringUtils.byteDesc(bytesToMove) + + LOG.info( "Will move " + StringUtils.byteDesc(bytesBeingMoved) + " in this iteration"); } - - formatter.format("%-24s %10d %19s %18s %17s%n", - DateFormat.getDateTimeInstance().format(new Date()), - iteration, - StringUtils.byteDesc(dispatcher.getBytesMoved()), - StringUtils.byteDesc(bytesLeftToMove), - StringUtils.byteDesc(bytesToMove) - ); /* For each pair of , start a thread that repeatedly * decide a block to be moved and its proxy source, @@ -512,19 +534,19 @@ public class Balancer { * Exit no byte has been moved for 5 consecutive iterations. */ if (!dispatcher.dispatchAndCheckContinue()) { - return ExitStatus.NO_MOVE_PROGRESS; + return newResult(ExitStatus.NO_MOVE_PROGRESS, bytesLeftToMove, bytesBeingMoved); } - return ExitStatus.IN_PROGRESS; + return newResult(ExitStatus.IN_PROGRESS, bytesLeftToMove, bytesBeingMoved); } catch (IllegalArgumentException e) { System.out.println(e + ". Exiting ..."); - return ExitStatus.ILLEGAL_ARGUMENTS; + return newResult(ExitStatus.ILLEGAL_ARGUMENTS); } catch (IOException e) { System.out.println(e + ". Exiting ..."); - return ExitStatus.IO_EXCEPTION; + return newResult(ExitStatus.IO_EXCEPTION); } catch (InterruptedException e) { System.out.println(e + ". Exiting ..."); - return ExitStatus.INTERRUPTED; + return newResult(ExitStatus.INTERRUPTED); } finally { dispatcher.shutdownNow(); } @@ -546,7 +568,6 @@ public class Balancer { LOG.info("namenodes = " + namenodes); LOG.info("parameters = " + p); - final Formatter formatter = new Formatter(System.out); System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved"); List connectors = Collections.emptyList(); @@ -560,14 +581,16 @@ public class Balancer { Collections.shuffle(connectors); for(NameNodeConnector nnc : connectors) { final Balancer b = new Balancer(nnc, p, conf); - final ExitStatus r = b.run(iteration, formatter, conf); + final Result r = b.runOneIteration(); + r.print(iteration, System.out); + // clean all lists b.resetData(conf); - if (r == ExitStatus.IN_PROGRESS) { + if (r.exitStatus == ExitStatus.IN_PROGRESS) { done = false; - } else if (r != ExitStatus.SUCCESS) { + } else if (r.exitStatus != ExitStatus.SUCCESS) { //must be an error statue, return. - return r.getExitCode(); + return r.exitStatus.getExitCode(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index 6682ba38104..ecf1c9da5c0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -41,9 +41,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicLong; -import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -78,6 +76,7 @@ import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; /** Dispatching block replica moves between datanodes. */ @@ -121,8 +120,6 @@ public class Dispatcher { /** The maximum number of concurrent blocks moves at a datanode */ private final int maxConcurrentMovesPerNode; - private final AtomicLong bytesMoved = new AtomicLong(); - private static class GlobalBlockMap { private final Map map = new HashMap(); @@ -315,7 +312,7 @@ public class Dispatcher { sendRequest(out, eb, accessToken); receiveResponse(in); - bytesMoved.addAndGet(block.getNumBytes()); + nnc.getBytesMoved().addAndGet(block.getNumBytes()); LOG.info("Successfully moved " + this); } catch (IOException e) { LOG.warn("Failed to move " + this + ": " + e.getMessage()); @@ -805,7 +802,7 @@ public class Dispatcher { } long getBytesMoved() { - return bytesMoved.get(); + return nnc.getBytesMoved().get(); } long bytesToMove() { @@ -891,7 +888,7 @@ public class Dispatcher { * @return the total number of bytes successfully moved in this iteration. */ private long dispatchBlockMoves() throws InterruptedException { - final long bytesLastMoved = bytesMoved.get(); + final long bytesLastMoved = getBytesMoved(); final Future[] futures = new Future[sources.size()]; final Iterator i = sources.iterator(); @@ -917,7 +914,7 @@ public class Dispatcher { // wait for all block moving to be done waitForMoveCompletion(targets); - return bytesMoved.get() - bytesLastMoved; + return getBytesMoved() - bytesLastMoved; } /** The sleeping period before checking if block move is completed again */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java index d2eb85b6ce6..1b34777823e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java @@ -28,6 +28,8 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -108,6 +110,7 @@ public class NameNodeConnector implements Closeable { private final Path idPath; private final OutputStream out; private final List targetPaths; + private final AtomicLong bytesMoved = new AtomicLong(); private int notChangedIterations = 0; @@ -148,6 +151,10 @@ public class NameNodeConnector implements Closeable { return blockpoolID; } + AtomicLong getBytesMoved() { + return bytesMoved; + } + /** @return blocks with locations. */ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 91374060523..b6c99cf4d2c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -17,7 +17,12 @@ */ package org.apache.hadoop.hdfs.server.balancer; -import static org.apache.hadoop.hdfs.DFSConfigKeys.*; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC; import static org.apache.hadoop.hdfs.StorageType.DEFAULT; import static org.apache.hadoop.hdfs.StorageType.RAM_DISK; import static org.junit.Assert.assertEquals; @@ -31,6 +36,7 @@ import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Random; @@ -44,7 +50,15 @@ import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.*; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.NameNodeProxies; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -53,8 +67,10 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli; import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters; +import org.apache.hadoop.hdfs.server.balancer.Balancer.Result; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Tool; @@ -65,8 +81,8 @@ import org.junit.Test; * This class tests if a balancer schedules tasks correctly. */ public class TestBalancer { - private static final Log LOG = LogFactory.getLog( - "org.apache.hadoop.hdfs.TestBalancer"); + private static final Log LOG = LogFactory.getLog(TestBalancer.class); + static { ((Log4JLogger)Balancer.LOG).getLogger().setLevel(Level.ALL); } @@ -478,7 +494,7 @@ public class TestBalancer { LOG.info("racks = " + Arrays.asList(racks)); LOG.info("newCapacity= " + newCapacity); LOG.info("newRack = " + newRack); - LOG.info("useTool = " + useTool); + LOG.info("useTool = " + useTool); assertEquals(capacities.length, racks.length); int numOfDatanodes = capacities.length; cluster = new MiniDFSCluster.Builder(conf) @@ -584,7 +600,7 @@ public class TestBalancer { // start rebalancing Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); - final int r = Balancer.run(namenodes, p, conf); + final int r = runBalancer(namenodes, p, conf); if (conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT) ==0) { assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); @@ -593,10 +609,63 @@ public class TestBalancer { assertEquals(ExitStatus.SUCCESS.getExitCode(), r); } waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); - LOG.info("Rebalancing with default ctor."); + LOG.info(" ."); waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, excludedNodes); } + private static int runBalancer(Collection namenodes, final Parameters p, + Configuration conf) throws IOException, InterruptedException { + final long sleeptime = + conf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, + DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 2000 + + conf.getLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, + DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000; + LOG.info("namenodes = " + namenodes); + LOG.info("parameters = " + p); + LOG.info("Print stack trace", new Throwable()); + + System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved"); + + List connectors = Collections.emptyList(); + try { + connectors = NameNodeConnector.newNameNodeConnectors(namenodes, + Balancer.class.getSimpleName(), Balancer.BALANCER_ID_PATH, conf); + + boolean done = false; + for(int iteration = 0; !done; iteration++) { + done = true; + Collections.shuffle(connectors); + for(NameNodeConnector nnc : connectors) { + final Balancer b = new Balancer(nnc, p, conf); + final Result r = b.runOneIteration(); + r.print(iteration, System.out); + + // clean all lists + b.resetData(conf); + if (r.exitStatus == ExitStatus.IN_PROGRESS) { + done = false; + } else if (r.exitStatus != ExitStatus.SUCCESS) { + //must be an error statue, return. + return r.exitStatus.getExitCode(); + } else { + if (iteration > 0) { + assertTrue(r.bytesAlreadyMoved > 0); + } + } + } + + if (!done) { + Thread.sleep(sleeptime); + } + } + } finally { + for(NameNodeConnector nnc : connectors) { + IOUtils.cleanup(LOG, nnc); + } + } + return ExitStatus.SUCCESS.getExitCode(); + } + private void runBalancerCli(Configuration conf, long totalUsedSpace, long totalCapacity, Balancer.Parameters p, boolean useFile, int expectedExcludedNodes) throws Exception { @@ -1120,7 +1189,6 @@ public class TestBalancer { initConfWithRamDisk(conf); final int defaultRamDiskCapacity = 10; - final int defaultDiskCapacity = 100; final long ramDiskStorageLimit = ((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) + (DEFAULT_RAM_DISK_BLOCK_SIZE - 1);