diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt index 968b33ab8ec..f5ede31804c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt @@ -71,5 +71,8 @@ IMPROVEMENTS: HDFS-5455. NN should update storageMap on first heartbeat. (Arpit Agarwal) HDFS-5457. Fix TestDatanodeRegistration, TestFsck and TestAddBlockRetry. - (Contributed bt szetszwo) + (Contributed by szetszwo) + + HDFS-5466. Update storage IDs when the pipeline is updated. (Contributed + by szetszwo) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 9e91abad6e7..7f5b6ed5323 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -402,8 +402,7 @@ public class DFSOutputStream extends FSOutputSummer } // setup pipeline to append to the last block XXX retries?? - nodes = lastBlock.getLocations(); - storageIDs = lastBlock.getStorageIDs(); + setPipeline(lastBlock); errorIndex = -1; // no errors yet. if (nodes.length < 1) { throw new IOException("Unable to retrieve blocks locations " + @@ -412,6 +411,14 @@ public class DFSOutputStream extends FSOutputSummer } } + + private void setPipeline(LocatedBlock lb) { + setPipeline(lb.getLocations(), lb.getStorageIDs()); + } + private void setPipeline(DatanodeInfo[] nodes, String[] storageIDs) { + this.nodes = nodes; + this.storageIDs = storageIDs; + } private void setFavoredNodes(String[] favoredNodes) { this.favoredNodes = favoredNodes; @@ -435,7 +442,7 @@ public class DFSOutputStream extends FSOutputSummer this.setName("DataStreamer for file " + src); closeResponder(); closeStream(); - nodes = null; + setPipeline(null, null); stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; } @@ -504,7 +511,7 @@ public class DFSOutputStream extends FSOutputSummer if(DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Allocating new block"); } - nodes = nextBlockOutputStream(); + setPipeline(nextBlockOutputStream()); initDataStreaming(); } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) { if(DFSClient.LOG.isDebugEnabled()) { @@ -912,7 +919,7 @@ public class DFSOutputStream extends FSOutputSummer src, block, nodes, storageIDs, failed.toArray(new DatanodeInfo[failed.size()]), 1, dfsClient.clientName); - nodes = lb.getLocations(); + setPipeline(lb); //find the new datanode final int d = findNewDatanode(original); @@ -1012,7 +1019,14 @@ public class DFSOutputStream extends FSOutputSummer System.arraycopy(nodes, 0, newnodes, 0, errorIndex); System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex, newnodes.length-errorIndex); - nodes = newnodes; + + final String[] newStorageIDs = new String[newnodes.length]; + System.arraycopy(storageIDs, 0, newStorageIDs, 0, errorIndex); + System.arraycopy(storageIDs, errorIndex+1, newStorageIDs, errorIndex, + newStorageIDs.length-errorIndex); + + setPipeline(newnodes, newStorageIDs); + hasError = false; lastException.set(null); errorIndex = -1; @@ -1051,7 +1065,7 @@ public class DFSOutputStream extends FSOutputSummer * Must get block ID and the IDs of the destinations from the namenode. * Returns the list of target datanodes. */ - private DatanodeInfo[] nextBlockOutputStream() throws IOException { + private LocatedBlock nextBlockOutputStream() throws IOException { LocatedBlock lb = null; DatanodeInfo[] nodes = null; int count = dfsClient.getConf().nBlockWriteRetry; @@ -1093,7 +1107,7 @@ public class DFSOutputStream extends FSOutputSummer if (!success) { throw new IOException("Unable to create new block."); } - return nodes; + return lb; } // connects to the first datanode in the pipeline diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 180c919a0d6..867f4e1802d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1620,7 +1620,6 @@ public class BlockManager { // To minimize startup time, we discard any second (or later) block reports // that we receive while still in startup phase. final DatanodeStorageInfo storageInfo = node.updateStorage(storage); - LOG.info("XXX storageInfo=" + storageInfo + ", storage=" + storage); if (namesystem.isInStartupSafeMode() && storageInfo.getBlockReportCount() > 0) { blockLog.info("BLOCK* processReport: "