diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReport.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReport.java index c0bf2d37f01..75e8a2038b3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReport.java @@ -75,7 +75,7 @@ public class StorageReport { } public long getRemaining() { - return remaining; + return Math.max(remaining, 0L); } public long getBlockPoolUsed() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java index a63dacf0fe7..91525c3ea3f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java @@ -104,21 +104,21 @@ abstract class BalancingPolicy { for(StorageReport s : r.getStorageReports()) { final StorageType t = s.getStorage().getStorageType(); totalCapacities.add(t, s.getCapacity()); - totalUsedSpaces.add(t, s.getDfsUsed()); + totalUsedSpaces.add(t, s.getCapacity() - s.getRemaining()); } } @Override Double getUtilization(DatanodeStorageReport r, final StorageType t) { long capacity = 0L; - long dfsUsed = 0L; + long totalUsed = 0L; for(StorageReport s : r.getStorageReports()) { if (s.getStorage().getStorageType() == t) { capacity += s.getCapacity(); - dfsUsed += s.getDfsUsed(); + totalUsed += s.getCapacity() - s.getRemaining(); } } - return capacity == 0L? null: dfsUsed*100.0/capacity; + return capacity == 0L ? null : totalUsed * 100.0 / capacity; } } @@ -138,7 +138,13 @@ abstract class BalancingPolicy { void accumulateSpaces(DatanodeStorageReport r) { for(StorageReport s : r.getStorageReports()) { final StorageType t = s.getStorage().getStorageType(); - totalCapacities.add(t, s.getCapacity()); + // Use s.getRemaining() + s.getBlockPoolUsed() instead of + // s.getCapacity() here to avoid moving blocks towards nodes with + // little actual available space. + // The util is computed as blockPoolUsed/(remaining+blockPoolUsed), + // which means nodes with more remaining space and less blockPoolUsed + // will serve as the recipient during the balancing process. + totalCapacities.add(t, s.getRemaining() + s.getBlockPoolUsed()); totalUsedSpaces.add(t, s.getBlockPoolUsed()); } } @@ -149,11 +155,11 @@ abstract class BalancingPolicy { long blockPoolUsed = 0L; for(StorageReport s : r.getStorageReports()) { if (s.getStorage().getStorageType() == t) { - capacity += s.getCapacity(); + capacity += s.getRemaining() + s.getBlockPoolUsed(); blockPoolUsed += s.getBlockPoolUsed(); } } - return capacity == 0L? null: blockPoolUsed*100.0/capacity; + return capacity == 0L ? null : blockPoolUsed * 100.0 / capacity; } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 5c7ed4b1d92..2a56c25d0d4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -41,6 +41,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBER import java.lang.reflect.Field; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.junit.AfterClass; + +import static org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset.CONFIG_PROPERTY_NONDFSUSED; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -502,8 +504,9 @@ public class TestBalancer { balanced = true; int actualExcludedNodeCount = 0; for (DatanodeInfo datanode : datanodeReport) { - double nodeUtilization = ((double)datanode.getDfsUsed()) - / datanode.getCapacity(); + double nodeUtilization = + ((double) datanode.getDfsUsed() + datanode.getNonDfsUsed()) / + datanode.getCapacity(); if (Dispatcher.Util.isExcluded(p.getExcludedNodes(), datanode)) { if (checkExcludeNodesUtilization) { assertTrue(nodeUtilization == 0); @@ -641,7 +644,7 @@ public class TestBalancer { private void doTest(Configuration conf, long[] capacities, String[] racks, long newCapacity, String newRack, NewNodeInfo nodes, boolean useTool, boolean useFile) throws Exception { - doTest(conf, capacities, racks, newCapacity, newRack, nodes, + doTest(conf, capacities, racks, newCapacity, 0L, newRack, nodes, useTool, useFile, false, 0.3); } @@ -666,8 +669,8 @@ public class TestBalancer { * @throws Exception */ private void doTest(Configuration conf, long[] capacities, - String[] racks, long newCapacity, String newRack, NewNodeInfo nodes, - boolean useTool, boolean useFile, + String[] racks, long newCapacity, long newNonDfsUsed, String newRack, + NewNodeInfo nodes, boolean useTool, boolean useFile, boolean useNamesystemSpy, double clusterUtilization) throws Exception { LOG.info("capacities = " + long2String(capacities)); LOG.info("racks = " + Arrays.asList(racks)); @@ -701,10 +704,11 @@ public class TestBalancer { long totalCapacity = sum(capacities); // fill up the cluster to be `clusterUtilization` full - long totalUsedSpace = (long) (totalCapacity * clusterUtilization); - createFile(cluster, filePath, totalUsedSpace / numOfDatanodes, + long totalDfsUsedSpace = (long) (totalCapacity * clusterUtilization); + createFile(cluster, filePath, totalDfsUsedSpace / numOfDatanodes, (short) numOfDatanodes, 0); + conf.setLong(CONFIG_PROPERTY_NONDFSUSED, newNonDfsUsed); if (nodes == null) { // there is no specification of new nodes. // start up an empty node with the same capacity and on the same rack cluster.startDataNodes(conf, 1, true, null, @@ -774,9 +778,11 @@ public class TestBalancer { // run balancer and validate results if (useTool) { - runBalancerCli(conf, totalUsedSpace, totalCapacity, p, useFile, expectedExcludedNodes); + runBalancerCli(conf, totalDfsUsedSpace, newNonDfsUsed, + totalCapacity, p, useFile, expectedExcludedNodes); } else { - runBalancer(conf, totalUsedSpace, totalCapacity, p, expectedExcludedNodes); + runBalancer(conf, totalDfsUsedSpace, newNonDfsUsed, + totalCapacity, p, expectedExcludedNodes, true); } } finally { if(cluster != null) { @@ -791,16 +797,18 @@ public class TestBalancer { BalancerParameters.DEFAULT, 0); } - private void runBalancer(Configuration conf, long totalUsedSpace, + private void runBalancer(Configuration conf, long totalDfsUsedSpace, long totalCapacity, BalancerParameters p, int excludedNodes) throws Exception { - runBalancer(conf, totalUsedSpace, totalCapacity, p, excludedNodes, true); + runBalancer(conf, totalDfsUsedSpace, 0, totalCapacity, p, excludedNodes, + true); } - private void runBalancer(Configuration conf, long totalUsedSpace, - long totalCapacity, BalancerParameters p, int excludedNodes, - boolean checkExcludeNodesUtilization) throws Exception { - waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); + private void runBalancer(Configuration conf, long totalDfsUsedSpace, + long totalNonDfsUsedSpace, long totalCapacity, BalancerParameters p, + int excludedNodes, boolean checkExcludeNodesUtilization) + throws Exception { + waitForHeartBeat(totalDfsUsedSpace, totalCapacity, client, cluster); int retry = 5; while (retry > 0) { @@ -816,9 +824,10 @@ public class TestBalancer { } else { assertEquals(ExitStatus.SUCCESS.getExitCode(), run); } - waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); + waitForHeartBeat(totalDfsUsedSpace, totalCapacity, client, cluster); LOG.info(" ."); try { + long totalUsedSpace = totalDfsUsedSpace + totalNonDfsUsedSpace; waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, excludedNodes, checkExcludeNodesUtilization); } catch (TimeoutException e) { @@ -892,10 +901,10 @@ public class TestBalancer { return ExitStatus.SUCCESS.getExitCode(); } - private void runBalancerCli(Configuration conf, long totalUsedSpace, - long totalCapacity, BalancerParameters p, boolean useFile, - int expectedExcludedNodes) throws Exception { - waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); + private void runBalancerCli(Configuration conf, long totalDfsUsedSpace, + long totalNonDfsUsedSpace, long totalCapacity, BalancerParameters p, + boolean useFile, int expectedExcludedNodes) throws Exception { + waitForHeartBeat(totalDfsUsedSpace, totalCapacity, client, cluster); List args = new ArrayList(); args.add("-policy"); args.add("datanode"); @@ -939,8 +948,9 @@ public class TestBalancer { final int r = tool.run(args.toArray(new String[0])); // start rebalancing assertEquals("Tools should exit 0 on success", 0, r); - waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); + waitForHeartBeat(totalDfsUsedSpace, totalCapacity, client, cluster); LOG.info("Rebalancing with default ctor."); + long totalUsedSpace = totalDfsUsedSpace + totalNonDfsUsedSpace; waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, expectedExcludedNodes); if (excludeHostsFile != null && excludeHostsFile.exists()) { @@ -1112,6 +1122,16 @@ public class TestBalancer { new String[]{RACK0, RACK1}, CAPACITY, RACK2); } + /** Test a cluster with even distribution, + * then a new node with nonDfsUsed is added to the cluster. */ + @Test(timeout=100000) + public void testBalancer3() throws Exception { + Configuration conf = new HdfsConfiguration(); + initConf(conf); + doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, + CAPACITY, 1000L, RACK2, null, false, false, false, 0.3); + } + private void testBalancerDefaultConstructor(Configuration conf, long[] capacities, String[] racks, long newCapacity, String newRack) throws Exception { @@ -1504,10 +1524,11 @@ public class TestBalancer { conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L); final int BLOCK_SIZE = 1024*1024; + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); cluster = new MiniDFSCluster .Builder(conf) .numDataNodes(1) - .storageCapacities(new long[] { BLOCK_SIZE * 10 }) + .simulatedCapacities(new long[]{BLOCK_SIZE * 10}) .storageTypes(new StorageType[] { DEFAULT }) .storagesPerDatanode(1) .build(); @@ -1517,11 +1538,12 @@ public class TestBalancer { final Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); DistributedFileSystem fs = cluster.getFileSystem(); - DFSTestUtil.createFile(fs, path1, BLOCK_SIZE, BLOCK_SIZE * 2, BLOCK_SIZE, + DFSTestUtil.createFile(fs, path1, BLOCK_SIZE, BLOCK_SIZE * 3, BLOCK_SIZE, (short) 1, SEED); // Add another DN with the same capacity, cluster is now unbalanced - cluster.startDataNodes(conf, 1, true, null, null); + cluster.startDataNodes(conf, 1, true, null, null, null, + new long[]{BLOCK_SIZE * 10}, false); cluster.triggerHeartbeats(); Collection namenodes = DFSUtil.getInternalNsRpcUris(conf); @@ -1773,7 +1795,7 @@ public class TestBalancer { pBuilder.setExcludedNodes(excludedList); // start balancer and check the failed num of moving task - runBalancer(conf, totalUsedSpace, totalCapacity, pBuilder.build(), + runBalancer(conf, totalUsedSpace, 0, totalCapacity, pBuilder.build(), excludedList.size(), false); // check total blocks, max wait time 60s @@ -1891,7 +1913,7 @@ public class TestBalancer { capacities[i] = CAPACITY; racks[i] = (i < numDNs/2 ? RACK0 : RACK1); } - doTest(conf, capacities, racks, CAPACITY, RACK2, + doTest(conf, capacities, racks, CAPACITY, 0L, RACK2, // Use only 1 node and set the starting capacity to 50% to allow the // balancing to complete in only one iteration. This is necessary // because the startGetBlocksTime and endGetBlocksTime measures across diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 0bb4c2930a4..1d479f34e02 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -162,6 +162,11 @@ public class SimulatedFSDataset implements FsDatasetSpi { private static final DatanodeStorage.State DEFAULT_STATE = DatanodeStorage.State.NORMAL; + public static final String CONFIG_PROPERTY_NONDFSUSED = + "dfs.datanode.simulateddatastorage.nondfsused"; + + public static final long DEFAULT_NONDFSUSED = 0L; + static final byte[] nullCrcFileData; private final DataNodeLockManager datasetLockManager; @@ -467,11 +472,12 @@ public class SimulatedFSDataset implements FsDatasetSpi { new ConcurrentHashMap<>(); private final long capacity; // in bytes + private long nonDfsUsed; private final DatanodeStorage dnStorage; private final SimulatedVolume volume; synchronized long getFree() { - return capacity - getUsed(); + return capacity - getUsed() - getNonDfsUsed(); } long getCapacity() { @@ -486,6 +492,10 @@ public class SimulatedFSDataset implements FsDatasetSpi { return used; } + synchronized long getNonDfsUsed() { + return nonDfsUsed; + } + synchronized long getBlockPoolUsed(String bpid) throws IOException { return getBPStorage(bpid).getUsed(); } @@ -506,7 +516,7 @@ public class SimulatedFSDataset implements FsDatasetSpi { getBPStorage(bpid).free(amount); } - SimulatedStorage(long cap, DatanodeStorage.State state, + SimulatedStorage(long cap, DatanodeStorage.State state, long nonDfsUsed, FileIoProvider fileIoProvider, Configuration conf) { capacity = cap; dnStorage = new DatanodeStorage( @@ -515,6 +525,7 @@ public class SimulatedFSDataset implements FsDatasetSpi { DataNodeVolumeMetrics volumeMetrics = DataNodeVolumeMetrics.create(conf, dnStorage.getStorageID()); this.volume = new SimulatedVolume(this, fileIoProvider, volumeMetrics); + this.nonDfsUsed = nonDfsUsed; } synchronized void addBlockPool(String bpid) { @@ -548,7 +559,7 @@ public class SimulatedFSDataset implements FsDatasetSpi { synchronized StorageReport getStorageReport(String bpid) { return new StorageReport(dnStorage, false, getCapacity(), getUsed(), getFree(), - map.get(bpid).getUsed(), 0L); + map.get(bpid).getUsed(), getNonDfsUsed()); } SimulatedVolume getVolume() { @@ -733,6 +744,7 @@ public class SimulatedFSDataset implements FsDatasetSpi { this.storages.add(new SimulatedStorage( conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY), conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE), + conf.getLong(CONFIG_PROPERTY_NONDFSUSED, DEFAULT_NONDFSUSED), fileIoProvider, conf)); } }