HADOOP-10838. Byte array native checksumming. Contributed by James Thomas.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1617876 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2014-08-14 04:29:39 +00:00
parent 193d2ad731
commit bba67ff23e
4 changed files with 106 additions and 0 deletions

View File

@ -80,6 +80,8 @@ Release 2.6.0 - UNRELEASED
OPTIMIZATIONS OPTIMIZATIONS
HADOOP-10838. Byte array native checksumming. (James Thomas via todd)
BUG FIXES BUG FIXES
HADOOP-10781. Unportable getgrouplist() usage breaks FreeBSD (Dmitry HADOOP-10781. Unportable getgrouplist() usage breaks FreeBSD (Dmitry

View File

@ -340,6 +340,12 @@ private void verifyChunkedSums(
byte[] checksums, int checksumsOff, String fileName, byte[] checksums, int checksumsOff, String fileName,
long basePos) throws ChecksumException { long basePos) throws ChecksumException {
if (NativeCrc32.isAvailable()) {
NativeCrc32.verifyChunkedSumsByteArray(bytesPerChecksum, type.id,
checksums, checksumsOff, data, dataOff, dataLen, fileName, basePos);
return;
}
int remaining = dataLen; int remaining = dataLen;
int dataPos = 0; int dataPos = 0;
while (remaining > 0) { while (remaining > 0) {

View File

@ -60,12 +60,28 @@ public static void verifyChunkedSums(int bytesPerSum, int checksumType,
fileName, basePos); fileName, basePos);
} }
public static void verifyChunkedSumsByteArray(int bytesPerSum,
int checksumType, byte[] sums, int sumsOffset, byte[] data,
int dataOffset, int dataLength, String fileName, long basePos)
throws ChecksumException {
nativeVerifyChunkedSumsByteArray(bytesPerSum, checksumType,
sums, sumsOffset,
data, dataOffset, dataLength,
fileName, basePos);
}
private static native void nativeVerifyChunkedSums( private static native void nativeVerifyChunkedSums(
int bytesPerSum, int checksumType, int bytesPerSum, int checksumType,
ByteBuffer sums, int sumsOffset, ByteBuffer sums, int sumsOffset,
ByteBuffer data, int dataOffset, int dataLength, ByteBuffer data, int dataOffset, int dataLength,
String fileName, long basePos); String fileName, long basePos);
private static native void nativeVerifyChunkedSumsByteArray(
int bytesPerSum, int checksumType,
byte[] sums, int sumsOffset,
byte[] data, int dataOffset, int dataLength,
String fileName, long basePos);
// Copy the constants over from DataChecksum so that javah will pick them up // Copy the constants over from DataChecksum so that javah will pick them up
// and make them available in the native code header. // and make them available in the native code header.
public static final int CHECKSUM_CRC32 = DataChecksum.CHECKSUM_CRC32; public static final int CHECKSUM_CRC32 = DataChecksum.CHECKSUM_CRC32;

View File

@ -34,6 +34,10 @@
#include "bulk_crc32.h" #include "bulk_crc32.h"
#define MBYTE 1048576
#define MIN(X,Y) ((X) < (Y) ? (X) : (Y))
#define MAX(X,Y) ((X) > (Y) ? (X) : (Y))
static void throw_checksum_exception(JNIEnv *env, static void throw_checksum_exception(JNIEnv *env,
uint32_t got_crc, uint32_t expected_crc, uint32_t got_crc, uint32_t expected_crc,
jstring j_filename, jlong pos) { jstring j_filename, jlong pos) {
@ -177,6 +181,84 @@ JNIEXPORT void JNICALL Java_org_apache_hadoop_util_NativeCrc32_nativeVerifyChunk
} }
} }
JNIEXPORT void JNICALL Java_org_apache_hadoop_util_NativeCrc32_nativeVerifyChunkedSumsByteArray
(JNIEnv *env, jclass clazz,
jint bytes_per_checksum, jint j_crc_type,
jarray j_sums, jint sums_offset,
jarray j_data, jint data_offset, jint data_len,
jstring j_filename, jlong base_pos)
{
uint8_t *sums_addr;
uint8_t *data_addr;
uint32_t *sums;
uint8_t *data;
int crc_type;
crc32_error_t error_data;
int ret;
int numChecksumsPerIter;
int checksumNum;
if (unlikely(!j_sums || !j_data)) {
THROW(env, "java/lang/NullPointerException",
"input byte arrays must not be null");
return;
}
if (unlikely(sums_offset < 0 || data_offset < 0 || data_len < 0)) {
THROW(env, "java/lang/IllegalArgumentException",
"bad offsets or lengths");
return;
}
if (unlikely(bytes_per_checksum) <= 0) {
THROW(env, "java/lang/IllegalArgumentException",
"invalid bytes_per_checksum");
return;
}
// Convert to correct internal C constant for CRC type
crc_type = convert_java_crc_type(env, j_crc_type);
if (crc_type == -1) return; // exception already thrown
numChecksumsPerIter = MAX(1, MBYTE / bytes_per_checksum);
checksumNum = 0;
while (checksumNum * bytes_per_checksum < data_len) {
// Convert byte arrays to C pointers
sums_addr = (*env)->GetPrimitiveArrayCritical(env, j_sums, NULL);
data_addr = (*env)->GetPrimitiveArrayCritical(env, j_data, NULL);
if (unlikely(!sums_addr || !data_addr)) {
if (data_addr) (*env)->ReleasePrimitiveArrayCritical(env, j_data, data_addr, 0);
if (sums_addr) (*env)->ReleasePrimitiveArrayCritical(env, j_sums, sums_addr, 0);
THROW(env, "java/lang/OutOfMemoryError",
"not enough memory for byte arrays in JNI code");
return;
}
sums = (uint32_t *)(sums_addr + sums_offset) + checksumNum;
data = data_addr + data_offset + checksumNum * bytes_per_checksum;
// Setup complete. Actually verify checksums.
ret = bulk_verify_crc(data, MIN(numChecksumsPerIter * bytes_per_checksum,
data_len - checksumNum * bytes_per_checksum),
sums, crc_type, bytes_per_checksum, &error_data);
(*env)->ReleasePrimitiveArrayCritical(env, j_data, data_addr, 0);
(*env)->ReleasePrimitiveArrayCritical(env, j_sums, sums_addr, 0);
if (unlikely(ret == INVALID_CHECKSUM_DETECTED)) {
long pos = base_pos + (error_data.bad_data - data) + checksumNum *
bytes_per_checksum;
throw_checksum_exception(
env, error_data.got_crc, error_data.expected_crc,
j_filename, pos);
return;
} else if (unlikely(ret != CHECKSUMS_VALID)) {
THROW(env, "java/lang/AssertionError",
"Bad response code from native bulk_verify_crc");
return;
}
checksumNum += numChecksumsPerIter;
}
}
/** /**
* vim: sw=2: ts=2: et: * vim: sw=2: ts=2: et:
*/ */