HDFS-12072. Provide fairness between EC and non-EC recovery tasks. Contributed by Eddy Xu.

This commit is contained in:
Andrew Wang 2017-08-17 15:26:11 -07:00
parent ab1a8ae85f
commit b298948897
3 changed files with 108 additions and 39 deletions

View File

@ -661,7 +661,11 @@ public class DatanodeDescriptor extends DatanodeInfo {
return erasurecodeBlocks.size();
}
public List<BlockTargetPair> getReplicationCommand(int maxTransfers) {
int getNumberOfReplicateBlocks() {
return replicateBlocks.size();
}
List<BlockTargetPair> getReplicationCommand(int maxTransfers) {
return replicateBlocks.poll(maxTransfers);
}

View File

@ -1663,21 +1663,38 @@ public class DatanodeManager {
}
final List<DatanodeCommand> cmds = new ArrayList<>();
// check pending replication
// Allocate _approximately_ maxTransfers pending tasks to DataNode.
// NN chooses pending tasks based on the ratio between the lengths of
// replication and erasure-coded block queues.
int totalReplicateBlocks = nodeinfo.getNumberOfReplicateBlocks();
int totalECBlocks = nodeinfo.getNumberOfBlocksToBeErasureCoded();
int totalBlocks = totalReplicateBlocks + totalECBlocks;
if (totalBlocks > 0) {
int numReplicationTasks = (int) Math.ceil(
(double) (totalReplicateBlocks * maxTransfers) / totalBlocks);
int numECTasks = (int) Math.ceil(
(double) (totalECBlocks * maxTransfers) / totalBlocks);
if (LOG.isDebugEnabled()) {
LOG.debug("Pending replication tasks: " + numReplicationTasks
+ " erasure-coded tasks: " + numECTasks);
}
// check pending replication tasks
List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
maxTransfers);
if (pendingList != null) {
numReplicationTasks);
if (pendingList != null && !pendingList.isEmpty()) {
cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
pendingList));
maxTransfers -= pendingList.size();
}
// check pending erasure coding tasks
List<BlockECReconstructionInfo> pendingECList = nodeinfo
.getErasureCodeCommand(maxTransfers);
if (pendingECList != null) {
.getErasureCodeCommand(numECTasks);
if (pendingECList != null && !pendingECList.isEmpty()) {
cmds.add(new BlockECReconstructionCommand(
DNA_ERASURE_CODING_RECONSTRUCTION, pendingECList));
}
}
// check block invalidation
Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
if (blks != null) {

View File

@ -25,6 +25,7 @@ import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@ -500,46 +501,93 @@ public class TestDatanodeManager {
"127.0.0.1:23456", bothAgain.get(1).getInfoAddr());
}
@Test
public void testPendingRecoveryTasks() throws IOException {
/**
* Verify the correctness of pending recovery process.
*
* @param numReplicationBlocks the number of replication blocks in the queue.
* @param numECBlocks number of EC blocks in the queue.
* @param maxTransfers the maxTransfer value.
* @param numReplicationTasks the number of replication tasks polled from
* the queue.
* @param numECTasks the number of EC tasks polled from the queue.
*
* @throws IOException
*/
private void verifyPendingRecoveryTasks(
int numReplicationBlocks, int numECBlocks,
int maxTransfers, int numReplicationTasks, int numECTasks)
throws IOException {
FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
Mockito.when(fsn.hasWriteLock()).thenReturn(true);
Configuration conf = new Configuration();
DatanodeManager dm = Mockito.spy(mockDatanodeManager(fsn, conf));
int maxTransfers = 20;
int numPendingTasks = 7;
int numECTasks = maxTransfers - numPendingTasks;
DatanodeDescriptor nodeInfo = Mockito.mock(DatanodeDescriptor.class);
Mockito.when(nodeInfo.isRegistered()).thenReturn(true);
Mockito.when(nodeInfo.getStorageInfos())
.thenReturn(new DatanodeStorageInfo[0]);
List<BlockTargetPair> pendingList =
Collections.nCopies(numPendingTasks, new BlockTargetPair(null, null));
Mockito.when(nodeInfo.getReplicationCommand(maxTransfers))
.thenReturn(pendingList);
List<BlockECReconstructionInfo> ecPendingList =
Collections.nCopies(numECTasks, null);
if (numReplicationBlocks > 0) {
Mockito.when(nodeInfo.getNumberOfReplicateBlocks())
.thenReturn(numReplicationBlocks);
List<BlockTargetPair> tasks =
Collections.nCopies(
Math.min(numReplicationTasks, numReplicationBlocks),
new BlockTargetPair(null, null));
Mockito.when(nodeInfo.getReplicationCommand(numReplicationTasks))
.thenReturn(tasks);
}
if (numECBlocks > 0) {
Mockito.when(nodeInfo.getNumberOfBlocksToBeErasureCoded())
.thenReturn(numECBlocks);
List<BlockECReconstructionInfo> tasks =
Collections.nCopies(numECTasks, null);
Mockito.when(nodeInfo.getErasureCodeCommand(numECTasks))
.thenReturn(ecPendingList);
.thenReturn(tasks);
}
DatanodeRegistration dnReg = Mockito.mock(DatanodeRegistration.class);
Mockito.when(dm.getDatanode(dnReg)).thenReturn(nodeInfo);
DatanodeCommand[] cmds = dm.handleHeartbeat(
dnReg, new StorageReport[1], "bp-123", 0, 0, 10, maxTransfers, 0, null,
SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT);
assertEquals(2, cmds.length);
assertTrue(cmds[0] instanceof BlockCommand);
BlockCommand replicaCmd = (BlockCommand) cmds[0];
assertEquals(numPendingTasks, replicaCmd.getBlocks().length);
assertEquals(numPendingTasks, replicaCmd.getTargets().length);
assertTrue(cmds[1] instanceof BlockECReconstructionCommand);
BlockECReconstructionCommand ecRecoveryCmd =
(BlockECReconstructionCommand) cmds[1];
assertEquals(numECTasks, ecRecoveryCmd.getECTasks().size());
long expectedNumCmds = Arrays.stream(
new int[]{numReplicationTasks, numECTasks})
.filter(x -> x > 0)
.count();
assertEquals(expectedNumCmds, cmds.length);
int idx = 0;
if (numReplicationTasks > 0) {
assertTrue(cmds[idx] instanceof BlockCommand);
BlockCommand cmd = (BlockCommand) cmds[0];
assertEquals(numReplicationTasks, cmd.getBlocks().length);
assertEquals(numReplicationTasks, cmd.getTargets().length);
idx++;
}
if (numECTasks > 0) {
assertTrue(cmds[idx] instanceof BlockECReconstructionCommand);
BlockECReconstructionCommand cmd =
(BlockECReconstructionCommand) cmds[idx];
assertEquals(numECTasks, cmd.getECTasks().size());
}
Mockito.verify(nodeInfo).getReplicationCommand(numReplicationTasks);
Mockito.verify(nodeInfo).getErasureCodeCommand(numECTasks);
}
@Test
public void testPendingRecoveryTasks() throws IOException {
// Tasks are slitted according to the ratio between queue lengths.
verifyPendingRecoveryTasks(20, 20, 20, 10, 10);
verifyPendingRecoveryTasks(40, 10, 20, 16, 4);
// Approximately load tasks if the ratio between queue length is large.
verifyPendingRecoveryTasks(400, 1, 20, 20, 1);
}
}