HDFS-9255. Consolidate block recovery related implementation into a single class. Contributed by Walter Su.
Change-Id: I31c5d4778656053e60a4263970ec5fda441e9725
This commit is contained in:
@ -760,6 +760,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9311. Support optional offload of NameNode HA service health checks to
a separate RPC server. (cnauroth)
HDFS-9255. Consolidate block recovery related implementation into a single
class. (Walter Su via zhz)
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
@ -343,11 +343,16 @@ public static RecoveringBlockProto convert(RecoveringBlock b) {
public static RecoveringBlock convert(RecoveringBlockProto b) {
ExtendedBlock block = PBHelperClient.convert(b.getBlock().getB());
DatanodeInfo[] locs = PBHelperClient.convert(b.getBlock().getLocsList());
return (b.hasTruncateBlock()) ?
new RecoveringBlock(block, locs, PBHelperClient.convert(b.getTruncateBlock())) :
new RecoveringBlock(block, locs, b.getNewGenStamp());
LocatedBlock lb = PBHelperClient.convert(b.getBlock());
RecoveringBlock rBlock;
if (b.hasTruncateBlock()) {
rBlock = new RecoveringBlock(lb.getBlock(), lb.getLocations(),
} else {
rBlock = new RecoveringBlock(lb.getBlock(), lb.getLocations(),
return rBlock;
public static ReplicaState convert(ReplicaStateProto state) {
@ -1385,15 +1385,17 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
// in block recovery.
recoveryInfos = DatanodeStorageInfo.toDatanodeInfos(storages);
RecoveringBlock rBlock;
if(truncateRecovery) {
Block recoveryBlock = (copyOnTruncateRecovery) ? b :
brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos,
rBlock = new RecoveringBlock(primaryBlock, recoveryInfos,
} else {
brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos,
rBlock = new RecoveringBlock(primaryBlock, recoveryInfos,
return new DatanodeCommand[] { brCommand };
@ -699,7 +699,8 @@ assert getBlockPoolId().equals(bp) :
case DatanodeProtocol.DNA_RECOVERBLOCK:
String who = "NameNode at " + actor.getNNSocketAddress();
dn.recoverBlocks(who, ((BlockRecoveryCommand)cmd).getRecoveringBlocks());
case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
@ -0,0 +1,329 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hdfs.server.datanode;
import com.google.common.base.Joiner;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.Daemon;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
* This class handles the block recovery work commands.
public class BlockRecoveryWorker {
public static final Log LOG = DataNode.LOG;
private final DataNode datanode;
private final Configuration conf;
private final DNConf dnConf;
BlockRecoveryWorker(DataNode datanode) {
this.datanode = datanode;
conf = datanode.getConf();
dnConf = datanode.getDnConf();
/** A convenient class used in block recovery. */
static class BlockRecord {
private final DatanodeID id;
private final InterDatanodeProtocol datanode;
private final ReplicaRecoveryInfo rInfo;
private String storageID;
BlockRecord(DatanodeID id, InterDatanodeProtocol datanode,
ReplicaRecoveryInfo rInfo) {
this.id = id;
this.datanode = datanode;
this.rInfo = rInfo;
private void updateReplicaUnderRecovery(String bpid, long recoveryId,
long newBlockId, long newLength) throws IOException {
final ExtendedBlock b = new ExtendedBlock(bpid, rInfo);
storageID = datanode.updateReplicaUnderRecovery(b, recoveryId, newBlockId,
public String toString() {
return "block:" + rInfo + " node:" + id;
/** A block recovery task for a contiguous block. */
class RecoveryTaskContiguous {
private final RecoveringBlock rBlock;
private final ExtendedBlock block;
private final String bpid;
private final DatanodeInfo[] locs;
private final long recoveryId;
RecoveryTaskContiguous(RecoveringBlock rBlock) {
this.rBlock = rBlock;
block = rBlock.getBlock();
bpid = block.getBlockPoolId();
locs = rBlock.getLocations();
recoveryId = rBlock.getNewGenerationStamp();
protected void recover() throws IOException {
List<BlockRecord> syncList = new ArrayList<>(locs.length);
int errorCount = 0;
//check generation stamps
for(DatanodeID id : locs) {
try {
DatanodeID bpReg =datanode.getBPOfferService(bpid).bpRegistration;
InterDatanodeProtocol proxyDN = bpReg.equals(id)?
datanode: DataNode.createInterDataNodeProtocolProxy(id, conf,
dnConf.socketTimeout, dnConf.connectToDnViaHostname);
ReplicaRecoveryInfo info = callInitReplicaRecovery(proxyDN, rBlock);
if (info != null &&
info.getGenerationStamp() >= block.getGenerationStamp() &&
info.getNumBytes() > 0) {
syncList.add(new BlockRecord(id, proxyDN, info));
} catch (RecoveryInProgressException ripE) {
"Recovery for replica " + block + " on data-node " + id
+ " is already in progress. Recovery id = "
+ rBlock.getNewGenerationStamp() + " is aborted.", ripE);
} catch (IOException e) {
"Failed to obtain replica info for block (=" + block
+ ") from datanode (=" + id + ")", e);
if (errorCount == locs.length) {
throw new IOException("All datanodes failed: block=" + block
+ ", datanodeids=" + Arrays.asList(locs));
/** Block synchronization. */
void syncBlock(List<BlockRecord> syncList) throws IOException {
DatanodeProtocolClientSideTranslatorPB nn =
boolean isTruncateRecovery = rBlock.getNewBlock() != null;
long blockId = (isTruncateRecovery) ?
rBlock.getNewBlock().getBlockId() : block.getBlockId();
if (LOG.isDebugEnabled()) {
LOG.debug("block=" + block + ", (length=" + block.getNumBytes()
+ "), syncList=" + syncList);
// syncList.isEmpty() means that all data-nodes do not have the block
// or their replicas have 0 length.
// The block can be deleted.
if (syncList.isEmpty()) {
nn.commitBlockSynchronization(block, recoveryId, 0,
true, true, DatanodeID.EMPTY_ARRAY, null);
// Calculate the best available replica state.
ReplicaState bestState = ReplicaState.RWR;
long finalizedLength = -1;
for (BlockRecord r : syncList) {
assert r.rInfo.getNumBytes() > 0 : "zero length replica";
ReplicaState rState = r.rInfo.getOriginalReplicaState();
if (rState.getValue() < bestState.getValue()) {
bestState = rState;
if(rState == ReplicaState.FINALIZED) {
if (finalizedLength > 0 && finalizedLength != r.rInfo.getNumBytes()) {
throw new IOException("Inconsistent size of finalized replicas. " +
"Replica " + r.rInfo + " expected size: " + finalizedLength);
finalizedLength = r.rInfo.getNumBytes();
// Calculate list of nodes that will participate in the recovery
// and the new block size
List<BlockRecord> participatingList = new ArrayList<>();
final ExtendedBlock newBlock = new ExtendedBlock(bpid, blockId,
-1, recoveryId);
switch(bestState) {
assert finalizedLength > 0 : "finalizedLength is not positive";
for(BlockRecord r : syncList) {
ReplicaState rState = r.rInfo.getOriginalReplicaState();
if (rState == ReplicaState.FINALIZED ||
rState == ReplicaState.RBW &&
r.rInfo.getNumBytes() == finalizedLength) {
case RBW:
case RWR:
long minLength = Long.MAX_VALUE;
for(BlockRecord r : syncList) {
ReplicaState rState = r.rInfo.getOriginalReplicaState();
if(rState == bestState) {
minLength = Math.min(minLength, r.rInfo.getNumBytes());
case RUR:
assert false : "bad replica state: " + bestState;
break; // we have 'case' all enum values
if (isTruncateRecovery) {
List<DatanodeID> failedList = new ArrayList<>();
final List<BlockRecord> successList = new ArrayList<>();
for (BlockRecord r : participatingList) {
try {
r.updateReplicaUnderRecovery(bpid, recoveryId, blockId,
} catch (IOException e) {
InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
+ newBlock + ", datanode=" + r.id + ")", e);
// If any of the data-nodes failed, the recovery fails, because
// we never know the actual state of the replica on failed data-nodes.
// The recovery should be started over.
if (!failedList.isEmpty()) {
StringBuilder b = new StringBuilder();
for(DatanodeID id : failedList) {
b.append("\n " + id);
throw new IOException("Cannot recover " + block + ", the following "
+ failedList.size() + " data-nodes failed {" + b + "\n}");
// Notify the name-node about successfully recovered replicas.
final DatanodeID[] datanodes = new DatanodeID[successList.size()];
final String[] storages = new String[datanodes.length];
for (int i = 0; i < datanodes.length; i++) {
final BlockRecord r = successList.get(i);
datanodes[i] = r.id;
storages[i] = r.storageID;
newBlock.getGenerationStamp(), newBlock.getNumBytes(), true, false,
datanodes, storages);
private static void logRecoverBlock(String who, RecoveringBlock rb) {
ExtendedBlock block = rb.getBlock();
DatanodeInfo[] targets = rb.getLocations();
LOG.info(who + " calls recoverBlock(" + block
+ ", targets=[" + Joiner.on(", ").join(targets) + "]"
+ ", newGenerationStamp=" + rb.getNewGenerationStamp()
+ ", newBlock=" + rb.getNewBlock()
+ ")");
* Convenience method, which unwraps RemoteException.
* @throws IOException not a RemoteException.
private static ReplicaRecoveryInfo callInitReplicaRecovery(
InterDatanodeProtocol datanode, RecoveringBlock rBlock)
throws IOException {
try {
return datanode.initReplicaRecovery(rBlock);
} catch(RemoteException re) {
throw re.unwrapRemoteException();
* Get the NameNode corresponding to the given block pool.
* @param bpid Block pool Id
* @return Namenode corresponding to the bpid
* @throws IOException if unable to get the corresponding NameNode
DatanodeProtocolClientSideTranslatorPB getActiveNamenodeForBP(
String bpid) throws IOException {
BPOfferService bpos = datanode.getBPOfferService(bpid);
if (bpos == null) {
throw new IOException("No block pool offer service for bpid=" + bpid);
DatanodeProtocolClientSideTranslatorPB activeNN = bpos.getActiveNN();
if (activeNN == null) {
throw new IOException(
"Block pool " + bpid + " has not recognized an active NN");
return activeNN;
public Daemon recoverBlocks(final String who,
final Collection<RecoveringBlock> blocks) {
Daemon d = new Daemon(datanode.threadGroup, new Runnable() {
public void run() {
for(RecoveringBlock b : blocks) {
try {
logRecoverBlock(who, b);
RecoveryTaskContiguous task = new RecoveryTaskContiguous(b);
} catch (IOException e) {
LOG.warn("recoverBlocks FAILED: " + b, e);
return d;
@ -124,7 +124,6 @@
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
@ -174,7 +173,6 @@
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.DNS;
@ -369,7 +367,7 @@ public static InetSocketAddress createSocketAddr(String target) {
private String supergroup;
private boolean isPermissionEnabled;
private String dnUserName = null;
private BlockRecoveryWorker blockRecoveryWorker;
final Tracer tracer;
private final TracerConfigurationManager tracerConfigurationManager;
private static final int NUM_CORES = Runtime.getRuntime()
@ -711,7 +709,7 @@ public IOException call() {
* Remove volumes from DataNode.
* See {@link removeVolumes(final Set<File>, boolean)} for details.
* See {@link #removeVolumes(Set, boolean)} for details.
* @param locations the StorageLocations of the volumes to be removed.
* @throws IOException
@ -735,7 +733,7 @@ private void removeVolumes(final Collection<StorageLocation> locations)
* <li>
* <ul>Remove volumes and block info from FsDataset.</ul>
* <ul>Remove volumes from DataStorage.</ul>
* <ul>Reset configuration DATA_DIR and {@link dataDirs} to represent
* <ul>Reset configuration DATA_DIR and {@link #dataDirs} to represent
* active volumes.</ul>
* </li>
* @param absoluteVolumePaths the absolute path of volumes.
@ -861,7 +859,6 @@ private void startPlugins(Configuration conf) {
private void initIpcServer(Configuration conf) throws IOException {
InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
@ -1109,8 +1106,6 @@ void trySendErrorReport(String bpid, int errCode, String errMsg) {
bpos.trySendErrorReport(errCode, errMsg);
* Return the BPOfferService instance corresponding to the given block.
* @return the BPOS
@ -1127,8 +1122,6 @@ private BPOfferService getBPOSForBlock(ExtendedBlock block)
return bpos;
// used only for testing
void setHeartbeatsDisabledForTests(
@ -1220,7 +1213,9 @@ void startDataNode(Configuration conf,
metrics = DataNodeMetrics.create(conf, getDisplayName());
blockRecoveryWorker = new BlockRecoveryWorker(this);
blockPoolManager = new BlockPoolManager(this);
@ -1449,6 +1444,10 @@ void initBlockPool(BPOfferService bpos) throws IOException {
List<BPOfferService> getAllBpOs() {
return blockPoolManager.getAllNamenodeThreads();
BPOfferService getBPOfferService(String bpid){
return blockPoolManager.get(bpid);
int getBpOsCount() {
return blockPoolManager.getAllNamenodeThreads().size();
@ -2623,49 +2622,13 @@ public static void main(String args[]) {
secureMain(args, null);
public Daemon recoverBlocks(
final String who,
final Collection<RecoveringBlock> blocks) {
Daemon d = new Daemon(threadGroup, new Runnable() {
/** Recover a list of blocks. It is run by the primary datanode. */
public void run() {
for(RecoveringBlock b : blocks) {
try {
logRecoverBlock(who, b);
} catch (IOException e) {
LOG.warn("recoverBlocks FAILED: " + b, e);
return d;
// InterDataNodeProtocol implementation
@Override // InterDatanodeProtocol
public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
throws IOException {
throws IOException {
return data.initReplicaRecovery(rBlock);
* Convenience method, which unwraps RemoteException.
* @throws IOException not a RemoteException.
private static ReplicaRecoveryInfo callInitReplicaRecovery(
InterDatanodeProtocol datanode,
RecoveringBlock rBlock) throws IOException {
try {
return datanode.initReplicaRecovery(rBlock);
} catch(RemoteException re) {
throw re.unwrapRemoteException();
* Update replica with the new generation stamp and length.
@ -2686,231 +2649,6 @@ public String updateReplicaUnderRecovery(final ExtendedBlock oldBlock,
return storageID;
/** A convenient class used in block recovery */
static class BlockRecord {
final DatanodeID id;
final InterDatanodeProtocol datanode;
final ReplicaRecoveryInfo rInfo;
private String storageID;
BlockRecord(DatanodeID id,
InterDatanodeProtocol datanode,
ReplicaRecoveryInfo rInfo) {
this.id = id;
this.datanode = datanode;
this.rInfo = rInfo;
void updateReplicaUnderRecovery(String bpid, long recoveryId,
long newBlockId, long newLength)
throws IOException {
final ExtendedBlock b = new ExtendedBlock(bpid, rInfo);
storageID = datanode.updateReplicaUnderRecovery(b, recoveryId, newBlockId,
public String toString() {
return "block:" + rInfo + " node:" + id;
/** Recover a block */
private void recoverBlock(RecoveringBlock rBlock) throws IOException {
ExtendedBlock block = rBlock.getBlock();
String blookPoolId = block.getBlockPoolId();
DatanodeID[] datanodeids = rBlock.getLocations();
List<BlockRecord> syncList = new ArrayList<BlockRecord>(datanodeids.length);
int errorCount = 0;
//check generation stamps
for(DatanodeID id : datanodeids) {
try {
BPOfferService bpos = blockPoolManager.get(blookPoolId);
DatanodeRegistration bpReg = bpos.bpRegistration;
InterDatanodeProtocol datanode = bpReg.equals(id)?
this: DataNode.createInterDataNodeProtocolProxy(id, getConf(),
dnConf.socketTimeout, dnConf.connectToDnViaHostname);
ReplicaRecoveryInfo info = callInitReplicaRecovery(datanode, rBlock);
if (info != null &&
info.getGenerationStamp() >= block.getGenerationStamp() &&
info.getNumBytes() > 0) {
syncList.add(new BlockRecord(id, datanode, info));
} catch (RecoveryInProgressException ripE) {
"Recovery for replica " + block + " on data-node " + id
+ " is already in progress. Recovery id = "
+ rBlock.getNewGenerationStamp() + " is aborted.", ripE);
} catch (IOException e) {
"Failed to obtain replica info for block (=" + block
+ ") from datanode (=" + id + ")", e);
if (errorCount == datanodeids.length) {
throw new IOException("All datanodes failed: block=" + block
+ ", datanodeids=" + Arrays.asList(datanodeids));
syncBlock(rBlock, syncList);
* Get the NameNode corresponding to the given block pool.
* @param bpid Block pool Id
* @return Namenode corresponding to the bpid
* @throws IOException if unable to get the corresponding NameNode
public DatanodeProtocolClientSideTranslatorPB getActiveNamenodeForBP(String bpid)
throws IOException {
BPOfferService bpos = blockPoolManager.get(bpid);
if (bpos == null) {
throw new IOException("No block pool offer service for bpid=" + bpid);
DatanodeProtocolClientSideTranslatorPB activeNN = bpos.getActiveNN();
if (activeNN == null) {
throw new IOException(
"Block pool " + bpid + " has not recognized an active NN");
return activeNN;
/** Block synchronization */
void syncBlock(RecoveringBlock rBlock,
List<BlockRecord> syncList) throws IOException {
ExtendedBlock block = rBlock.getBlock();
final String bpid = block.getBlockPoolId();
DatanodeProtocolClientSideTranslatorPB nn =
long recoveryId = rBlock.getNewGenerationStamp();
boolean isTruncateRecovery = rBlock.getNewBlock() != null;
long blockId = (isTruncateRecovery) ?
rBlock.getNewBlock().getBlockId() : block.getBlockId();
if (LOG.isDebugEnabled()) {
LOG.debug("block=" + block + ", (length=" + block.getNumBytes()
+ "), syncList=" + syncList);
// syncList.isEmpty() means that all data-nodes do not have the block
// or their replicas have 0 length.
// The block can be deleted.
if (syncList.isEmpty()) {
nn.commitBlockSynchronization(block, recoveryId, 0,
true, true, DatanodeID.EMPTY_ARRAY, null);
// Calculate the best available replica state.
ReplicaState bestState = ReplicaState.RWR;
long finalizedLength = -1;
for(BlockRecord r : syncList) {
assert r.rInfo.getNumBytes() > 0 : "zero length replica";
ReplicaState rState = r.rInfo.getOriginalReplicaState();
if(rState.getValue() < bestState.getValue())
bestState = rState;
if(rState == ReplicaState.FINALIZED) {
if(finalizedLength > 0 && finalizedLength != r.rInfo.getNumBytes())
throw new IOException("Inconsistent size of finalized replicas. " +
"Replica " + r.rInfo + " expected size: " + finalizedLength);
finalizedLength = r.rInfo.getNumBytes();
// Calculate list of nodes that will participate in the recovery
// and the new block size
List<BlockRecord> participatingList = new ArrayList<BlockRecord>();
final ExtendedBlock newBlock = new ExtendedBlock(bpid, blockId,
-1, recoveryId);
switch(bestState) {
assert finalizedLength > 0 : "finalizedLength is not positive";
for(BlockRecord r : syncList) {
ReplicaState rState = r.rInfo.getOriginalReplicaState();
if(rState == ReplicaState.FINALIZED ||
rState == ReplicaState.RBW &&
r.rInfo.getNumBytes() == finalizedLength)
case RBW:
case RWR:
long minLength = Long.MAX_VALUE;
for(BlockRecord r : syncList) {
ReplicaState rState = r.rInfo.getOriginalReplicaState();
if(rState == bestState) {
minLength = Math.min(minLength, r.rInfo.getNumBytes());
case RUR:
assert false : "bad replica state: " + bestState;
List<DatanodeID> failedList = new ArrayList<DatanodeID>();
final List<BlockRecord> successList = new ArrayList<BlockRecord>();
for(BlockRecord r : participatingList) {
try {
r.updateReplicaUnderRecovery(bpid, recoveryId, blockId,
} catch (IOException e) {
InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
+ newBlock + ", datanode=" + r.id + ")", e);
// If any of the data-nodes failed, the recovery fails, because
// we never know the actual state of the replica on failed data-nodes.
// The recovery should be started over.
if(!failedList.isEmpty()) {
StringBuilder b = new StringBuilder();
for(DatanodeID id : failedList) {
b.append("\n " + id);
throw new IOException("Cannot recover " + block + ", the following "
+ failedList.size() + " data-nodes failed {" + b + "\n}");
// Notify the name-node about successfully recovered replicas.
final DatanodeID[] datanodes = new DatanodeID[successList.size()];
final String[] storages = new String[datanodes.length];
for(int i = 0; i < datanodes.length; i++) {
final BlockRecord r = successList.get(i);
datanodes[i] = r.id;
storages[i] = r.storageID;
newBlock.getGenerationStamp(), newBlock.getNumBytes(), true, false,
datanodes, storages);
private static void logRecoverBlock(String who, RecoveringBlock rb) {
ExtendedBlock block = rb.getBlock();
DatanodeInfo[] targets = rb.getLocations();
LOG.info(who + " calls recoverBlock(" + block
+ ", targets=[" + Joiner.on(", ").join(targets) + "]"
+ ((rb.getNewBlock() == null) ? ", newGenerationStamp="
+ rb.getNewGenerationStamp() : ", newBlock=" + rb.getNewBlock())
+ ")");
@Override // ClientDataNodeProtocol
public long getReplicaVisibleLength(final ExtendedBlock block) throws IOException {
@ -3326,6 +3064,10 @@ public void removeSpanReceiver(long id) throws IOException {
public BlockRecoveryWorker getBlockRecoveryWorker() {
return blockRecoveryWorker;
* Get timeout value of each OOB type from configuration
@ -64,7 +64,7 @@
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.DataNode.BlockRecord;
import org.apache.hadoop.hdfs.server.datanode.BlockRecoveryWorker.BlockRecord;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@ -79,7 +79,6 @@
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.log4j.Level;
import org.junit.After;
@ -98,6 +97,8 @@ public class TestBlockRecovery {
private static final String DATA_DIR =
MiniDFSCluster.getBaseDirectory() + "data";
private DataNode dn;
private DataNode spyDN;
private BlockRecoveryWorker recoveryWorker;
private Configuration conf;
private final static long RECOVERY_ID = 3000L;
private final static String CLUSTER_ID = "testClusterID";
@ -177,6 +178,8 @@ DatanodeProtocolClientSideTranslatorPB connectToNN(
// Trigger a heartbeat so that it acknowledges the NN as active.
spyDN = spy(dn);
recoveryWorker = new BlockRecoveryWorker(spyDN);
@ -222,7 +225,10 @@ private void testSyncReplicas(ReplicaRecoveryInfo replica1,
anyLong(), anyLong())).thenReturn("storage1");
when(dn2.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(),
anyLong(), anyLong())).thenReturn("storage2");
dn.syncBlock(rBlock, syncList);
BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous =
recoveryWorker.new RecoveryTaskContiguous(rBlock);
@ -443,13 +449,17 @@ public void testRecoveryInProgressException()
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
DataNode spyDN = spy(dn);
doThrow(new RecoveryInProgressException("Replica recovery is in progress")).
Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks());
verify(spyDN, never()).syncBlock(
any(RecoveringBlock.class), anyListOf(BlockRecord.class));
for(RecoveringBlock rBlock: initRecoveringBlocks()){
BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous =
recoveryWorker.new RecoveryTaskContiguous(rBlock);
BlockRecoveryWorker.RecoveryTaskContiguous spyTask
= spy(RecoveryTaskContiguous);
verify(spyTask, never()).syncBlock(anyListOf(BlockRecord.class));
@ -463,13 +473,21 @@ public void testErrorReplicas() throws IOException, InterruptedException {
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
DataNode spyDN = spy(dn);
doThrow(new IOException()).
Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks());
verify(spyDN, never()).syncBlock(
any(RecoveringBlock.class), anyListOf(BlockRecord.class));
for(RecoveringBlock rBlock: initRecoveringBlocks()){
BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous =
recoveryWorker.new RecoveryTaskContiguous(rBlock);
BlockRecoveryWorker.RecoveryTaskContiguous spyTask = spy(RecoveryTaskContiguous);
try {
} catch(IOException e){
GenericTestUtils.assertExceptionContains("All datanodes failed", e);
verify(spyTask, never()).syncBlock(anyListOf(BlockRecord.class));
@ -482,13 +500,18 @@ public void testZeroLenReplicas() throws IOException, InterruptedException {
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
DataNode spyDN = spy(dn);
doReturn(new ReplicaRecoveryInfo(block.getBlockId(), 0,
block.getGenerationStamp(), ReplicaState.FINALIZED)).when(spyDN).
Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks());
DatanodeProtocol dnP = dn.getActiveNamenodeForBP(POOL_ID);
for(RecoveringBlock rBlock: initRecoveringBlocks()){
BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous =
recoveryWorker.new RecoveryTaskContiguous(rBlock);
BlockRecoveryWorker.RecoveryTaskContiguous spyTask
= spy(RecoveryTaskContiguous);
DatanodeProtocol dnP = recoveryWorker.getActiveNamenodeForBP(POOL_ID);
block, RECOVERY_ID, 0, true, true, DatanodeID.EMPTY_ARRAY, null);
@ -517,11 +540,12 @@ public void testFailedReplicaUpdate() throws IOException {
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
DataNode spyDN = spy(dn);
doThrow(new IOException()).when(spyDN).updateReplicaUnderRecovery(
block, RECOVERY_ID, BLOCK_ID, block.getNumBytes());
try {
spyDN.syncBlock(rBlock, initBlockRecords(spyDN));
BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous =
recoveryWorker.new RecoveryTaskContiguous(rBlock);
fail("Sync should fail");
} catch (IOException e) {
e.getMessage().startsWith("Cannot recover ");
@ -539,13 +563,15 @@ public void testNoReplicaUnderRecovery() throws IOException {
LOG.debug("Running " + GenericTestUtils.getMethodName());
dn.data.createRbw(StorageType.DEFAULT, block, false);
BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous =
recoveryWorker.new RecoveryTaskContiguous(rBlock);
try {
dn.syncBlock(rBlock, initBlockRecords(dn));
fail("Sync should fail");
} catch (IOException e) {
e.getMessage().startsWith("Cannot recover ");
DatanodeProtocol namenode = dn.getActiveNamenodeForBP(POOL_ID);
DatanodeProtocol namenode = recoveryWorker.getActiveNamenodeForBP(POOL_ID);
verify(namenode, never()).commitBlockSynchronization(
any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(),
anyBoolean(), any(DatanodeID[].class), any(String[].class));
@ -569,13 +595,15 @@ public void testNotMatchedReplicaID() throws IOException {
DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
dn.data.initReplicaRecovery(new RecoveringBlock(block, null, RECOVERY_ID+1));
BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous =
recoveryWorker.new RecoveryTaskContiguous(rBlock);
try {
dn.syncBlock(rBlock, initBlockRecords(dn));
fail("Sync should fail");
} catch (IOException e) {
e.getMessage().startsWith("Cannot recover ");
DatanodeProtocol namenode = dn.getActiveNamenodeForBP(POOL_ID);
DatanodeProtocol namenode = recoveryWorker.getActiveNamenodeForBP(POOL_ID);
verify(namenode, never()).commitBlockSynchronization(
any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(),
anyBoolean(), any(DatanodeID[].class), any(String[].class));
Reference in New Issue
Block a user