HDFS-6561. org.apache.hadoop.util.DataChecksum should support native checksumming (James Thomas via Colin Patrick McCabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1618680 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Colin McCabe 2014-08-18 18:02:37 +00:00
parent f8e871d01b
commit e446497cd1
8 changed files with 197 additions and 152 deletions

View File

@ -527,6 +527,9 @@ Release 2.6.0 - UNRELEASED
HADOOP-10335. An ip whilelist based implementation to resolve Sasl
properties per connection. (Benoy Antony via Arpit Agarwal)
HDFS-6561. org.apache.hadoop.util.DataChecksum should support native
checksumming (James Thomas via Colin Patrick McCabe)
OPTIMIZATIONS
HADOOP-10838. Byte array native checksumming. (James Thomas via todd)

View File

@ -391,6 +391,12 @@ public class DataChecksum implements Checksum {
return;
}
if (NativeCrc32.isAvailable()) {
NativeCrc32.calculateChunkedSums(bytesPerChecksum, type.id,
checksums, data);
return;
}
data.mark();
checksums.mark();
try {
@ -412,10 +418,16 @@ public class DataChecksum implements Checksum {
* Implementation of chunked calculation specifically on byte arrays. This
* is to avoid the copy when dealing with ByteBuffers that have array backing.
*/
private void calculateChunkedSums(
public void calculateChunkedSums(
byte[] data, int dataOffset, int dataLength,
byte[] sums, int sumsOffset) {
if (NativeCrc32.isAvailable()) {
NativeCrc32.calculateChunkedSumsByteArray(bytesPerChecksum, type.id,
sums, sumsOffset, data, dataOffset, dataLength);
return;
}
int remaining = dataLength;
while (remaining > 0) {
int n = Math.min(remaining, bytesPerChecksum);

View File

@ -54,33 +54,50 @@ class NativeCrc32 {
public static void verifyChunkedSums(int bytesPerSum, int checksumType,
ByteBuffer sums, ByteBuffer data, String fileName, long basePos)
throws ChecksumException {
nativeVerifyChunkedSums(bytesPerSum, checksumType,
nativeComputeChunkedSums(bytesPerSum, checksumType,
sums, sums.position(),
data, data.position(), data.remaining(),
fileName, basePos);
fileName, basePos, true);
}
public static void verifyChunkedSumsByteArray(int bytesPerSum,
int checksumType, byte[] sums, int sumsOffset, byte[] data,
int dataOffset, int dataLength, String fileName, long basePos)
throws ChecksumException {
nativeVerifyChunkedSumsByteArray(bytesPerSum, checksumType,
nativeComputeChunkedSumsByteArray(bytesPerSum, checksumType,
sums, sumsOffset,
data, dataOffset, dataLength,
fileName, basePos);
fileName, basePos, true);
}
private static native void nativeVerifyChunkedSums(
public static void calculateChunkedSums(int bytesPerSum, int checksumType,
ByteBuffer sums, ByteBuffer data) {
nativeComputeChunkedSums(bytesPerSum, checksumType,
sums, sums.position(),
data, data.position(), data.remaining(),
"", 0, false);
}
public static void calculateChunkedSumsByteArray(int bytesPerSum,
int checksumType, byte[] sums, int sumsOffset, byte[] data,
int dataOffset, int dataLength) {
nativeComputeChunkedSumsByteArray(bytesPerSum, checksumType,
sums, sumsOffset,
data, dataOffset, dataLength,
"", 0, false);
}
private static native void nativeComputeChunkedSums(
int bytesPerSum, int checksumType,
ByteBuffer sums, int sumsOffset,
ByteBuffer data, int dataOffset, int dataLength,
String fileName, long basePos);
String fileName, long basePos, boolean verify);
private static native void nativeVerifyChunkedSumsByteArray(
private static native void nativeComputeChunkedSumsByteArray(
int bytesPerSum, int checksumType,
byte[] sums, int sumsOffset,
byte[] data, int dataOffset, int dataLength,
String fileName, long basePos);
String fileName, long basePos, boolean verify);
// Copy the constants over from DataChecksum so that javah will pick them up
// and make them available in the native code header.

View File

@ -117,12 +117,12 @@ static int convert_java_crc_type(JNIEnv *env, jint crc_type) {
}
}
JNIEXPORT void JNICALL Java_org_apache_hadoop_util_NativeCrc32_nativeVerifyChunkedSums
JNIEXPORT void JNICALL Java_org_apache_hadoop_util_NativeCrc32_nativeComputeChunkedSums
(JNIEnv *env, jclass clazz,
jint bytes_per_checksum, jint j_crc_type,
jobject j_sums, jint sums_offset,
jobject j_data, jint data_offset, jint data_len,
jstring j_filename, jlong base_pos)
jstring j_filename, jlong base_pos, jboolean verify)
{
uint8_t *sums_addr;
uint8_t *data_addr;
@ -166,27 +166,27 @@ JNIEXPORT void JNICALL Java_org_apache_hadoop_util_NativeCrc32_nativeVerifyChunk
if (crc_type == -1) return; // exception already thrown
// Setup complete. Actually verify checksums.
ret = bulk_verify_crc(data, data_len, sums, crc_type,
bytes_per_checksum, &error_data);
if (likely(ret == CHECKSUMS_VALID)) {
ret = bulk_crc(data, data_len, sums, crc_type,
bytes_per_checksum, verify ? &error_data : NULL);
if (likely(verify && ret == CHECKSUMS_VALID || !verify && ret == 0)) {
return;
} else if (unlikely(ret == INVALID_CHECKSUM_DETECTED)) {
} else if (unlikely(verify && ret == INVALID_CHECKSUM_DETECTED)) {
long pos = base_pos + (error_data.bad_data - data);
throw_checksum_exception(
env, error_data.got_crc, error_data.expected_crc,
j_filename, pos);
} else {
THROW(env, "java/lang/AssertionError",
"Bad response code from native bulk_verify_crc");
"Bad response code from native bulk_crc");
}
}
JNIEXPORT void JNICALL Java_org_apache_hadoop_util_NativeCrc32_nativeVerifyChunkedSumsByteArray
JNIEXPORT void JNICALL Java_org_apache_hadoop_util_NativeCrc32_nativeComputeChunkedSumsByteArray
(JNIEnv *env, jclass clazz,
jint bytes_per_checksum, jint j_crc_type,
jarray j_sums, jint sums_offset,
jarray j_data, jint data_offset, jint data_len,
jstring j_filename, jlong base_pos)
jstring j_filename, jlong base_pos, jboolean verify)
{
uint8_t *sums_addr;
uint8_t *data_addr;
@ -237,21 +237,21 @@ JNIEXPORT void JNICALL Java_org_apache_hadoop_util_NativeCrc32_nativeVerifyChunk
data = data_addr + data_offset + checksumNum * bytes_per_checksum;
// Setup complete. Actually verify checksums.
ret = bulk_verify_crc(data, MIN(numChecksumsPerIter * bytes_per_checksum,
data_len - checksumNum * bytes_per_checksum),
sums, crc_type, bytes_per_checksum, &error_data);
ret = bulk_crc(data, MIN(numChecksumsPerIter * bytes_per_checksum,
data_len - checksumNum * bytes_per_checksum),
sums, crc_type, bytes_per_checksum, verify ? &error_data : NULL);
(*env)->ReleasePrimitiveArrayCritical(env, j_data, data_addr, 0);
(*env)->ReleasePrimitiveArrayCritical(env, j_sums, sums_addr, 0);
if (unlikely(ret == INVALID_CHECKSUM_DETECTED)) {
if (unlikely(verify && ret == INVALID_CHECKSUM_DETECTED)) {
long pos = base_pos + (error_data.bad_data - data) + checksumNum *
bytes_per_checksum;
throw_checksum_exception(
env, error_data.got_crc, error_data.expected_crc,
j_filename, pos);
return;
} else if (unlikely(ret != CHECKSUMS_VALID)) {
} else if (unlikely(verify && ret != CHECKSUMS_VALID || !verify && ret != 0)) {
THROW(env, "java/lang/AssertionError",
"Bad response code from native bulk_verify_crc");
"Bad response code from native bulk_crc");
return;
}
checksumNum += numChecksumsPerIter;

View File

@ -55,40 +55,23 @@ static void pipelined_crc32c(uint32_t *crc1, uint32_t *crc2, uint32_t *crc3, con
static int cached_cpu_supports_crc32; // initialized by constructor below
static uint32_t crc32c_hardware(uint32_t crc, const uint8_t* data, size_t length);
int bulk_calculate_crc(const uint8_t *data, size_t data_len,
uint32_t *sums, int checksum_type,
int bytes_per_checksum) {
uint32_t crc;
crc_update_func_t crc_update_func;
switch (checksum_type) {
case CRC32_ZLIB_POLYNOMIAL:
crc_update_func = crc32_zlib_sb8;
break;
case CRC32C_POLYNOMIAL:
crc_update_func = crc32c_sb8;
break;
default:
return -EINVAL;
break;
static inline int store_or_verify(uint32_t *sums, uint32_t crc,
int is_verify) {
if (!is_verify) {
*sums = crc;
return 1;
} else {
return crc == *sums;
}
while (likely(data_len > 0)) {
int len = likely(data_len >= bytes_per_checksum) ? bytes_per_checksum : data_len;
crc = CRC_INITIAL_VAL;
crc = crc_update_func(crc, data, len);
*sums = ntohl(crc_val(crc));
data += len;
data_len -= len;
sums++;
}
return 0;
}
int bulk_verify_crc(const uint8_t *data, size_t data_len,
const uint32_t *sums, int checksum_type,
int bulk_crc(const uint8_t *data, size_t data_len,
uint32_t *sums, int checksum_type,
int bytes_per_checksum,
crc32_error_t *error_info) {
int is_verify = error_info != NULL;
#ifdef USE_PIPELINED
uint32_t crc1, crc2, crc3;
int n_blocks = data_len / bytes_per_checksum;
@ -112,7 +95,7 @@ int bulk_verify_crc(const uint8_t *data, size_t data_len,
}
break;
default:
return INVALID_CHECKSUM_TYPE;
return is_verify ? INVALID_CHECKSUM_TYPE : -EINVAL;
}
#ifdef USE_PIPELINED
@ -122,16 +105,15 @@ int bulk_verify_crc(const uint8_t *data, size_t data_len,
crc1 = crc2 = crc3 = CRC_INITIAL_VAL;
pipelined_crc32c(&crc1, &crc2, &crc3, data, bytes_per_checksum, 3);
crc = ntohl(crc_val(crc1));
if ((crc = ntohl(crc_val(crc1))) != *sums)
if (unlikely(!store_or_verify(sums, (crc = ntohl(crc_val(crc1))), is_verify)))
goto return_crc_error;
sums++;
data += bytes_per_checksum;
if ((crc = ntohl(crc_val(crc2))) != *sums)
if (unlikely(!store_or_verify(sums, (crc = ntohl(crc_val(crc2))), is_verify)))
goto return_crc_error;
sums++;
data += bytes_per_checksum;
if ((crc = ntohl(crc_val(crc3))) != *sums)
if (unlikely(!store_or_verify(sums, (crc = ntohl(crc_val(crc3))), is_verify)))
goto return_crc_error;
sums++;
data += bytes_per_checksum;
@ -143,12 +125,12 @@ int bulk_verify_crc(const uint8_t *data, size_t data_len,
crc1 = crc2 = crc3 = CRC_INITIAL_VAL;
pipelined_crc32c(&crc1, &crc2, &crc3, data, bytes_per_checksum, n_blocks);
if ((crc = ntohl(crc_val(crc1))) != *sums)
if (unlikely(!store_or_verify(sums, (crc = ntohl(crc_val(crc1))), is_verify)))
goto return_crc_error;
data += bytes_per_checksum;
sums++;
if (n_blocks == 2) {
if ((crc = ntohl(crc_val(crc2))) != *sums)
if (unlikely(!store_or_verify(sums, (crc = ntohl(crc_val(crc2))), is_verify)))
goto return_crc_error;
sums++;
data += bytes_per_checksum;
@ -160,10 +142,10 @@ int bulk_verify_crc(const uint8_t *data, size_t data_len,
crc1 = crc2 = crc3 = CRC_INITIAL_VAL;
pipelined_crc32c(&crc1, &crc2, &crc3, data, remainder, 1);
if ((crc = ntohl(crc_val(crc1))) != *sums)
if (unlikely(!store_or_verify(sums, (crc = ntohl(crc_val(crc1))), is_verify)))
goto return_crc_error;
}
return CHECKSUMS_VALID;
return is_verify ? CHECKSUMS_VALID : 0;
}
#endif
@ -172,14 +154,14 @@ int bulk_verify_crc(const uint8_t *data, size_t data_len,
crc = CRC_INITIAL_VAL;
crc = crc_update_func(crc, data, len);
crc = ntohl(crc_val(crc));
if (unlikely(crc != *sums)) {
if (unlikely(!store_or_verify(sums, crc, is_verify))) {
goto return_crc_error;
}
data += len;
data_len -= len;
sums++;
}
return CHECKSUMS_VALID;
return is_verify ? CHECKSUMS_VALID : 0;
return_crc_error:
if (error_info != NULL) {

View File

@ -42,49 +42,32 @@ typedef struct crc32_error {
/**
* Verify a buffer of data which is checksummed in chunks
* of bytes_per_checksum bytes. The checksums are each 32 bits
* and are stored in sequential indexes of the 'sums' array.
* Either calculates checksums for or verifies a buffer of data.
* Checksums performed in chunks of bytes_per_checksum bytes. The checksums
* are each 32 bits and are stored in sequential indexes of the 'sums' array.
* Verification is done (sums is assumed to already contain the checksums)
* if error_info is non-null; otherwise calculation is done and checksums
* are stored into sums.
*
* @param data The data to checksum
* @param dataLen Length of the data buffer
* @param sums (out param) buffer to write checksums into.
* It must contain at least dataLen * 4 bytes.
* @param sums (out param) buffer to write checksums into or
* where checksums are already stored.
* It must contain at least
* ((dataLen - 1) / bytes_per_checksum + 1) * 4 bytes.
* @param checksum_type One of the CRC32 algorithm constants defined
* above
* @param bytes_per_checksum How many bytes of data to process per checksum.
* @param error_info If non-NULL, will be filled in if an error
* is detected
* @param error_info If non-NULL, verification will be performed and
* it will be filled in if an error
* is detected. Otherwise calculation is performed.
*
* @return 0 for success, non-zero for an error, result codes
* for which are defined above
* for verification are defined above
*/
extern int bulk_verify_crc(const uint8_t *data, size_t data_len,
const uint32_t *sums, int checksum_type,
extern int bulk_crc(const uint8_t *data, size_t data_len,
uint32_t *sums, int checksum_type,
int bytes_per_checksum,
crc32_error_t *error_info);
/**
* Calculate checksums for some data.
*
* The checksums are each 32 bits and are stored in sequential indexes of the
* 'sums' array.
*
* This function is not (yet) optimized. It is provided for testing purposes
* only.
*
* @param data The data to checksum
* @param dataLen Length of the data buffer
* @param sums (out param) buffer to write checksums into.
* It must contain at least dataLen * 4 bytes.
* @param checksum_type One of the CRC32 algorithm constants defined
* above
* @param bytesPerChecksum How many bytes of data to process per checksum.
*
* @return 0 for success, non-zero for an error
*/
int bulk_calculate_crc(const uint8_t *data, size_t data_len,
uint32_t *sums, int checksum_type,
int bytes_per_checksum);
#endif

View File

@ -48,9 +48,9 @@ static int testBulkVerifyCrc(int dataLen, int crcType, int bytesPerChecksum)
sums = calloc(sizeof(uint32_t),
(dataLen + bytesPerChecksum - 1) / bytesPerChecksum);
EXPECT_ZERO(bulk_calculate_crc(data, dataLen, sums, crcType,
bytesPerChecksum));
EXPECT_ZERO(bulk_verify_crc(data, dataLen, sums, crcType,
EXPECT_ZERO(bulk_crc(data, dataLen, sums, crcType,
bytesPerChecksum, NULL));
EXPECT_ZERO(bulk_crc(data, dataLen, sums, crcType,
bytesPerChecksum, &errorData));
free(data);
free(sums);

View File

@ -19,6 +19,9 @@ package org.apache.hadoop.util;
import java.nio.ByteBuffer;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Stopwatch;
import org.apache.hadoop.fs.ChecksumException;
import org.junit.Test;
@ -54,64 +57,109 @@ public class TestDataChecksum {
}
}
private static class Harness {
final DataChecksum checksum;
final int dataLength, sumsLength, numSums;
ByteBuffer dataBuf, checksumBuf;
Harness(DataChecksum checksum, int dataLength, boolean useDirect) {
this.checksum = checksum;
this.dataLength = dataLength;
numSums = (dataLength - 1)/checksum.getBytesPerChecksum() + 1;
sumsLength = numSums * checksum.getChecksumSize();
byte data[] = new byte[dataLength +
DATA_OFFSET_IN_BUFFER +
DATA_TRAILER_IN_BUFFER];
new Random().nextBytes(data);
dataBuf = ByteBuffer.wrap(
data, DATA_OFFSET_IN_BUFFER, dataLength);
byte checksums[] = new byte[SUMS_OFFSET_IN_BUFFER + sumsLength];
checksumBuf = ByteBuffer.wrap(
checksums, SUMS_OFFSET_IN_BUFFER, sumsLength);
// Swap out for direct buffers if requested.
if (useDirect) {
dataBuf = directify(dataBuf);
checksumBuf = directify(checksumBuf);
}
}
void testCorrectness() throws ChecksumException {
// calculate real checksum, make sure it passes
checksum.calculateChunkedSums(dataBuf, checksumBuf);
checksum.verifyChunkedSums(dataBuf, checksumBuf, "fake file", 0);
// Change a byte in the header and in the trailer, make sure
// it doesn't affect checksum result
corruptBufferOffset(checksumBuf, 0);
checksum.verifyChunkedSums(dataBuf, checksumBuf, "fake file", 0);
corruptBufferOffset(dataBuf, 0);
dataBuf.limit(dataBuf.limit() + 1);
corruptBufferOffset(dataBuf, dataLength + DATA_OFFSET_IN_BUFFER);
dataBuf.limit(dataBuf.limit() - 1);
checksum.verifyChunkedSums(dataBuf, checksumBuf, "fake file", 0);
// Make sure bad checksums fail - error at beginning of array
corruptBufferOffset(checksumBuf, SUMS_OFFSET_IN_BUFFER);
try {
checksum.verifyChunkedSums(dataBuf, checksumBuf, "fake file", 0);
fail("Did not throw on bad checksums");
} catch (ChecksumException ce) {
assertEquals(0, ce.getPos());
}
// Make sure bad checksums fail - error at end of array
uncorruptBufferOffset(checksumBuf, SUMS_OFFSET_IN_BUFFER);
corruptBufferOffset(checksumBuf, SUMS_OFFSET_IN_BUFFER + sumsLength - 1);
try {
checksum.verifyChunkedSums(dataBuf, checksumBuf, "fake file", 0);
fail("Did not throw on bad checksums");
} catch (ChecksumException ce) {
int expectedPos = checksum.getBytesPerChecksum() * (numSums - 1);
assertEquals(expectedPos, ce.getPos());
assertTrue(ce.getMessage().contains("fake file"));
}
}
}
private void doBulkTest(DataChecksum checksum, int dataLength,
boolean useDirect) throws Exception {
System.err.println("Testing bulk checksums of length " +
dataLength + " with " +
(useDirect ? "direct" : "array-backed") + " buffers");
int numSums = (dataLength - 1)/checksum.getBytesPerChecksum() + 1;
int sumsLength = numSums * checksum.getChecksumSize();
byte data[] = new byte[dataLength +
DATA_OFFSET_IN_BUFFER +
DATA_TRAILER_IN_BUFFER];
new Random().nextBytes(data);
ByteBuffer dataBuf = ByteBuffer.wrap(
data, DATA_OFFSET_IN_BUFFER, dataLength);
new Harness(checksum, dataLength, useDirect).testCorrectness();
}
byte checksums[] = new byte[SUMS_OFFSET_IN_BUFFER + sumsLength];
ByteBuffer checksumBuf = ByteBuffer.wrap(
checksums, SUMS_OFFSET_IN_BUFFER, sumsLength);
/**
* Simple performance test for the "common case" checksum usage in HDFS:
* computing and verifying CRC32C with 512 byte chunking on native
* buffers.
*/
@Test
public void commonUsagePerfTest() throws Exception {
final int NUM_RUNS = 5;
final DataChecksum checksum = DataChecksum.newDataChecksum(DataChecksum.Type.CRC32C, 512);
final int dataLength = 512 * 1024 * 1024;
Harness h = new Harness(checksum, dataLength, true);
// Swap out for direct buffers if requested.
if (useDirect) {
dataBuf = directify(dataBuf);
checksumBuf = directify(checksumBuf);
}
for (int i = 0; i < NUM_RUNS; i++) {
Stopwatch s = new Stopwatch().start();
// calculate real checksum, make sure it passes
checksum.calculateChunkedSums(h.dataBuf, h.checksumBuf);
s.stop();
System.err.println("Calculate run #" + i + ": " +
s.elapsedTime(TimeUnit.MICROSECONDS) + "us");
// calculate real checksum, make sure it passes
checksum.calculateChunkedSums(dataBuf, checksumBuf);
checksum.verifyChunkedSums(dataBuf, checksumBuf, "fake file", 0);
// Change a byte in the header and in the trailer, make sure
// it doesn't affect checksum result
corruptBufferOffset(checksumBuf, 0);
checksum.verifyChunkedSums(dataBuf, checksumBuf, "fake file", 0);
corruptBufferOffset(dataBuf, 0);
dataBuf.limit(dataBuf.limit() + 1);
corruptBufferOffset(dataBuf, dataLength + DATA_OFFSET_IN_BUFFER);
dataBuf.limit(dataBuf.limit() - 1);
checksum.verifyChunkedSums(dataBuf, checksumBuf, "fake file", 0);
// Make sure bad checksums fail - error at beginning of array
corruptBufferOffset(checksumBuf, SUMS_OFFSET_IN_BUFFER);
try {
checksum.verifyChunkedSums(dataBuf, checksumBuf, "fake file", 0);
fail("Did not throw on bad checksums");
} catch (ChecksumException ce) {
assertEquals(0, ce.getPos());
}
// Make sure bad checksums fail - error at end of array
uncorruptBufferOffset(checksumBuf, SUMS_OFFSET_IN_BUFFER);
corruptBufferOffset(checksumBuf, SUMS_OFFSET_IN_BUFFER + sumsLength - 1);
try {
checksum.verifyChunkedSums(dataBuf, checksumBuf, "fake file", 0);
fail("Did not throw on bad checksums");
} catch (ChecksumException ce) {
int expectedPos = checksum.getBytesPerChecksum() * (numSums - 1);
assertEquals(expectedPos, ce.getPos());
assertTrue(ce.getMessage().contains("fake file"));
s = new Stopwatch().start();
// calculate real checksum, make sure it passes
checksum.verifyChunkedSums(h.dataBuf, h.checksumBuf, "fake file", 0);
s.stop();
System.err.println("Verify run #" + i + ": " +
s.elapsedTime(TimeUnit.MICROSECONDS) + "us");
}
}