HDFS-6865. Byte array native checksumming on client side. Contributed by James Thomas.

(cherry picked from commit ab638e77b8)
This commit is contained in:
Todd Lipcon 2014-08-28 16:44:09 -07:00
parent 4f95017602
commit ed2a997f49
11 changed files with 108 additions and 72 deletions

View File

@ -381,7 +381,8 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
long blockSize, long blockSize,
Progressable progress) Progressable progress)
throws IOException { throws IOException {
super(DataChecksum.newCrc32(), fs.getBytesPerSum(), 4); super(DataChecksum.newDataChecksum(DataChecksum.Type.CRC32,
fs.getBytesPerSum()));
int bytesPerSum = fs.getBytesPerSum(); int bytesPerSum = fs.getBytesPerSum();
this.datas = fs.getRawFileSystem().create(file, overwrite, bufferSize, this.datas = fs.getRawFileSystem().create(file, overwrite, bufferSize,
replication, blockSize, progress); replication, blockSize, progress);
@ -405,10 +406,11 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
} }
@Override @Override
protected void writeChunk(byte[] b, int offset, int len, byte[] checksum) protected void writeChunk(byte[] b, int offset, int len, byte[] checksum,
int ckoff, int cklen)
throws IOException { throws IOException {
datas.write(b, offset, len); datas.write(b, offset, len);
sums.write(checksum); sums.write(checksum, ckoff, cklen);
} }
@Override @Override

View File

@ -337,7 +337,8 @@ public abstract class ChecksumFs extends FilterFs {
final short replication, final long blockSize, final short replication, final long blockSize,
final Progressable progress, final ChecksumOpt checksumOpt, final Progressable progress, final ChecksumOpt checksumOpt,
final boolean createParent) throws IOException { final boolean createParent) throws IOException {
super(DataChecksum.newCrc32(), fs.getBytesPerSum(), 4); super(DataChecksum.newDataChecksum(DataChecksum.Type.CRC32,
fs.getBytesPerSum()));
// checksumOpt is passed down to the raw fs. Unless it implements // checksumOpt is passed down to the raw fs. Unless it implements
// checksum impelemts internally, checksumOpt will be ignored. // checksum impelemts internally, checksumOpt will be ignored.
@ -370,10 +371,11 @@ public abstract class ChecksumFs extends FilterFs {
} }
@Override @Override
protected void writeChunk(byte[] b, int offset, int len, byte[] checksum) protected void writeChunk(byte[] b, int offset, int len, byte[] checksum,
int ckoff, int cklen)
throws IOException { throws IOException {
datas.write(b, offset, len); datas.write(b, offset, len);
sums.write(checksum); sums.write(checksum, ckoff, cklen);
} }
@Override @Override

View File

