svn merge -c 1547173 merging from trunk to branch-2 to fix: HDFS-5557. Write pipeline recovery for the last packet in the block may cause rejection of valid replicas.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1547179 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Kihwal Lee 2013-12-02 20:27:37 +00:00
parent 439a690ad1
commit 2ea9cd4896
5 changed files with 24 additions and 29 deletions

View File

@ -3695,6 +3695,9 @@ Release 0.23.10 - UNRELEASED
HDFS-5526. Datanode cannot roll back to previous layout version (kihwal) HDFS-5526. Datanode cannot roll back to previous layout version (kihwal)
HDFS-5557. Write pipeline recovery for the last packet in the block may
cause rejection of valid replicas. (kihwal)
Release 0.23.9 - 2013-07-08 Release 0.23.9 - 2013-07-08
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -840,7 +840,6 @@ public class DFSOutputStream extends FSOutputSummer
// We also need to set lastAckedSeqno to the end-of-block Packet's seqno, so that // We also need to set lastAckedSeqno to the end-of-block Packet's seqno, so that
// a client waiting on close() will be aware that the flush finished. // a client waiting on close() will be aware that the flush finished.
synchronized (dataQueue) { synchronized (dataQueue) {
assert dataQueue.size() == 1;
Packet endOfBlockPacket = dataQueue.remove(); // remove the end of block packet Packet endOfBlockPacket = dataQueue.remove(); // remove the end of block packet
assert endOfBlockPacket.lastPacketInBlock; assert endOfBlockPacket.lastPacketInBlock;
assert lastAckedSeqno == endOfBlockPacket.seqno - 1; assert lastAckedSeqno == endOfBlockPacket.seqno - 1;
@ -1044,7 +1043,7 @@ public class DFSOutputStream extends FSOutputSummer
// set up the pipeline again with the remaining nodes // set up the pipeline again with the remaining nodes
if (failPacket) { // for testing if (failPacket) { // for testing
success = createBlockOutputStream(nodes, newGS-1, isRecovery); success = createBlockOutputStream(nodes, newGS, isRecovery);
failPacket = false; failPacket = false;
try { try {
// Give DNs time to send in bad reports. In real situations, // Give DNs time to send in bad reports. In real situations,

View File

@ -235,6 +235,8 @@ public class BlockInfoUnderConstruction extends BlockInfo {
* @param genStamp The final generation stamp for the block. * @param genStamp The final generation stamp for the block.
*/ */
public void setGenerationStampAndVerifyReplicas(long genStamp) { public void setGenerationStampAndVerifyReplicas(long genStamp) {
// Set the generation stamp for the block.
setGenerationStamp(genStamp);
if (replicas == null) if (replicas == null)
return; return;
@ -244,12 +246,9 @@ public class BlockInfoUnderConstruction extends BlockInfo {
if (genStamp != r.getGenerationStamp()) { if (genStamp != r.getGenerationStamp()) {
r.getExpectedLocation().removeBlock(this); r.getExpectedLocation().removeBlock(this);
NameNode.blockStateChangeLog.info("BLOCK* Removing stale replica " NameNode.blockStateChangeLog.info("BLOCK* Removing stale replica "
+ "from location: " + r); + "from location: " + r.getExpectedLocation());
} }
} }
// Set the generation stamp for the block.
setGenerationStamp(genStamp);
} }
/** /**
@ -264,6 +263,8 @@ public class BlockInfoUnderConstruction extends BlockInfo {
+ block.getBlockId() + ", expected id = " + getBlockId()); + block.getBlockId() + ", expected id = " + getBlockId());
blockUCState = BlockUCState.COMMITTED; blockUCState = BlockUCState.COMMITTED;
this.set(getBlockId(), block.getNumBytes(), block.getGenerationStamp()); this.set(getBlockId(), block.getNumBytes(), block.getGenerationStamp());
// Sort out invalid replicas.
setGenerationStampAndVerifyReplicas(block.getGenerationStamp());
} }
/** /**

View File

@ -1556,13 +1556,15 @@ public class BlockManager {
* Besides the block in question, it provides the ReplicaState * Besides the block in question, it provides the ReplicaState
* reported by the datanode in the block report. * reported by the datanode in the block report.
*/ */
private static class StatefulBlockInfo { static class StatefulBlockInfo {
final BlockInfoUnderConstruction storedBlock; final BlockInfoUnderConstruction storedBlock;
final Block reportedBlock;
final ReplicaState reportedState; final ReplicaState reportedState;
StatefulBlockInfo(BlockInfoUnderConstruction storedBlock, StatefulBlockInfo(BlockInfoUnderConstruction storedBlock,
ReplicaState reportedState) { Block reportedBlock, ReplicaState reportedState) {
this.storedBlock = storedBlock; this.storedBlock = storedBlock;
this.reportedBlock = reportedBlock;
this.reportedState = reportedState; this.reportedState = reportedState;
} }
} }
@ -1715,7 +1717,7 @@ public class BlockManager {
// Process the blocks on each queue // Process the blocks on each queue
for (StatefulBlockInfo b : toUC) { for (StatefulBlockInfo b : toUC) {
addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState); addStoredBlockUnderConstruction(b, node);
} }
for (Block b : toRemove) { for (Block b : toRemove) {
removeStoredBlock(b, node); removeStoredBlock(b, node);
@ -1939,7 +1941,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) { if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
toUC.add(new StatefulBlockInfo( toUC.add(new StatefulBlockInfo(
(BlockInfoUnderConstruction)storedBlock, reportedState)); (BlockInfoUnderConstruction)storedBlock, block, reportedState));
return storedBlock; return storedBlock;
} }
@ -2110,13 +2112,11 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
} }
} }
void addStoredBlockUnderConstruction( void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock,
BlockInfoUnderConstruction block, DatanodeDescriptor node) throws IOException {
DatanodeDescriptor node, BlockInfoUnderConstruction block = ucBlock.storedBlock;
ReplicaState reportedState) block.addReplicaIfNotPresent(node, ucBlock.reportedBlock, ucBlock.reportedState);
throws IOException { if (ucBlock.reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) {
block.addReplicaIfNotPresent(node, block, reportedState);
if (reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) {
addStoredBlock(block, node, null, true); addStoredBlock(block, node, null, true);
} }
} }
@ -2683,7 +2683,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
: "The block should be only in one of the lists."; : "The block should be only in one of the lists.";
for (StatefulBlockInfo b : toUC) { for (StatefulBlockInfo b : toUC) {
addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState); addStoredBlockUnderConstruction(b, node);
} }
long numBlocksLogged = 0; long numBlocksLogged = 0;
for (BlockInfo b : toAdd) { for (BlockInfo b : toAdd) {

View File

@ -139,16 +139,10 @@ public class TestClientProtocolForPipelineRecovery {
Path file = new Path("dataprotocol1.dat"); Path file = new Path("dataprotocol1.dat");
Mockito.when(faultInjector.failPacket()).thenReturn(true); Mockito.when(faultInjector.failPacket()).thenReturn(true);
try { DFSTestUtil.createFile(fileSys, file, 68000000L, (short)numDataNodes, 0L);
DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
} catch (IOException e) {
// completeFile() should fail.
Assert.assertTrue(e.getMessage().startsWith("Unable to close file"));
return;
}
// At this point, NN let data corruption to happen. // At this point, NN should have accepted only valid replicas.
// Before failing test, try reading the file. It should fail. // Read should succeed.
FSDataInputStream in = fileSys.open(file); FSDataInputStream in = fileSys.open(file);
try { try {
int c = in.read(); int c = in.read();
@ -158,8 +152,6 @@ public class TestClientProtocolForPipelineRecovery {
Assert.fail("Block is missing because the file was closed with" Assert.fail("Block is missing because the file was closed with"
+ " corrupt replicas."); + " corrupt replicas.");
} }
Assert.fail("The file was closed with corrupt replicas, but read still"
+ " works!");
} finally { } finally {
DFSClientFaultInjector.instance = oldInjector; DFSClientFaultInjector.instance = oldInjector;
if (cluster != null) { if (cluster != null) {