HDFS-7933. fsck should also report decommissioning replicas. Contributed by Xiaoyu Yao.

This commit is contained in:
cnauroth 2015-04-11 13:23:18 -07:00
parent 2c17da4208
commit f8f5887209
8 changed files with 193 additions and 40 deletions

View File

@ -425,6 +425,9 @@ Release 2.8.0 - UNRELEASED
HdfsClientConfigKeys.Failover and fix typos in the dfs.http.client.* HdfsClientConfigKeys.Failover and fix typos in the dfs.http.client.*
configuration keys. (szetszwo) configuration keys. (szetszwo)
HDFS-7933. fsck should also report decommissioning replicas.
(Xiaoyu Yao via cnauroth)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -539,7 +539,7 @@ public class BlockManager {
// not included in the numReplicas.liveReplicas() count // not included in the numReplicas.liveReplicas() count
assert containingLiveReplicasNodes.size() >= numReplicas.liveReplicas(); assert containingLiveReplicasNodes.size() >= numReplicas.liveReplicas();
int usableReplicas = numReplicas.liveReplicas() + int usableReplicas = numReplicas.liveReplicas() +
numReplicas.decommissionedReplicas(); numReplicas.decommissionedAndDecommissioning();
if (block instanceof BlockInfoContiguous) { if (block instanceof BlockInfoContiguous) {
BlockCollection bc = ((BlockInfoContiguous) block).getBlockCollection(); BlockCollection bc = ((BlockInfoContiguous) block).getBlockCollection();
@ -550,7 +550,7 @@ public class BlockManager {
out.print(block + ((usableReplicas > 0)? "" : " MISSING") + out.print(block + ((usableReplicas > 0)? "" : " MISSING") +
" (replicas:" + " (replicas:" +
" l: " + numReplicas.liveReplicas() + " l: " + numReplicas.liveReplicas() +
" d: " + numReplicas.decommissionedReplicas() + " d: " + numReplicas.decommissionedAndDecommissioning() +
" c: " + numReplicas.corruptReplicas() + " c: " + numReplicas.corruptReplicas() +
" e: " + numReplicas.excessReplicas() + ") "); " e: " + numReplicas.excessReplicas() + ") ");
@ -730,7 +730,7 @@ public class BlockManager {
// Remove block from replication queue. // Remove block from replication queue.
NumberReplicas replicas = countNodes(ucBlock); NumberReplicas replicas = countNodes(ucBlock);
neededReplications.remove(ucBlock, replicas.liveReplicas(), neededReplications.remove(ucBlock, replicas.liveReplicas(),
replicas.decommissionedReplicas(), getReplication(ucBlock)); replicas.decommissionedAndDecommissioning(), getReplication(ucBlock));
pendingReplications.remove(ucBlock); pendingReplications.remove(ucBlock);
// remove this block from the list of pending blocks to be deleted. // remove this block from the list of pending blocks to be deleted.
@ -1614,6 +1614,7 @@ public class BlockManager {
DatanodeDescriptor srcNode = null; DatanodeDescriptor srcNode = null;
int live = 0; int live = 0;
int decommissioned = 0; int decommissioned = 0;
int decommissioning = 0;
int corrupt = 0; int corrupt = 0;
int excess = 0; int excess = 0;
@ -1625,9 +1626,11 @@ public class BlockManager {
int countableReplica = storage.getState() == State.NORMAL ? 1 : 0; int countableReplica = storage.getState() == State.NORMAL ? 1 : 0;
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
corrupt += countableReplica; corrupt += countableReplica;
else if (node.isDecommissionInProgress() || node.isDecommissioned()) else if (node.isDecommissionInProgress()) {
decommissioning += countableReplica;
} else if (node.isDecommissioned()) {
decommissioned += countableReplica; decommissioned += countableReplica;
else if (excessBlocks != null && excessBlocks.contains(block)) { } else if (excessBlocks != null && excessBlocks.contains(block)) {
excess += countableReplica; excess += countableReplica;
} else { } else {
nodesContainingLiveReplicas.add(storage); nodesContainingLiveReplicas.add(storage);
@ -1667,7 +1670,8 @@ public class BlockManager {
srcNode = node; srcNode = node;
} }
if(numReplicas != null) if(numReplicas != null)
numReplicas.initialize(live, decommissioned, corrupt, excess, 0); numReplicas.initialize(live, decommissioned, decommissioning, corrupt,
excess, 0);
return srcNode; return srcNode;
} }
@ -1686,7 +1690,7 @@ public class BlockManager {
num.liveReplicas())) { num.liveReplicas())) {
neededReplications.add(timedOutItems[i], neededReplications.add(timedOutItems[i],
num.liveReplicas(), num.liveReplicas(),
num.decommissionedReplicas(), num.decommissionedAndDecommissioning(),
getReplication(timedOutItems[i])); getReplication(timedOutItems[i]));
} }
} }
@ -2573,7 +2577,7 @@ public class BlockManager {
short fileReplication = bc.getBlockReplication(); short fileReplication = bc.getBlockReplication();
if (!isNeededReplication(storedBlock, fileReplication, numCurrentReplica)) { if (!isNeededReplication(storedBlock, fileReplication, numCurrentReplica)) {
neededReplications.remove(storedBlock, numCurrentReplica, neededReplications.remove(storedBlock, numCurrentReplica,
num.decommissionedReplicas(), fileReplication); num.decommissionedAndDecommissioning(), fileReplication);
} else { } else {
updateNeededReplications(storedBlock, curReplicaDelta, 0); updateNeededReplications(storedBlock, curReplicaDelta, 0);
} }
@ -2807,7 +2811,7 @@ public class BlockManager {
// add to under-replicated queue if need to be // add to under-replicated queue if need to be
if (isNeededReplication(block, expectedReplication, numCurrentReplica)) { if (isNeededReplication(block, expectedReplication, numCurrentReplica)) {
if (neededReplications.add(block, numCurrentReplica, num if (neededReplications.add(block, numCurrentReplica, num
.decommissionedReplicas(), expectedReplication)) { .decommissionedAndDecommissioning(), expectedReplication)) {
return MisReplicationResult.UNDER_REPLICATED; return MisReplicationResult.UNDER_REPLICATED;
} }
} }
@ -3221,6 +3225,7 @@ public class BlockManager {
*/ */
public NumberReplicas countNodes(Block b) { public NumberReplicas countNodes(Block b) {
int decommissioned = 0; int decommissioned = 0;
int decommissioning = 0;
int live = 0; int live = 0;
int corrupt = 0; int corrupt = 0;
int excess = 0; int excess = 0;
@ -3230,7 +3235,9 @@ public class BlockManager {
final DatanodeDescriptor node = storage.getDatanodeDescriptor(); final DatanodeDescriptor node = storage.getDatanodeDescriptor();
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) { if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
corrupt++; corrupt++;
} else if (node.isDecommissionInProgress() || node.isDecommissioned()) { } else if (node.isDecommissionInProgress()) {
decommissioning++;
} else if (node.isDecommissioned()) {
decommissioned++; decommissioned++;
} else { } else {
LightWeightLinkedSet<Block> blocksExcess = excessReplicateMap.get(node LightWeightLinkedSet<Block> blocksExcess = excessReplicateMap.get(node
@ -3245,7 +3252,7 @@ public class BlockManager {
stale++; stale++;
} }
} }
return new NumberReplicas(live, decommissioned, corrupt, excess, stale); return new NumberReplicas(live, decommissioned, decommissioning, corrupt, excess, stale);
} }
/** /**
@ -3382,13 +3389,13 @@ public class BlockManager {
int curExpectedReplicas = getReplication(block); int curExpectedReplicas = getReplication(block);
if (isNeededReplication(block, curExpectedReplicas, repl.liveReplicas())) { if (isNeededReplication(block, curExpectedReplicas, repl.liveReplicas())) {
neededReplications.update(block, repl.liveReplicas(), repl neededReplications.update(block, repl.liveReplicas(), repl
.decommissionedReplicas(), curExpectedReplicas, curReplicasDelta, .decommissionedAndDecommissioning(), curExpectedReplicas,
expectedReplicasDelta); curReplicasDelta, expectedReplicasDelta);
} else { } else {
int oldReplicas = repl.liveReplicas()-curReplicasDelta; int oldReplicas = repl.liveReplicas()-curReplicasDelta;
int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta; int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
neededReplications.remove(block, oldReplicas, repl.decommissionedReplicas(), neededReplications.remove(block, oldReplicas,
oldExpectedReplicas); repl.decommissionedAndDecommissioning(), oldExpectedReplicas);
} }
} finally { } finally {
namesystem.writeUnlock(); namesystem.writeUnlock();
@ -3407,7 +3414,7 @@ public class BlockManager {
final NumberReplicas n = countNodes(block); final NumberReplicas n = countNodes(block);
if (isNeededReplication(block, expected, n.liveReplicas())) { if (isNeededReplication(block, expected, n.liveReplicas())) {
neededReplications.add(block, n.liveReplicas(), neededReplications.add(block, n.liveReplicas(),
n.decommissionedReplicas(), expected); n.decommissionedAndDecommissioning(), expected);
} else if (n.liveReplicas() > expected) { } else if (n.liveReplicas() > expected) {
processOverReplicatedBlock(block, expected, null, null); processOverReplicatedBlock(block, expected, null, null);
} }

View File

@ -298,7 +298,8 @@ public class DecommissionManager {
LOG.info("Block: " + block + ", Expected Replicas: " LOG.info("Block: " + block + ", Expected Replicas: "
+ curExpectedReplicas + ", live replicas: " + curReplicas + curExpectedReplicas + ", live replicas: " + curReplicas
+ ", corrupt replicas: " + num.corruptReplicas() + ", corrupt replicas: " + num.corruptReplicas()
+ ", decommissioned replicas: " + num.decommissionedReplicas() + ", decommissioned replicas: " + num.decommissioned()
+ ", decommissioning replicas: " + num.decommissioning()
+ ", excess replicas: " + num.excessReplicas() + ", excess replicas: " + num.excessReplicas()
+ ", Is Open File: " + bc.isUnderConstruction() + ", Is Open File: " + bc.isUnderConstruction()
+ ", Datanodes having this block: " + nodeList + ", Current Datanode: " + ", Datanodes having this block: " + nodeList + ", Current Datanode: "
@ -571,7 +572,7 @@ public class DecommissionManager {
// Process these blocks only when active NN is out of safe mode. // Process these blocks only when active NN is out of safe mode.
blockManager.neededReplications.add(block, blockManager.neededReplications.add(block,
curReplicas, curReplicas,
num.decommissionedReplicas(), num.decommissionedAndDecommissioning(),
bc.getBlockReplication()); bc.getBlockReplication());
} }
} }
@ -600,7 +601,7 @@ public class DecommissionManager {
if (bc.isUnderConstruction()) { if (bc.isUnderConstruction()) {
underReplicatedInOpenFiles++; underReplicatedInOpenFiles++;
} }
if ((curReplicas == 0) && (num.decommissionedReplicas() > 0)) { if ((curReplicas == 0) && (num.decommissionedAndDecommissioning() > 0)) {
decommissionOnlyReplicas++; decommissionOnlyReplicas++;
} }
} }

View File

@ -19,26 +19,33 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
/** /**
* A immutable object that stores the number of live replicas and * A immutable object that stores the number of live replicas and
* the number of decommissined Replicas. * the number of decommissioned Replicas.
*/ */
public class NumberReplicas { public class NumberReplicas {
private int liveReplicas; private int liveReplicas;
private int decommissionedReplicas;
// Tracks only the decommissioning replicas
private int decommissioning;
// Tracks only the decommissioned replicas
private int decommissioned;
private int corruptReplicas; private int corruptReplicas;
private int excessReplicas; private int excessReplicas;
private int replicasOnStaleNodes; private int replicasOnStaleNodes;
NumberReplicas() { NumberReplicas() {
initialize(0, 0, 0, 0, 0); initialize(0, 0, 0, 0, 0, 0);
} }
NumberReplicas(int live, int decommissioned, int corrupt, int excess, int stale) { NumberReplicas(int live, int decommissioned, int decommissioning, int corrupt,
initialize(live, decommissioned, corrupt, excess, stale); int excess, int stale) {
initialize(live, decommissioned, decommissioning, corrupt, excess, stale);
} }
void initialize(int live, int decommissioned, int corrupt, int excess, int stale) { void initialize(int live, int decommissioned, int decommissioning,
int corrupt, int excess, int stale) {
liveReplicas = live; liveReplicas = live;
decommissionedReplicas = decommissioned; this.decommissioning = decommissioning;
this.decommissioned = decommissioned;
corruptReplicas = corrupt; corruptReplicas = corrupt;
excessReplicas = excess; excessReplicas = excess;
replicasOnStaleNodes = stale; replicasOnStaleNodes = stale;
@ -47,12 +54,46 @@ public class NumberReplicas {
public int liveReplicas() { public int liveReplicas() {
return liveReplicas; return liveReplicas;
} }
/**
*
* @return decommissioned replicas + decommissioning replicas
* It is deprecated by decommissionedAndDecommissioning
* due to its misleading name.
*/
@Deprecated
public int decommissionedReplicas() { public int decommissionedReplicas() {
return decommissionedReplicas; return decommissionedAndDecommissioning();
} }
/**
*
* @return decommissioned and decommissioning replicas
*/
public int decommissionedAndDecommissioning() {
return decommissioned + decommissioning;
}
/**
*
* @return decommissioned replicas only
*/
public int decommissioned() {
return decommissioned;
}
/**
*
* @return decommissioning replicas only
*/
public int decommissioning() {
return decommissioning;
}
public int corruptReplicas() { public int corruptReplicas() {
return corruptReplicas; return corruptReplicas;
} }
public int excessReplicas() { public int excessReplicas() {
return excessReplicas; return excessReplicas;
} }

View File

@ -250,8 +250,10 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
out.println("No. of live Replica: " + numberReplicas.liveReplicas()); out.println("No. of live Replica: " + numberReplicas.liveReplicas());
out.println("No. of excess Replica: " + numberReplicas.excessReplicas()); out.println("No. of excess Replica: " + numberReplicas.excessReplicas());
out.println("No. of stale Replica: " + numberReplicas.replicasOnStaleNodes()); out.println("No. of stale Replica: " + numberReplicas.replicasOnStaleNodes());
out.println("No. of decommission Replica: " out.println("No. of decommissioned Replica: "
+ numberReplicas.decommissionedReplicas()); + numberReplicas.decommissioned());
out.println("No. of decommissioning Replica: "
+ numberReplicas.decommissioning());
out.println("No. of corrupted Replica: " + numberReplicas.corruptReplicas()); out.println("No. of corrupted Replica: " + numberReplicas.corruptReplicas());
//record datanodes that have corrupted block replica //record datanodes that have corrupted block replica
Collection<DatanodeDescriptor> corruptionRecord = null; Collection<DatanodeDescriptor> corruptionRecord = null;
@ -509,10 +511,16 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
NumberReplicas numberReplicas = NumberReplicas numberReplicas =
namenode.getNamesystem().getBlockManager().countNodes(block.getLocalBlock()); namenode.getNamesystem().getBlockManager().countNodes(block.getLocalBlock());
int liveReplicas = numberReplicas.liveReplicas(); int liveReplicas = numberReplicas.liveReplicas();
res.totalReplicas += liveReplicas; int decommissionedReplicas = numberReplicas.decommissioned();;
int decommissioningReplicas = numberReplicas.decommissioning();
res.decommissionedReplicas += decommissionedReplicas;
res.decommissioningReplicas += decommissioningReplicas;
int totalReplicas = liveReplicas + decommissionedReplicas +
decommissioningReplicas;
res.totalReplicas += totalReplicas;
short targetFileReplication = file.getReplication(); short targetFileReplication = file.getReplication();
res.numExpectedReplicas += targetFileReplication; res.numExpectedReplicas += targetFileReplication;
if(liveReplicas<minReplication){ if(totalReplicas < minReplication){
res.numUnderMinReplicatedBlocks++; res.numUnderMinReplicatedBlocks++;
} }
if (liveReplicas > targetFileReplication) { if (liveReplicas > targetFileReplication) {
@ -532,10 +540,10 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
out.print("\n" + path + ": CORRUPT blockpool " + block.getBlockPoolId() + out.print("\n" + path + ": CORRUPT blockpool " + block.getBlockPoolId() +
" block " + block.getBlockName()+"\n"); " block " + block.getBlockName()+"\n");
} }
if (liveReplicas >= minReplication) if (totalReplicas >= minReplication)
res.numMinReplicatedBlocks++; res.numMinReplicatedBlocks++;
if (liveReplicas < targetFileReplication && liveReplicas > 0) { if (totalReplicas < targetFileReplication && totalReplicas > 0) {
res.missingReplicas += (targetFileReplication - liveReplicas); res.missingReplicas += (targetFileReplication - totalReplicas);
res.numUnderReplicatedBlocks += 1; res.numUnderReplicatedBlocks += 1;
underReplicatedPerFile++; underReplicatedPerFile++;
if (!showFiles) { if (!showFiles) {
@ -544,7 +552,9 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
out.println(" Under replicated " + block + out.println(" Under replicated " + block +
". Target Replicas is " + ". Target Replicas is " +
targetFileReplication + " but found " + targetFileReplication + " but found " +
liveReplicas + " replica(s)."); liveReplicas + " live replica(s), " +
decommissionedReplicas + " decommissioned replica(s) and " +
decommissioningReplicas + " decommissioning replica(s).");
} }
// verify block placement policy // verify block placement policy
BlockPlacementStatus blockPlacementStatus = bpPolicy BlockPlacementStatus blockPlacementStatus = bpPolicy
@ -561,7 +571,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
block + ". " + blockPlacementStatus.getErrorDescription()); block + ". " + blockPlacementStatus.getErrorDescription());
} }
report.append(i + ". " + blkName + " len=" + block.getNumBytes()); report.append(i + ". " + blkName + " len=" + block.getNumBytes());
if (liveReplicas == 0) { if (totalReplicas == 0) {
report.append(" MISSING!"); report.append(" MISSING!");
res.addMissing(block.toString(), block.getNumBytes()); res.addMissing(block.toString(), block.getNumBytes());
missing++; missing++;
@ -861,6 +871,8 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
long corruptBlocks = 0L; long corruptBlocks = 0L;
long excessiveReplicas = 0L; long excessiveReplicas = 0L;
long missingReplicas = 0L; long missingReplicas = 0L;
long decommissionedReplicas = 0L;
long decommissioningReplicas = 0L;
long numUnderMinReplicatedBlocks=0L; long numUnderMinReplicatedBlocks=0L;
long numOverReplicatedBlocks = 0L; long numOverReplicatedBlocks = 0L;
long numUnderReplicatedBlocks = 0L; long numUnderReplicatedBlocks = 0L;
@ -932,7 +944,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
res.append(" (Total open file blocks (not validated): ").append( res.append(" (Total open file blocks (not validated): ").append(
totalOpenFilesBlocks).append(")"); totalOpenFilesBlocks).append(")");
} }
if (corruptFiles > 0 || numUnderMinReplicatedBlocks>0) { if (corruptFiles > 0 || numUnderMinReplicatedBlocks > 0) {
res.append("\n ********************************"); res.append("\n ********************************");
if(numUnderMinReplicatedBlocks>0){ if(numUnderMinReplicatedBlocks>0){
res.append("\n UNDER MIN REPL'D BLOCKS:\t").append(numUnderMinReplicatedBlocks); res.append("\n UNDER MIN REPL'D BLOCKS:\t").append(numUnderMinReplicatedBlocks);
@ -995,6 +1007,14 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
((float) (missingReplicas * 100) / (float) numExpectedReplicas)).append( ((float) (missingReplicas * 100) / (float) numExpectedReplicas)).append(
" %)"); " %)");
} }
if (decommissionedReplicas > 0) {
res.append("\n DecommissionedReplicas:\t").append(
decommissionedReplicas);
}
if (decommissioningReplicas > 0) {
res.append("\n DecommissioningReplicas:\t").append(
decommissioningReplicas);
}
return res.toString(); return res.toString();
} }
} }

View File

@ -192,7 +192,7 @@ public class TestClientReportBadBlock {
verifyFirstBlockCorrupted(filePath, false); verifyFirstBlockCorrupted(filePath, false);
int expectedReplicaCount = repl-corruptBlocReplicas; int expectedReplicaCount = repl-corruptBlocReplicas;
verifyCorruptedBlockCount(filePath, expectedReplicaCount); verifyCorruptedBlockCount(filePath, expectedReplicaCount);
verifyFsckHealth("Target Replicas is 3 but found 1 replica"); verifyFsckHealth("Target Replicas is 3 but found 1 live replica");
testFsckListCorruptFilesBlocks(filePath, 0); testFsckListCorruptFilesBlocks(filePath, 0);
} }
} }

