HDFS-9731. Erasure Coding: Rename BlockECRecoveryCommand to BlockECReconstructionCommand. Contributed by Rakesh R.

Change-Id: I405365a8395770e494b92bfe9651f4f0366d8f28
This commit is contained in:
zhezhang 2016-02-02 12:31:43 -08:00
parent 913676dc35
commit 4ae543fdcd
16 changed files with 329 additions and 298 deletions

View File

@ -47,9 +47,9 @@ message GetErasureCodingPolicyResponseProto {
}
/**
* Block erasure coding recovery info
* Block erasure coding reconstruction info
*/
message BlockECRecoveryInfoProto {
message BlockECReconstructionInfoProto {
required ExtendedBlockProto block = 1;
required DatanodeInfosProto sourceDnInfos = 2;
required DatanodeInfosProto targetDnInfos = 3;

View File

@ -912,6 +912,9 @@ Trunk (Unreleased)
HDFS-9659. EditLogTailerThread to Active Namenode RPC should timeout
(surendra singh lilhore via vinayakumarb)
HDFS-9731. Erasure Coding: Rename BlockECRecoveryCommand to
BlockECReconstructionCommand. (Rakesh R via zhz)
Release 2.9.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -465,14 +465,16 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT = 21600;
public static final String DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY = "dfs.datanode.directoryscan.threads";
public static final int DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT = 1;
public static final String DFS_DATANODE_STRIPED_READ_THREADS_KEY = "dfs.datanode.stripedread.threads";
public static final int DFS_DATANODE_STRIPED_READ_THREADS_DEFAULT = 20;
public static final String DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY = "dfs.datanode.stripedread.buffer.size";
public static final int DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT = 64 * 1024;
public static final String DFS_DATANODE_STRIPED_READ_TIMEOUT_MILLIS_KEY = "dfs.datanode.stripedread.timeout.millis";
public static final int DFS_DATANODE_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT = 5000; //5s
public static final String DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_KEY = "dfs.datanode.striped.blockrecovery.threads.size";
public static final int DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_DEFAULT = 8;
public static final String DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_THREADS_KEY = "dfs.datanode.ec.reconstruction.stripedread.threads";
public static final int DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_THREADS_DEFAULT = 20;
public static final String DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY = "dfs.datanode.ec.reconstruction.stripedread.buffer.size";
public static final int DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_DEFAULT = 64 * 1024;
public static final String DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY = "dfs.datanode.ec.reconstruction.stripedread.timeout.millis";
public static final int DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT = 5000; //5s
public static final String DFS_DN_EC_RECONSTRUCTION_STRIPED_BLK_THREADS_KEY = "dfs.datanode.ec.reconstruction.stripedblock.threads.size";
public static final int DFS_DN_EC_RECONSTRUCTION_STRIPED_BLK_THREADS_DEFAULT = 8;
public static final String
DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY =
"dfs.datanode.directoryscan.throttle.limit.ms.per.sec";

View File

@ -36,7 +36,7 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECRecoveryCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECReconstructionCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
@ -48,7 +48,7 @@
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECRecoveryInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstructionInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
@ -82,10 +82,10 @@
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
@ -453,8 +453,8 @@ public static DatanodeCommand convert(DatanodeCommandProto proto) {
return REG_CMD;
case BlockIdCommand:
return PBHelper.convert(proto.getBlkIdCmd());
case BlockECRecoveryCommand:
return PBHelper.convert(proto.getBlkECRecoveryCmd());
case BlockECReconstructionCommand:
return PBHelper.convert(proto.getBlkECReconstructionCmd());
default:
return null;
}
@ -584,10 +584,10 @@ public static DatanodeCommandProto convert(DatanodeCommand datanodeCommand) {
builder.setCmdType(DatanodeCommandProto.Type.BlockIdCommand).
setBlkIdCmd(PBHelper.convert((BlockIdCommand) datanodeCommand));
break;
case DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY:
builder.setCmdType(DatanodeCommandProto.Type.BlockECRecoveryCommand)
.setBlkECRecoveryCmd(
convert((BlockECRecoveryCommand) datanodeCommand));
case DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION:
builder.setCmdType(DatanodeCommandProto.Type.BlockECReconstructionCommand)
.setBlkECReconstructionCmd(
convert((BlockECReconstructionCommand) datanodeCommand));
break;
case DatanodeProtocol.DNA_UNKNOWN: //Not expected
default:
@ -873,42 +873,42 @@ private static String[] convert(HdfsProtos.StorageUuidsProto targetStorageUuidsP
return storageUuids;
}
public static BlockECRecoveryInfo convertBlockECRecoveryInfo(
BlockECRecoveryInfoProto blockEcRecoveryInfoProto) {
ExtendedBlockProto blockProto = blockEcRecoveryInfoProto.getBlock();
public static BlockECReconstructionInfo convertBlockECReconstructionInfo(
BlockECReconstructionInfoProto blockEcReconstructionInfoProto) {
ExtendedBlockProto blockProto = blockEcReconstructionInfoProto.getBlock();
ExtendedBlock block = PBHelperClient.convert(blockProto);
DatanodeInfosProto sourceDnInfosProto = blockEcRecoveryInfoProto
DatanodeInfosProto sourceDnInfosProto = blockEcReconstructionInfoProto
.getSourceDnInfos();
DatanodeInfo[] sourceDnInfos = PBHelperClient.convert(sourceDnInfosProto);
DatanodeInfosProto targetDnInfosProto = blockEcRecoveryInfoProto
DatanodeInfosProto targetDnInfosProto = blockEcReconstructionInfoProto
.getTargetDnInfos();
DatanodeInfo[] targetDnInfos = PBHelperClient.convert(targetDnInfosProto);
HdfsProtos.StorageUuidsProto targetStorageUuidsProto = blockEcRecoveryInfoProto
.getTargetStorageUuids();
HdfsProtos.StorageUuidsProto targetStorageUuidsProto =
blockEcReconstructionInfoProto.getTargetStorageUuids();
String[] targetStorageUuids = convert(targetStorageUuidsProto);
StorageTypesProto targetStorageTypesProto = blockEcRecoveryInfoProto
StorageTypesProto targetStorageTypesProto = blockEcReconstructionInfoProto
.getTargetStorageTypes();
StorageType[] convertStorageTypes = PBHelperClient.convertStorageTypes(
targetStorageTypesProto.getStorageTypesList(), targetStorageTypesProto
.getStorageTypesList().size());
byte[] liveBlkIndices = blockEcRecoveryInfoProto.getLiveBlockIndices()
byte[] liveBlkIndices = blockEcReconstructionInfoProto.getLiveBlockIndices()
.toByteArray();
ErasureCodingPolicy ecPolicy =
PBHelperClient.convertErasureCodingPolicy(
blockEcRecoveryInfoProto.getEcPolicy());
return new BlockECRecoveryInfo(block, sourceDnInfos, targetDnInfos,
blockEcReconstructionInfoProto.getEcPolicy());
return new BlockECReconstructionInfo(block, sourceDnInfos, targetDnInfos,
targetStorageUuids, convertStorageTypes, liveBlkIndices, ecPolicy);
}
public static BlockECRecoveryInfoProto convertBlockECRecoveryInfo(
BlockECRecoveryInfo blockEcRecoveryInfo) {
BlockECRecoveryInfoProto.Builder builder = BlockECRecoveryInfoProto
.newBuilder();
public static BlockECReconstructionInfoProto convertBlockECRecoveryInfo(
BlockECReconstructionInfo blockEcRecoveryInfo) {
BlockECReconstructionInfoProto.Builder builder =
BlockECReconstructionInfoProto.newBuilder();
builder.setBlock(PBHelperClient.convert(
blockEcRecoveryInfo.getExtendedBlock()));
@ -934,29 +934,31 @@ public static BlockECRecoveryInfoProto convertBlockECRecoveryInfo(
return builder.build();
}
public static BlockECRecoveryCommandProto convert(
BlockECRecoveryCommand blkECRecoveryCmd) {
BlockECRecoveryCommandProto.Builder builder = BlockECRecoveryCommandProto
.newBuilder();
Collection<BlockECRecoveryInfo> blockECRecoveryInfos = blkECRecoveryCmd
.getECTasks();
for (BlockECRecoveryInfo blkECRecoveryInfo : blockECRecoveryInfos) {
builder
.addBlockECRecoveryinfo(convertBlockECRecoveryInfo(blkECRecoveryInfo));
public static BlockECReconstructionCommandProto convert(
BlockECReconstructionCommand blkECReconstructionCmd) {
BlockECReconstructionCommandProto.Builder builder =
BlockECReconstructionCommandProto.newBuilder();
Collection<BlockECReconstructionInfo> blockECRInfos =
blkECReconstructionCmd.getECTasks();
for (BlockECReconstructionInfo blkECReconstructInfo : blockECRInfos) {
builder.addBlockECReconstructioninfo(
convertBlockECRecoveryInfo(blkECReconstructInfo));
}
return builder.build();
}
public static BlockECRecoveryCommand convert(
BlockECRecoveryCommandProto blkECRecoveryCmdProto) {
Collection<BlockECRecoveryInfo> blkECRecoveryInfos = new ArrayList<>();
List<BlockECRecoveryInfoProto> blockECRecoveryinfoList = blkECRecoveryCmdProto
.getBlockECRecoveryinfoList();
for (BlockECRecoveryInfoProto blockECRecoveryInfoProto : blockECRecoveryinfoList) {
blkECRecoveryInfos
.add(convertBlockECRecoveryInfo(blockECRecoveryInfoProto));
public static BlockECReconstructionCommand convert(
BlockECReconstructionCommandProto blkECReconstructionCmdProto) {
Collection<BlockECReconstructionInfo> blkECReconstructionInfos =
new ArrayList<>();
List<BlockECReconstructionInfoProto> blkECRInfoList =
blkECReconstructionCmdProto.getBlockECReconstructioninfoList();
for (BlockECReconstructionInfoProto blkECRInfoProto : blkECRInfoList) {
blkECReconstructionInfos
.add(convertBlockECReconstructionInfo(blkECRInfoProto));
}
return new BlockECRecoveryCommand(DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY,
blkECRecoveryInfos);
return new BlockECReconstructionCommand(
DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION,
blkECReconstructionInfos);
}
}

View File

@ -42,7 +42,7 @@
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
@ -204,7 +204,7 @@ public Type getType() {
private final BlockQueue<BlockTargetPair> replicateBlocks =
new BlockQueue<>();
/** A queue of blocks to be erasure coded by this datanode */
private final BlockQueue<BlockECRecoveryInfo> erasurecodeBlocks =
private final BlockQueue<BlockECReconstructionInfo> erasurecodeBlocks =
new BlockQueue<>();
/** A queue of blocks to be recovered by this datanode */
private final BlockQueue<BlockInfo> recoverBlocks = new BlockQueue<>();
@ -605,8 +605,8 @@ void addBlockToBeErasureCoded(ExtendedBlock block,
DatanodeDescriptor[] sources, DatanodeStorageInfo[] targets,
byte[] liveBlockIndices, ErasureCodingPolicy ecPolicy) {
assert (block != null && sources != null && sources.length > 0);
BlockECRecoveryInfo task = new BlockECRecoveryInfo(block, sources, targets,
liveBlockIndices, ecPolicy);
BlockECReconstructionInfo task = new BlockECReconstructionInfo(block,
sources, targets, liveBlockIndices, ecPolicy);
erasurecodeBlocks.offer(task);
BlockManager.LOG.debug("Adding block recovery task " + task + "to "
+ getName() + ", current queue size is " + erasurecodeBlocks.size());
@ -655,7 +655,8 @@ public List<BlockTargetPair> getReplicationCommand(int maxTransfers) {
return replicateBlocks.poll(maxTransfers);
}
public List<BlockECRecoveryInfo> getErasureCodeCommand(int maxTransfers) {
public List<BlockECReconstructionInfo> getErasureCodeCommand(
int maxTransfers) {
return erasurecodeBlocks.poll(maxTransfers);
}

View File

@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY;
import static org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION;
import static org.apache.hadoop.util.Time.monotonicNow;
import com.google.common.annotations.VisibleForTesting;
@ -40,7 +40,7 @@
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.*;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
import org.apache.hadoop.ipc.Server;
@ -1455,11 +1455,11 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
pendingList));
}
// check pending erasure coding tasks
List<BlockECRecoveryInfo> pendingECList = nodeinfo.getErasureCodeCommand(
maxTransfers);
List<BlockECReconstructionInfo> pendingECList = nodeinfo
.getErasureCodeCommand(maxTransfers);
if (pendingECList != null) {
cmds.add(new BlockECRecoveryCommand(DNA_ERASURE_CODING_RECOVERY,
pendingECList));
cmds.add(new BlockECReconstructionCommand(
DNA_ERASURE_CODING_RECONSTRUCTION, pendingECList));
}
// check block invalidation
Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);

View File

@ -35,7 +35,8 @@ public ErasureCodingWork(BlockInfo block,
super(block, bc, srcNodes, containingNodes,
liveReplicaStorages, additionalReplRequired, priority);
this.liveBlockIndicies = liveBlockIndicies;
BlockManager.LOG.debug("Creating an ErasureCodingWork to recover " + block);
BlockManager.LOG.debug("Creating an ErasureCodingWork to {} reconstruct ",
block);
}
byte[] getLiveBlockIndicies() {

View File

@ -31,7 +31,7 @@
import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.protocol.*;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
import org.slf4j.Logger;
@ -725,9 +725,10 @@ assert getBlockPoolId().equals(bp) :
dxcs.balanceThrottler.setBandwidth(bandwidth);
}
break;
case DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY:
case DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION:
LOG.info("DatanodeCommand action: DNA_ERASURE_CODING_RECOVERY");
Collection<BlockECRecoveryInfo> ecTasks = ((BlockECRecoveryCommand) cmd).getECTasks();
Collection<BlockECReconstructionInfo> ecTasks =
((BlockECReconstructionCommand) cmd).getECTasks();
dn.getErasureCodingWorker().processErasureCodingTasks(ecTasks);
break;
default:
@ -759,7 +760,7 @@ private boolean processCommandFromStandby(DatanodeCommand cmd,
case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:
case DatanodeProtocol.DNA_CACHE:
case DatanodeProtocol.DNA_UNCACHE:
case DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY:
case DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION:
LOG.warn("Got a command from standby NN - ignoring command:" + cmd.getAction());
break;
default:

View File

@ -67,7 +67,7 @@
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
import org.apache.hadoop.io.IOUtils;
@ -83,10 +83,10 @@
import org.slf4j.Logger;
/**
* ErasureCodingWorker handles the erasure coding recovery work commands. These
* commands would be issued from Namenode as part of Datanode's heart beat
* response. BPOfferService delegates the work to this class for handling EC
* commands.
* ErasureCodingWorker handles the erasure coding reconstruction work commands.
* These commands would be issued from Namenode as part of Datanode's heart
* beat response. BPOfferService delegates the work to this class for handling
* EC commands.
*/
@InterfaceAudience.Private
public final class ErasureCodingWorker {
@ -95,28 +95,28 @@ public final class ErasureCodingWorker {
private final DataNode datanode;
private final Configuration conf;
private ThreadPoolExecutor STRIPED_BLK_RECOVERY_THREAD_POOL;
private ThreadPoolExecutor STRIPED_READ_THREAD_POOL;
private final int STRIPED_READ_TIMEOUT_MILLIS;
private final int STRIPED_READ_BUFFER_SIZE;
private ThreadPoolExecutor EC_RECONSTRUCTION_STRIPED_BLK_THREAD_POOL;
private ThreadPoolExecutor EC_RECONSTRUCTION_STRIPED_READ_THREAD_POOL;
private final int EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS;
private final int EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE;
public ErasureCodingWorker(Configuration conf, DataNode datanode) {
this.datanode = datanode;
this.conf = conf;
STRIPED_READ_TIMEOUT_MILLIS = conf.getInt(
DFSConfigKeys.DFS_DATANODE_STRIPED_READ_TIMEOUT_MILLIS_KEY,
DFSConfigKeys.DFS_DATANODE_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT);
EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS = conf.getInt(
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY,
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT);
initializeStripedReadThreadPool(conf.getInt(
DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THREADS_KEY,
DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THREADS_DEFAULT));
STRIPED_READ_BUFFER_SIZE = conf.getInt(
DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY,
DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT);
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_THREADS_KEY,
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_THREADS_DEFAULT));
EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE = conf.getInt(
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY,
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_DEFAULT);
initializeStripedBlkRecoveryThreadPool(conf.getInt(
DFSConfigKeys.DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_KEY,
DFSConfigKeys.DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_DEFAULT));
initializeStripedBlkReconstructionThreadPool(conf.getInt(
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_BLK_THREADS_KEY,
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_BLK_THREADS_DEFAULT));
}
private RawErasureDecoder newDecoder(int numDataUnits, int numParityUnits) {
@ -126,8 +126,8 @@ private RawErasureDecoder newDecoder(int numDataUnits, int numParityUnits) {
private void initializeStripedReadThreadPool(int num) {
LOG.debug("Using striped reads; pool threads=" + num);
STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
EC_RECONSTRUCTION_STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num,
60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new Daemon.DaemonFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);
@ -146,48 +146,50 @@ public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) {
super.rejectedExecution(runnable, e);
}
});
STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true);
EC_RECONSTRUCTION_STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true);
}
private void initializeStripedBlkRecoveryThreadPool(int num) {
LOG.debug("Using striped block recovery; pool threads=" + num);
STRIPED_BLK_RECOVERY_THREAD_POOL = new ThreadPoolExecutor(2, num, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
private void initializeStripedBlkReconstructionThreadPool(int num) {
LOG.debug("Using striped block reconstruction; pool threads=" + num);
EC_RECONSTRUCTION_STRIPED_BLK_THREAD_POOL = new ThreadPoolExecutor(2, num,
60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new Daemon.DaemonFactory() {
private final AtomicInteger threadIdx = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = super.newThread(r);
t.setName("stripedBlockRecovery-" + threadIdx.getAndIncrement());
t.setName(
"stripedBlockReconstruction-" + threadIdx.getAndIncrement());
return t;
}
});
STRIPED_BLK_RECOVERY_THREAD_POOL.allowCoreThreadTimeOut(true);
EC_RECONSTRUCTION_STRIPED_BLK_THREAD_POOL.allowCoreThreadTimeOut(true);
}
/**
* Handles the Erasure Coding recovery work commands.
*
* Handles the Erasure Coding reconstruction work commands.
*
* @param ecTasks
* BlockECRecoveryInfo
* BlockECReconstructionInfo
*/
public void processErasureCodingTasks(Collection<BlockECRecoveryInfo> ecTasks) {
for (BlockECRecoveryInfo recoveryInfo : ecTasks) {
public void processErasureCodingTasks(
Collection<BlockECReconstructionInfo> ecTasks) {
for (BlockECReconstructionInfo reconstructionInfo : ecTasks) {
try {
STRIPED_BLK_RECOVERY_THREAD_POOL
.submit(new ReconstructAndTransferBlock(recoveryInfo));
EC_RECONSTRUCTION_STRIPED_BLK_THREAD_POOL
.submit(new ReconstructAndTransferBlock(reconstructionInfo));
} catch (Throwable e) {
LOG.warn("Failed to recover striped block "
+ recoveryInfo.getExtendedBlock().getLocalBlock(), e);
LOG.warn("Failed to reconstruct striped block "
+ reconstructionInfo.getExtendedBlock().getLocalBlock(), e);
}
}
}
/**
* ReconstructAndTransferBlock recover one or more missed striped block in the
* striped block group, the minimum number of live striped blocks should be
* no less than data block number.
* ReconstructAndTransferBlock reconstruct one or more missed striped block
* in the striped block group, the minimum number of live striped blocks
* should be no less than data block number.
*
* | <- Striped Block Group -> |
* blk_0 blk_1 blk_2(*) blk_3 ... <- A striped block group
@ -203,12 +205,12 @@ public void processErasureCodingTasks(Collection<BlockECRecoveryInfo> ecTasks) {
* ... ... ... ...
*
*
* We use following steps to recover striped block group, in each round, we
* recover <code>bufferSize</code> data until finish, the
* We use following steps to reconstruct striped block group, in each round,
* we reconstruct <code>bufferSize</code> data until finish, the
* <code>bufferSize</code> is configurable and may be less or larger than
* cell size:
* step1: read <code>bufferSize</code> data from minimum number of sources
* required by recovery.
* required by reconstruction.
* step2: decode data for targets.
* step3: transfer data to targets.
*
@ -217,25 +219,25 @@ public void processErasureCodingTasks(Collection<BlockECRecoveryInfo> ecTasks) {
* will be scheduled. The best sources are remembered for next round and
* may be updated in each round.
*
* In step2, typically if source blocks we read are all data blocks, we
* In step2, typically if source blocks we read are all data blocks, we
* need to call encode, and if there is one parity block, we need to call
* decode. Notice we only read once and recover all missed striped block
* decode. Notice we only read once and reconstruct all missed striped block
* if they are more than one.
*
* In step3, send the recovered data to targets by constructing packet
* and send them directly. Same as continuous block replication, we
* don't check the packet ack. Since the datanode doing the recovery work
* are one of the source datanodes, so the recovered data are sent
* In step3, send the reconstructed data to targets by constructing packet
* and send them directly. Same as continuous block replication, we
* don't check the packet ack. Since the datanode doing the reconstruction
* work are one of the source datanodes, so the reconstructed data are sent
* remotely.
*
* There are some points we can do further improvements in next phase:
* 1. we can read the block file directly on the local datanode,
* currently we use remote block reader. (Notice short-circuit is not
* a good choice, see inline comments).
* 2. We need to check the packet ack for EC recovery? Since EC recovery
* is more expensive than continuous block replication, it needs to
* read from several other datanodes, should we make sure the
* recovered result received by targets?
* 2. We need to check the packet ack for EC reconstruction? Since EC
* reconstruction is more expensive than continuous block replication,
* it needs to read from several other datanodes, should we make sure
* the reconstructed result received by targets?
*/
private class ReconstructAndTransferBlock implements Runnable {
private final int dataBlkNum;
@ -288,20 +290,22 @@ private class ReconstructAndTransferBlock implements Runnable {
private final Map<Future<Void>, Integer> futures = new HashMap<>();
private final CompletionService<Void> readService =
new ExecutorCompletionService<>(STRIPED_READ_THREAD_POOL);
new ExecutorCompletionService<>(
EC_RECONSTRUCTION_STRIPED_READ_THREAD_POOL);
ReconstructAndTransferBlock(BlockECRecoveryInfo recoveryInfo) {
ErasureCodingPolicy ecPolicy = recoveryInfo.getErasureCodingPolicy();
ReconstructAndTransferBlock(BlockECReconstructionInfo reconstructionInfo) {
ErasureCodingPolicy ecPolicy = reconstructionInfo
.getErasureCodingPolicy();
dataBlkNum = ecPolicy.getNumDataUnits();
parityBlkNum = ecPolicy.getNumParityUnits();
cellSize = ecPolicy.getCellSize();
blockGroup = recoveryInfo.getExtendedBlock();
blockGroup = reconstructionInfo.getExtendedBlock();
final int cellsNum = (int)((blockGroup.getNumBytes() - 1) / cellSize + 1);
minRequiredSources = Math.min(cellsNum, dataBlkNum);
liveIndices = recoveryInfo.getLiveBlockIndices();
sources = recoveryInfo.getSourceDnInfos();
liveIndices = reconstructionInfo.getLiveBlockIndices();
sources = reconstructionInfo.getSourceDnInfos();
stripedReaders = new ArrayList<>(sources.length);
Preconditions.checkArgument(liveIndices.length >= minRequiredSources,
@ -315,8 +319,8 @@ private class ReconstructAndTransferBlock implements Runnable {
zeroStripeIndices = new short[dataBlkNum - minRequiredSources];
}
targets = recoveryInfo.getTargetDnInfos();
targetStorageTypes = recoveryInfo.getTargetStorageTypes();
targets = reconstructionInfo.getTargetDnInfos();
targetStorageTypes = reconstructionInfo.getTargetStorageTypes();
targetIndices = new short[targets.length];
targetBuffers = new ByteBuffer[targets.length];
@ -402,7 +406,7 @@ public void run() {
if (nsuccess < minRequiredSources) {
String error = "Can't find minimum sources required by "
+ "recovery, block id: " + blockGroup.getBlockId();
+ "reconstruction, block id: " + blockGroup.getBlockId();
throw new IOException(error);
}
@ -441,21 +445,21 @@ public void run() {
getBlockLen(blockGroup, targetIndex));
}
while (positionInBlock < maxTargetLength) {
final int toRecover = (int) Math.min(
final int toReconstruct = (int) Math.min(
bufferSize, maxTargetLength - positionInBlock);
// step1: read from minimum source DNs required for reconstruction.
// The returned success list is the source DNs we do real read from
Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap = new HashMap<>();
try {
success = readMinimumStripedData4Recovery(success, toRecover,
corruptionMap);
success = readMinimumStripedData4Reconstruction(success,
toReconstruct, corruptionMap);
} finally {
// report corrupted blocks to NN
reportCorruptedBlocks(corruptionMap);
}
// step2: decode to reconstruct targets
recoverTargets(success, targetsStatus, toRecover);
reconstructTargets(success, targetsStatus, toReconstruct);
// step3: transfer data
if (transferData2Targets(targetsStatus) == 0) {
@ -464,7 +468,7 @@ public void run() {
}
clearBuffers();
positionInBlock += toRecover;
positionInBlock += toReconstruct;
}
endTargetBlocks(targetsStatus);
@ -472,7 +476,7 @@ public void run() {
// Currently we don't check the acks for packets, this is similar as
// block replication.
} catch (Throwable e) {
LOG.warn("Failed to recover striped block: " + blockGroup, e);
LOG.warn("Failed to reconstruct striped block: " + blockGroup, e);
} finally {
datanode.decrementXmitsInProgress();
// close block readers
@ -493,7 +497,7 @@ private void initChecksumAndBufferSizeIfNeeded(BlockReader blockReader) {
checksum = blockReader.getDataChecksum();
bytesPerChecksum = checksum.getBytesPerChecksum();
// The bufferSize is flat to divide bytesPerChecksum
int readBufferSize = STRIPED_READ_BUFFER_SIZE;
int readBufferSize = EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE;
bufferSize = readBufferSize < bytesPerChecksum ? bytesPerChecksum :
readBufferSize - readBufferSize % bytesPerChecksum;
} else {
@ -521,11 +525,11 @@ private void getTargetIndices() {
}
}
/** the reading length should not exceed the length for recovery */
private int getReadLength(int index, int recoverLength) {
/** the reading length should not exceed the length for reconstruction. */
private int getReadLength(int index, int reconstructLength) {
long blockLen = getBlockLen(blockGroup, index);
long remaining = blockLen - positionInBlock;
return (int) Math.min(remaining, recoverLength);
return (int) Math.min(remaining, reconstructLength);
}
/**
@ -538,15 +542,16 @@ private int getReadLength(int index, int recoverLength) {
* operations and next iteration read.
*
* @param success the initial success list of source DNs we think best
* @param recoverLength the length to recover.
* @param reconstructLength the length to reconstruct.
* @return updated success list of source DNs we do real read
* @throws IOException
*/
private int[] readMinimumStripedData4Recovery(final int[] success,
int recoverLength, Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap)
throws IOException {
Preconditions.checkArgument(recoverLength >= 0 &&
recoverLength <= bufferSize);
private int[] readMinimumStripedData4Reconstruction(final int[] success,
int reconstructLength,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap)
throws IOException {
Preconditions.checkArgument(reconstructLength >= 0 &&
reconstructLength <= bufferSize);
int nsuccess = 0;
int[] newSuccess = new int[minRequiredSources];
BitSet used = new BitSet(sources.length);
@ -557,7 +562,7 @@ private int[] readMinimumStripedData4Recovery(final int[] success,
for (int i = 0; i < minRequiredSources; i++) {
StripedReader reader = stripedReaders.get(success[i]);
final int toRead = getReadLength(liveIndices[success[i]],
recoverLength);
reconstructLength);
if (toRead > 0) {
Callable<Void> readCallable = readFromBlock(reader, reader.buffer,
toRead, corruptionMap);
@ -573,9 +578,9 @@ private int[] readMinimumStripedData4Recovery(final int[] success,
while (!futures.isEmpty()) {
try {
StripingChunkReadResult result =
StripedBlockUtil.getNextCompletedStripedRead(
readService, futures, STRIPED_READ_TIMEOUT_MILLIS);
StripingChunkReadResult result = StripedBlockUtil
.getNextCompletedStripedRead(readService, futures,
EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS);
int resultIndex = -1;
if (result.state == StripingChunkReadResult.SUCCESSFUL) {
resultIndex = result.index;
@ -585,10 +590,12 @@ private int[] readMinimumStripedData4Recovery(final int[] success,
StripedReader failedReader = stripedReaders.get(result.index);
closeBlockReader(failedReader.blockReader);
failedReader.blockReader = null;
resultIndex = scheduleNewRead(used, recoverLength, corruptionMap);
resultIndex = scheduleNewRead(used, reconstructLength,
corruptionMap);
} else if (result.state == StripingChunkReadResult.TIMEOUT) {
// If timeout, we also schedule a new read.
resultIndex = scheduleNewRead(used, recoverLength, corruptionMap);
resultIndex = scheduleNewRead(used, reconstructLength,
corruptionMap);
}
if (resultIndex >= 0) {
newSuccess[nsuccess++] = resultIndex;
@ -643,20 +650,20 @@ private int[] getErasedIndices(boolean[] targetsStatus) {
return Arrays.copyOf(result, m);
}
private void recoverTargets(int[] success, boolean[] targetsStatus,
int toRecoverLen) {
private void reconstructTargets(int[] success, boolean[] targetsStatus,
int toReconstructLen) {
initDecoderIfNecessary();
ByteBuffer[] inputs = new ByteBuffer[dataBlkNum + parityBlkNum];
for (int i = 0; i < success.length; i++) {
StripedReader reader = stripedReaders.get(success[i]);
ByteBuffer buffer = reader.buffer;
paddingBufferToLen(buffer, toRecoverLen);
paddingBufferToLen(buffer, toReconstructLen);
inputs[reader.index] = (ByteBuffer)buffer.flip();
}
if (success.length < dataBlkNum) {
for (int i = 0; i < zeroStripeBuffers.length; i++) {
ByteBuffer buffer = zeroStripeBuffers[i];
paddingBufferToLen(buffer, toRecoverLen);
paddingBufferToLen(buffer, toReconstructLen);
int index = zeroStripeIndices[i];
inputs[index] = (ByteBuffer)buffer.flip();
}
@ -666,7 +673,7 @@ private void recoverTargets(int[] success, boolean[] targetsStatus,
int m = 0;
for (int i = 0; i < targetBuffers.length; i++) {
if (targetsStatus[i]) {
targetBuffers[i].limit(toRecoverLen);
targetBuffers[i].limit(toReconstructLen);
outputs[m++] = targetBuffers[i];
}
}
@ -678,7 +685,7 @@ private void recoverTargets(int[] success, boolean[] targetsStatus,
long remaining = blockLen - positionInBlock;
if (remaining <= 0) {
targetBuffers[i].limit(0);
} else if (remaining < toRecoverLen) {
} else if (remaining < toReconstructLen) {
targetBuffers[i].limit((int)remaining);
}
}
@ -696,7 +703,7 @@ private void recoverTargets(int[] success, boolean[] targetsStatus,
* @param used the used source DNs in this iteration.
* @return the array index of source DN if don't need to do real read.
*/
private int scheduleNewRead(BitSet used, int recoverLength,
private int scheduleNewRead(BitSet used, int reconstructLen,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap) {
StripedReader reader = null;
// step1: initially we may only have <code>minRequiredSources</code>
@ -707,7 +714,7 @@ private int scheduleNewRead(BitSet used, int recoverLength,
int toRead = 0;
while (reader == null && m < sources.length) {
reader = addStripedReader(m, positionInBlock);
toRead = getReadLength(liveIndices[m], recoverLength);
toRead = getReadLength(liveIndices[m], reconstructLen);
if (toRead > 0) {
if (reader.blockReader == null) {
reader = null;
@ -727,7 +734,7 @@ private int scheduleNewRead(BitSet used, int recoverLength,
for (int i = 0; reader == null && i < stripedReaders.size(); i++) {
if (!used.get(i)) {
StripedReader r = stripedReaders.get(i);
toRead = getReadLength(liveIndices[i], recoverLength);
toRead = getReadLength(liveIndices[i], reconstructLen);
if (toRead > 0) {
closeBlockReader(r.blockReader);
r.blockReader = newBlockReader(

View File

@ -31,8 +31,8 @@
import java.util.Collection;
/**
* A BlockECRecoveryCommand is an instruction to a DataNode to reconstruct a
* striped block group with missing blocks.
* A BlockECReconstructionCommand is an instruction to a DataNode to
* reconstruct a striped block group with missing blocks.
*
* Upon receiving this command, the DataNode pulls data from other DataNodes
* hosting blocks in this group and reconstructs the lost blocks through codec
@ -45,23 +45,24 @@
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class BlockECRecoveryCommand extends DatanodeCommand {
final Collection<BlockECRecoveryInfo> ecTasks;
public class BlockECReconstructionCommand extends DatanodeCommand {
private final Collection<BlockECReconstructionInfo> ecTasks;
/**
* Create BlockECRecoveryCommand from a collection of
* {@link BlockECRecoveryInfo}, each representing a recovery task
* Create BlockECReconstructionCommand from a collection of
* {@link BlockECReconstructionInfo}, each representing a reconstruction
* task
*/
public BlockECRecoveryCommand(int action,
Collection<BlockECRecoveryInfo> blockECRecoveryInfoList) {
public BlockECReconstructionCommand(int action,
Collection<BlockECReconstructionInfo> blockECReconstructionInfoList) {
super(action);
this.ecTasks = blockECRecoveryInfoList;
this.ecTasks = blockECReconstructionInfoList;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("BlockECRecoveryCommand(\n ");
sb.append("BlockECReconstructionCommand(\n ");
Joiner.on("\n ").appendTo(sb, ecTasks);
sb.append("\n)");
return sb.toString();
@ -70,7 +71,7 @@ public String toString() {
/** Block and targets pair */
@InterfaceAudience.Private
@InterfaceStability.Evolving
public static class BlockECRecoveryInfo {
public static class BlockECReconstructionInfo {
private final ExtendedBlock block;
private final DatanodeInfo[] sources;
private DatanodeInfo[] targets;
@ -79,19 +80,19 @@ public static class BlockECRecoveryInfo {
private final byte[] liveBlockIndices;
private final ErasureCodingPolicy ecPolicy;
public BlockECRecoveryInfo(ExtendedBlock block, DatanodeInfo[] sources,
DatanodeStorageInfo[] targetDnStorageInfo, byte[] liveBlockIndices,
ErasureCodingPolicy ecPolicy) {
public BlockECReconstructionInfo(ExtendedBlock block,
DatanodeInfo[] sources, DatanodeStorageInfo[] targetDnStorageInfo,
byte[] liveBlockIndices, ErasureCodingPolicy ecPolicy) {
this(block, sources, DatanodeStorageInfo
.toDatanodeInfos(targetDnStorageInfo), DatanodeStorageInfo
.toStorageIDs(targetDnStorageInfo), DatanodeStorageInfo
.toStorageTypes(targetDnStorageInfo), liveBlockIndices, ecPolicy);
}
public BlockECRecoveryInfo(ExtendedBlock block, DatanodeInfo[] sources,
DatanodeInfo[] targets, String[] targetStorageIDs,
StorageType[] targetStorageTypes, byte[] liveBlockIndices,
ErasureCodingPolicy ecPolicy) {
public BlockECReconstructionInfo(ExtendedBlock block,
DatanodeInfo[] sources, DatanodeInfo[] targets,
String[] targetStorageIDs, StorageType[] targetStorageTypes,
byte[] liveBlockIndices, ErasureCodingPolicy ecPolicy) {
this.block = block;
this.sources = sources;
this.targets = targets;
@ -117,7 +118,7 @@ public DatanodeInfo[] getTargetDnInfos() {
public String[] getTargetStorageIDs() {
return targetStorageIDs;
}
public StorageType[] getTargetStorageTypes() {
return targetStorageTypes;
}
@ -125,14 +126,14 @@ public StorageType[] getTargetStorageTypes() {
public byte[] getLiveBlockIndices() {
return liveBlockIndices;
}
public ErasureCodingPolicy getErasureCodingPolicy() {
return ecPolicy;
}
@Override
public String toString() {
return new StringBuilder().append("BlockECRecoveryInfo(\n ")
return new StringBuilder().append("BlockECReconstructionInfo(\n ")
.append("Recovering ").append(block).append(" From: ")
.append(Arrays.asList(sources)).append(" To: [")
.append(Arrays.asList(targets)).append(")\n")
@ -141,7 +142,7 @@ public String toString() {
}
}
public Collection<BlockECRecoveryInfo> getECTasks() {
public Collection<BlockECReconstructionInfo> getECTasks() {
return this.ecTasks;
}
}

View File

@ -76,7 +76,7 @@ public interface DatanodeProtocol {
final static int DNA_BALANCERBANDWIDTHUPDATE = 8; // update balancer bandwidth
final static int DNA_CACHE = 9; // cache blocks
final static int DNA_UNCACHE = 10; // uncache blocks
final static int DNA_ERASURE_CODING_RECOVERY = 11; // erasure coding recovery command
final static int DNA_ERASURE_CODING_RECONSTRUCTION = 11; // erasure coding reconstruction command
/**
* Register Datanode.

View File

@ -60,7 +60,7 @@ message DatanodeCommandProto {
UnusedUpgradeCommand = 6;
NullDatanodeCommand = 7;
BlockIdCommand = 8;
BlockECRecoveryCommand = 9;
BlockECReconstructionCommand = 9;
}
required Type cmdType = 1; // Type of the command
@ -74,7 +74,7 @@ message DatanodeCommandProto {
optional KeyUpdateCommandProto keyUpdateCmd = 6;
optional RegisterCommandProto registerCmd = 7;
optional BlockIdCommandProto blkIdCmd = 8;
optional BlockECRecoveryCommandProto blkECRecoveryCmd = 9;
optional BlockECReconstructionCommandProto blkECReconstructionCmd = 9;
}
/**
@ -149,10 +149,10 @@ message RegisterCommandProto {
}
/**
* Block Erasure coding recovery command
* Block Erasure coding reconstruction command
*/
message BlockECRecoveryCommandProto {
repeated BlockECRecoveryInfoProto blockECRecoveryinfo = 1;
message BlockECReconstructionCommandProto {
repeated BlockECReconstructionInfoProto blockECReconstructioninfo = 1;
}
/**

View File

@ -2654,26 +2654,37 @@
</property>
<property>
<name>dfs.datanode.stripedread.timeout.millis</name>
<name>dfs.datanode.ec.reconstruction.stripedread.timeout.millis</name>
<value>5000</value>
<description>Datanode striped read timeout in milliseconds.
</description>
</property>
<property>
<name>dfs.datanode.stripedread.threads</name>
<name>dfs.datanode.ec.reconstruction.stripedread.threads</name>
<value>20</value>
<description>Number of threads used by the Datanode for background recovery work.
<description>
Number of threads used by the Datanode to read striped block
during background reconstruction work.
</description>
</property>
<property>
<name>dfs.datanode.stripedread.buffer.size</name>
<name>dfs.datanode.ec.reconstruction.stripedread.buffer.size</name>
<value>65536</value>
<description>Datanode striped read buffer size.
</description>
</property>
<property>
<name>dfs.datanode.ec.reconstruction.stripedblock.threads.size</name>
<value>8</value>
<description>
Number of threads used by the Datanode for background
reconstruction work.
</description>
</property>
<property>
<name>dfs.namenode.quota.init-threads</name>
<value>4</value>

View File

@ -45,7 +45,7 @@
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
@ -54,9 +54,9 @@
import org.junit.Before;
import org.junit.Test;
public class TestRecoverStripedFile {
public static final Log LOG = LogFactory.getLog(TestRecoverStripedFile.class);
public class TestReconstructStripedFile {
public static final Log LOG = LogFactory.getLog(TestReconstructStripedFile.class);
private static final int dataBlkNum = StripedFileTestUtil.NUM_DATA_BLOCKS;
private static final int parityBlkNum = StripedFileTestUtil.NUM_PARITY_BLOCKS;
private static final int cellSize = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE;
@ -70,7 +70,7 @@ public class TestRecoverStripedFile {
GenericTestUtils.setLogLevel(BlockManager.blockLog, Level.ALL);
}
enum RecoveryType {
enum ReconstructionType {
DataOnly,
ParityOnly,
Any
@ -86,14 +86,14 @@ enum RecoveryType {
public void setup() throws IOException {
final Configuration conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY,
conf.setInt(DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY,
cellSize - 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
false);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dnNum).build();
cluster.waitActive();
fs = cluster.getFileSystem();
fs.getClient().setErasureCodingPolicy("/", null);
@ -110,100 +110,100 @@ public void tearDown() {
cluster = null;
}
}
@Test(timeout = 120000)
public void testRecoverOneParityBlock() throws Exception {
int fileLen = 10 * blockSize + blockSize/10;
assertFileBlocksRecovery("/testRecoverOneParityBlock", fileLen,
RecoveryType.ParityOnly, 1);
assertFileBlocksReconstruction("/testRecoverOneParityBlock", fileLen,
ReconstructionType.ParityOnly, 1);
}
@Test(timeout = 120000)
public void testRecoverOneParityBlock1() throws Exception {
int fileLen = cellSize + cellSize/10;
assertFileBlocksRecovery("/testRecoverOneParityBlock1", fileLen,
RecoveryType.ParityOnly, 1);
assertFileBlocksReconstruction("/testRecoverOneParityBlock1", fileLen,
ReconstructionType.ParityOnly, 1);
}
@Test(timeout = 120000)
public void testRecoverOneParityBlock2() throws Exception {
int fileLen = 1;
assertFileBlocksRecovery("/testRecoverOneParityBlock2", fileLen,
RecoveryType.ParityOnly, 1);
assertFileBlocksReconstruction("/testRecoverOneParityBlock2", fileLen,
ReconstructionType.ParityOnly, 1);
}
@Test(timeout = 120000)
public void testRecoverOneParityBlock3() throws Exception {
int fileLen = 3 * blockSize + blockSize/10;
assertFileBlocksRecovery("/testRecoverOneParityBlock3", fileLen,
RecoveryType.ParityOnly, 1);
assertFileBlocksReconstruction("/testRecoverOneParityBlock3", fileLen,
ReconstructionType.ParityOnly, 1);
}
@Test(timeout = 120000)
public void testRecoverThreeParityBlocks() throws Exception {
int fileLen = 10 * blockSize + blockSize/10;
assertFileBlocksRecovery("/testRecoverThreeParityBlocks", fileLen,
RecoveryType.ParityOnly, 3);
assertFileBlocksReconstruction("/testRecoverThreeParityBlocks", fileLen,
ReconstructionType.ParityOnly, 3);
}
@Test(timeout = 120000)
public void testRecoverThreeDataBlocks() throws Exception {
int fileLen = 10 * blockSize + blockSize/10;
assertFileBlocksRecovery("/testRecoverThreeDataBlocks", fileLen,
RecoveryType.DataOnly, 3);
assertFileBlocksReconstruction("/testRecoverThreeDataBlocks", fileLen,
ReconstructionType.DataOnly, 3);
}
@Test(timeout = 120000)
public void testRecoverThreeDataBlocks1() throws Exception {
int fileLen = 3 * blockSize + blockSize/10;
assertFileBlocksRecovery("/testRecoverThreeDataBlocks1", fileLen,
RecoveryType.DataOnly, 3);
assertFileBlocksReconstruction("/testRecoverThreeDataBlocks1", fileLen,
ReconstructionType.DataOnly, 3);
}
@Test(timeout = 120000)
public void testRecoverOneDataBlock() throws Exception {
int fileLen = 10 * blockSize + blockSize/10;
assertFileBlocksRecovery("/testRecoverOneDataBlock", fileLen,
RecoveryType.DataOnly, 1);
assertFileBlocksReconstruction("/testRecoverOneDataBlock", fileLen,
ReconstructionType.DataOnly, 1);
}
@Test(timeout = 120000)
public void testRecoverOneDataBlock1() throws Exception {
int fileLen = cellSize + cellSize/10;
assertFileBlocksRecovery("/testRecoverOneDataBlock1", fileLen,
RecoveryType.DataOnly, 1);
assertFileBlocksReconstruction("/testRecoverOneDataBlock1", fileLen,
ReconstructionType.DataOnly, 1);
}
@Test(timeout = 120000)
public void testRecoverOneDataBlock2() throws Exception {
int fileLen = 1;
assertFileBlocksRecovery("/testRecoverOneDataBlock2", fileLen,
RecoveryType.DataOnly, 1);
assertFileBlocksReconstruction("/testRecoverOneDataBlock2", fileLen,
ReconstructionType.DataOnly, 1);
}
@Test(timeout = 120000)
public void testRecoverAnyBlocks() throws Exception {
int fileLen = 3 * blockSize + blockSize/10;
assertFileBlocksRecovery("/testRecoverAnyBlocks", fileLen,
RecoveryType.Any, 2);
assertFileBlocksReconstruction("/testRecoverAnyBlocks", fileLen,
ReconstructionType.Any, 2);
}
@Test(timeout = 120000)
public void testRecoverAnyBlocks1() throws Exception {
int fileLen = 10 * blockSize + blockSize/10;
assertFileBlocksRecovery("/testRecoverAnyBlocks1", fileLen,
RecoveryType.Any, 3);
assertFileBlocksReconstruction("/testRecoverAnyBlocks1", fileLen,
ReconstructionType.Any, 3);
}
private int[] generateDeadDnIndices(RecoveryType type, int deadNum,
private int[] generateDeadDnIndices(ReconstructionType type, int deadNum,
byte[] indices) {
List<Integer> deadList = new ArrayList<>(deadNum);
while (deadList.size() < deadNum) {
int dead = random.nextInt(indices.length);
boolean isOfType = true;
if (type == RecoveryType.DataOnly) {
if (type == ReconstructionType.DataOnly) {
isOfType = indices[dead] < dataBlkNum;
} else if (type == RecoveryType.ParityOnly) {
} else if (type == ReconstructionType.ParityOnly) {
isOfType = indices[dead] >= dataBlkNum;
}
if (isOfType && !deadList.contains(dead)) {
@ -228,13 +228,13 @@ private void shutdownDataNodes(DataNode dn) throws IOException {
}
private int generateErrors(Map<ExtendedBlock, DataNode> corruptTargets,
RecoveryType type)
ReconstructionType type)
throws IOException {
int stoppedDN = 0;
for (Map.Entry<ExtendedBlock, DataNode> target : corruptTargets.entrySet()) {
if (stoppedDN == 0 || type != RecoveryType.DataOnly
if (stoppedDN == 0 || type != ReconstructionType.DataOnly
|| random.nextBoolean()) {
// stop at least one DN to trigger recovery
// stop at least one DN to trigger reconstruction
LOG.info("Note: stop DataNode " + target.getValue().getDisplayName()
+ " with internal block " + target.getKey());
shutdownDataNodes(target.getValue());
@ -249,17 +249,17 @@ private int generateErrors(Map<ExtendedBlock, DataNode> corruptTargets,
}
/**
* Test the file blocks recovery.
* 1. Check the replica is recovered in the target datanode,
* Test the file blocks reconstruction.
* 1. Check the replica is reconstructed in the target datanode,
* and verify the block replica length, generationStamp and content.
* 2. Read the file and verify content.
* 2. Read the file and verify content.
*/
private void assertFileBlocksRecovery(String fileName, int fileLen,
RecoveryType type, int toRecoverBlockNum) throws Exception {
private void assertFileBlocksReconstruction(String fileName, int fileLen,
ReconstructionType type, int toRecoverBlockNum) throws Exception {
if (toRecoverBlockNum < 1 || toRecoverBlockNum > parityBlkNum) {
Assert.fail("toRecoverBlockNum should be between 1 ~ " + parityBlkNum);
}
Path file = new Path(fileName);
final byte[] data = new byte[fileLen];
@ -269,13 +269,13 @@ private void assertFileBlocksRecovery(String fileName, int fileLen,
LocatedBlocks locatedBlocks = getLocatedBlocks(file);
assertEquals(locatedBlocks.getFileLength(), fileLen);
LocatedStripedBlock lastBlock =
LocatedStripedBlock lastBlock =
(LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
DatanodeInfo[] storageInfos = lastBlock.getLocations();
byte[] indices = lastBlock.getBlockIndices();
BitSet bitset = new BitSet(dnNum);
for (DatanodeInfo storageInfo : storageInfos) {
bitset.set(dnMap.get(storageInfo));
@ -284,7 +284,7 @@ private void assertFileBlocksRecovery(String fileName, int fileLen,
int[] dead = generateDeadDnIndices(type, toRecoverBlockNum, indices);
LOG.info("Note: indices == " + Arrays.toString(indices)
+ ". Generate errors on datanodes: " + Arrays.toString(dead));
DatanodeInfo[] dataDNs = new DatanodeInfo[toRecoverBlockNum];
int[] deadDnIndices = new int[toRecoverBlockNum];
ExtendedBlock[] blocks = new ExtendedBlock[toRecoverBlockNum];
@ -303,7 +303,7 @@ private void assertFileBlocksRecovery(String fileName, int fileLen,
replicas[i] = cluster.getBlockFile(deadDnIndices[i], blocks[i]);
metadatas[i] = cluster.getBlockMetadataFile(deadDnIndices[i], blocks[i]);
// the block replica on the datanode should be the same as expected
assertEquals(replicas[i].length(),
assertEquals(replicas[i].length(),
StripedBlockUtil.getInternalBlockLength(
lastBlock.getBlockSize(), cellSize, dataBlkNum, indices[dead[i]]));
assertTrue(metadatas[i].getName().
@ -311,7 +311,7 @@ private void assertFileBlocksRecovery(String fileName, int fileLen,
LOG.info("replica " + i + " locates in file: " + replicas[i]);
replicaContents[i] = DFSTestUtil.readFileAsBytes(replicas[i]);
}
int cellsNum = (fileLen - 1) / cellSize + 1;
int groupSize = Math.min(cellsNum, dataBlkNum) + parityBlkNum;
@ -331,25 +331,25 @@ private void assertFileBlocksRecovery(String fileName, int fileLen,
targetDNs[n++] = i;
}
}
waitForRecoveryFinished(file, groupSize);
waitForReconstructionFinished(file, groupSize);
targetDNs = sortTargetsByReplicas(blocks, targetDNs);
// Check the replica on the new target node.
for (int i = 0; i < toRecoverBlockNum; i++) {
File replicaAfterRecovery = cluster.getBlockFile(targetDNs[i], blocks[i]);
LOG.info("replica after recovery " + replicaAfterRecovery);
File metadataAfterRecovery =
File replicaAfterReconstruction = cluster.getBlockFile(targetDNs[i], blocks[i]);
LOG.info("replica after reconstruction " + replicaAfterReconstruction);
File metadataAfterReconstruction =
cluster.getBlockMetadataFile(targetDNs[i], blocks[i]);
assertEquals(replicaAfterRecovery.length(), replicas[i].length());
assertEquals(replicaAfterReconstruction.length(), replicas[i].length());
LOG.info("replica before " + replicas[i]);
assertTrue(metadataAfterRecovery.getName().
assertTrue(metadataAfterReconstruction.getName().
endsWith(blocks[i].getGenerationStamp() + ".meta"));
byte[] replicaContentAfterRecovery =
DFSTestUtil.readFileAsBytes(replicaAfterRecovery);
byte[] replicaContentAfterReconstruction =
DFSTestUtil.readFileAsBytes(replicaAfterReconstruction);
Assert.assertArrayEquals(replicaContents[i], replicaContentAfterRecovery);
Assert.assertArrayEquals(replicaContents[i], replicaContentAfterReconstruction);
}
}
@ -368,18 +368,19 @@ private int[] sortTargetsByReplicas(ExtendedBlock[] blocks, int[] targetDNs) {
}
}
if (result[i] == -1) {
Assert.fail("Failed to recover striped block: " + blocks[i].getBlockId());
Assert.fail("Failed to reconstruct striped block: "
+ blocks[i].getBlockId());
}
}
return result;
}
private LocatedBlocks waitForRecoveryFinished(Path file, int groupSize)
private LocatedBlocks waitForReconstructionFinished(Path file, int groupSize)
throws Exception {
final int ATTEMPTS = 60;
for (int i = 0; i < ATTEMPTS; i++) {
LocatedBlocks locatedBlocks = getLocatedBlocks(file);
LocatedStripedBlock lastBlock =
LocatedStripedBlock lastBlock =
(LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
DatanodeInfo[] storageInfos = lastBlock.getLocations();
if (storageInfos.length >= groupSize) {
@ -387,9 +388,9 @@ private LocatedBlocks waitForRecoveryFinished(Path file, int groupSize)
}
Thread.sleep(1000);
}
throw new IOException ("Time out waiting for EC block recovery.");
throw new IOException ("Time out waiting for EC block reconstruction.");
}
private LocatedBlocks getLocatedBlocks(Path file) throws IOException {
return fs.getClient().getLocatedBlocks(file.toString(), 0, Long.MAX_VALUE);
}
@ -415,10 +416,10 @@ public void testProcessErasureCodingTasksSubmitionShouldSucceed()
DatanodeStorageInfo[] dnStorageInfo = new DatanodeStorageInfo[] {
targetDnInfos_1 };
BlockECRecoveryInfo invalidECInfo = new BlockECRecoveryInfo(
BlockECReconstructionInfo invalidECInfo = new BlockECReconstructionInfo(
new ExtendedBlock("bp-id", 123456), dataDNs, dnStorageInfo, liveIndices,
ErasureCodingPolicyManager.getSystemDefaultPolicy());
List<BlockECRecoveryInfo> ecTasks = new ArrayList<>();
List<BlockECReconstructionInfo> ecTasks = new ArrayList<>();
ecTasks.add(invalidECInfo);
dataNode.getErasureCodingWorker().processErasureCodingTasks(ecTasks);
}

View File

@ -43,7 +43,7 @@
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECRecoveryCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECReconstructionCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
@ -75,13 +75,13 @@
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@ -689,7 +689,7 @@ public void testBlockECRecoveryCommand() {
DatanodeStorageInfo[] targetDnInfos0 = new DatanodeStorageInfo[] {
targetDnInfos_0, targetDnInfos_1 };
byte[] liveBlkIndices0 = new byte[2];
BlockECRecoveryInfo blkECRecoveryInfo0 = new BlockECRecoveryInfo(
BlockECReconstructionInfo blkECRecoveryInfo0 = new BlockECReconstructionInfo(
new ExtendedBlock("bp1", 1234), dnInfos0, targetDnInfos0,
liveBlkIndices0, ErasureCodingPolicyManager.getSystemDefaultPolicy());
DatanodeInfo[] dnInfos1 = new DatanodeInfo[] {
@ -703,26 +703,26 @@ public void testBlockECRecoveryCommand() {
DatanodeStorageInfo[] targetDnInfos1 = new DatanodeStorageInfo[] {
targetDnInfos_2, targetDnInfos_3 };
byte[] liveBlkIndices1 = new byte[2];
BlockECRecoveryInfo blkECRecoveryInfo1 = new BlockECRecoveryInfo(
BlockECReconstructionInfo blkECRecoveryInfo1 = new BlockECReconstructionInfo(
new ExtendedBlock("bp2", 3256), dnInfos1, targetDnInfos1,
liveBlkIndices1, ErasureCodingPolicyManager.getSystemDefaultPolicy());
List<BlockECRecoveryInfo> blkRecoveryInfosList = new ArrayList<BlockECRecoveryInfo>();
List<BlockECReconstructionInfo> blkRecoveryInfosList = new ArrayList<BlockECReconstructionInfo>();
blkRecoveryInfosList.add(blkECRecoveryInfo0);
blkRecoveryInfosList.add(blkECRecoveryInfo1);
BlockECRecoveryCommand blkECRecoveryCmd = new BlockECRecoveryCommand(
DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY, blkRecoveryInfosList);
BlockECRecoveryCommandProto blkECRecoveryCmdProto = PBHelper
.convert(blkECRecoveryCmd);
blkECRecoveryCmd = PBHelper.convert(blkECRecoveryCmdProto);
Iterator<BlockECRecoveryInfo> iterator = blkECRecoveryCmd.getECTasks()
BlockECReconstructionCommand blkECReconstructionCmd = new BlockECReconstructionCommand(
DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION, blkRecoveryInfosList);
BlockECReconstructionCommandProto blkECRecoveryCmdProto = PBHelper
.convert(blkECReconstructionCmd);
blkECReconstructionCmd = PBHelper.convert(blkECRecoveryCmdProto);
Iterator<BlockECReconstructionInfo> iterator = blkECReconstructionCmd.getECTasks()
.iterator();
assertBlockECRecoveryInfoEquals(blkECRecoveryInfo0, iterator.next());
assertBlockECRecoveryInfoEquals(blkECRecoveryInfo1, iterator.next());
}
private void assertBlockECRecoveryInfoEquals(
BlockECRecoveryInfo blkECRecoveryInfo1,
BlockECRecoveryInfo blkECRecoveryInfo2) {
BlockECReconstructionInfo blkECRecoveryInfo1,
BlockECReconstructionInfo blkECRecoveryInfo2) {
assertEquals(blkECRecoveryInfo1.getExtendedBlock(),
blkECRecoveryInfo2.getExtendedBlock());

View File

@ -37,7 +37,7 @@
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.junit.Test;
@ -50,7 +50,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class TestRecoverStripedBlocks {
public class TestReconstructStripedBlocks {
private static final int cellSize = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE;
private final short GROUP_SIZE =
(short) (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS);
@ -90,9 +90,9 @@ public void testMissingStripedBlockWithBusyNode2() throws Exception {
* Start GROUP_SIZE + 1 datanodes.
* Inject striped blocks to first GROUP_SIZE datanodes.
* Then make numOfBusy datanodes busy, make numOfMissed datanodes missed.
* Then trigger BlockManager to compute recovery works. (so all recovery work
* will be scheduled to the last datanode)
* Finally, verify the recovery work of the last datanode.
* Then trigger BlockManager to compute reconstruction works. (so all
* reconstruction work will be scheduled to the last datanode)
* Finally, verify the reconstruction work of the last datanode.
*/
private void doTestMissingStripedBlock(int numOfMissed, int numOfBusy)
throws Exception {
@ -148,22 +148,23 @@ private void doTestMissingStripedBlock(int numOfMissed, int numOfBusy)
BlockManagerTestUtil.getComputedDatanodeWork(bm);
// all the recovery work will be scheduled on the last DN
// all the reconstruction work will be scheduled on the last DN
DataNode lastDn = cluster.getDataNodes().get(GROUP_SIZE);
DatanodeDescriptor last =
bm.getDatanodeManager().getDatanode(lastDn.getDatanodeId());
assertEquals("Counting the number of outstanding EC tasks", numBlocks,
last.getNumberOfBlocksToBeErasureCoded());
List<BlockECRecoveryInfo> recovery =
List<BlockECReconstructionInfo> reconstruction =
last.getErasureCodeCommand(numBlocks);
for (BlockECRecoveryInfo info : recovery) {
for (BlockECReconstructionInfo info : reconstruction) {
assertEquals(1, info.getTargetDnInfos().length);
assertEquals(last, info.getTargetDnInfos()[0]);
assertEquals(info.getSourceDnInfos().length,
info.getLiveBlockIndices().length);
if (GROUP_SIZE - numOfMissed == NUM_DATA_BLOCKS) {
// It's a QUEUE_HIGHEST_PRIORITY block, so the busy DNs will be chosen
// to make sure we have NUM_DATA_BLOCKS DNs to do recovery work.
// to make sure we have NUM_DATA_BLOCKS DNs to do reconstruction
// work.
assertEquals(NUM_DATA_BLOCKS, info.getSourceDnInfos().length);
} else {
// The block has no highest priority, so we don't use the busy DNs as