From a9a3d219fed2dd9d7bb84c228f6b8d97eadbe1f6 Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Sun, 7 May 2017 14:45:26 -0700 Subject: [PATCH] HDFS-9342. Erasure coding: client should update and commit block based on acknowledged size. Contributed by SammiChen. --- .../hadoop/hdfs/DFSStripedOutputStream.java | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 3dd07f72e02..0fdae8c0f8f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -772,9 +772,37 @@ public class DFSStripedOutputStream extends DFSOutputStream { newStorageIDs[i] = ""; } } + + // should update the block group length based on the acked length + final long sentBytes = currentBlockGroup.getNumBytes(); + final long ackedBytes = getNumAckedStripes() * cellSize * numDataBlocks; + Preconditions.checkState(ackedBytes <= sentBytes); + currentBlockGroup.setNumBytes(ackedBytes); + newBG.setNumBytes(ackedBytes); dfsClient.namenode.updatePipeline(dfsClient.clientName, currentBlockGroup, newBG, newNodes, newStorageIDs); currentBlockGroup = newBG; + currentBlockGroup.setNumBytes(sentBytes); + } + + /** + * Get the number of acked stripes. An acked stripe means at least data block + * number size cells of the stripe were acked. + */ + private long getNumAckedStripes() { + int minStripeNum = Integer.MAX_VALUE; + for (int i = 0; i < numAllBlocks; i++) { + final StripedDataStreamer streamer = getStripedDataStreamer(i); + if (streamer.isHealthy()) { + int curStripeNum = 0; + if (streamer.getBlock() != null) { + curStripeNum = (int) (streamer.getBlock().getNumBytes() / cellSize); + } + minStripeNum = Math.min(curStripeNum, minStripeNum); + } + } + assert minStripeNum != Integer.MAX_VALUE; + return minStripeNum; } private int stripeDataSize() {