@ -18,13 +18,14 @@
package org.apache.hadoop.fs; package org.apache.hadoop.fs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.util.DataChecksum;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.zip.Checksum; import java.util.zip.Checksum;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/** /**
* This is a generic output stream for generating checksums for * This is a generic output stream for generating checksums for
* data before it is written to the underlying stream * data before it is written to the underlying stream
@ -33,7 +34,7 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceStability.Unstable @InterfaceStability.Unstable
abstract public class FSOutputSummer extends OutputStream { abstract public class FSOutputSummer extends OutputStream {
// data checksum // data checksum
private Checksum sum; private final DataChecksum sum;
// internal buffer for storing data before it is checksumed // internal buffer for storing data before it is checksumed
private byte buf[]; private byte buf[];
// internal buffer for storing checksum // internal buffer for storing checksum
@ -41,18 +42,24 @@ abstract public class FSOutputSummer extends OutputStream {
// The number of valid bytes in the buffer. // The number of valid bytes in the buffer.
private int count; private int count;
protected FSOutputSummer(Checksum sum, int maxChunkSize, int checksumSize) { // We want this value to be a multiple of 3 because the native code checksums
// 3 chunks simultaneously. The chosen value of 9 strikes a balance between
// limiting the number of JNI calls and flushing to the underlying stream
// relatively frequently.
private static final int BUFFER_NUM_CHUNKS = 9;
protected FSOutputSummer(DataChecksum sum) {
this.sum = sum; this.sum = sum;
this.buf = new byte[maxChunkSize]; this.buf = new byte[sum.getBytesPerChecksum() * BUFFER_NUM_CHUNKS];
this.checksum = new byte[checksumSize]; this.checksum = new byte[sum.getChecksumSize() * BUFFER_NUM_CHUNKS];
this.count = 0; this.count = 0;
} }
/* write the data chunk in <code>b</code> staring at <code>offset</code> with /* write the data chunk in <code>b</code> staring at <code>offset</code> with
* a length of <code>len</code>, and its checksum * a length of <code>len > 0</code>, and its checksum
*/ */
protected abstract void writeChunk(byte[] b, int offset, int len, byte[] checksum) protected abstract void writeChunk(byte[] b, int bOffset, int bLen,
throws IOException; byte[] checksum, int checksumOffset, int checksumLen) throws IOException;
/** /**
* Check if the implementing OutputStream is closed and should no longer * Check if the implementing OutputStream is closed and should no longer
@ -66,7 +73,6 @@ abstract public class FSOutputSummer extends OutputStream {
/** Write one byte */ /** Write one byte */
@Override @Override
public synchronized void write(int b) throws IOException { public synchronized void write(int b) throws IOException {
sum.update(b);
buf[count++] = (byte)b; buf[count++] = (byte)b;
if(count == buf.length) { if(count == buf.length) {
flushBuffer(); flushBuffer();
@ -111,18 +117,17 @@ abstract public class FSOutputSummer extends OutputStream {
*/ */
private int write1(byte b[], int off, int len) throws IOException { private int write1(byte b[], int off, int len) throws IOException {
if(count==0 && len>=buf.length) { if(count==0 && len>=buf.length) {
// local buffer is empty and user data has one chunk // local buffer is empty and user buffer size >= local buffer size, so
// checksum and output data // simply checksum the user buffer and send it directly to the underlying
// stream
final int length = buf.length; final int length = buf.length;
sum.update(b, off, length); writeChecksumChunks(b, off, length);
writeChecksumChunk(b, off, length, false);
return length; return length;
} }
// copy user data to local buffer // copy user data to local buffer
int bytesToCopy = buf.length-count; int bytesToCopy = buf.length-count;
bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy; bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;
sum.update(b, off, bytesToCopy);
System.arraycopy(b, off, buf, count, bytesToCopy); System.arraycopy(b, off, buf, count, bytesToCopy);
count += bytesToCopy; count += bytesToCopy;
if (count == buf.length) { if (count == buf.length) {
@ -136,22 +141,45 @@ abstract public class FSOutputSummer extends OutputStream {
* the underlying output stream. * the underlying output stream.
*/ */
protected synchronized void flushBuffer() throws IOException { protected synchronized void flushBuffer() throws IOException {
flushBuffer(false); flushBuffer(false, true);
} }
/* Forces any buffered output bytes to be checksumed and written out to /* Forces buffered output bytes to be checksummed and written out to
* the underlying output stream. If keep is true, then the state of * the underlying output stream. If there is a trailing partial chunk in the
* this object remains intact. * buffer,
* 1) flushPartial tells us whether to flush that chunk
* 2) if flushPartial is true, keep tells us whether to keep that chunk in the
* buffer (if flushPartial is false, it is always kept in the buffer)
*
* Returns the number of bytes that were flushed but are still left in the
* buffer (can only be non-zero if keep is true).
*/ */
protected synchronized void flushBuffer(boolean keep) throws IOException { protected synchronized int flushBuffer(boolean keep,
if (count != 0) { boolean flushPartial) throws IOException {
int chunkLen = count; int bufLen = count;
int partialLen = bufLen % sum.getBytesPerChecksum();
int lenToFlush = flushPartial ? bufLen : bufLen - partialLen;
if (lenToFlush != 0) {
writeChecksumChunks(buf, 0, lenToFlush);
if (!flushPartial || keep) {
count = partialLen;
System.arraycopy(buf, bufLen - count, buf, 0, count);
} else {
count = 0; count = 0;
writeChecksumChunk(buf, 0, chunkLen, keep);
if (keep) {
count = chunkLen;
} }
} }
// total bytes left minus unflushed bytes left
return count - (bufLen - lenToFlush);
}
/**
* Checksums all complete data chunks and flushes them to the underlying
* stream. If there is a trailing partial chunk, it is not flushed and is
* maintained in the buffer.
*/
public void flush() throws IOException {
flushBuffer(false, false);
} }
/** /**
@ -161,18 +189,18 @@ abstract public class FSOutputSummer extends OutputStream {
return count; return count;
} }
/** Generate checksum for the data chunk and output data chunk & checksum /** Generate checksums for the given data chunks and output chunks & checksums
* to the underlying output stream. If keep is true then keep the * to the underlying output stream.
* current checksum intact, do not reset it.
*/ */
private void writeChecksumChunk(byte b[], int off, int len, boolean keep) private void writeChecksumChunks(byte b[], int off, int len)
throws IOException { throws IOException {
int tempChecksum = (int)sum.getValue(); sum.calculateChunkedSums(b, off, len, checksum, 0);
if (!keep) { for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
sum.reset(); int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
int ckOffset = i / sum.getBytesPerChecksum() * sum.getChecksumSize();
writeChunk(b, off + i, chunkLen, checksum, ckOffset,
sum.getChecksumSize());
} }
int2byte(tempChecksum, checksum);
writeChunk(b, off, len, checksum);
} }
/** /**
@ -196,9 +224,14 @@ abstract public class FSOutputSummer extends OutputStream {
/** /**
* Resets existing buffer with a new one of the specified size. * Resets existing buffer with a new one of the specified size.
*/ */
protected synchronized void resetChecksumChunk(int size) { protected synchronized void setChecksumBufSize(int size) {
sum.reset();
this.buf = new byte[size]; this.buf = new byte[size];
this.checksum = new byte[((size - 1) / sum.getBytesPerChecksum() + 1) *
sum.getChecksumSize()];
this.count = 0; this.count = 0;
} }
protected synchronized void resetChecksumBufSize() {
setChecksumBufSize(sum.getBytesPerChecksum() * BUFFER_NUM_CHUNKS);
}
} }

View File

@ -339,6 +339,7 @@ public class DataChecksum implements Checksum {
byte[] data, int dataOff, int dataLen, byte[] data, int dataOff, int dataLen,
byte[] checksums, int checksumsOff, String fileName, byte[] checksums, int checksumsOff, String fileName,
long basePos) throws ChecksumException { long basePos) throws ChecksumException {
if (type.size == 0) return;
if (NativeCrc32.isAvailable()) { if (NativeCrc32.isAvailable()) {
NativeCrc32.verifyChunkedSumsByteArray(bytesPerChecksum, type.id, NativeCrc32.verifyChunkedSumsByteArray(bytesPerChecksum, type.id,
@ -421,6 +422,7 @@ public class DataChecksum implements Checksum {
public void calculateChunkedSums( public void calculateChunkedSums(
byte[] data, int dataOffset, int dataLength, byte[] data, int dataOffset, int dataLength,
byte[] sums, int sumsOffset) { byte[] sums, int sumsOffset) {
if (type.size == 0) return;
if (NativeCrc32.isAvailable()) { if (NativeCrc32.isAvailable()) {
NativeCrc32.calculateChunkedSumsByteArray(bytesPerChecksum, type.id, NativeCrc32.calculateChunkedSumsByteArray(bytesPerChecksum, type.id,

View File

@ -42,7 +42,7 @@ class NativeCrc32 {
* modified. * modified.
* *
* @param bytesPerSum the chunk size (eg 512 bytes) * @param bytesPerSum the chunk size (eg 512 bytes)
* @param checksumType the DataChecksum type constant * @param checksumType the DataChecksum type constant (NULL is not supported)
* @param sums the DirectByteBuffer pointing at the beginning of the * @param sums the DirectByteBuffer pointing at the beginning of the
* stored checksums * stored checksums
* @param data the DirectByteBuffer pointing at the beginning of the * @param data the DirectByteBuffer pointing at the beginning of the

View File

@ -175,6 +175,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6773. MiniDFSCluster should skip edit log fsync by default (Stephen HDFS-6773. MiniDFSCluster should skip edit log fsync by default (Stephen
Chu via Colin Patrick McCabe) Chu via Colin Patrick McCabe)
HDFS-6865. Byte array native checksumming on client side
(James Thomas via todd)
BUG FIXES BUG FIXES
HDFS-6823. dfs.web.authentication.kerberos.principal shows up in logs for HDFS-6823. dfs.web.authentication.kerberos.principal shows up in logs for

View File

@ -398,7 +398,7 @@ public class DFSOutputStream extends FSOutputSummer
// one chunk that fills up the partial chunk. // one chunk that fills up the partial chunk.
// //
computePacketChunkSize(0, freeInCksum); computePacketChunkSize(0, freeInCksum);
resetChecksumChunk(freeInCksum); setChecksumBufSize(freeInCksum);
appendChunk = true; appendChunk = true;
} else { } else {
// if the remaining space in the block is smaller than // if the remaining space in the block is smaller than
@ -1563,7 +1563,7 @@ public class DFSOutputStream extends FSOutputSummer
private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress, private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress,
HdfsFileStatus stat, DataChecksum checksum) throws IOException { HdfsFileStatus stat, DataChecksum checksum) throws IOException {
super(checksum, checksum.getBytesPerChecksum(), checksum.getChecksumSize()); super(checksum);
this.dfsClient = dfsClient; this.dfsClient = dfsClient;
this.src = src; this.src = src;
this.fileId = stat.getFileId(); this.fileId = stat.getFileId();
@ -1717,22 +1717,21 @@ public class DFSOutputStream extends FSOutputSummer
// @see FSOutputSummer#writeChunk() // @see FSOutputSummer#writeChunk()
@Override @Override
protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum) protected synchronized void writeChunk(byte[] b, int offset, int len,
throws IOException { byte[] checksum, int ckoff, int cklen) throws IOException {
dfsClient.checkOpen(); dfsClient.checkOpen();
checkClosed(); checkClosed();
int cklen = checksum.length;
int bytesPerChecksum = this.checksum.getBytesPerChecksum(); int bytesPerChecksum = this.checksum.getBytesPerChecksum();
if (len > bytesPerChecksum) { if (len > bytesPerChecksum) {
throw new IOException("writeChunk() buffer size is " + len + throw new IOException("writeChunk() buffer size is " + len +
" is larger than supported bytesPerChecksum " + " is larger than supported bytesPerChecksum " +
bytesPerChecksum); bytesPerChecksum);
} }
if (checksum.length != this.checksum.getChecksumSize()) { if (cklen != this.checksum.getChecksumSize()) {
throw new IOException("writeChunk() checksum size is supposed to be " + throw new IOException("writeChunk() checksum size is supposed to be " +
this.checksum.getChecksumSize() + this.checksum.getChecksumSize() +
" but found to be " + checksum.length); " but found to be " + cklen);
} }
if (currentPacket == null) { if (currentPacket == null) {
@ -1748,7 +1747,7 @@ public class DFSOutputStream extends FSOutputSummer
} }
} }
currentPacket.writeChecksum(checksum, 0, cklen); currentPacket.writeChecksum(checksum, ckoff, cklen);
currentPacket.writeData(b, offset, len); currentPacket.writeData(b, offset, len);
currentPacket.numChunks++; currentPacket.numChunks++;
bytesCurBlock += len; bytesCurBlock += len;
@ -1772,7 +1771,7 @@ public class DFSOutputStream extends FSOutputSummer
// crc chunks from now on. // crc chunks from now on.
if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) { if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {
appendChunk = false; appendChunk = false;
resetChecksumChunk(bytesPerChecksum); resetChecksumBufSize();
} }
if (!appendChunk) { if (!appendChunk) {
@ -1859,20 +1858,13 @@ public class DFSOutputStream extends FSOutputSummer
long lastBlockLength = -1L; long lastBlockLength = -1L;
boolean updateLength = syncFlags.contains(SyncFlag.UPDATE_LENGTH); boolean updateLength = syncFlags.contains(SyncFlag.UPDATE_LENGTH);
synchronized (this) { synchronized (this) {
/* Record current blockOffset. This might be changed inside
* flushBuffer() where a partial checksum chunk might be flushed.
* After the flush, reset the bytesCurBlock back to its previous value,
* any partial checksum chunk will be sent now and in next packet.
*/
long saveOffset = bytesCurBlock;
Packet oldCurrentPacket = currentPacket;
// flush checksum buffer, but keep checksum buffer intact // flush checksum buffer, but keep checksum buffer intact
flushBuffer(true); int numKept = flushBuffer(true, true);
// bytesCurBlock potentially incremented if there was buffered data // bytesCurBlock potentially incremented if there was buffered data
if (DFSClient.LOG.isDebugEnabled()) { if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug( DFSClient.LOG.debug(
"DFSClient flush() : saveOffset " + saveOffset + "DFSClient flush() :" +
" bytesCurBlock " + bytesCurBlock + " bytesCurBlock " + bytesCurBlock +
" lastFlushOffset " + lastFlushOffset); " lastFlushOffset " + lastFlushOffset);
} }
@ -1889,14 +1881,6 @@ public class DFSOutputStream extends FSOutputSummer
bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize()); bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
} }
} else { } else {
// We already flushed up to this offset.
// This means that we haven't written anything since the last flush
// (or the beginning of the file). Hence, we should not have any
// packet queued prior to this call, since the last flush set
// currentPacket = null.
assert oldCurrentPacket == null :
"Empty flush should not occur with a currentPacket";
if (isSync && bytesCurBlock > 0) { if (isSync && bytesCurBlock > 0) {
// Nothing to send right now, // Nothing to send right now,
// and the block was partially written, // and the block was partially written,
@ -1916,7 +1900,7 @@ public class DFSOutputStream extends FSOutputSummer
// Restore state of stream. Record the last flush offset // Restore state of stream. Record the last flush offset
// of the last full chunk that was flushed. // of the last full chunk that was flushed.
// //
bytesCurBlock = saveOffset; bytesCurBlock -= numKept;
toWaitFor = lastQueuedSeqno; toWaitFor = lastQueuedSeqno;
} // end synchronized } // end synchronized

View File

@ -261,7 +261,9 @@ public class TestFileAppend{
start += 29; start += 29;
} }
stm.write(fileContents, start, AppendTestUtil.FILE_SIZE -start); stm.write(fileContents, start, AppendTestUtil.FILE_SIZE -start);
// need to make sure we completely write out all full blocks before
// the checkFile() call (see FSOutputSummer#flush)
stm.flush();
// verify that full blocks are sane // verify that full blocks are sane
checkFile(fs, file1, 1); checkFile(fs, file1, 1);
stm.close(); stm.close();

View File

@ -394,6 +394,8 @@ public class TestBlockToken {
Path filePath = new Path(fileName); Path filePath = new Path(fileName);
FSDataOutputStream out = fs.create(filePath, (short) 1); FSDataOutputStream out = fs.create(filePath, (short) 1);
out.write(new byte[1000]); out.write(new byte[1000]);
// ensure that the first block is written out (see FSOutputSummer#flush)
out.flush();
LocatedBlocks locatedBlocks = cluster.getNameNodeRpc().getBlockLocations( LocatedBlocks locatedBlocks = cluster.getNameNodeRpc().getBlockLocations(
fileName, 0, 1000); fileName, 0, 1000);
while (locatedBlocks.getLastLocatedBlock() == null) { while (locatedBlocks.getLastLocatedBlock() == null) {

View File

@ -70,6 +70,9 @@ public class TestBlockUnderConstruction {
long blocksBefore = stm.getPos() / BLOCK_SIZE; long blocksBefore = stm.getPos() / BLOCK_SIZE;
TestFileCreation.writeFile(stm, BLOCK_SIZE); TestFileCreation.writeFile(stm, BLOCK_SIZE);
// need to make sure the full block is completely flushed to the DataNodes
// (see FSOutputSummer#flush)
stm.flush();
int blocksAfter = 0; int blocksAfter = 0;
// wait until the block is allocated by DataStreamer // wait until the block is allocated by DataStreamer
BlockLocation[] locatedBlocks; BlockLocation[] locatedBlocks;

View File

@ -154,6 +154,9 @@ public class TestDecommissioningStatus {
Random rand = new Random(seed); Random rand = new Random(seed);
rand.nextBytes(buffer); rand.nextBytes(buffer);
stm.write(buffer); stm.write(buffer);
// need to make sure that we actually write out both file blocks
// (see FSOutputSummer#flush)
stm.flush();
// Do not close stream, return it // Do not close stream, return it
// so that it is not garbage collected // so that it is not garbage collected
return stm; return stm;