diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java index bdc55853d8b..a8a74945e5b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java @@ -196,6 +196,10 @@ abstract public class FSOutputSummer extends OutputStream { return sum.getChecksumSize(); } + protected DataChecksum getDataChecksum() { + return sum; + } + protected TraceScope createWriteTraceScope() { return NullScope.INSTANCE; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index 48791b134a6..9357e23fddd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -125,3 +125,6 @@ HDFS-8233. Fix DFSStripedOutputStream#getCurrentBlockGroupBytes when the last stripe is at the block group boundary. (jing9) + + HDFS-8223. Should calculate checksum for parity blocks in DFSStripedOutputStream. + (Yi Liu via jing9) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 245dfc10b6e..68422675a1e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -62,6 +62,8 @@ public class DFSStripedOutputStream extends DFSOutputStream { */ private final ECInfo ecInfo; private final int cellSize; + // checksum buffer, we only need to calculate checksum for parity blocks + private byte[] checksumBuf; private ByteBuffer[] cellBuffers; private final short numAllBlocks; @@ -99,6 +101,7 @@ public class DFSStripedOutputStream extends DFSOutputStream { checkConfiguration(); + checksumBuf = new byte[getChecksumSize() * (cellSize / bytesPerChecksum)]; cellBuffers = new ByteBuffer[numAllBlocks]; List> stripeBlocks = new ArrayList<>(); @@ -179,6 +182,10 @@ public class DFSStripedOutputStream extends DFSOutputStream { private List generatePackets(ByteBuffer byteBuffer) throws IOException{ List packets = new ArrayList<>(); + assert byteBuffer.hasArray(); + getDataChecksum().calculateChunkedSums(byteBuffer.array(), 0, + byteBuffer.remaining(), checksumBuf, 0); + int ckOff = 0; while (byteBuffer.remaining() > 0) { DFSPacket p = createPacket(packetSize, chunksPerPacket, streamer.getBytesCurBlock(), @@ -186,6 +193,9 @@ public class DFSStripedOutputStream extends DFSOutputStream { int maxBytesToPacket = p.getMaxChunks() * bytesPerChecksum; int toWrite = byteBuffer.remaining() > maxBytesToPacket ? maxBytesToPacket: byteBuffer.remaining(); + int ckLen = ((toWrite - 1) / bytesPerChecksum + 1) * getChecksumSize(); + p.writeChecksum(checksumBuf, ckOff, ckLen); + ckOff += ckLen; p.writeData(byteBuffer, toWrite); streamer.incBytesCurBlock(toWrite); packets.add(p);