diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java index 8d5660e5ed1..91833444d94 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java @@ -57,9 +57,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; - /** * Worker class for Disk Balancer. *
@@ -864,7 +861,8 @@ public class DiskBalancer { * @param item DiskBalancerWorkItem * @return sleep delay in Milliseconds. */ - private long computeDelay(long bytesCopied, long timeUsed, + @VisibleForTesting + public long computeDelay(long bytesCopied, long timeUsed, DiskBalancerWorkItem item) { // we had an overflow, ignore this reading and continue. @@ -872,11 +870,15 @@ public class DiskBalancer { return 0; } final int megaByte = 1024 * 1024; + if (bytesCopied < megaByte) { + return 0; + } long bytesInMB = bytesCopied / megaByte; - long lastThroughput = bytesInMB / SECONDS.convert(timeUsed, - TimeUnit.MILLISECONDS); - long delay = (bytesInMB / getDiskBandwidth(item)) - lastThroughput; - return (delay <= 0) ? 0 : MILLISECONDS.convert(delay, TimeUnit.SECONDS); + + // converting disk bandwidth in MB/millisec + float bandwidth = getDiskBandwidth(item) / 1000f; + float delay = ((long) (bytesInMB / bandwidth) - timeUsed); + return (delay <= 0) ? 0 : (long) delay; } /** @@ -1112,7 +1114,8 @@ public class DiskBalancer { // to make sure that our promise is good on average. // Because we sleep, if a shutdown or cancel call comes in // we exit via Thread Interrupted exception. - Thread.sleep(computeDelay(block.getNumBytes(), timeUsed, item)); + Thread.sleep(computeDelay(block.getNumBytes(), TimeUnit.NANOSECONDS + .toMillis(timeUsed), item)); // We delay updating the info to avoid confusing the user. // This way we report the copy only if it is under the diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java index e7896944f64..af17e3fd74b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java @@ -222,6 +222,69 @@ public class TestDiskBalancer { } + @Test + public void testDiskBalancerComputeDelay() throws Exception { + + Configuration conf = new HdfsConfiguration(); + conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true); + + final int blockCount = 100; + final int blockSize = 11 * 1024 * 1024; + final int diskCount = 2; + final int dataNodeCount = 1; + final int dataNodeIndex = 0; + final long cap = blockSize * 2L * blockCount; + + conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize); + + final MiniDFSCluster cluster = new ClusterBuilder() + .setBlockCount(blockCount).setBlockSize(blockSize) + .setDiskCount(diskCount).setNumDatanodes(dataNodeCount).setConf(conf) + .setCapacities(new long[] {cap, cap }).build(); + + try { + DataNode node = cluster.getDataNodes().get(dataNodeIndex); + + final FsDatasetSpi> fsDatasetSpy = Mockito.spy(node.getFSDataset()); + DiskBalancerWorkItem item = Mockito.spy(new DiskBalancerWorkItem()); + // Mocking bandwidth as 10mb/sec. + Mockito.doReturn((long) 10).when(item).getBandwidth(); + + doAnswer(new Answer