From 4ae543fdcd6dcfbe32257b1e72a405df9aa73e17 Mon Sep 17 00:00:00 2001 From: zhezhang Date: Tue, 2 Feb 2016 12:31:43 -0800 Subject: [PATCH] HDFS-9731. Erasure Coding: Rename BlockECRecoveryCommand to BlockECReconstructionCommand. Contributed by Rakesh R. Change-Id: I405365a8395770e494b92bfe9651f4f0366d8f28 --- .../src/main/proto/erasurecoding.proto | 4 +- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 18 +- .../hadoop/hdfs/protocolPB/PBHelper.java | 90 ++++---- .../blockmanagement/DatanodeDescriptor.java | 11 +- .../blockmanagement/DatanodeManager.java | 12 +- .../blockmanagement/ErasureCodingWork.java | 3 +- .../hdfs/server/datanode/BPOfferService.java | 9 +- .../erasurecode/ErasureCodingWorker.java | 195 +++++++++--------- ...java => BlockECReconstructionCommand.java} | 45 ++-- .../server/protocol/DatanodeProtocol.java | 2 +- .../src/main/proto/DatanodeProtocol.proto | 10 +- .../src/main/resources/hdfs-default.xml | 19 +- ...e.java => TestReconstructStripedFile.java} | 159 +++++++------- .../hadoop/hdfs/protocolPB/TestPBHelper.java | 28 +-- ...java => TestReconstructStripedBlocks.java} | 19 +- 16 files changed, 329 insertions(+), 298 deletions(-) rename hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/{BlockECRecoveryCommand.java => BlockECReconstructionCommand.java} (75%) rename hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/{TestRecoverStripedFile.java => TestReconstructStripedFile.java} (79%) rename hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/{TestRecoverStripedBlocks.java => TestReconstructStripedBlocks.java} (94%) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/erasurecoding.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/erasurecoding.proto index d73c2088bd4..4bb44fb4077 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/erasurecoding.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/erasurecoding.proto @@ -47,9 +47,9 @@ message GetErasureCodingPolicyResponseProto { } /** - * Block erasure coding recovery info + * Block erasure coding reconstruction info */ -message BlockECRecoveryInfoProto { +message BlockECReconstructionInfoProto { required ExtendedBlockProto block = 1; required DatanodeInfosProto sourceDnInfos = 2; required DatanodeInfosProto targetDnInfos = 3; diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 0c9ab6fbf79..1b049475200 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -912,6 +912,9 @@ Trunk (Unreleased) HDFS-9659. EditLogTailerThread to Active Namenode RPC should timeout (surendra singh lilhore via vinayakumarb) + HDFS-9731. Erasure Coding: Rename BlockECRecoveryCommand to + BlockECReconstructionCommand. (Rakesh R via zhz) + Release 2.9.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 76915cb1a9a..df205dbc646 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -465,14 +465,16 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT = 21600; public static final String DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY = "dfs.datanode.directoryscan.threads"; public static final int DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT = 1; - public static final String DFS_DATANODE_STRIPED_READ_THREADS_KEY = "dfs.datanode.stripedread.threads"; - public static final int DFS_DATANODE_STRIPED_READ_THREADS_DEFAULT = 20; - public static final String DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY = "dfs.datanode.stripedread.buffer.size"; - public static final int DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT = 64 * 1024; - public static final String DFS_DATANODE_STRIPED_READ_TIMEOUT_MILLIS_KEY = "dfs.datanode.stripedread.timeout.millis"; - public static final int DFS_DATANODE_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT = 5000; //5s - public static final String DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_KEY = "dfs.datanode.striped.blockrecovery.threads.size"; - public static final int DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_DEFAULT = 8; + + public static final String DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_THREADS_KEY = "dfs.datanode.ec.reconstruction.stripedread.threads"; + public static final int DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_THREADS_DEFAULT = 20; + public static final String DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY = "dfs.datanode.ec.reconstruction.stripedread.buffer.size"; + public static final int DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_DEFAULT = 64 * 1024; + public static final String DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY = "dfs.datanode.ec.reconstruction.stripedread.timeout.millis"; + public static final int DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT = 5000; //5s + public static final String DFS_DN_EC_RECONSTRUCTION_STRIPED_BLK_THREADS_KEY = "dfs.datanode.ec.reconstruction.stripedblock.threads.size"; + public static final int DFS_DN_EC_RECONSTRUCTION_STRIPED_BLK_THREADS_DEFAULT = 8; + public static final String DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY = "dfs.datanode.directoryscan.throttle.limit.ms.per.sec"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index e70cdf0bdde..52ac5d80929 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -36,7 +36,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto; -import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECRecoveryCommandProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECReconstructionCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto; @@ -48,7 +48,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDele import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto; -import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECRecoveryInfoProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstructionInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; @@ -82,10 +82,10 @@ import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature; import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; -import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand; import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; -import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; +import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; @@ -453,8 +453,8 @@ public class PBHelper { return REG_CMD; case BlockIdCommand: return PBHelper.convert(proto.getBlkIdCmd()); - case BlockECRecoveryCommand: - return PBHelper.convert(proto.getBlkECRecoveryCmd()); + case BlockECReconstructionCommand: + return PBHelper.convert(proto.getBlkECReconstructionCmd()); default: return null; } @@ -584,10 +584,10 @@ public class PBHelper { builder.setCmdType(DatanodeCommandProto.Type.BlockIdCommand). setBlkIdCmd(PBHelper.convert((BlockIdCommand) datanodeCommand)); break; - case DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY: - builder.setCmdType(DatanodeCommandProto.Type.BlockECRecoveryCommand) - .setBlkECRecoveryCmd( - convert((BlockECRecoveryCommand) datanodeCommand)); + case DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION: + builder.setCmdType(DatanodeCommandProto.Type.BlockECReconstructionCommand) + .setBlkECReconstructionCmd( + convert((BlockECReconstructionCommand) datanodeCommand)); break; case DatanodeProtocol.DNA_UNKNOWN: //Not expected default: @@ -873,42 +873,42 @@ public class PBHelper { return storageUuids; } - public static BlockECRecoveryInfo convertBlockECRecoveryInfo( - BlockECRecoveryInfoProto blockEcRecoveryInfoProto) { - ExtendedBlockProto blockProto = blockEcRecoveryInfoProto.getBlock(); + public static BlockECReconstructionInfo convertBlockECReconstructionInfo( + BlockECReconstructionInfoProto blockEcReconstructionInfoProto) { + ExtendedBlockProto blockProto = blockEcReconstructionInfoProto.getBlock(); ExtendedBlock block = PBHelperClient.convert(blockProto); - DatanodeInfosProto sourceDnInfosProto = blockEcRecoveryInfoProto + DatanodeInfosProto sourceDnInfosProto = blockEcReconstructionInfoProto .getSourceDnInfos(); DatanodeInfo[] sourceDnInfos = PBHelperClient.convert(sourceDnInfosProto); - DatanodeInfosProto targetDnInfosProto = blockEcRecoveryInfoProto + DatanodeInfosProto targetDnInfosProto = blockEcReconstructionInfoProto .getTargetDnInfos(); DatanodeInfo[] targetDnInfos = PBHelperClient.convert(targetDnInfosProto); - HdfsProtos.StorageUuidsProto targetStorageUuidsProto = blockEcRecoveryInfoProto - .getTargetStorageUuids(); + HdfsProtos.StorageUuidsProto targetStorageUuidsProto = + blockEcReconstructionInfoProto.getTargetStorageUuids(); String[] targetStorageUuids = convert(targetStorageUuidsProto); - StorageTypesProto targetStorageTypesProto = blockEcRecoveryInfoProto + StorageTypesProto targetStorageTypesProto = blockEcReconstructionInfoProto .getTargetStorageTypes(); StorageType[] convertStorageTypes = PBHelperClient.convertStorageTypes( targetStorageTypesProto.getStorageTypesList(), targetStorageTypesProto .getStorageTypesList().size()); - byte[] liveBlkIndices = blockEcRecoveryInfoProto.getLiveBlockIndices() + byte[] liveBlkIndices = blockEcReconstructionInfoProto.getLiveBlockIndices() .toByteArray(); ErasureCodingPolicy ecPolicy = PBHelperClient.convertErasureCodingPolicy( - blockEcRecoveryInfoProto.getEcPolicy()); - return new BlockECRecoveryInfo(block, sourceDnInfos, targetDnInfos, + blockEcReconstructionInfoProto.getEcPolicy()); + return new BlockECReconstructionInfo(block, sourceDnInfos, targetDnInfos, targetStorageUuids, convertStorageTypes, liveBlkIndices, ecPolicy); } - public static BlockECRecoveryInfoProto convertBlockECRecoveryInfo( - BlockECRecoveryInfo blockEcRecoveryInfo) { - BlockECRecoveryInfoProto.Builder builder = BlockECRecoveryInfoProto - .newBuilder(); + public static BlockECReconstructionInfoProto convertBlockECRecoveryInfo( + BlockECReconstructionInfo blockEcRecoveryInfo) { + BlockECReconstructionInfoProto.Builder builder = + BlockECReconstructionInfoProto.newBuilder(); builder.setBlock(PBHelperClient.convert( blockEcRecoveryInfo.getExtendedBlock())); @@ -934,29 +934,31 @@ public class PBHelper { return builder.build(); } - public static BlockECRecoveryCommandProto convert( - BlockECRecoveryCommand blkECRecoveryCmd) { - BlockECRecoveryCommandProto.Builder builder = BlockECRecoveryCommandProto - .newBuilder(); - Collection blockECRecoveryInfos = blkECRecoveryCmd - .getECTasks(); - for (BlockECRecoveryInfo blkECRecoveryInfo : blockECRecoveryInfos) { - builder - .addBlockECRecoveryinfo(convertBlockECRecoveryInfo(blkECRecoveryInfo)); + public static BlockECReconstructionCommandProto convert( + BlockECReconstructionCommand blkECReconstructionCmd) { + BlockECReconstructionCommandProto.Builder builder = + BlockECReconstructionCommandProto.newBuilder(); + Collection blockECRInfos = + blkECReconstructionCmd.getECTasks(); + for (BlockECReconstructionInfo blkECReconstructInfo : blockECRInfos) { + builder.addBlockECReconstructioninfo( + convertBlockECRecoveryInfo(blkECReconstructInfo)); } return builder.build(); } - public static BlockECRecoveryCommand convert( - BlockECRecoveryCommandProto blkECRecoveryCmdProto) { - Collection blkECRecoveryInfos = new ArrayList<>(); - List blockECRecoveryinfoList = blkECRecoveryCmdProto - .getBlockECRecoveryinfoList(); - for (BlockECRecoveryInfoProto blockECRecoveryInfoProto : blockECRecoveryinfoList) { - blkECRecoveryInfos - .add(convertBlockECRecoveryInfo(blockECRecoveryInfoProto)); + public static BlockECReconstructionCommand convert( + BlockECReconstructionCommandProto blkECReconstructionCmdProto) { + Collection blkECReconstructionInfos = + new ArrayList<>(); + List blkECRInfoList = + blkECReconstructionCmdProto.getBlockECReconstructioninfoList(); + for (BlockECReconstructionInfoProto blkECRInfoProto : blkECRInfoList) { + blkECReconstructionInfos + .add(convertBlockECReconstructionInfo(blkECRInfoProto)); } - return new BlockECRecoveryCommand(DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY, - blkECRecoveryInfos); + return new BlockECReconstructionCommand( + DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION, + blkECReconstructionInfos); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 46f373865a8..9e7ab20e52f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -42,7 +42,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; -import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; +import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; @@ -204,7 +204,7 @@ public class DatanodeDescriptor extends DatanodeInfo { private final BlockQueue replicateBlocks = new BlockQueue<>(); /** A queue of blocks to be erasure coded by this datanode */ - private final BlockQueue erasurecodeBlocks = + private final BlockQueue erasurecodeBlocks = new BlockQueue<>(); /** A queue of blocks to be recovered by this datanode */ private final BlockQueue recoverBlocks = new BlockQueue<>(); @@ -605,8 +605,8 @@ public class DatanodeDescriptor extends DatanodeInfo { DatanodeDescriptor[] sources, DatanodeStorageInfo[] targets, byte[] liveBlockIndices, ErasureCodingPolicy ecPolicy) { assert (block != null && sources != null && sources.length > 0); - BlockECRecoveryInfo task = new BlockECRecoveryInfo(block, sources, targets, - liveBlockIndices, ecPolicy); + BlockECReconstructionInfo task = new BlockECReconstructionInfo(block, + sources, targets, liveBlockIndices, ecPolicy); erasurecodeBlocks.offer(task); BlockManager.LOG.debug("Adding block recovery task " + task + "to " + getName() + ", current queue size is " + erasurecodeBlocks.size()); @@ -655,7 +655,8 @@ public class DatanodeDescriptor extends DatanodeInfo { return replicateBlocks.poll(maxTransfers); } - public List getErasureCodeCommand(int maxTransfers) { + public List getErasureCodeCommand( + int maxTransfers) { return erasurecodeBlocks.poll(maxTransfers); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 53f7043b271..d344ca66225 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; -import static org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY; +import static org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION; import static org.apache.hadoop.util.Time.monotonicNow; import com.google.common.annotations.VisibleForTesting; @@ -40,7 +40,7 @@ import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.protocol.*; -import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; +import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock; import org.apache.hadoop.ipc.Server; @@ -1455,11 +1455,11 @@ public class DatanodeManager { pendingList)); } // check pending erasure coding tasks - List pendingECList = nodeinfo.getErasureCodeCommand( - maxTransfers); + List pendingECList = nodeinfo + .getErasureCodeCommand(maxTransfers); if (pendingECList != null) { - cmds.add(new BlockECRecoveryCommand(DNA_ERASURE_CODING_RECOVERY, - pendingECList)); + cmds.add(new BlockECReconstructionCommand( + DNA_ERASURE_CODING_RECONSTRUCTION, pendingECList)); } // check block invalidation Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java index bb2e492e1db..fec669c4f22 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java @@ -35,7 +35,8 @@ class ErasureCodingWork extends BlockRecoveryWork { super(block, bc, srcNodes, containingNodes, liveReplicaStorages, additionalReplRequired, priority); this.liveBlockIndicies = liveBlockIndicies; - BlockManager.LOG.debug("Creating an ErasureCodingWork to recover " + block); + BlockManager.LOG.debug("Creating an ErasureCodingWork to {} reconstruct ", + block); } byte[] getLiveBlockIndicies() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index cd30eadb865..5b5dc4c10f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -31,7 +31,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.protocol.*; -import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; +import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; import org.slf4j.Logger; @@ -725,9 +725,10 @@ class BPOfferService { dxcs.balanceThrottler.setBandwidth(bandwidth); } break; - case DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY: + case DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION: LOG.info("DatanodeCommand action: DNA_ERASURE_CODING_RECOVERY"); - Collection ecTasks = ((BlockECRecoveryCommand) cmd).getECTasks(); + Collection ecTasks = + ((BlockECReconstructionCommand) cmd).getECTasks(); dn.getErasureCodingWorker().processErasureCodingTasks(ecTasks); break; default: @@ -759,7 +760,7 @@ class BPOfferService { case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE: case DatanodeProtocol.DNA_CACHE: case DatanodeProtocol.DNA_UNCACHE: - case DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY: + case DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION: LOG.warn("Got a command from standby NN - ignoring command:" + cmd.getAction()); break; default: diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java index 6ad71648586..60c84178cb9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java @@ -67,7 +67,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactor import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; +import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult; import org.apache.hadoop.io.IOUtils; @@ -83,10 +83,10 @@ import com.google.common.base.Preconditions; import org.slf4j.Logger; /** - * ErasureCodingWorker handles the erasure coding recovery work commands. These - * commands would be issued from Namenode as part of Datanode's heart beat - * response. BPOfferService delegates the work to this class for handling EC - * commands. + * ErasureCodingWorker handles the erasure coding reconstruction work commands. + * These commands would be issued from Namenode as part of Datanode's heart + * beat response. BPOfferService delegates the work to this class for handling + * EC commands. */ @InterfaceAudience.Private public final class ErasureCodingWorker { @@ -95,28 +95,28 @@ public final class ErasureCodingWorker { private final DataNode datanode; private final Configuration conf; - private ThreadPoolExecutor STRIPED_BLK_RECOVERY_THREAD_POOL; - private ThreadPoolExecutor STRIPED_READ_THREAD_POOL; - private final int STRIPED_READ_TIMEOUT_MILLIS; - private final int STRIPED_READ_BUFFER_SIZE; + private ThreadPoolExecutor EC_RECONSTRUCTION_STRIPED_BLK_THREAD_POOL; + private ThreadPoolExecutor EC_RECONSTRUCTION_STRIPED_READ_THREAD_POOL; + private final int EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS; + private final int EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE; public ErasureCodingWorker(Configuration conf, DataNode datanode) { this.datanode = datanode; this.conf = conf; - STRIPED_READ_TIMEOUT_MILLIS = conf.getInt( - DFSConfigKeys.DFS_DATANODE_STRIPED_READ_TIMEOUT_MILLIS_KEY, - DFSConfigKeys.DFS_DATANODE_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT); + EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS = conf.getInt( + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY, + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT); initializeStripedReadThreadPool(conf.getInt( - DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THREADS_KEY, - DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THREADS_DEFAULT)); - STRIPED_READ_BUFFER_SIZE = conf.getInt( - DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY, - DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT); + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_THREADS_KEY, + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_THREADS_DEFAULT)); + EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE = conf.getInt( + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY, + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_DEFAULT); - initializeStripedBlkRecoveryThreadPool(conf.getInt( - DFSConfigKeys.DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_KEY, - DFSConfigKeys.DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_DEFAULT)); + initializeStripedBlkReconstructionThreadPool(conf.getInt( + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_BLK_THREADS_KEY, + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_BLK_THREADS_DEFAULT)); } private RawErasureDecoder newDecoder(int numDataUnits, int numParityUnits) { @@ -126,8 +126,8 @@ public final class ErasureCodingWorker { private void initializeStripedReadThreadPool(int num) { LOG.debug("Using striped reads; pool threads=" + num); - STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60, - TimeUnit.SECONDS, new SynchronousQueue(), + EC_RECONSTRUCTION_STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, + 60, TimeUnit.SECONDS, new SynchronousQueue(), new Daemon.DaemonFactory() { private final AtomicInteger threadIndex = new AtomicInteger(0); @@ -146,48 +146,50 @@ public final class ErasureCodingWorker { super.rejectedExecution(runnable, e); } }); - STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true); + EC_RECONSTRUCTION_STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true); } - private void initializeStripedBlkRecoveryThreadPool(int num) { - LOG.debug("Using striped block recovery; pool threads=" + num); - STRIPED_BLK_RECOVERY_THREAD_POOL = new ThreadPoolExecutor(2, num, 60, - TimeUnit.SECONDS, new LinkedBlockingQueue(), + private void initializeStripedBlkReconstructionThreadPool(int num) { + LOG.debug("Using striped block reconstruction; pool threads=" + num); + EC_RECONSTRUCTION_STRIPED_BLK_THREAD_POOL = new ThreadPoolExecutor(2, num, + 60, TimeUnit.SECONDS, new LinkedBlockingQueue(), new Daemon.DaemonFactory() { private final AtomicInteger threadIdx = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { Thread t = super.newThread(r); - t.setName("stripedBlockRecovery-" + threadIdx.getAndIncrement()); + t.setName( + "stripedBlockReconstruction-" + threadIdx.getAndIncrement()); return t; } }); - STRIPED_BLK_RECOVERY_THREAD_POOL.allowCoreThreadTimeOut(true); + EC_RECONSTRUCTION_STRIPED_BLK_THREAD_POOL.allowCoreThreadTimeOut(true); } /** - * Handles the Erasure Coding recovery work commands. - * + * Handles the Erasure Coding reconstruction work commands. + * * @param ecTasks - * BlockECRecoveryInfo + * BlockECReconstructionInfo */ - public void processErasureCodingTasks(Collection ecTasks) { - for (BlockECRecoveryInfo recoveryInfo : ecTasks) { + public void processErasureCodingTasks( + Collection ecTasks) { + for (BlockECReconstructionInfo reconstructionInfo : ecTasks) { try { - STRIPED_BLK_RECOVERY_THREAD_POOL - .submit(new ReconstructAndTransferBlock(recoveryInfo)); + EC_RECONSTRUCTION_STRIPED_BLK_THREAD_POOL + .submit(new ReconstructAndTransferBlock(reconstructionInfo)); } catch (Throwable e) { - LOG.warn("Failed to recover striped block " - + recoveryInfo.getExtendedBlock().getLocalBlock(), e); + LOG.warn("Failed to reconstruct striped block " + + reconstructionInfo.getExtendedBlock().getLocalBlock(), e); } } } /** - * ReconstructAndTransferBlock recover one or more missed striped block in the - * striped block group, the minimum number of live striped blocks should be - * no less than data block number. + * ReconstructAndTransferBlock reconstruct one or more missed striped block + * in the striped block group, the minimum number of live striped blocks + * should be no less than data block number. * * | <- Striped Block Group -> | * blk_0 blk_1 blk_2(*) blk_3 ... <- A striped block group @@ -203,12 +205,12 @@ public final class ErasureCodingWorker { * ... ... ... ... * * - * We use following steps to recover striped block group, in each round, we - * recover bufferSize data until finish, the + * We use following steps to reconstruct striped block group, in each round, + * we reconstruct bufferSize data until finish, the * bufferSize is configurable and may be less or larger than * cell size: * step1: read bufferSize data from minimum number of sources - * required by recovery. + * required by reconstruction. * step2: decode data for targets. * step3: transfer data to targets. * @@ -217,25 +219,25 @@ public final class ErasureCodingWorker { * will be scheduled. The best sources are remembered for next round and * may be updated in each round. * - * In step2, typically if source blocks we read are all data blocks, we + * In step2, typically if source blocks we read are all data blocks, we * need to call encode, and if there is one parity block, we need to call - * decode. Notice we only read once and recover all missed striped block + * decode. Notice we only read once and reconstruct all missed striped block * if they are more than one. * - * In step3, send the recovered data to targets by constructing packet - * and send them directly. Same as continuous block replication, we - * don't check the packet ack. Since the datanode doing the recovery work - * are one of the source datanodes, so the recovered data are sent + * In step3, send the reconstructed data to targets by constructing packet + * and send them directly. Same as continuous block replication, we + * don't check the packet ack. Since the datanode doing the reconstruction + * work are one of the source datanodes, so the reconstructed data are sent * remotely. * * There are some points we can do further improvements in next phase: * 1. we can read the block file directly on the local datanode, * currently we use remote block reader. (Notice short-circuit is not * a good choice, see inline comments). - * 2. We need to check the packet ack for EC recovery? Since EC recovery - * is more expensive than continuous block replication, it needs to - * read from several other datanodes, should we make sure the - * recovered result received by targets? + * 2. We need to check the packet ack for EC reconstruction? Since EC + * reconstruction is more expensive than continuous block replication, + * it needs to read from several other datanodes, should we make sure + * the reconstructed result received by targets? */ private class ReconstructAndTransferBlock implements Runnable { private final int dataBlkNum; @@ -288,20 +290,22 @@ public final class ErasureCodingWorker { private final Map, Integer> futures = new HashMap<>(); private final CompletionService readService = - new ExecutorCompletionService<>(STRIPED_READ_THREAD_POOL); + new ExecutorCompletionService<>( + EC_RECONSTRUCTION_STRIPED_READ_THREAD_POOL); - ReconstructAndTransferBlock(BlockECRecoveryInfo recoveryInfo) { - ErasureCodingPolicy ecPolicy = recoveryInfo.getErasureCodingPolicy(); + ReconstructAndTransferBlock(BlockECReconstructionInfo reconstructionInfo) { + ErasureCodingPolicy ecPolicy = reconstructionInfo + .getErasureCodingPolicy(); dataBlkNum = ecPolicy.getNumDataUnits(); parityBlkNum = ecPolicy.getNumParityUnits(); cellSize = ecPolicy.getCellSize(); - blockGroup = recoveryInfo.getExtendedBlock(); + blockGroup = reconstructionInfo.getExtendedBlock(); final int cellsNum = (int)((blockGroup.getNumBytes() - 1) / cellSize + 1); minRequiredSources = Math.min(cellsNum, dataBlkNum); - liveIndices = recoveryInfo.getLiveBlockIndices(); - sources = recoveryInfo.getSourceDnInfos(); + liveIndices = reconstructionInfo.getLiveBlockIndices(); + sources = reconstructionInfo.getSourceDnInfos(); stripedReaders = new ArrayList<>(sources.length); Preconditions.checkArgument(liveIndices.length >= minRequiredSources, @@ -315,8 +319,8 @@ public final class ErasureCodingWorker { zeroStripeIndices = new short[dataBlkNum - minRequiredSources]; } - targets = recoveryInfo.getTargetDnInfos(); - targetStorageTypes = recoveryInfo.getTargetStorageTypes(); + targets = reconstructionInfo.getTargetDnInfos(); + targetStorageTypes = reconstructionInfo.getTargetStorageTypes(); targetIndices = new short[targets.length]; targetBuffers = new ByteBuffer[targets.length]; @@ -402,7 +406,7 @@ public final class ErasureCodingWorker { if (nsuccess < minRequiredSources) { String error = "Can't find minimum sources required by " - + "recovery, block id: " + blockGroup.getBlockId(); + + "reconstruction, block id: " + blockGroup.getBlockId(); throw new IOException(error); } @@ -441,21 +445,21 @@ public final class ErasureCodingWorker { getBlockLen(blockGroup, targetIndex)); } while (positionInBlock < maxTargetLength) { - final int toRecover = (int) Math.min( + final int toReconstruct = (int) Math.min( bufferSize, maxTargetLength - positionInBlock); // step1: read from minimum source DNs required for reconstruction. // The returned success list is the source DNs we do real read from Map> corruptionMap = new HashMap<>(); try { - success = readMinimumStripedData4Recovery(success, toRecover, - corruptionMap); + success = readMinimumStripedData4Reconstruction(success, + toReconstruct, corruptionMap); } finally { // report corrupted blocks to NN reportCorruptedBlocks(corruptionMap); } // step2: decode to reconstruct targets - recoverTargets(success, targetsStatus, toRecover); + reconstructTargets(success, targetsStatus, toReconstruct); // step3: transfer data if (transferData2Targets(targetsStatus) == 0) { @@ -464,7 +468,7 @@ public final class ErasureCodingWorker { } clearBuffers(); - positionInBlock += toRecover; + positionInBlock += toReconstruct; } endTargetBlocks(targetsStatus); @@ -472,7 +476,7 @@ public final class ErasureCodingWorker { // Currently we don't check the acks for packets, this is similar as // block replication. } catch (Throwable e) { - LOG.warn("Failed to recover striped block: " + blockGroup, e); + LOG.warn("Failed to reconstruct striped block: " + blockGroup, e); } finally { datanode.decrementXmitsInProgress(); // close block readers @@ -493,7 +497,7 @@ public final class ErasureCodingWorker { checksum = blockReader.getDataChecksum(); bytesPerChecksum = checksum.getBytesPerChecksum(); // The bufferSize is flat to divide bytesPerChecksum - int readBufferSize = STRIPED_READ_BUFFER_SIZE; + int readBufferSize = EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE; bufferSize = readBufferSize < bytesPerChecksum ? bytesPerChecksum : readBufferSize - readBufferSize % bytesPerChecksum; } else { @@ -521,11 +525,11 @@ public final class ErasureCodingWorker { } } - /** the reading length should not exceed the length for recovery */ - private int getReadLength(int index, int recoverLength) { + /** the reading length should not exceed the length for reconstruction. */ + private int getReadLength(int index, int reconstructLength) { long blockLen = getBlockLen(blockGroup, index); long remaining = blockLen - positionInBlock; - return (int) Math.min(remaining, recoverLength); + return (int) Math.min(remaining, reconstructLength); } /** @@ -538,15 +542,16 @@ public final class ErasureCodingWorker { * operations and next iteration read. * * @param success the initial success list of source DNs we think best - * @param recoverLength the length to recover. + * @param reconstructLength the length to reconstruct. * @return updated success list of source DNs we do real read * @throws IOException */ - private int[] readMinimumStripedData4Recovery(final int[] success, - int recoverLength, Map> corruptionMap) - throws IOException { - Preconditions.checkArgument(recoverLength >= 0 && - recoverLength <= bufferSize); + private int[] readMinimumStripedData4Reconstruction(final int[] success, + int reconstructLength, + Map> corruptionMap) + throws IOException { + Preconditions.checkArgument(reconstructLength >= 0 && + reconstructLength <= bufferSize); int nsuccess = 0; int[] newSuccess = new int[minRequiredSources]; BitSet used = new BitSet(sources.length); @@ -557,7 +562,7 @@ public final class ErasureCodingWorker { for (int i = 0; i < minRequiredSources; i++) { StripedReader reader = stripedReaders.get(success[i]); final int toRead = getReadLength(liveIndices[success[i]], - recoverLength); + reconstructLength); if (toRead > 0) { Callable readCallable = readFromBlock(reader, reader.buffer, toRead, corruptionMap); @@ -573,9 +578,9 @@ public final class ErasureCodingWorker { while (!futures.isEmpty()) { try { - StripingChunkReadResult result = - StripedBlockUtil.getNextCompletedStripedRead( - readService, futures, STRIPED_READ_TIMEOUT_MILLIS); + StripingChunkReadResult result = StripedBlockUtil + .getNextCompletedStripedRead(readService, futures, + EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS); int resultIndex = -1; if (result.state == StripingChunkReadResult.SUCCESSFUL) { resultIndex = result.index; @@ -585,10 +590,12 @@ public final class ErasureCodingWorker { StripedReader failedReader = stripedReaders.get(result.index); closeBlockReader(failedReader.blockReader); failedReader.blockReader = null; - resultIndex = scheduleNewRead(used, recoverLength, corruptionMap); + resultIndex = scheduleNewRead(used, reconstructLength, + corruptionMap); } else if (result.state == StripingChunkReadResult.TIMEOUT) { // If timeout, we also schedule a new read. - resultIndex = scheduleNewRead(used, recoverLength, corruptionMap); + resultIndex = scheduleNewRead(used, reconstructLength, + corruptionMap); } if (resultIndex >= 0) { newSuccess[nsuccess++] = resultIndex; @@ -643,20 +650,20 @@ public final class ErasureCodingWorker { return Arrays.copyOf(result, m); } - private void recoverTargets(int[] success, boolean[] targetsStatus, - int toRecoverLen) { + private void reconstructTargets(int[] success, boolean[] targetsStatus, + int toReconstructLen) { initDecoderIfNecessary(); ByteBuffer[] inputs = new ByteBuffer[dataBlkNum + parityBlkNum]; for (int i = 0; i < success.length; i++) { StripedReader reader = stripedReaders.get(success[i]); ByteBuffer buffer = reader.buffer; - paddingBufferToLen(buffer, toRecoverLen); + paddingBufferToLen(buffer, toReconstructLen); inputs[reader.index] = (ByteBuffer)buffer.flip(); } if (success.length < dataBlkNum) { for (int i = 0; i < zeroStripeBuffers.length; i++) { ByteBuffer buffer = zeroStripeBuffers[i]; - paddingBufferToLen(buffer, toRecoverLen); + paddingBufferToLen(buffer, toReconstructLen); int index = zeroStripeIndices[i]; inputs[index] = (ByteBuffer)buffer.flip(); } @@ -666,7 +673,7 @@ public final class ErasureCodingWorker { int m = 0; for (int i = 0; i < targetBuffers.length; i++) { if (targetsStatus[i]) { - targetBuffers[i].limit(toRecoverLen); + targetBuffers[i].limit(toReconstructLen); outputs[m++] = targetBuffers[i]; } } @@ -678,7 +685,7 @@ public final class ErasureCodingWorker { long remaining = blockLen - positionInBlock; if (remaining <= 0) { targetBuffers[i].limit(0); - } else if (remaining < toRecoverLen) { + } else if (remaining < toReconstructLen) { targetBuffers[i].limit((int)remaining); } } @@ -696,7 +703,7 @@ public final class ErasureCodingWorker { * @param used the used source DNs in this iteration. * @return the array index of source DN if don't need to do real read. */ - private int scheduleNewRead(BitSet used, int recoverLength, + private int scheduleNewRead(BitSet used, int reconstructLen, Map> corruptionMap) { StripedReader reader = null; // step1: initially we may only have minRequiredSources @@ -707,7 +714,7 @@ public final class ErasureCodingWorker { int toRead = 0; while (reader == null && m < sources.length) { reader = addStripedReader(m, positionInBlock); - toRead = getReadLength(liveIndices[m], recoverLength); + toRead = getReadLength(liveIndices[m], reconstructLen); if (toRead > 0) { if (reader.blockReader == null) { reader = null; @@ -727,7 +734,7 @@ public final class ErasureCodingWorker { for (int i = 0; reader == null && i < stripedReaders.size(); i++) { if (!used.get(i)) { StripedReader r = stripedReaders.get(i); - toRead = getReadLength(liveIndices[i], recoverLength); + toRead = getReadLength(liveIndices[i], reconstructLen); if (toRead > 0) { closeBlockReader(r.blockReader); r.blockReader = newBlockReader( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECReconstructionCommand.java similarity index 75% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECReconstructionCommand.java index d0c1786dbe5..6e9c55be2c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECReconstructionCommand.java @@ -31,8 +31,8 @@ import java.util.Arrays; import java.util.Collection; /** - * A BlockECRecoveryCommand is an instruction to a DataNode to reconstruct a - * striped block group with missing blocks. + * A BlockECReconstructionCommand is an instruction to a DataNode to + * reconstruct a striped block group with missing blocks. * * Upon receiving this command, the DataNode pulls data from other DataNodes * hosting blocks in this group and reconstructs the lost blocks through codec @@ -45,23 +45,24 @@ import java.util.Collection; */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class BlockECRecoveryCommand extends DatanodeCommand { - final Collection ecTasks; +public class BlockECReconstructionCommand extends DatanodeCommand { + private final Collection ecTasks; /** - * Create BlockECRecoveryCommand from a collection of - * {@link BlockECRecoveryInfo}, each representing a recovery task + * Create BlockECReconstructionCommand from a collection of + * {@link BlockECReconstructionInfo}, each representing a reconstruction + * task */ - public BlockECRecoveryCommand(int action, - Collection blockECRecoveryInfoList) { + public BlockECReconstructionCommand(int action, + Collection blockECReconstructionInfoList) { super(action); - this.ecTasks = blockECRecoveryInfoList; + this.ecTasks = blockECReconstructionInfoList; } @Override public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("BlockECRecoveryCommand(\n "); + sb.append("BlockECReconstructionCommand(\n "); Joiner.on("\n ").appendTo(sb, ecTasks); sb.append("\n)"); return sb.toString(); @@ -70,7 +71,7 @@ public class BlockECRecoveryCommand extends DatanodeCommand { /** Block and targets pair */ @InterfaceAudience.Private @InterfaceStability.Evolving - public static class BlockECRecoveryInfo { + public static class BlockECReconstructionInfo { private final ExtendedBlock block; private final DatanodeInfo[] sources; private DatanodeInfo[] targets; @@ -79,19 +80,19 @@ public class BlockECRecoveryCommand extends DatanodeCommand { private final byte[] liveBlockIndices; private final ErasureCodingPolicy ecPolicy; - public BlockECRecoveryInfo(ExtendedBlock block, DatanodeInfo[] sources, - DatanodeStorageInfo[] targetDnStorageInfo, byte[] liveBlockIndices, - ErasureCodingPolicy ecPolicy) { + public BlockECReconstructionInfo(ExtendedBlock block, + DatanodeInfo[] sources, DatanodeStorageInfo[] targetDnStorageInfo, + byte[] liveBlockIndices, ErasureCodingPolicy ecPolicy) { this(block, sources, DatanodeStorageInfo .toDatanodeInfos(targetDnStorageInfo), DatanodeStorageInfo .toStorageIDs(targetDnStorageInfo), DatanodeStorageInfo .toStorageTypes(targetDnStorageInfo), liveBlockIndices, ecPolicy); } - public BlockECRecoveryInfo(ExtendedBlock block, DatanodeInfo[] sources, - DatanodeInfo[] targets, String[] targetStorageIDs, - StorageType[] targetStorageTypes, byte[] liveBlockIndices, - ErasureCodingPolicy ecPolicy) { + public BlockECReconstructionInfo(ExtendedBlock block, + DatanodeInfo[] sources, DatanodeInfo[] targets, + String[] targetStorageIDs, StorageType[] targetStorageTypes, + byte[] liveBlockIndices, ErasureCodingPolicy ecPolicy) { this.block = block; this.sources = sources; this.targets = targets; @@ -117,7 +118,7 @@ public class BlockECRecoveryCommand extends DatanodeCommand { public String[] getTargetStorageIDs() { return targetStorageIDs; } - + public StorageType[] getTargetStorageTypes() { return targetStorageTypes; } @@ -125,14 +126,14 @@ public class BlockECRecoveryCommand extends DatanodeCommand { public byte[] getLiveBlockIndices() { return liveBlockIndices; } - + public ErasureCodingPolicy getErasureCodingPolicy() { return ecPolicy; } @Override public String toString() { - return new StringBuilder().append("BlockECRecoveryInfo(\n ") + return new StringBuilder().append("BlockECReconstructionInfo(\n ") .append("Recovering ").append(block).append(" From: ") .append(Arrays.asList(sources)).append(" To: [") .append(Arrays.asList(targets)).append(")\n") @@ -141,7 +142,7 @@ public class BlockECRecoveryCommand extends DatanodeCommand { } } - public Collection getECTasks() { + public Collection getECTasks() { return this.ecTasks; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java index b9628557951..8c4359f756c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java @@ -76,7 +76,7 @@ public interface DatanodeProtocol { final static int DNA_BALANCERBANDWIDTHUPDATE = 8; // update balancer bandwidth final static int DNA_CACHE = 9; // cache blocks final static int DNA_UNCACHE = 10; // uncache blocks - final static int DNA_ERASURE_CODING_RECOVERY = 11; // erasure coding recovery command + final static int DNA_ERASURE_CODING_RECONSTRUCTION = 11; // erasure coding reconstruction command /** * Register Datanode. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index 02d5b81d697..711118530cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -60,7 +60,7 @@ message DatanodeCommandProto { UnusedUpgradeCommand = 6; NullDatanodeCommand = 7; BlockIdCommand = 8; - BlockECRecoveryCommand = 9; + BlockECReconstructionCommand = 9; } required Type cmdType = 1; // Type of the command @@ -74,7 +74,7 @@ message DatanodeCommandProto { optional KeyUpdateCommandProto keyUpdateCmd = 6; optional RegisterCommandProto registerCmd = 7; optional BlockIdCommandProto blkIdCmd = 8; - optional BlockECRecoveryCommandProto blkECRecoveryCmd = 9; + optional BlockECReconstructionCommandProto blkECReconstructionCmd = 9; } /** @@ -149,10 +149,10 @@ message RegisterCommandProto { } /** - * Block Erasure coding recovery command + * Block Erasure coding reconstruction command */ -message BlockECRecoveryCommandProto { - repeated BlockECRecoveryInfoProto blockECRecoveryinfo = 1; +message BlockECReconstructionCommandProto { + repeated BlockECReconstructionInfoProto blockECReconstructioninfo = 1; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 7607c32221b..4889bc32318 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2654,26 +2654,37 @@ - dfs.datanode.stripedread.timeout.millis + dfs.datanode.ec.reconstruction.stripedread.timeout.millis 5000 Datanode striped read timeout in milliseconds. - dfs.datanode.stripedread.threads + dfs.datanode.ec.reconstruction.stripedread.threads 20 - Number of threads used by the Datanode for background recovery work. + + Number of threads used by the Datanode to read striped block + during background reconstruction work. - dfs.datanode.stripedread.buffer.size + dfs.datanode.ec.reconstruction.stripedread.buffer.size 65536 Datanode striped read buffer size. + + dfs.datanode.ec.reconstruction.stripedblock.threads.size + 8 + + Number of threads used by the Datanode for background + reconstruction work. + + + dfs.namenode.quota.init-threads 4 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java similarity index 79% rename from hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java rename to hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java index ca9d933fa95..97edaf1f68a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java @@ -45,7 +45,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; -import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; +import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.test.GenericTestUtils; import org.apache.log4j.Level; @@ -54,9 +54,9 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -public class TestRecoverStripedFile { - public static final Log LOG = LogFactory.getLog(TestRecoverStripedFile.class); - +public class TestReconstructStripedFile { + public static final Log LOG = LogFactory.getLog(TestReconstructStripedFile.class); + private static final int dataBlkNum = StripedFileTestUtil.NUM_DATA_BLOCKS; private static final int parityBlkNum = StripedFileTestUtil.NUM_PARITY_BLOCKS; private static final int cellSize = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE; @@ -70,7 +70,7 @@ public class TestRecoverStripedFile { GenericTestUtils.setLogLevel(BlockManager.blockLog, Level.ALL); } - enum RecoveryType { + enum ReconstructionType { DataOnly, ParityOnly, Any @@ -86,14 +86,14 @@ public class TestRecoverStripedFile { public void setup() throws IOException { final Configuration conf = new Configuration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); - conf.setInt(DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY, + conf.setInt(DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY, cellSize - 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dnNum).build(); cluster.waitActive(); - + fs = cluster.getFileSystem(); fs.getClient().setErasureCodingPolicy("/", null); @@ -110,100 +110,100 @@ public class TestRecoverStripedFile { cluster = null; } } - + @Test(timeout = 120000) public void testRecoverOneParityBlock() throws Exception { int fileLen = 10 * blockSize + blockSize/10; - assertFileBlocksRecovery("/testRecoverOneParityBlock", fileLen, - RecoveryType.ParityOnly, 1); + assertFileBlocksReconstruction("/testRecoverOneParityBlock", fileLen, + ReconstructionType.ParityOnly, 1); } - + @Test(timeout = 120000) public void testRecoverOneParityBlock1() throws Exception { int fileLen = cellSize + cellSize/10; - assertFileBlocksRecovery("/testRecoverOneParityBlock1", fileLen, - RecoveryType.ParityOnly, 1); + assertFileBlocksReconstruction("/testRecoverOneParityBlock1", fileLen, + ReconstructionType.ParityOnly, 1); } - + @Test(timeout = 120000) public void testRecoverOneParityBlock2() throws Exception { int fileLen = 1; - assertFileBlocksRecovery("/testRecoverOneParityBlock2", fileLen, - RecoveryType.ParityOnly, 1); + assertFileBlocksReconstruction("/testRecoverOneParityBlock2", fileLen, + ReconstructionType.ParityOnly, 1); } - + @Test(timeout = 120000) public void testRecoverOneParityBlock3() throws Exception { int fileLen = 3 * blockSize + blockSize/10; - assertFileBlocksRecovery("/testRecoverOneParityBlock3", fileLen, - RecoveryType.ParityOnly, 1); + assertFileBlocksReconstruction("/testRecoverOneParityBlock3", fileLen, + ReconstructionType.ParityOnly, 1); } - + @Test(timeout = 120000) public void testRecoverThreeParityBlocks() throws Exception { int fileLen = 10 * blockSize + blockSize/10; - assertFileBlocksRecovery("/testRecoverThreeParityBlocks", fileLen, - RecoveryType.ParityOnly, 3); + assertFileBlocksReconstruction("/testRecoverThreeParityBlocks", fileLen, + ReconstructionType.ParityOnly, 3); } - + @Test(timeout = 120000) public void testRecoverThreeDataBlocks() throws Exception { int fileLen = 10 * blockSize + blockSize/10; - assertFileBlocksRecovery("/testRecoverThreeDataBlocks", fileLen, - RecoveryType.DataOnly, 3); + assertFileBlocksReconstruction("/testRecoverThreeDataBlocks", fileLen, + ReconstructionType.DataOnly, 3); } - + @Test(timeout = 120000) public void testRecoverThreeDataBlocks1() throws Exception { int fileLen = 3 * blockSize + blockSize/10; - assertFileBlocksRecovery("/testRecoverThreeDataBlocks1", fileLen, - RecoveryType.DataOnly, 3); + assertFileBlocksReconstruction("/testRecoverThreeDataBlocks1", fileLen, + ReconstructionType.DataOnly, 3); } - + @Test(timeout = 120000) public void testRecoverOneDataBlock() throws Exception { int fileLen = 10 * blockSize + blockSize/10; - assertFileBlocksRecovery("/testRecoverOneDataBlock", fileLen, - RecoveryType.DataOnly, 1); + assertFileBlocksReconstruction("/testRecoverOneDataBlock", fileLen, + ReconstructionType.DataOnly, 1); } - + @Test(timeout = 120000) public void testRecoverOneDataBlock1() throws Exception { int fileLen = cellSize + cellSize/10; - assertFileBlocksRecovery("/testRecoverOneDataBlock1", fileLen, - RecoveryType.DataOnly, 1); + assertFileBlocksReconstruction("/testRecoverOneDataBlock1", fileLen, + ReconstructionType.DataOnly, 1); } - + @Test(timeout = 120000) public void testRecoverOneDataBlock2() throws Exception { int fileLen = 1; - assertFileBlocksRecovery("/testRecoverOneDataBlock2", fileLen, - RecoveryType.DataOnly, 1); + assertFileBlocksReconstruction("/testRecoverOneDataBlock2", fileLen, + ReconstructionType.DataOnly, 1); } - + @Test(timeout = 120000) public void testRecoverAnyBlocks() throws Exception { int fileLen = 3 * blockSize + blockSize/10; - assertFileBlocksRecovery("/testRecoverAnyBlocks", fileLen, - RecoveryType.Any, 2); + assertFileBlocksReconstruction("/testRecoverAnyBlocks", fileLen, + ReconstructionType.Any, 2); } - + @Test(timeout = 120000) public void testRecoverAnyBlocks1() throws Exception { int fileLen = 10 * blockSize + blockSize/10; - assertFileBlocksRecovery("/testRecoverAnyBlocks1", fileLen, - RecoveryType.Any, 3); + assertFileBlocksReconstruction("/testRecoverAnyBlocks1", fileLen, + ReconstructionType.Any, 3); } - private int[] generateDeadDnIndices(RecoveryType type, int deadNum, + private int[] generateDeadDnIndices(ReconstructionType type, int deadNum, byte[] indices) { List deadList = new ArrayList<>(deadNum); while (deadList.size() < deadNum) { int dead = random.nextInt(indices.length); boolean isOfType = true; - if (type == RecoveryType.DataOnly) { + if (type == ReconstructionType.DataOnly) { isOfType = indices[dead] < dataBlkNum; - } else if (type == RecoveryType.ParityOnly) { + } else if (type == ReconstructionType.ParityOnly) { isOfType = indices[dead] >= dataBlkNum; } if (isOfType && !deadList.contains(dead)) { @@ -228,13 +228,13 @@ public class TestRecoverStripedFile { } private int generateErrors(Map corruptTargets, - RecoveryType type) + ReconstructionType type) throws IOException { int stoppedDN = 0; for (Map.Entry target : corruptTargets.entrySet()) { - if (stoppedDN == 0 || type != RecoveryType.DataOnly + if (stoppedDN == 0 || type != ReconstructionType.DataOnly || random.nextBoolean()) { - // stop at least one DN to trigger recovery + // stop at least one DN to trigger reconstruction LOG.info("Note: stop DataNode " + target.getValue().getDisplayName() + " with internal block " + target.getKey()); shutdownDataNodes(target.getValue()); @@ -249,17 +249,17 @@ public class TestRecoverStripedFile { } /** - * Test the file blocks recovery. - * 1. Check the replica is recovered in the target datanode, + * Test the file blocks reconstruction. + * 1. Check the replica is reconstructed in the target datanode, * and verify the block replica length, generationStamp and content. - * 2. Read the file and verify content. + * 2. Read the file and verify content. */ - private void assertFileBlocksRecovery(String fileName, int fileLen, - RecoveryType type, int toRecoverBlockNum) throws Exception { + private void assertFileBlocksReconstruction(String fileName, int fileLen, + ReconstructionType type, int toRecoverBlockNum) throws Exception { if (toRecoverBlockNum < 1 || toRecoverBlockNum > parityBlkNum) { Assert.fail("toRecoverBlockNum should be between 1 ~ " + parityBlkNum); } - + Path file = new Path(fileName); final byte[] data = new byte[fileLen]; @@ -269,13 +269,13 @@ public class TestRecoverStripedFile { LocatedBlocks locatedBlocks = getLocatedBlocks(file); assertEquals(locatedBlocks.getFileLength(), fileLen); - - LocatedStripedBlock lastBlock = + + LocatedStripedBlock lastBlock = (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock(); - + DatanodeInfo[] storageInfos = lastBlock.getLocations(); byte[] indices = lastBlock.getBlockIndices(); - + BitSet bitset = new BitSet(dnNum); for (DatanodeInfo storageInfo : storageInfos) { bitset.set(dnMap.get(storageInfo)); @@ -284,7 +284,7 @@ public class TestRecoverStripedFile { int[] dead = generateDeadDnIndices(type, toRecoverBlockNum, indices); LOG.info("Note: indices == " + Arrays.toString(indices) + ". Generate errors on datanodes: " + Arrays.toString(dead)); - + DatanodeInfo[] dataDNs = new DatanodeInfo[toRecoverBlockNum]; int[] deadDnIndices = new int[toRecoverBlockNum]; ExtendedBlock[] blocks = new ExtendedBlock[toRecoverBlockNum]; @@ -303,7 +303,7 @@ public class TestRecoverStripedFile { replicas[i] = cluster.getBlockFile(deadDnIndices[i], blocks[i]); metadatas[i] = cluster.getBlockMetadataFile(deadDnIndices[i], blocks[i]); // the block replica on the datanode should be the same as expected - assertEquals(replicas[i].length(), + assertEquals(replicas[i].length(), StripedBlockUtil.getInternalBlockLength( lastBlock.getBlockSize(), cellSize, dataBlkNum, indices[dead[i]])); assertTrue(metadatas[i].getName(). @@ -311,7 +311,7 @@ public class TestRecoverStripedFile { LOG.info("replica " + i + " locates in file: " + replicas[i]); replicaContents[i] = DFSTestUtil.readFileAsBytes(replicas[i]); } - + int cellsNum = (fileLen - 1) / cellSize + 1; int groupSize = Math.min(cellsNum, dataBlkNum) + parityBlkNum; @@ -331,25 +331,25 @@ public class TestRecoverStripedFile { targetDNs[n++] = i; } } - - waitForRecoveryFinished(file, groupSize); + + waitForReconstructionFinished(file, groupSize); targetDNs = sortTargetsByReplicas(blocks, targetDNs); // Check the replica on the new target node. for (int i = 0; i < toRecoverBlockNum; i++) { - File replicaAfterRecovery = cluster.getBlockFile(targetDNs[i], blocks[i]); - LOG.info("replica after recovery " + replicaAfterRecovery); - File metadataAfterRecovery = + File replicaAfterReconstruction = cluster.getBlockFile(targetDNs[i], blocks[i]); + LOG.info("replica after reconstruction " + replicaAfterReconstruction); + File metadataAfterReconstruction = cluster.getBlockMetadataFile(targetDNs[i], blocks[i]); - assertEquals(replicaAfterRecovery.length(), replicas[i].length()); + assertEquals(replicaAfterReconstruction.length(), replicas[i].length()); LOG.info("replica before " + replicas[i]); - assertTrue(metadataAfterRecovery.getName(). + assertTrue(metadataAfterReconstruction.getName(). endsWith(blocks[i].getGenerationStamp() + ".meta")); - byte[] replicaContentAfterRecovery = - DFSTestUtil.readFileAsBytes(replicaAfterRecovery); + byte[] replicaContentAfterReconstruction = + DFSTestUtil.readFileAsBytes(replicaAfterReconstruction); - Assert.assertArrayEquals(replicaContents[i], replicaContentAfterRecovery); + Assert.assertArrayEquals(replicaContents[i], replicaContentAfterReconstruction); } } @@ -368,18 +368,19 @@ public class TestRecoverStripedFile { } } if (result[i] == -1) { - Assert.fail("Failed to recover striped block: " + blocks[i].getBlockId()); + Assert.fail("Failed to reconstruct striped block: " + + blocks[i].getBlockId()); } } return result; } - private LocatedBlocks waitForRecoveryFinished(Path file, int groupSize) + private LocatedBlocks waitForReconstructionFinished(Path file, int groupSize) throws Exception { final int ATTEMPTS = 60; for (int i = 0; i < ATTEMPTS; i++) { LocatedBlocks locatedBlocks = getLocatedBlocks(file); - LocatedStripedBlock lastBlock = + LocatedStripedBlock lastBlock = (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock(); DatanodeInfo[] storageInfos = lastBlock.getLocations(); if (storageInfos.length >= groupSize) { @@ -387,9 +388,9 @@ public class TestRecoverStripedFile { } Thread.sleep(1000); } - throw new IOException ("Time out waiting for EC block recovery."); + throw new IOException ("Time out waiting for EC block reconstruction."); } - + private LocatedBlocks getLocatedBlocks(Path file) throws IOException { return fs.getClient().getLocatedBlocks(file.toString(), 0, Long.MAX_VALUE); } @@ -415,10 +416,10 @@ public class TestRecoverStripedFile { DatanodeStorageInfo[] dnStorageInfo = new DatanodeStorageInfo[] { targetDnInfos_1 }; - BlockECRecoveryInfo invalidECInfo = new BlockECRecoveryInfo( + BlockECReconstructionInfo invalidECInfo = new BlockECReconstructionInfo( new ExtendedBlock("bp-id", 123456), dataDNs, dnStorageInfo, liveIndices, ErasureCodingPolicyManager.getSystemDefaultPolicy()); - List ecTasks = new ArrayList<>(); + List ecTasks = new ArrayList<>(); ecTasks.add(invalidECInfo); dataNode.getErasureCodingWorker().processErasureCodingTasks(ecTasks); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java index 70614a8667d..a4f330267a0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java @@ -43,7 +43,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto; -import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECRecoveryCommandProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECReconstructionCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; @@ -75,13 +75,13 @@ import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature; import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; -import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; +import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations; -import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; @@ -689,7 +689,7 @@ public class TestPBHelper { DatanodeStorageInfo[] targetDnInfos0 = new DatanodeStorageInfo[] { targetDnInfos_0, targetDnInfos_1 }; byte[] liveBlkIndices0 = new byte[2]; - BlockECRecoveryInfo blkECRecoveryInfo0 = new BlockECRecoveryInfo( + BlockECReconstructionInfo blkECRecoveryInfo0 = new BlockECReconstructionInfo( new ExtendedBlock("bp1", 1234), dnInfos0, targetDnInfos0, liveBlkIndices0, ErasureCodingPolicyManager.getSystemDefaultPolicy()); DatanodeInfo[] dnInfos1 = new DatanodeInfo[] { @@ -703,26 +703,26 @@ public class TestPBHelper { DatanodeStorageInfo[] targetDnInfos1 = new DatanodeStorageInfo[] { targetDnInfos_2, targetDnInfos_3 }; byte[] liveBlkIndices1 = new byte[2]; - BlockECRecoveryInfo blkECRecoveryInfo1 = new BlockECRecoveryInfo( + BlockECReconstructionInfo blkECRecoveryInfo1 = new BlockECReconstructionInfo( new ExtendedBlock("bp2", 3256), dnInfos1, targetDnInfos1, liveBlkIndices1, ErasureCodingPolicyManager.getSystemDefaultPolicy()); - List blkRecoveryInfosList = new ArrayList(); + List blkRecoveryInfosList = new ArrayList(); blkRecoveryInfosList.add(blkECRecoveryInfo0); blkRecoveryInfosList.add(blkECRecoveryInfo1); - BlockECRecoveryCommand blkECRecoveryCmd = new BlockECRecoveryCommand( - DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY, blkRecoveryInfosList); - BlockECRecoveryCommandProto blkECRecoveryCmdProto = PBHelper - .convert(blkECRecoveryCmd); - blkECRecoveryCmd = PBHelper.convert(blkECRecoveryCmdProto); - Iterator iterator = blkECRecoveryCmd.getECTasks() + BlockECReconstructionCommand blkECReconstructionCmd = new BlockECReconstructionCommand( + DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION, blkRecoveryInfosList); + BlockECReconstructionCommandProto blkECRecoveryCmdProto = PBHelper + .convert(blkECReconstructionCmd); + blkECReconstructionCmd = PBHelper.convert(blkECRecoveryCmdProto); + Iterator iterator = blkECReconstructionCmd.getECTasks() .iterator(); assertBlockECRecoveryInfoEquals(blkECRecoveryInfo0, iterator.next()); assertBlockECRecoveryInfoEquals(blkECRecoveryInfo1, iterator.next()); } private void assertBlockECRecoveryInfoEquals( - BlockECRecoveryInfo blkECRecoveryInfo1, - BlockECRecoveryInfo blkECRecoveryInfo2) { + BlockECReconstructionInfo blkECRecoveryInfo1, + BlockECReconstructionInfo blkECRecoveryInfo2) { assertEquals(blkECRecoveryInfo1.getExtendedBlock(), blkECRecoveryInfo2.getExtendedBlock()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReconstructStripedBlocks.java similarity index 94% rename from hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java rename to hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReconstructStripedBlocks.java index 3a5c135578d..6ed982f4279 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReconstructStripedBlocks.java @@ -37,7 +37,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; +import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.junit.Test; @@ -50,7 +50,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -public class TestRecoverStripedBlocks { +public class TestReconstructStripedBlocks { private static final int cellSize = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE; private final short GROUP_SIZE = (short) (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS); @@ -90,9 +90,9 @@ public class TestRecoverStripedBlocks { * Start GROUP_SIZE + 1 datanodes. * Inject striped blocks to first GROUP_SIZE datanodes. * Then make numOfBusy datanodes busy, make numOfMissed datanodes missed. - * Then trigger BlockManager to compute recovery works. (so all recovery work - * will be scheduled to the last datanode) - * Finally, verify the recovery work of the last datanode. + * Then trigger BlockManager to compute reconstruction works. (so all + * reconstruction work will be scheduled to the last datanode) + * Finally, verify the reconstruction work of the last datanode. */ private void doTestMissingStripedBlock(int numOfMissed, int numOfBusy) throws Exception { @@ -148,22 +148,23 @@ public class TestRecoverStripedBlocks { BlockManagerTestUtil.getComputedDatanodeWork(bm); - // all the recovery work will be scheduled on the last DN + // all the reconstruction work will be scheduled on the last DN DataNode lastDn = cluster.getDataNodes().get(GROUP_SIZE); DatanodeDescriptor last = bm.getDatanodeManager().getDatanode(lastDn.getDatanodeId()); assertEquals("Counting the number of outstanding EC tasks", numBlocks, last.getNumberOfBlocksToBeErasureCoded()); - List recovery = + List reconstruction = last.getErasureCodeCommand(numBlocks); - for (BlockECRecoveryInfo info : recovery) { + for (BlockECReconstructionInfo info : reconstruction) { assertEquals(1, info.getTargetDnInfos().length); assertEquals(last, info.getTargetDnInfos()[0]); assertEquals(info.getSourceDnInfos().length, info.getLiveBlockIndices().length); if (GROUP_SIZE - numOfMissed == NUM_DATA_BLOCKS) { // It's a QUEUE_HIGHEST_PRIORITY block, so the busy DNs will be chosen - // to make sure we have NUM_DATA_BLOCKS DNs to do recovery work. + // to make sure we have NUM_DATA_BLOCKS DNs to do reconstruction + // work. assertEquals(NUM_DATA_BLOCKS, info.getSourceDnInfos().length); } else { // The block has no highest priority, so we don't use the busy DNs as