HDFS-14202. dfs.disk.balancer.max.disk.throughputInMBperSec property is not working as per set value. Contributed by Ranith Sardar.

(cherry picked from commit 0e79a86582)
This commit is contained in:
Inigo Goiri 2019-02-04 11:59:48 -08:00 committed by Wei-Chiu Chuang
parent 06b2eceb76
commit fc8a7a9e5b
2 changed files with 75 additions and 9 deletions

View File

@ -57,9 +57,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
/**
* Worker class for Disk Balancer.
* <p>
@ -864,7 +861,8 @@ public class DiskBalancer {
* @param item DiskBalancerWorkItem
* @return sleep delay in Milliseconds.
*/
private long computeDelay(long bytesCopied, long timeUsed,
@VisibleForTesting
public long computeDelay(long bytesCopied, long timeUsed,
DiskBalancerWorkItem item) {
// we had an overflow, ignore this reading and continue.
@ -872,11 +870,15 @@ public class DiskBalancer {
return 0;
}
final int megaByte = 1024 * 1024;
if (bytesCopied < megaByte) {
return 0;
}
long bytesInMB = bytesCopied / megaByte;
long lastThroughput = bytesInMB / SECONDS.convert(timeUsed,
TimeUnit.MILLISECONDS);
long delay = (bytesInMB / getDiskBandwidth(item)) - lastThroughput;
return (delay <= 0) ? 0 : MILLISECONDS.convert(delay, TimeUnit.SECONDS);
// converting disk bandwidth in MB/millisec
float bandwidth = getDiskBandwidth(item) / 1000f;
float delay = ((long) (bytesInMB / bandwidth) - timeUsed);
return (delay <= 0) ? 0 : (long) delay;
}
/**
@ -1112,7 +1114,8 @@ public class DiskBalancer {
// to make sure that our promise is good on average.
// Because we sleep, if a shutdown or cancel call comes in
// we exit via Thread Interrupted exception.
Thread.sleep(computeDelay(block.getNumBytes(), timeUsed, item));
Thread.sleep(computeDelay(block.getNumBytes(), TimeUnit.NANOSECONDS
.toMillis(timeUsed), item));
// We delay updating the info to avoid confusing the user.
// This way we report the copy only if it is under the

View File

@ -222,6 +222,69 @@ public class TestDiskBalancer {
}
@Test
public void testDiskBalancerComputeDelay() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
final int blockCount = 100;
final int blockSize = 11 * 1024 * 1024;
final int diskCount = 2;
final int dataNodeCount = 1;
final int dataNodeIndex = 0;
final long cap = blockSize * 2L * blockCount;
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize);
final MiniDFSCluster cluster = new ClusterBuilder()
.setBlockCount(blockCount).setBlockSize(blockSize)
.setDiskCount(diskCount).setNumDatanodes(dataNodeCount).setConf(conf)
.setCapacities(new long[] {cap, cap }).build();
try {
DataNode node = cluster.getDataNodes().get(dataNodeIndex);
final FsDatasetSpi<?> fsDatasetSpy = Mockito.spy(node.getFSDataset());
DiskBalancerWorkItem item = Mockito.spy(new DiskBalancerWorkItem());
// Mocking bandwidth as 10mb/sec.
Mockito.doReturn((long) 10).when(item).getBandwidth();
doAnswer(new Answer<Object>() {
public Object answer(InvocationOnMock invocation) {
try {
node.getFSDataset().moveBlockAcrossVolumes(
(ExtendedBlock) invocation.getArguments()[0],
(FsVolumeSpi) invocation.getArguments()[1]);
} catch (Exception e) {
LOG.error(e.getMessage());
}
return null;
}
}).when(fsDatasetSpy).moveBlockAcrossVolumes(any(ExtendedBlock.class),
any(FsVolumeSpi.class));
DiskBalancerMover diskBalancerMover = new DiskBalancerMover(fsDatasetSpy,
conf);
diskBalancerMover.setRunnable();
// bytesCopied - 20 * 1024 *1024 byteCopied.
// timeUsed - 1200 in milliseconds
// item - set DiskBalancerWorkItem bandwidth as 10
// Expect return sleep delay in Milliseconds. sleep value = bytesCopied /
// (1024*1024*bandwidth in MB/milli) - timeUsed;
long val = diskBalancerMover.computeDelay(20 * 1024 * 1024, 1200, item);
Assert.assertEquals(val, (long) 800);
} catch (Exception e) {
Assert.fail("Unexpected exception: " + e);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test
public void testDiskBalancerWithFedClusterWithOneNameServiceEmpty() throws