diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java index 53db022dbf5..75e5d9ad3d1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java @@ -57,9 +57,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; - /** * Worker class for Disk Balancer. *

@@ -864,7 +861,8 @@ public class DiskBalancer { * @param item DiskBalancerWorkItem * @return sleep delay in Milliseconds. */ - private long computeDelay(long bytesCopied, long timeUsed, + @VisibleForTesting + public long computeDelay(long bytesCopied, long timeUsed, DiskBalancerWorkItem item) { // we had an overflow, ignore this reading and continue. @@ -872,11 +870,15 @@ public class DiskBalancer { return 0; } final int megaByte = 1024 * 1024; + if (bytesCopied < megaByte) { + return 0; + } long bytesInMB = bytesCopied / megaByte; - long lastThroughput = bytesInMB / SECONDS.convert(timeUsed, - TimeUnit.MILLISECONDS); - long delay = (bytesInMB / getDiskBandwidth(item)) - lastThroughput; - return (delay <= 0) ? 0 : MILLISECONDS.convert(delay, TimeUnit.SECONDS); + + // converting disk bandwidth in MB/millisec + float bandwidth = getDiskBandwidth(item) / 1000f; + float delay = ((long) (bytesInMB / bandwidth) - timeUsed); + return (delay <= 0) ? 0 : (long) delay; } /** @@ -1114,7 +1116,8 @@ public class DiskBalancer { // to make sure that our promise is good on average. // Because we sleep, if a shutdown or cancel call comes in // we exit via Thread Interrupted exception. - Thread.sleep(computeDelay(block.getNumBytes(), timeUsed, item)); + Thread.sleep(computeDelay(block.getNumBytes(), TimeUnit.NANOSECONDS + .toMillis(timeUsed), item)); // We delay updating the info to avoid confusing the user. // This way we report the copy only if it is under the diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java index 95984fca749..094da09ae83 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java @@ -222,6 +222,69 @@ public class TestDiskBalancer { } + @Test + public void testDiskBalancerComputeDelay() throws Exception { + + Configuration conf = new HdfsConfiguration(); + conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true); + + final int blockCount = 100; + final int blockSize = 11 * 1024 * 1024; + final int diskCount = 2; + final int dataNodeCount = 1; + final int dataNodeIndex = 0; + final long cap = blockSize * 2L * blockCount; + + conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize); + + final MiniDFSCluster cluster = new ClusterBuilder() + .setBlockCount(blockCount).setBlockSize(blockSize) + .setDiskCount(diskCount).setNumDatanodes(dataNodeCount).setConf(conf) + .setCapacities(new long[] {cap, cap }).build(); + + try { + DataNode node = cluster.getDataNodes().get(dataNodeIndex); + + final FsDatasetSpi fsDatasetSpy = Mockito.spy(node.getFSDataset()); + DiskBalancerWorkItem item = Mockito.spy(new DiskBalancerWorkItem()); + // Mocking bandwidth as 10mb/sec. + Mockito.doReturn((long) 10).when(item).getBandwidth(); + + doAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) { + try { + node.getFSDataset().moveBlockAcrossVolumes( + (ExtendedBlock) invocation.getArguments()[0], + (FsVolumeSpi) invocation.getArguments()[1]); + } catch (Exception e) { + LOG.error(e.getMessage()); + } + return null; + } + }).when(fsDatasetSpy).moveBlockAcrossVolumes(any(ExtendedBlock.class), + any(FsVolumeSpi.class)); + + DiskBalancerMover diskBalancerMover = new DiskBalancerMover(fsDatasetSpy, + conf); + + diskBalancerMover.setRunnable(); + + // bytesCopied - 20 * 1024 *1024 byteCopied. + // timeUsed - 1200 in milliseconds + // item - set DiskBalancerWorkItem bandwidth as 10 + // Expect return sleep delay in Milliseconds. sleep value = bytesCopied / + // (1024*1024*bandwidth in MB/milli) - timeUsed; + long val = diskBalancerMover.computeDelay(20 * 1024 * 1024, 1200, item); + Assert.assertEquals(val, (long) 800); + } catch (Exception e) { + Assert.fail("Unexpected exception: " + e); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } @Test public void testDiskBalancerWithFedClusterWithOneNameServiceEmpty() throws