diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 92f13d86381..879c4ab99b0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -1572,7 +1572,8 @@ void updateBlockGS(final long newGS) { } /** update pipeline at the namenode */ - private void updatePipeline(long newGS) throws IOException { + @VisibleForTesting + void updatePipeline(long newGS) throws IOException { final ExtendedBlock oldBlock = block.getCurrentBlock(); // the new GS has been propagated to all DN, it should be ok to update the // local block state diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java index c43ace7c83a..7bdd5eb5aca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java @@ -84,7 +84,7 @@ public void setExpectedLocations(Block block, DatanodeStorageInfo[] targets) { for(int i = 0; i < targets.length; i++) { // Only store non-null DatanodeStorageInfo. if (targets[i] != null) { - replicas[i] = new ReplicaUnderConstruction(block, + replicas[offset++] = new ReplicaUnderConstruction(block, targets[i], ReplicaState.RBW); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java index 50a9793bc40..fe069365062 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java @@ -38,6 +38,9 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; @@ -705,4 +708,51 @@ public void failPipeline(ReplicaInPipelineInterface replicaInfo, cluster.shutdown(); } } + + @Test + public void testUpdatePipeLineAfterDNReg()throws Exception { + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); + cluster.waitActive(); + FileSystem fileSys = cluster.getFileSystem(); + + Path file = new Path("/testUpdatePipeLineAfterDNReg"); + FSDataOutputStream out = fileSys.create(file); + out.write(1); + out.hflush(); + //Get the First DN and disable the heartbeats and then put in Deadstate + DFSOutputStream dfsOut = (DFSOutputStream) out.getWrappedStream(); + DatanodeInfo[] pipeline = dfsOut.getPipeline(); + DataNode dn1 = cluster.getDataNode(pipeline[0].getIpcPort()); + dn1.setHeartbeatsDisabledForTests(true); + DatanodeDescriptor dn1Desc = cluster.getNamesystem(0).getBlockManager() + .getDatanodeManager().getDatanode(dn1.getDatanodeId()); + cluster.setDataNodeDead(dn1Desc); + //Re-register the DeadNode + DatanodeProtocolClientSideTranslatorPB dnp = + new DatanodeProtocolClientSideTranslatorPB( + cluster.getNameNode().getNameNodeAddress(), conf); + dnp.registerDatanode( + dn1.getDNRegistrationForBP(cluster.getNamesystem().getBlockPoolId())); + DFSOutputStream dfsO = (DFSOutputStream) out.getWrappedStream(); + String clientName = ((DistributedFileSystem) fileSys).getClient() + .getClientName(); + NamenodeProtocols namenode = cluster.getNameNodeRpc(); + //Update the genstamp and call updatepipeline + LocatedBlock newBlock = namenode + .updateBlockForPipeline(dfsO.getBlock(), clientName); + dfsO.getStreamer() + .updatePipeline(newBlock.getBlock().getGenerationStamp()); + newBlock = namenode.updateBlockForPipeline(dfsO.getBlock(), clientName); + //Should not throw any error Pipeline should be success + dfsO.getStreamer() + .updatePipeline(newBlock.getBlock().getGenerationStamp()); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } }