From 295e8685dd8a34b2c6c09b601ceaf66587695e34 Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Sat, 25 Feb 2017 21:13:51 -0800 Subject: [PATCH] HDFS-8498. Blocks can be committed with wrong size. Contributed by Jing Zhao. (cherry picked from commit f3cdf29af4c67a1963f51f02bf88075bf6dce679) --- .../apache/hadoop/hdfs/DFSOutputStream.java | 6 +- .../org/apache/hadoop/hdfs/DataStreamer.java | 119 ++++++++++++------ .../hadoop/hdfs/TestDFSOutputStream.java | 3 +- 3 files changed, 83 insertions(+), 45 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 1e549d963f8..404a644c736 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -606,9 +606,9 @@ public class DFSOutputStream extends FSOutputSummer // update the block length first time irrespective of flag if (updateLength || getStreamer().getPersistBlocks().get()) { synchronized (this) { - if (!getStreamer().streamerClosed() - && getStreamer().getBlock() != null) { - lastBlockLength = getStreamer().getBlock().getNumBytes(); + final ExtendedBlock block = getStreamer().getBlock(); + if (!getStreamer().streamerClosed() && block != null) { + lastBlockLength = block.getNumBytes(); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 2a7f8c0133f..47728d3e469 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -150,8 +150,6 @@ class DataStreamer extends Daemon { /** * Record a connection exception. - * @param e - * @throws InvalidEncryptionKeyException */ void recordFailure(final InvalidEncryptionKeyException e) throws InvalidEncryptionKeyException { @@ -186,9 +184,8 @@ class DataStreamer extends Daemon { final StorageType[] targetStorageTypes, final Token blockToken) throws IOException { //send the TRANSFER_BLOCK request - new Sender(out) - .transferBlock(block, blockToken, dfsClient.clientName, targets, - targetStorageTypes); + new Sender(out).transferBlock(block.getCurrentBlock(), blockToken, + dfsClient.clientName, targets, targetStorageTypes); out.flush(); //ack BlockOpResponseProto transferResponse = BlockOpResponseProto @@ -207,6 +204,42 @@ class DataStreamer extends Daemon { } } + static class BlockToWrite { + private ExtendedBlock currentBlock; + + BlockToWrite(ExtendedBlock block) { + setCurrentBlock(block); + } + + synchronized ExtendedBlock getCurrentBlock() { + return currentBlock == null ? null : new ExtendedBlock(currentBlock); + } + + synchronized long getNumBytes() { + return currentBlock == null ? 0 : currentBlock.getNumBytes(); + } + + synchronized void setCurrentBlock(ExtendedBlock block) { + currentBlock = (block == null || block.getLocalBlock() == null) ? + null : new ExtendedBlock(block); + } + + synchronized void setNumBytes(long numBytes) { + assert currentBlock != null; + currentBlock.setNumBytes(numBytes); + } + + synchronized void setGenerationStamp(long generationStamp) { + assert currentBlock != null; + currentBlock.setGenerationStamp(generationStamp); + } + + @Override + public synchronized String toString() { + return currentBlock == null ? "null" : currentBlock.toString(); + } + } + /** * Create a socket for a write pipeline * @@ -420,7 +453,7 @@ class DataStreamer extends Daemon { } private volatile boolean streamerClosed = false; - private volatile ExtendedBlock block; // its length is number of bytes acked + private final BlockToWrite block; // its length is number of bytes acked private Token accessToken; private DataOutputStream blockStream; private DataInputStream blockReplyStream; @@ -481,12 +514,14 @@ class DataStreamer extends Daemon { private final String[] favoredNodes; private final EnumSet addBlockFlags; - private DataStreamer(HdfsFileStatus stat, DFSClient dfsClient, String src, + private DataStreamer(HdfsFileStatus stat, ExtendedBlock block, + DFSClient dfsClient, String src, Progressable progress, DataChecksum checksum, AtomicReference cachingStrategy, ByteArrayManager byteArrayManage, boolean isAppend, String[] favoredNodes, EnumSet flags) { + this.block = new BlockToWrite(block); this.dfsClient = dfsClient; this.src = src; this.progress = progress; @@ -512,9 +547,8 @@ class DataStreamer extends Daemon { AtomicReference cachingStrategy, ByteArrayManager byteArrayManage, String[] favoredNodes, EnumSet flags) { - this(stat, dfsClient, src, progress, checksum, cachingStrategy, + this(stat, block, dfsClient, src, progress, checksum, cachingStrategy, byteArrayManage, false, favoredNodes, flags); - this.block = block; stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; } @@ -527,10 +561,9 @@ class DataStreamer extends Daemon { String src, Progressable progress, DataChecksum checksum, AtomicReference cachingStrategy, ByteArrayManager byteArrayManage) throws IOException { - this(stat, dfsClient, src, progress, checksum, cachingStrategy, - byteArrayManage, true, null, null); + this(stat, lastBlock.getBlock(), dfsClient, src, progress, checksum, + cachingStrategy, byteArrayManage, true, null, null); stage = BlockConstructionStage.PIPELINE_SETUP_APPEND; - block = lastBlock.getBlock(); bytesSent = block.getNumBytes(); accessToken = lastBlock.getBlockToken(); } @@ -1288,7 +1321,7 @@ class DataStreamer extends Daemon { LocatedBlock lb; //get a new datanode lb = dfsClient.namenode.getAdditionalDatanode( - src, stat.getFileId(), block, nodes, storageIDs, + src, stat.getFileId(), block.getCurrentBlock(), nodes, storageIDs, exclude.toArray(new DatanodeInfo[exclude.size()]), 1, dfsClient.clientName); // a new node was allocated by the namenode. Update nodes. @@ -1400,7 +1433,7 @@ class DataStreamer extends Daemon { } // while if (success) { - block = updatePipeline(newGS); + updatePipeline(newGS); } return false; // do not sleep, continue processing } @@ -1497,17 +1530,27 @@ class DataStreamer extends Daemon { } private LocatedBlock updateBlockForPipeline() throws IOException { - return dfsClient.namenode.updateBlockForPipeline(block, + return dfsClient.namenode.updateBlockForPipeline(block.getCurrentBlock(), dfsClient.clientName); } + void updateBlockGS(final long newGS) { + block.setGenerationStamp(newGS); + } + /** update pipeline at the namenode */ - ExtendedBlock updatePipeline(long newGS) throws IOException { - final ExtendedBlock newBlock = new ExtendedBlock( - block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS); - dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock, - nodes, storageIDs); - return newBlock; + private void updatePipeline(long newGS) throws IOException { + final ExtendedBlock oldBlock = block.getCurrentBlock(); + // the new GS has been propagated to all DN, it should be ok to update the + // local block state + updateBlockGS(newGS); + dfsClient.namenode.updatePipeline(dfsClient.clientName, oldBlock, + block.getCurrentBlock(), nodes, storageIDs); + } + + DatanodeInfo[] getExcludedNodes() { + return excludedNodes.getAllPresent(excludedNodes.asMap().keySet()) + .keySet().toArray(new DatanodeInfo[0]); } /** @@ -1522,35 +1565,30 @@ class DataStreamer extends Daemon { StorageType[] storageTypes; int count = dfsClient.getConf().getNumBlockWriteRetry(); boolean success; - ExtendedBlock oldBlock = block; + final ExtendedBlock oldBlock = block.getCurrentBlock(); do { errorState.reset(); lastException.clear(); success = false; - DatanodeInfo[] excluded = - excludedNodes.getAllPresent(excludedNodes.asMap().keySet()) - .keySet() - .toArray(new DatanodeInfo[0]); - block = oldBlock; - lb = locateFollowingBlock(excluded.length > 0 ? excluded : null); - block = lb.getBlock(); + DatanodeInfo[] excluded = getExcludedNodes(); + lb = locateFollowingBlock( + excluded.length > 0 ? excluded : null, oldBlock); + block.setCurrentBlock(lb.getBlock()); block.setNumBytes(0); bytesSent = 0; accessToken = lb.getBlockToken(); nodes = lb.getLocations(); storageTypes = lb.getStorageTypes(); - // // Connect to first DataNode in the list. - // success = createBlockOutputStream(nodes, storageTypes, 0L, false); if (!success) { LOG.warn("Abandoning " + block); - dfsClient.namenode.abandonBlock(block, stat.getFileId(), src, - dfsClient.clientName); - block = null; + dfsClient.namenode.abandonBlock(block.getCurrentBlock(), + stat.getFileId(), src, dfsClient.clientName); + block.setCurrentBlock(null); final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()]; LOG.warn("Excluding datanode " + badNode); excludedNodes.put(badNode, badNode); @@ -1611,7 +1649,7 @@ class DataStreamer extends Daemon { // We cannot change the block length in 'block' as it counts the number // of bytes ack'ed. - ExtendedBlock blockCopy = new ExtendedBlock(block); + ExtendedBlock blockCopy = block.getCurrentBlock(); blockCopy.setNumBytes(stat.getBlockSize()); boolean[] targetPinnings = getPinnings(nodes); @@ -1721,8 +1759,8 @@ class DataStreamer extends Daemon { } } - protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) - throws IOException { + protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excluded, + ExtendedBlock oldBlock) throws IOException { final DfsClientConf conf = dfsClient.getConf(); int retries = conf.getNumBlockWriteLocateFollowingRetry(); long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs(); @@ -1731,7 +1769,7 @@ class DataStreamer extends Daemon { while (true) { try { return dfsClient.namenode.addBlock(src, dfsClient.clientName, - block, excludedNodes, stat.getFileId(), favoredNodes, + oldBlock, excluded, stat.getFileId(), favoredNodes, addBlockFlags); } catch (RemoteException e) { IOException ue = @@ -1816,7 +1854,7 @@ class DataStreamer extends Daemon { * @return the block this streamer is writing to */ ExtendedBlock getBlock() { - return block; + return block.getCurrentBlock(); } /** @@ -2009,7 +2047,8 @@ class DataStreamer extends Daemon { @Override public String toString() { - return (block == null? null: block.getLocalBlock()) + final ExtendedBlock extendedBlock = block.getCurrentBlock(); + return (extendedBlock == null ? null : extendedBlock.getLocalBlock()) + "@" + Arrays.toString(getNodes()); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java index 750103d19dd..9ec01b6dbc2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java @@ -110,8 +110,7 @@ public class TestDFSOutputStream { * packet size < 64kB. See HDFS-7308 for details. */ @Test - public void testComputePacketChunkSize() - throws Exception { + public void testComputePacketChunkSize() throws Exception { DistributedFileSystem fs = cluster.getFileSystem(); FSDataOutputStream os = fs.create(new Path("/test")); DFSOutputStream dos = (DFSOutputStream) Whitebox.getInternalState(os,