svn merge -c 1356086 from trunk for HDFS-3157. Fix a bug in the case that the generation stamps of the stored block in a namenode and the reported block from a datanode do not match.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1356095 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2b938bea99
commit
2de013e0a8
|
@ -238,6 +238,13 @@ Release 2.0.1-alpha - UNRELEASED
|
||||||
HDFS-3559. DFSTestUtil: use Builder class to construct DFSTestUtil
|
HDFS-3559. DFSTestUtil: use Builder class to construct DFSTestUtil
|
||||||
instances. (Colin Patrick McCabe via atm)
|
instances. (Colin Patrick McCabe via atm)
|
||||||
|
|
||||||
|
HDFS-3551. WebHDFS CREATE should use client location for HTTP redirection.
|
||||||
|
(szetszwo)
|
||||||
|
|
||||||
|
HDFS-3157. Fix a bug in the case that the generation stamps of the stored
|
||||||
|
block in a namenode and the reported block from a datanode do not match.
|
||||||
|
(Ashish Singhi via szetszwo)
|
||||||
|
|
||||||
BREAKDOWN OF HDFS-3042 SUBTASKS
|
BREAKDOWN OF HDFS-3042 SUBTASKS
|
||||||
|
|
||||||
HDFS-2185. HDFS portion of ZK-based FailoverController (todd)
|
HDFS-2185. HDFS portion of ZK-based FailoverController (todd)
|
||||||
|
@ -256,9 +263,6 @@ Release 2.0.1-alpha - UNRELEASED
|
||||||
|
|
||||||
HDFS-3428. Move DelegationTokenRenewer to common (tucu)
|
HDFS-3428. Move DelegationTokenRenewer to common (tucu)
|
||||||
|
|
||||||
HDFS-3551. WebHDFS CREATE should use client location for HTTP redirection.
|
|
||||||
(szetszwo)
|
|
||||||
|
|
||||||
HDFS-3491. HttpFs does not set permissions correctly (tucu)
|
HDFS-3491. HttpFs does not set permissions correctly (tucu)
|
||||||
|
|
||||||
HDFS-3580. incompatible types; no instance(s) of type variable(s) V exist
|
HDFS-3580. incompatible types; no instance(s) of type variable(s) V exist
|
||||||
|
|
|
@ -930,78 +930,71 @@ public class BlockManager {
|
||||||
+ blk + " not found.");
|
+ blk + " not found.");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
markBlockAsCorrupt(storedBlock, dn, reason);
|
markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason), dn);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void markBlockAsCorrupt(BlockInfo storedBlock,
|
private void markBlockAsCorrupt(BlockToMarkCorrupt b,
|
||||||
DatanodeInfo dn,
|
DatanodeInfo dn) throws IOException {
|
||||||
String reason) throws IOException {
|
|
||||||
assert storedBlock != null : "storedBlock should not be null";
|
|
||||||
DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
|
DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
|
||||||
if (node == null) {
|
if (node == null) {
|
||||||
throw new IOException("Cannot mark block " +
|
throw new IOException("Cannot mark " + b
|
||||||
storedBlock.getBlockName() +
|
+ " as corrupt because datanode " + dn + " does not exist");
|
||||||
" as corrupt because datanode " + dn +
|
|
||||||
" does not exist. ");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
BlockCollection bc = storedBlock.getBlockCollection();
|
BlockCollection bc = b.corrupted.getBlockCollection();
|
||||||
if (bc == null) {
|
if (bc == null) {
|
||||||
NameNode.stateChangeLog.info("BLOCK markBlockAsCorrupt: " +
|
NameNode.stateChangeLog.info("BLOCK markBlockAsCorrupt: " + b
|
||||||
"block " + storedBlock +
|
+ " cannot be marked as corrupt as it does not belong to any file");
|
||||||
" could not be marked as corrupt as it" +
|
addToInvalidates(b.corrupted, node);
|
||||||
" does not belong to any file");
|
|
||||||
addToInvalidates(storedBlock, node);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add replica to the data-node if it is not already there
|
// Add replica to the data-node if it is not already there
|
||||||
node.addBlock(storedBlock);
|
node.addBlock(b.stored);
|
||||||
|
|
||||||
// Add this replica to corruptReplicas Map
|
// Add this replica to corruptReplicas Map
|
||||||
corruptReplicas.addToCorruptReplicasMap(storedBlock, node, reason);
|
corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason);
|
||||||
if (countNodes(storedBlock).liveReplicas() >= bc.getReplication()) {
|
if (countNodes(b.stored).liveReplicas() >= bc.getReplication()) {
|
||||||
// the block is over-replicated so invalidate the replicas immediately
|
// the block is over-replicated so invalidate the replicas immediately
|
||||||
invalidateBlock(storedBlock, node);
|
invalidateBlock(b, node);
|
||||||
} else if (namesystem.isPopulatingReplQueues()) {
|
} else if (namesystem.isPopulatingReplQueues()) {
|
||||||
// add the block to neededReplication
|
// add the block to neededReplication
|
||||||
updateNeededReplications(storedBlock, -1, 0);
|
updateNeededReplications(b.stored, -1, 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Invalidates the given block on the given datanode.
|
* Invalidates the given block on the given datanode.
|
||||||
*/
|
*/
|
||||||
private void invalidateBlock(Block blk, DatanodeInfo dn)
|
private void invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn
|
||||||
throws IOException {
|
) throws IOException {
|
||||||
NameNode.stateChangeLog.info("BLOCK* invalidateBlock: "
|
NameNode.stateChangeLog.info("BLOCK* invalidateBlock: " + b + " on " + dn);
|
||||||
+ blk + " on " + dn);
|
|
||||||
DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
|
DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
|
||||||
if (node == null) {
|
if (node == null) {
|
||||||
throw new IOException("Cannot invalidate block " + blk
|
throw new IOException("Cannot invalidate " + b
|
||||||
+ " because datanode " + dn + " does not exist.");
|
+ " because datanode " + dn + " does not exist.");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check how many copies we have of the block
|
// Check how many copies we have of the block
|
||||||
NumberReplicas nr = countNodes(blk);
|
NumberReplicas nr = countNodes(b.stored);
|
||||||
if (nr.replicasOnStaleNodes() > 0) {
|
if (nr.replicasOnStaleNodes() > 0) {
|
||||||
NameNode.stateChangeLog.info("BLOCK* invalidateBlocks: postponing " +
|
NameNode.stateChangeLog.info("BLOCK* invalidateBlocks: postponing " +
|
||||||
"invalidation of block " + blk + " on " + dn + " because " +
|
"invalidation of " + b + " on " + dn + " because " +
|
||||||
nr.replicasOnStaleNodes() + " replica(s) are located on nodes " +
|
nr.replicasOnStaleNodes() + " replica(s) are located on nodes " +
|
||||||
"with potentially out-of-date block reports.");
|
"with potentially out-of-date block reports.");
|
||||||
postponeBlock(blk);
|
postponeBlock(b.corrupted);
|
||||||
|
|
||||||
} else if (nr.liveReplicas() >= 1) {
|
} else if (nr.liveReplicas() >= 1) {
|
||||||
// If we have at least one copy on a live node, then we can delete it.
|
// If we have at least one copy on a live node, then we can delete it.
|
||||||
addToInvalidates(blk, dn);
|
addToInvalidates(b.corrupted, dn);
|
||||||
removeStoredBlock(blk, node);
|
removeStoredBlock(b.stored, node);
|
||||||
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
||||||
NameNode.stateChangeLog.debug("BLOCK* invalidateBlocks: "
|
NameNode.stateChangeLog.debug("BLOCK* invalidateBlocks: "
|
||||||
+ blk + " on " + dn + " listed for deletion.");
|
+ b + " on " + dn + " listed for deletion.");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
NameNode.stateChangeLog.info("BLOCK* invalidateBlocks: " + blk + " on "
|
NameNode.stateChangeLog.info("BLOCK* invalidateBlocks: " + b
|
||||||
+ dn + " is the only copy and was not deleted.");
|
+ " on " + dn + " is the only copy and was not deleted.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1408,14 +1401,37 @@ public class BlockManager {
|
||||||
* list of blocks that should be considered corrupt due to a block report.
|
* list of blocks that should be considered corrupt due to a block report.
|
||||||
*/
|
*/
|
||||||
private static class BlockToMarkCorrupt {
|
private static class BlockToMarkCorrupt {
|
||||||
final BlockInfo blockInfo;
|
/** The corrupted block in a datanode. */
|
||||||
|
final BlockInfo corrupted;
|
||||||
|
/** The corresponding block stored in the BlockManager. */
|
||||||
|
final BlockInfo stored;
|
||||||
|
/** The reason to mark corrupt. */
|
||||||
final String reason;
|
final String reason;
|
||||||
|
|
||||||
BlockToMarkCorrupt(BlockInfo blockInfo, String reason) {
|
BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason) {
|
||||||
super();
|
Preconditions.checkNotNull(corrupted, "corrupted is null");
|
||||||
this.blockInfo = blockInfo;
|
Preconditions.checkNotNull(stored, "stored is null");
|
||||||
|
|
||||||
|
this.corrupted = corrupted;
|
||||||
|
this.stored = stored;
|
||||||
this.reason = reason;
|
this.reason = reason;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
BlockToMarkCorrupt(BlockInfo stored, String reason) {
|
||||||
|
this(stored, stored, reason);
|
||||||
|
}
|
||||||
|
|
||||||
|
BlockToMarkCorrupt(BlockInfo stored, long gs, String reason) {
|
||||||
|
this(new BlockInfo(stored), stored, reason);
|
||||||
|
//the corrupted block in datanode has a different generation stamp
|
||||||
|
corrupted.setGenerationStamp(gs);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return corrupted + "("
|
||||||
|
+ (corrupted == stored? "same as stored": "stored=" + stored) + ")";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1536,7 +1552,7 @@ public class BlockManager {
|
||||||
addToInvalidates(b, node);
|
addToInvalidates(b, node);
|
||||||
}
|
}
|
||||||
for (BlockToMarkCorrupt b : toCorrupt) {
|
for (BlockToMarkCorrupt b : toCorrupt) {
|
||||||
markBlockAsCorrupt(b.blockInfo, node, b.reason);
|
markBlockAsCorrupt(b, node);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1586,7 +1602,7 @@ public class BlockManager {
|
||||||
queueReportedBlock(node, iblk, reportedState,
|
queueReportedBlock(node, iblk, reportedState,
|
||||||
QUEUE_REASON_CORRUPT_STATE);
|
QUEUE_REASON_CORRUPT_STATE);
|
||||||
} else {
|
} else {
|
||||||
markBlockAsCorrupt(c.blockInfo, node, c.reason);
|
markBlockAsCorrupt(c, node);
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -1807,7 +1823,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
||||||
assert pendingDNMessages.count() == 0;
|
assert pendingDNMessages.count() == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/**
|
||||||
* The next two methods test the various cases under which we must conclude
|
* The next two methods test the various cases under which we must conclude
|
||||||
* the replica is corrupt, or under construction. These are laid out
|
* the replica is corrupt, or under construction. These are laid out
|
||||||
* as switch statements, on the theory that it is easier to understand
|
* as switch statements, on the theory that it is easier to understand
|
||||||
|
@ -1817,7 +1833,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
||||||
* @return a BlockToMarkCorrupt object, or null if the replica is not corrupt
|
* @return a BlockToMarkCorrupt object, or null if the replica is not corrupt
|
||||||
*/
|
*/
|
||||||
private BlockToMarkCorrupt checkReplicaCorrupt(
|
private BlockToMarkCorrupt checkReplicaCorrupt(
|
||||||
Block iblk, ReplicaState reportedState,
|
Block reported, ReplicaState reportedState,
|
||||||
BlockInfo storedBlock, BlockUCState ucState,
|
BlockInfo storedBlock, BlockUCState ucState,
|
||||||
DatanodeDescriptor dn) {
|
DatanodeDescriptor dn) {
|
||||||
switch(reportedState) {
|
switch(reportedState) {
|
||||||
|
@ -1825,15 +1841,16 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
||||||
switch(ucState) {
|
switch(ucState) {
|
||||||
case COMPLETE:
|
case COMPLETE:
|
||||||
case COMMITTED:
|
case COMMITTED:
|
||||||
if (storedBlock.getGenerationStamp() != iblk.getGenerationStamp()) {
|
if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) {
|
||||||
return new BlockToMarkCorrupt(storedBlock,
|
final long reportedGS = reported.getGenerationStamp();
|
||||||
"block is " + ucState + " and reported genstamp " +
|
return new BlockToMarkCorrupt(storedBlock, reportedGS,
|
||||||
iblk.getGenerationStamp() + " does not match " +
|
"block is " + ucState + " and reported genstamp " + reportedGS
|
||||||
"genstamp in block map " + storedBlock.getGenerationStamp());
|
+ " does not match genstamp in block map "
|
||||||
} else if (storedBlock.getNumBytes() != iblk.getNumBytes()) {
|
+ storedBlock.getGenerationStamp());
|
||||||
|
} else if (storedBlock.getNumBytes() != reported.getNumBytes()) {
|
||||||
return new BlockToMarkCorrupt(storedBlock,
|
return new BlockToMarkCorrupt(storedBlock,
|
||||||
"block is " + ucState + " and reported length " +
|
"block is " + ucState + " and reported length " +
|
||||||
iblk.getNumBytes() + " does not match " +
|
reported.getNumBytes() + " does not match " +
|
||||||
"length in block map " + storedBlock.getNumBytes());
|
"length in block map " + storedBlock.getNumBytes());
|
||||||
} else {
|
} else {
|
||||||
return null; // not corrupt
|
return null; // not corrupt
|
||||||
|
@ -1845,11 +1862,12 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
||||||
case RWR:
|
case RWR:
|
||||||
if (!storedBlock.isComplete()) {
|
if (!storedBlock.isComplete()) {
|
||||||
return null; // not corrupt
|
return null; // not corrupt
|
||||||
} else if (storedBlock.getGenerationStamp() != iblk.getGenerationStamp()) {
|
} else if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) {
|
||||||
return new BlockToMarkCorrupt(storedBlock,
|
final long reportedGS = reported.getGenerationStamp();
|
||||||
"reported " + reportedState + " replica with genstamp " +
|
return new BlockToMarkCorrupt(storedBlock, reportedGS,
|
||||||
iblk.getGenerationStamp() + " does not match COMPLETE block's " +
|
"reported " + reportedState + " replica with genstamp " + reportedGS
|
||||||
"genstamp in block map " + storedBlock.getGenerationStamp());
|
+ " does not match COMPLETE block's genstamp in block map "
|
||||||
|
+ storedBlock.getGenerationStamp());
|
||||||
} else { // COMPLETE block, same genstamp
|
} else { // COMPLETE block, same genstamp
|
||||||
if (reportedState == ReplicaState.RBW) {
|
if (reportedState == ReplicaState.RBW) {
|
||||||
// If it's a RBW report for a COMPLETE block, it may just be that
|
// If it's a RBW report for a COMPLETE block, it may just be that
|
||||||
|
@ -1871,8 +1889,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
||||||
String msg = "Unexpected replica state " + reportedState
|
String msg = "Unexpected replica state " + reportedState
|
||||||
+ " for block: " + storedBlock +
|
+ " for block: " + storedBlock +
|
||||||
" on " + dn + " size " + storedBlock.getNumBytes();
|
" on " + dn + " size " + storedBlock.getNumBytes();
|
||||||
// log here at WARN level since this is really a broken HDFS
|
// log here at WARN level since this is really a broken HDFS invariant
|
||||||
// invariant
|
|
||||||
LOG.warn(msg);
|
LOG.warn(msg);
|
||||||
return new BlockToMarkCorrupt(storedBlock, msg);
|
return new BlockToMarkCorrupt(storedBlock, msg);
|
||||||
}
|
}
|
||||||
|
@ -2075,7 +2092,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
||||||
*
|
*
|
||||||
* @param blk Block whose corrupt replicas need to be invalidated
|
* @param blk Block whose corrupt replicas need to be invalidated
|
||||||
*/
|
*/
|
||||||
private void invalidateCorruptReplicas(Block blk) {
|
private void invalidateCorruptReplicas(BlockInfo blk) {
|
||||||
Collection<DatanodeDescriptor> nodes = corruptReplicas.getNodes(blk);
|
Collection<DatanodeDescriptor> nodes = corruptReplicas.getNodes(blk);
|
||||||
boolean gotException = false;
|
boolean gotException = false;
|
||||||
if (nodes == null)
|
if (nodes == null)
|
||||||
|
@ -2085,7 +2102,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
||||||
DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]);
|
DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]);
|
||||||
for (DatanodeDescriptor node : nodesCopy) {
|
for (DatanodeDescriptor node : nodesCopy) {
|
||||||
try {
|
try {
|
||||||
invalidateBlock(blk, node);
|
invalidateBlock(new BlockToMarkCorrupt(blk, null), node);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
NameNode.stateChangeLog.info("NameNode.invalidateCorruptReplicas " +
|
NameNode.stateChangeLog.info("NameNode.invalidateCorruptReplicas " +
|
||||||
"error in deleting bad block " + blk +
|
"error in deleting bad block " + blk +
|
||||||
|
@ -2501,7 +2518,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
||||||
addToInvalidates(b, node);
|
addToInvalidates(b, node);
|
||||||
}
|
}
|
||||||
for (BlockToMarkCorrupt b : toCorrupt) {
|
for (BlockToMarkCorrupt b : toCorrupt) {
|
||||||
markBlockAsCorrupt(b.blockInfo, node, b.reason);
|
markBlockAsCorrupt(b, node);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,127 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
|
import org.junit.Test;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test when RBW block is removed. Invalidation of the corrupted block happens
|
||||||
|
* and then the under replicated block gets replicated to the datanode.
|
||||||
|
*/
|
||||||
|
public class TestRBWBlockInvalidation {
|
||||||
|
private static NumberReplicas countReplicas(final FSNamesystem namesystem,
|
||||||
|
ExtendedBlock block) {
|
||||||
|
return namesystem.getBlockManager().countNodes(block.getLocalBlock());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test when a block's replica is removed from RBW folder in one of the
|
||||||
|
* datanode, namenode should ask to invalidate that corrupted block and
|
||||||
|
* schedule replication for one more replica for that under replicated block.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testBlockInvalidationWhenRBWReplicaMissedInDN()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2);
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 300);
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1);
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
|
||||||
|
.build();
|
||||||
|
FSDataOutputStream out = null;
|
||||||
|
try {
|
||||||
|
final FSNamesystem namesystem = cluster.getNamesystem();
|
||||||
|
FileSystem fs = cluster.getFileSystem();
|
||||||
|
Path testPath = new Path(MiniDFSCluster.getBaseDirectory(), "foo1");
|
||||||
|
out = fs.create(testPath, (short) 2);
|
||||||
|
out.writeBytes("HDFS-3157: " + testPath);
|
||||||
|
out.hsync();
|
||||||
|
cluster.startDataNodes(conf, 1, true, null, null, null);
|
||||||
|
String bpid = namesystem.getBlockPoolId();
|
||||||
|
ExtendedBlock blk = DFSTestUtil.getFirstBlock(fs, testPath);
|
||||||
|
Block block = blk.getLocalBlock();
|
||||||
|
DataNode dn = cluster.getDataNodes().get(0);
|
||||||
|
|
||||||
|
// Delete partial block and its meta information from the RBW folder
|
||||||
|
// of first datanode.
|
||||||
|
File blockFile = DataNodeTestUtils.getBlockFile(dn, bpid, block);
|
||||||
|
File metaFile = DataNodeTestUtils.getMetaFile(dn, bpid, block);
|
||||||
|
assertTrue("Could not delete the block file from the RBW folder",
|
||||||
|
blockFile.delete());
|
||||||
|
assertTrue("Could not delete the block meta file from the RBW folder",
|
||||||
|
metaFile.delete());
|
||||||
|
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
// Check datanode has reported the corrupt block.
|
||||||
|
boolean isCorruptReported = false;
|
||||||
|
while (!isCorruptReported) {
|
||||||
|
if (countReplicas(namesystem, blk).corruptReplicas() > 0) {
|
||||||
|
isCorruptReported = true;
|
||||||
|
}
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
assertEquals("There should be 1 replica in the corruptReplicasMap", 1,
|
||||||
|
countReplicas(namesystem, blk).corruptReplicas());
|
||||||
|
|
||||||
|
// Check the block has got replicated to another datanode.
|
||||||
|
blk = DFSTestUtil.getFirstBlock(fs, testPath);
|
||||||
|
boolean isReplicated = false;
|
||||||
|
while (!isReplicated) {
|
||||||
|
if (countReplicas(namesystem, blk).liveReplicas() > 1) {
|
||||||
|
isReplicated = true;
|
||||||
|
}
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
assertEquals("There should be two live replicas", 2, countReplicas(
|
||||||
|
namesystem, blk).liveReplicas());
|
||||||
|
|
||||||
|
// sleep for 1 second, so that by this time datanode reports the corrupt
|
||||||
|
// block after a live replica of block got replicated.
|
||||||
|
Thread.sleep(1000);
|
||||||
|
|
||||||
|
// Check that there is no corrupt block in the corruptReplicasMap.
|
||||||
|
assertEquals("There should not be any replica in the corruptReplicasMap",
|
||||||
|
0, countReplicas(namesystem, blk).corruptReplicas());
|
||||||
|
} finally {
|
||||||
|
if (out != null) {
|
||||||
|
out.close();
|
||||||
|
}
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -137,6 +137,11 @@ public class DataNodeTestUtils {
|
||||||
return FsDatasetTestUtil.getBlockFile(dn.getFSDataset(), bpid, b);
|
return FsDatasetTestUtil.getBlockFile(dn.getFSDataset(), bpid, b);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static File getMetaFile(DataNode dn, String bpid, Block b)
|
||||||
|
throws IOException {
|
||||||
|
return FsDatasetTestUtil.getMetaFile(dn.getFSDataset(), bpid, b);
|
||||||
|
}
|
||||||
|
|
||||||
public static boolean unlinkBlock(DataNode dn, ExtendedBlock bk, int numLinks
|
public static boolean unlinkBlock(DataNode dn, ExtendedBlock bk, int numLinks
|
||||||
) throws IOException {
|
) throws IOException {
|
||||||
return FsDatasetTestUtil.unlinkBlock(dn.getFSDataset(), bk, numLinks);
|
return FsDatasetTestUtil.unlinkBlock(dn.getFSDataset(), bk, numLinks);
|
||||||
|
|
|
@ -37,6 +37,12 @@ public class FsDatasetTestUtil {
|
||||||
return ((FsDatasetImpl)fsd).getBlockFile(bpid, b);
|
return ((FsDatasetImpl)fsd).getBlockFile(bpid, b);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static File getMetaFile(FsDatasetSpi<?> fsd, String bpid, Block b)
|
||||||
|
throws IOException {
|
||||||
|
return FsDatasetUtil.getMetaFile(getBlockFile(fsd, bpid, b), b
|
||||||
|
.getGenerationStamp());
|
||||||
|
}
|
||||||
|
|
||||||
public static boolean unlinkBlock(FsDatasetSpi<?> fsd,
|
public static boolean unlinkBlock(FsDatasetSpi<?> fsd,
|
||||||
ExtendedBlock block, int numLinks) throws IOException {
|
ExtendedBlock block, int numLinks) throws IOException {
|
||||||
final ReplicaInfo info = ((FsDatasetImpl)fsd).getReplicaInfo(block);
|
final ReplicaInfo info = ((FsDatasetImpl)fsd).getReplicaInfo(block);
|
||||||
|
|
Loading…
Reference in New Issue