HADOOP-10838. Byte array native checksumming. Contributed by James Thomas.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1617875 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2014-08-14 04:29:35 +00:00
parent 44864c68b5
commit bd79a4b926
4 changed files with 106 additions and 0 deletions

View File

@ -513,6 +513,8 @@ Release 2.6.0 - UNRELEASED
OPTIMIZATIONS
HADOOP-10838. Byte array native checksumming. (James Thomas via todd)
BUG FIXES
HADOOP-10781. Unportable getgrouplist() usage breaks FreeBSD (Dmitry

View File

@ -339,6 +339,12 @@ public class DataChecksum implements Checksum {
byte[] data, int dataOff, int dataLen,
byte[] checksums, int checksumsOff, String fileName,
long basePos) throws ChecksumException {
if (NativeCrc32.isAvailable()) {
NativeCrc32.verifyChunkedSumsByteArray(bytesPerChecksum, type.id,
checksums, checksumsOff, data, dataOff, dataLen, fileName, basePos);
return;
}
int remaining = dataLen;
int dataPos = 0;

View File

@ -59,6 +59,16 @@ class NativeCrc32 {
data, data.position(), data.remaining(),
fileName, basePos);
}
public static void verifyChunkedSumsByteArray(int bytesPerSum,
int checksumType, byte[] sums, int sumsOffset, byte[] data,
int dataOffset, int dataLength, String fileName, long basePos)
throws ChecksumException {
nativeVerifyChunkedSumsByteArray(bytesPerSum, checksumType,
sums, sumsOffset,
data, dataOffset, dataLength,
fileName, basePos);
}
private static native void nativeVerifyChunkedSums(
int bytesPerSum, int checksumType,
@ -66,6 +76,12 @@ class NativeCrc32 {
ByteBuffer data, int dataOffset, int dataLength,
String fileName, long basePos);
private static native void nativeVerifyChunkedSumsByteArray(
int bytesPerSum, int checksumType,
byte[] sums, int sumsOffset,
byte[] data, int dataOffset, int dataLength,
String fileName, long basePos);
// Copy the constants over from DataChecksum so that javah will pick them up
// and make them available in the native code header.
public static final int CHECKSUM_CRC32 = DataChecksum.CHECKSUM_CRC32;

View File

@ -34,6 +34,10 @@
#include "bulk_crc32.h"
#define MBYTE 1048576
#define MIN(X,Y) ((X) < (Y) ? (X) : (Y))
#define MAX(X,Y) ((X) > (Y) ? (X) : (Y))
static void throw_checksum_exception(JNIEnv *env,
uint32_t got_crc, uint32_t expected_crc,
jstring j_filename, jlong pos) {
@ -177,6 +181,84 @@ JNIEXPORT void JNICALL Java_org_apache_hadoop_util_NativeCrc32_nativeVerifyChunk
}
}
JNIEXPORT void JNICALL Java_org_apache_hadoop_util_NativeCrc32_nativeVerifyChunkedSumsByteArray
(JNIEnv *env, jclass clazz,
jint bytes_per_checksum, jint j_crc_type,
jarray j_sums, jint sums_offset,
jarray j_data, jint data_offset, jint data_len,
jstring j_filename, jlong base_pos)
{
uint8_t *sums_addr;
uint8_t *data_addr;
uint32_t *sums;
uint8_t *data;
int crc_type;
crc32_error_t error_data;
int ret;
int numChecksumsPerIter;
int checksumNum;
if (unlikely(!j_sums || !j_data)) {
THROW(env, "java/lang/NullPointerException",
"input byte arrays must not be null");
return;
}
if (unlikely(sums_offset < 0 || data_offset < 0 || data_len < 0)) {
THROW(env, "java/lang/IllegalArgumentException",
"bad offsets or lengths");
return;
}
if (unlikely(bytes_per_checksum) <= 0) {
THROW(env, "java/lang/IllegalArgumentException",
"invalid bytes_per_checksum");
return;
}
// Convert to correct internal C constant for CRC type
crc_type = convert_java_crc_type(env, j_crc_type);
if (crc_type == -1) return; // exception already thrown
numChecksumsPerIter = MAX(1, MBYTE / bytes_per_checksum);
checksumNum = 0;
while (checksumNum * bytes_per_checksum < data_len) {
// Convert byte arrays to C pointers
sums_addr = (*env)->GetPrimitiveArrayCritical(env, j_sums, NULL);
data_addr = (*env)->GetPrimitiveArrayCritical(env, j_data, NULL);
if (unlikely(!sums_addr || !data_addr)) {
if (data_addr) (*env)->ReleasePrimitiveArrayCritical(env, j_data, data_addr, 0);
if (sums_addr) (*env)->ReleasePrimitiveArrayCritical(env, j_sums, sums_addr, 0);
THROW(env, "java/lang/OutOfMemoryError",
"not enough memory for byte arrays in JNI code");
return;
}
sums = (uint32_t *)(sums_addr + sums_offset) + checksumNum;
data = data_addr + data_offset + checksumNum * bytes_per_checksum;
// Setup complete. Actually verify checksums.
ret = bulk_verify_crc(data, MIN(numChecksumsPerIter * bytes_per_checksum,
data_len - checksumNum * bytes_per_checksum),
sums, crc_type, bytes_per_checksum, &error_data);
(*env)->ReleasePrimitiveArrayCritical(env, j_data, data_addr, 0);
(*env)->ReleasePrimitiveArrayCritical(env, j_sums, sums_addr, 0);
if (unlikely(ret == INVALID_CHECKSUM_DETECTED)) {
long pos = base_pos + (error_data.bad_data - data) + checksumNum *
bytes_per_checksum;
throw_checksum_exception(
env, error_data.got_crc, error_data.expected_crc,
j_filename, pos);
return;
} else if (unlikely(ret != CHECKSUMS_VALID)) {
THROW(env, "java/lang/AssertionError",
"Bad response code from native bulk_verify_crc");
return;
}
checksumNum += numChecksumsPerIter;
}
}
/**
* vim: sw=2: ts=2: et:
*/