HDFS-9342. Erasure coding: client should update and commit block based on acknowledged size. Contributed by SammiChen.
This commit is contained in:
parent
8065129d87
commit
a9a3d219fe
|
@ -772,9 +772,37 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|||
newStorageIDs[i] = "";
|
||||
}
|
||||
}
|
||||
|
||||
// should update the block group length based on the acked length
|
||||
final long sentBytes = currentBlockGroup.getNumBytes();
|
||||
final long ackedBytes = getNumAckedStripes() * cellSize * numDataBlocks;
|
||||
Preconditions.checkState(ackedBytes <= sentBytes);
|
||||
currentBlockGroup.setNumBytes(ackedBytes);
|
||||
newBG.setNumBytes(ackedBytes);
|
||||
dfsClient.namenode.updatePipeline(dfsClient.clientName, currentBlockGroup,
|
||||
newBG, newNodes, newStorageIDs);
|
||||
currentBlockGroup = newBG;
|
||||
currentBlockGroup.setNumBytes(sentBytes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of acked stripes. An acked stripe means at least data block
|
||||
* number size cells of the stripe were acked.
|
||||
*/
|
||||
private long getNumAckedStripes() {
|
||||
int minStripeNum = Integer.MAX_VALUE;
|
||||
for (int i = 0; i < numAllBlocks; i++) {
|
||||
final StripedDataStreamer streamer = getStripedDataStreamer(i);
|
||||
if (streamer.isHealthy()) {
|
||||
int curStripeNum = 0;
|
||||
if (streamer.getBlock() != null) {
|
||||
curStripeNum = (int) (streamer.getBlock().getNumBytes() / cellSize);
|
||||
}
|
||||
minStripeNum = Math.min(curStripeNum, minStripeNum);
|
||||
}
|
||||
}
|
||||
assert minStripeNum != Integer.MAX_VALUE;
|
||||
return minStripeNum;
|
||||
}
|
||||
|
||||
private int stripeDataSize() {
|
||||
|
|
Loading…
Reference in New Issue