From 2ea9cd4896724cf7dd14945ece34a66501069d6f Mon Sep 17 00:00:00 2001 From: Kihwal Lee Date: Mon, 2 Dec 2013 20:27:37 +0000 Subject: [PATCH] svn merge -c 1547173 merging from trunk to branch-2 to fix: HDFS-5557. Write pipeline recovery for the last packet in the block may cause rejection of valid replicas. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1547179 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +++ .../apache/hadoop/hdfs/DFSOutputStream.java | 3 +-- .../BlockInfoUnderConstruction.java | 9 +++---- .../server/blockmanagement/BlockManager.java | 24 +++++++++---------- ...TestClientProtocolForPipelineRecovery.java | 14 +++-------- 5 files changed, 24 insertions(+), 29 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 8d1f2434843..d83b2b6d03c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -3695,6 +3695,9 @@ Release 0.23.10 - UNRELEASED HDFS-5526. Datanode cannot roll back to previous layout version (kihwal) + HDFS-5557. Write pipeline recovery for the last packet in the block may + cause rejection of valid replicas. (kihwal) + Release 0.23.9 - 2013-07-08 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index c51da3050ed..635dff190ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -840,7 +840,6 @@ private boolean processDatanodeError() throws IOException { // We also need to set lastAckedSeqno to the end-of-block Packet's seqno, so that // a client waiting on close() will be aware that the flush finished. synchronized (dataQueue) { - assert dataQueue.size() == 1; Packet endOfBlockPacket = dataQueue.remove(); // remove the end of block packet assert endOfBlockPacket.lastPacketInBlock; assert lastAckedSeqno == endOfBlockPacket.seqno - 1; @@ -1044,7 +1043,7 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException { // set up the pipeline again with the remaining nodes if (failPacket) { // for testing - success = createBlockOutputStream(nodes, newGS-1, isRecovery); + success = createBlockOutputStream(nodes, newGS, isRecovery); failPacket = false; try { // Give DNs time to send in bad reports. In real situations, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java index b05cfce1e65..1b343747ecc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java @@ -235,6 +235,8 @@ public long getBlockRecoveryId() { * @param genStamp The final generation stamp for the block. */ public void setGenerationStampAndVerifyReplicas(long genStamp) { + // Set the generation stamp for the block. + setGenerationStamp(genStamp); if (replicas == null) return; @@ -244,12 +246,9 @@ public void setGenerationStampAndVerifyReplicas(long genStamp) { if (genStamp != r.getGenerationStamp()) { r.getExpectedLocation().removeBlock(this); NameNode.blockStateChangeLog.info("BLOCK* Removing stale replica " - + "from location: " + r); + + "from location: " + r.getExpectedLocation()); } } - - // Set the generation stamp for the block. - setGenerationStamp(genStamp); } /** @@ -264,6 +263,8 @@ void commitBlock(Block block) throws IOException { + block.getBlockId() + ", expected id = " + getBlockId()); blockUCState = BlockUCState.COMMITTED; this.set(getBlockId(), block.getNumBytes(), block.getGenerationStamp()); + // Sort out invalid replicas. + setGenerationStampAndVerifyReplicas(block.getGenerationStamp()); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 10beae97232..7376276cc29 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1556,13 +1556,15 @@ private void processPendingReplications() { * Besides the block in question, it provides the ReplicaState * reported by the datanode in the block report. */ - private static class StatefulBlockInfo { + static class StatefulBlockInfo { final BlockInfoUnderConstruction storedBlock; + final Block reportedBlock; final ReplicaState reportedState; StatefulBlockInfo(BlockInfoUnderConstruction storedBlock, - ReplicaState reportedState) { + Block reportedBlock, ReplicaState reportedState) { this.storedBlock = storedBlock; + this.reportedBlock = reportedBlock; this.reportedState = reportedState; } } @@ -1715,7 +1717,7 @@ private void processReport(final DatanodeDescriptor node, // Process the blocks on each queue for (StatefulBlockInfo b : toUC) { - addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState); + addStoredBlockUnderConstruction(b, node); } for (Block b : toRemove) { removeStoredBlock(b, node); @@ -1939,7 +1941,7 @@ private BlockInfo processReportedBlock(final DatanodeDescriptor dn, if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) { toUC.add(new StatefulBlockInfo( - (BlockInfoUnderConstruction)storedBlock, reportedState)); + (BlockInfoUnderConstruction)storedBlock, block, reportedState)); return storedBlock; } @@ -2110,13 +2112,11 @@ private boolean isBlockUnderConstruction(BlockInfo storedBlock, } } - void addStoredBlockUnderConstruction( - BlockInfoUnderConstruction block, - DatanodeDescriptor node, - ReplicaState reportedState) - throws IOException { - block.addReplicaIfNotPresent(node, block, reportedState); - if (reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) { + void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock, + DatanodeDescriptor node) throws IOException { + BlockInfoUnderConstruction block = ucBlock.storedBlock; + block.addReplicaIfNotPresent(node, ucBlock.reportedBlock, ucBlock.reportedState); + if (ucBlock.reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) { addStoredBlock(block, node, null, true); } } @@ -2683,7 +2683,7 @@ private void processAndHandleReportedBlock(DatanodeDescriptor node, Block block, : "The block should be only in one of the lists."; for (StatefulBlockInfo b : toUC) { - addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState); + addStoredBlockUnderConstruction(b, node); } long numBlocksLogged = 0; for (BlockInfo b : toAdd) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java index 0a8ed3eb7f5..86a9fec48b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java @@ -139,16 +139,10 @@ public void testPipelineRecoveryForLastBlock() throws IOException { Path file = new Path("dataprotocol1.dat"); Mockito.when(faultInjector.failPacket()).thenReturn(true); - try { - DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L); - } catch (IOException e) { - // completeFile() should fail. - Assert.assertTrue(e.getMessage().startsWith("Unable to close file")); - return; - } + DFSTestUtil.createFile(fileSys, file, 68000000L, (short)numDataNodes, 0L); - // At this point, NN let data corruption to happen. - // Before failing test, try reading the file. It should fail. + // At this point, NN should have accepted only valid replicas. + // Read should succeed. FSDataInputStream in = fileSys.open(file); try { int c = in.read(); @@ -158,8 +152,6 @@ public void testPipelineRecoveryForLastBlock() throws IOException { Assert.fail("Block is missing because the file was closed with" + " corrupt replicas."); } - Assert.fail("The file was closed with corrupt replicas, but read still" - + " works!"); } finally { DFSClientFaultInjector.instance = oldInjector; if (cluster != null) {