HDFS-8223. Should calculate checksum for parity blocks in DFSStripedOutputStream. Contributed by Yi Liu.

This commit is contained in:
Jing Zhao 2015-04-23 15:48:21 -07:00 committed by Zhe Zhang
parent eb612b0b70
commit 3f2c6938f1
3 changed files with 17 additions and 0 deletions

View File

@ -196,6 +196,10 @@ protected int getChecksumSize() {
return sum.getChecksumSize(); return sum.getChecksumSize();
} }
protected DataChecksum getDataChecksum() {
return sum;
}
protected TraceScope createWriteTraceScope() { protected TraceScope createWriteTraceScope() {
return NullScope.INSTANCE; return NullScope.INSTANCE;
} }

View File

@ -125,3 +125,6 @@
HDFS-8233. Fix DFSStripedOutputStream#getCurrentBlockGroupBytes when the last HDFS-8233. Fix DFSStripedOutputStream#getCurrentBlockGroupBytes when the last
stripe is at the block group boundary. (jing9) stripe is at the block group boundary. (jing9)
HDFS-8223. Should calculate checksum for parity blocks in DFSStripedOutputStream.
(Yi Liu via jing9)

View File

@ -62,6 +62,8 @@ public class DFSStripedOutputStream extends DFSOutputStream {
*/ */
private final ECInfo ecInfo; private final ECInfo ecInfo;
private final int cellSize; private final int cellSize;
// checksum buffer, we only need to calculate checksum for parity blocks
private byte[] checksumBuf;
private ByteBuffer[] cellBuffers; private ByteBuffer[] cellBuffers;
private final short numAllBlocks; private final short numAllBlocks;
@ -99,6 +101,7 @@ private long getBlockGroupSize() {
checkConfiguration(); checkConfiguration();
checksumBuf = new byte[getChecksumSize() * (cellSize / bytesPerChecksum)];
cellBuffers = new ByteBuffer[numAllBlocks]; cellBuffers = new ByteBuffer[numAllBlocks];
List<BlockingQueue<LocatedBlock>> stripeBlocks = new ArrayList<>(); List<BlockingQueue<LocatedBlock>> stripeBlocks = new ArrayList<>();
@ -179,6 +182,10 @@ private void encode(ByteBuffer[] buffers) {
private List<DFSPacket> generatePackets(ByteBuffer byteBuffer) private List<DFSPacket> generatePackets(ByteBuffer byteBuffer)
throws IOException{ throws IOException{
List<DFSPacket> packets = new ArrayList<>(); List<DFSPacket> packets = new ArrayList<>();
assert byteBuffer.hasArray();
getDataChecksum().calculateChunkedSums(byteBuffer.array(), 0,
byteBuffer.remaining(), checksumBuf, 0);
int ckOff = 0;
while (byteBuffer.remaining() > 0) { while (byteBuffer.remaining() > 0) {
DFSPacket p = createPacket(packetSize, chunksPerPacket, DFSPacket p = createPacket(packetSize, chunksPerPacket,
streamer.getBytesCurBlock(), streamer.getBytesCurBlock(),
@ -186,6 +193,9 @@ private List<DFSPacket> generatePackets(ByteBuffer byteBuffer)
int maxBytesToPacket = p.getMaxChunks() * bytesPerChecksum; int maxBytesToPacket = p.getMaxChunks() * bytesPerChecksum;
int toWrite = byteBuffer.remaining() > maxBytesToPacket ? int toWrite = byteBuffer.remaining() > maxBytesToPacket ?
maxBytesToPacket: byteBuffer.remaining(); maxBytesToPacket: byteBuffer.remaining();
int ckLen = ((toWrite - 1) / bytesPerChecksum + 1) * getChecksumSize();
p.writeChecksum(checksumBuf, ckOff, ckLen);
ckOff += ckLen;
p.writeData(byteBuffer, toWrite); p.writeData(byteBuffer, toWrite);
streamer.incBytesCurBlock(toWrite); streamer.incBytesCurBlock(toWrite);
packets.add(p); packets.add(p);