View File

@ -192,7 +192,7 @@ public class TestReadOnlySharedStorage {
assertThat(numberReplicas.liveReplicas(), is(expectedReplicas)); assertThat(numberReplicas.liveReplicas(), is(expectedReplicas));
assertThat(numberReplicas.excessReplicas(), is(0)); assertThat(numberReplicas.excessReplicas(), is(0));
assertThat(numberReplicas.corruptReplicas(), is(0)); assertThat(numberReplicas.corruptReplicas(), is(0));
assertThat(numberReplicas.decommissionedReplicas(), is(0)); assertThat(numberReplicas.decommissionedAndDecommissioning(), is(0));
assertThat(numberReplicas.replicasOnStaleNodes(), is(0)); assertThat(numberReplicas.replicasOnStaleNodes(), is(0));
BlockManagerTestUtil.updateState(blockManager); BlockManagerTestUtil.updateState(blockManager);

View File

@ -1464,4 +1464,85 @@ public class TestFsck {
} }
} }
} }
/**
* Test for blocks on decommissioning hosts are not shown as missing
*/
@Test
public void testFsckWithDecommissionedReplicas() throws Exception {
final short REPL_FACTOR = 1;
short NUM_DN = 2;
final long blockSize = 512;
final long fileSize = 1024;
boolean checkDecommissionInProgress = false;
String [] racks = {"/rack1", "/rack2"};
String [] hosts = {"host1", "host2"};
Configuration conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
MiniDFSCluster cluster;
DistributedFileSystem dfs ;
cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DN).hosts(hosts)
.racks(racks).build();
assertNotNull("Failed Cluster Creation", cluster);
cluster.waitClusterUp();
dfs = cluster.getFileSystem();
assertNotNull("Failed to get FileSystem", dfs);
DFSTestUtil util = new DFSTestUtil.Builder().
setName(getClass().getSimpleName()).setNumFiles(1).build();
//create files
final String testFile = new String("/testfile");
final Path path = new Path(testFile);
util.createFile(dfs, path, fileSize, REPL_FACTOR, 1000L);
util.waitReplication(dfs, path, REPL_FACTOR);
try {
// make sure datanode that has replica is fine before decommission
String outStr = runFsck(conf, 0, true, testFile);
System.out.println(outStr);
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
// decommission datanode
ExtendedBlock eb = util.getFirstBlock(dfs, path);
DatanodeDescriptor dn = cluster.getNameNode().getNamesystem()
.getBlockManager().getBlockCollection(eb.getLocalBlock())
.getBlocks()[0].getDatanode(0);
cluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().getDecomManager().startDecommission(dn);
String dnName = dn.getXferAddr();
// wait for decommission start
DatanodeInfo datanodeInfo = null;
int count = 0;
do {
Thread.sleep(2000);
for (DatanodeInfo info : dfs.getDataNodeStats()) {
if (dnName.equals(info.getXferAddr())) {
datanodeInfo = info;
}
}
// check the replica status should be healthy(0)
// instead of corruption (1) during decommissioning
if(!checkDecommissionInProgress && datanodeInfo != null
&& datanodeInfo.isDecommissionInProgress()) {
String fsckOut = runFsck(conf, 0, true, testFile);
checkDecommissionInProgress = true;
}
} while (datanodeInfo != null && !datanodeInfo.isDecommissioned());
// check the replica status should be healthy(0) after decommission
// is done
String fsckOut = runFsck(conf, 0, true, testFile);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
} }