HDFS-14202. dfs.disk.balancer.max.disk.throughputInMBperSec property is not working as per set value. Contributed by Ranith Sardar.
(cherry picked from commit0e79a86582
) (cherry picked from commitfc8a7a9e5b
)
This commit is contained in:
parent
7a915a0ba3
commit
23eed7b201
|
@ -57,9 +57,6 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
|
||||
/**
|
||||
* Worker class for Disk Balancer.
|
||||
* <p>
|
||||
|
@ -864,7 +861,8 @@ public class DiskBalancer {
|
|||
* @param item DiskBalancerWorkItem
|
||||
* @return sleep delay in Milliseconds.
|
||||
*/
|
||||
private long computeDelay(long bytesCopied, long timeUsed,
|
||||
@VisibleForTesting
|
||||
public long computeDelay(long bytesCopied, long timeUsed,
|
||||
DiskBalancerWorkItem item) {
|
||||
|
||||
// we had an overflow, ignore this reading and continue.
|
||||
|
@ -872,11 +870,15 @@ public class DiskBalancer {
|
|||
return 0;
|
||||
}
|
||||
final int megaByte = 1024 * 1024;
|
||||
if (bytesCopied < megaByte) {
|
||||
return 0;
|
||||
}
|
||||
long bytesInMB = bytesCopied / megaByte;
|
||||
long lastThroughput = bytesInMB / SECONDS.convert(timeUsed,
|
||||
TimeUnit.MILLISECONDS);
|
||||
long delay = (bytesInMB / getDiskBandwidth(item)) - lastThroughput;
|
||||
return (delay <= 0) ? 0 : MILLISECONDS.convert(delay, TimeUnit.SECONDS);
|
||||
|
||||
// converting disk bandwidth in MB/millisec
|
||||
float bandwidth = getDiskBandwidth(item) / 1000f;
|
||||
float delay = ((long) (bytesInMB / bandwidth) - timeUsed);
|
||||
return (delay <= 0) ? 0 : (long) delay;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1112,7 +1114,8 @@ public class DiskBalancer {
|
|||
// to make sure that our promise is good on average.
|
||||
// Because we sleep, if a shutdown or cancel call comes in
|
||||
// we exit via Thread Interrupted exception.
|
||||
Thread.sleep(computeDelay(block.getNumBytes(), timeUsed, item));
|
||||
Thread.sleep(computeDelay(block.getNumBytes(), TimeUnit.NANOSECONDS
|
||||
.toMillis(timeUsed), item));
|
||||
|
||||
// We delay updating the info to avoid confusing the user.
|
||||
// This way we report the copy only if it is under the
|
||||
|
|
|
@ -222,6 +222,69 @@ public class TestDiskBalancer {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDiskBalancerComputeDelay() throws Exception {
|
||||
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
|
||||
|
||||
final int blockCount = 100;
|
||||
final int blockSize = 11 * 1024 * 1024;
|
||||
final int diskCount = 2;
|
||||
final int dataNodeCount = 1;
|
||||
final int dataNodeIndex = 0;
|
||||
final long cap = blockSize * 2L * blockCount;
|
||||
|
||||
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
|
||||
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize);
|
||||
|
||||
final MiniDFSCluster cluster = new ClusterBuilder()
|
||||
.setBlockCount(blockCount).setBlockSize(blockSize)
|
||||
.setDiskCount(diskCount).setNumDatanodes(dataNodeCount).setConf(conf)
|
||||
.setCapacities(new long[] {cap, cap }).build();
|
||||
|
||||
try {
|
||||
DataNode node = cluster.getDataNodes().get(dataNodeIndex);
|
||||
|
||||
final FsDatasetSpi<?> fsDatasetSpy = Mockito.spy(node.getFSDataset());
|
||||
DiskBalancerWorkItem item = Mockito.spy(new DiskBalancerWorkItem());
|
||||
// Mocking bandwidth as 10mb/sec.
|
||||
Mockito.doReturn((long) 10).when(item).getBandwidth();
|
||||
|
||||
doAnswer(new Answer<Object>() {
|
||||
public Object answer(InvocationOnMock invocation) {
|
||||
try {
|
||||
node.getFSDataset().moveBlockAcrossVolumes(
|
||||
(ExtendedBlock) invocation.getArguments()[0],
|
||||
(FsVolumeSpi) invocation.getArguments()[1]);
|
||||
} catch (Exception e) {
|
||||
LOG.error(e.getMessage());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}).when(fsDatasetSpy).moveBlockAcrossVolumes(any(ExtendedBlock.class),
|
||||
any(FsVolumeSpi.class));
|
||||
|
||||
DiskBalancerMover diskBalancerMover = new DiskBalancerMover(fsDatasetSpy,
|
||||
conf);
|
||||
|
||||
diskBalancerMover.setRunnable();
|
||||
|
||||
// bytesCopied - 20 * 1024 *1024 byteCopied.
|
||||
// timeUsed - 1200 in milliseconds
|
||||
// item - set DiskBalancerWorkItem bandwidth as 10
|
||||
// Expect return sleep delay in Milliseconds. sleep value = bytesCopied /
|
||||
// (1024*1024*bandwidth in MB/milli) - timeUsed;
|
||||
long val = diskBalancerMover.computeDelay(20 * 1024 * 1024, 1200, item);
|
||||
Assert.assertEquals(val, (long) 800);
|
||||
} catch (Exception e) {
|
||||
Assert.fail("Unexpected exception: " + e);
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDiskBalancerWithFedClusterWithOneNameServiceEmpty() throws
|
||||
|
|
Loading…
Reference in New Issue