HADOOP-10674. Improve PureJavaCrc32 performance and use java.util.zip.CRC32 for Java 7 and above.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1605239 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2014-06-25 02:22:15 +00:00
parent 2eb913b6ec
commit 988bc17bc3
8 changed files with 187 additions and 111 deletions

View File

@ -472,6 +472,9 @@ Release 2.5.0 - UNRELEASED
HADOOP-10747. Support configurable retries on SASL connection failures in
RPC client. (cnauroth)
HADOOP-10674. Improve PureJavaCrc32 performance and use java.util.zip.CRC32
for Java 7 and above. (szetszwo)
OPTIMIZATIONS
BUG FIXES

View File

@ -18,7 +18,9 @@
package org.apache.hadoop.fs;
import java.io.*;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.ClosedChannelException;
import java.util.Arrays;
@ -26,8 +28,8 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.PureJavaCrc32;
/****************************************************************
* Abstract Checksumed FileSystem.
@ -147,7 +149,7 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
if (!Arrays.equals(version, CHECKSUM_VERSION))
throw new IOException("Not a checksum file: "+sumFile);
this.bytesPerSum = sums.readInt();
set(fs.verifyChecksum, new PureJavaCrc32(), bytesPerSum, 4);
set(fs.verifyChecksum, DataChecksum.newCrc32(), bytesPerSum, 4);
} catch (FileNotFoundException e) { // quietly ignore
set(fs.verifyChecksum, null, 1, 0);
} catch (IOException e) { // loudly ignore
@ -259,8 +261,7 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
private Path file;
private long fileLen = -1L;
FSDataBoundedInputStream(FileSystem fs, Path file, InputStream in)
throws IOException {
FSDataBoundedInputStream(FileSystem fs, Path file, InputStream in) {
super(in);
this.fs = fs;
this.file = file;
@ -379,7 +380,7 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
long blockSize,
Progressable progress)
throws IOException {
super(new PureJavaCrc32(), fs.getBytesPerSum(), 4);
super(DataChecksum.newCrc32(), fs.getBytesPerSum(), 4);
int bytesPerSum = fs.getBytesPerSum();
this.datas = fs.getRawFileSystem().create(file, overwrite, bufferSize,
replication, blockSize, progress);

View File

@ -18,7 +18,9 @@
package org.apache.hadoop.fs;
import java.io.*;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
@ -31,8 +33,8 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.PureJavaCrc32;
/**
* Abstract Checksumed Fs.
@ -139,7 +141,7 @@ public abstract class ChecksumFs extends FilterFs {
throw new IOException("Not a checksum file: "+sumFile);
}
this.bytesPerSum = sums.readInt();
set(fs.verifyChecksum, new PureJavaCrc32(), bytesPerSum, 4);
set(fs.verifyChecksum, DataChecksum.newCrc32(), bytesPerSum, 4);
} catch (FileNotFoundException e) { // quietly ignore
set(fs.verifyChecksum, null, 1, 0);
} catch (IOException e) { // loudly ignore
@ -335,7 +337,7 @@ public abstract class ChecksumFs extends FilterFs {
final short replication, final long blockSize,
final Progressable progress, final ChecksumOpt checksumOpt,
final boolean createParent) throws IOException {
super(new PureJavaCrc32(), fs.getBytesPerSum(), 4);
super(DataChecksum.newCrc32(), fs.getBytesPerSum(), 4);
// checksumOpt is passed down to the raw fs. Unless it implements
// checksum impelemts internally, checksumOpt will be ignored.

View File

@ -19,12 +19,13 @@
package org.apache.hadoop.io.compress.zlib;
import java.io.IOException;
import java.util.zip.Checksum;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
import org.apache.hadoop.util.PureJavaCrc32;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DoNotPool;
import org.apache.hadoop.util.DataChecksum;
/**
* A {@link Decompressor} based on the popular gzip compressed file format.
@ -54,7 +55,7 @@ public class BuiltInGzipDecompressor implements Decompressor {
private int headerBytesRead = 0;
private int trailerBytesRead = 0;
private int numExtraFieldBytesRemaining = -1;
private PureJavaCrc32 crc = new PureJavaCrc32();
private Checksum crc = DataChecksum.newCrc32();
private boolean hasExtraField = false;
private boolean hasFilename = false;
private boolean hasComment = false;

View File

@ -22,6 +22,7 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
import org.apache.hadoop.classification.InterfaceAudience;
@ -72,6 +73,13 @@ public class DataChecksum implements Checksum {
}
}
/**
* Create a Crc32 Checksum object. The implementation of the Crc32 algorithm
* is chosen depending on the platform.
*/
public static Checksum newCrc32() {
return Shell.isJava7OrAbove()? new CRC32(): new PureJavaCrc32();
}
public static DataChecksum newDataChecksum(Type type, int bytesPerChecksum ) {
if ( bytesPerChecksum <= 0 ) {
@ -82,7 +90,7 @@ public class DataChecksum implements Checksum {
case NULL :
return new DataChecksum(type, new ChecksumNull(), bytesPerChecksum );
case CRC32 :
return new DataChecksum(type, new PureJavaCrc32(), bytesPerChecksum );
return new DataChecksum(type, newCrc32(), bytesPerChecksum );
case CRC32C:
return new DataChecksum(type, new PureJavaCrc32C(), bytesPerChecksum);
default:

View File

@ -57,38 +57,31 @@ public class PureJavaCrc32 implements Checksum {
}
@Override
public void update(byte[] b, int off, int len) {
public void update(final byte[] b, final int offset, final int len) {
int localCrc = crc;
while(len > 7) {
final int c0 =(b[off+0] ^ localCrc) & 0xff;
final int c1 =(b[off+1] ^ (localCrc >>>= 8)) & 0xff;
final int c2 =(b[off+2] ^ (localCrc >>>= 8)) & 0xff;
final int c3 =(b[off+3] ^ (localCrc >>>= 8)) & 0xff;
localCrc = (T[T8_7_start + c0] ^ T[T8_6_start + c1])
^ (T[T8_5_start + c2] ^ T[T8_4_start + c3]);
final int remainder = len & 0x7;
int i = offset;
for(final int end = offset + len - remainder; i < end; i += 8) {
final int x = localCrc
^ ((((b[i ] << 24) >>> 24) + ((b[i+1] << 24) >>> 16))
+ (((b[i+2] << 24) >>> 8 ) + (b[i+3] << 24)));
final int c4 = b[off+4] & 0xff;
final int c5 = b[off+5] & 0xff;
final int c6 = b[off+6] & 0xff;
final int c7 = b[off+7] & 0xff;
localCrc ^= (T[T8_3_start + c4] ^ T[T8_2_start + c5])
^ (T[T8_1_start + c6] ^ T[T8_0_start + c7]);
off += 8;
len -= 8;
localCrc = ((T[((x << 24) >>> 24) + 0x700] ^ T[((x << 16) >>> 24) + 0x600])
^ (T[((x << 8) >>> 24) + 0x500] ^ T[ (x >>> 24) + 0x400]))
^ ((T[((b[i+4] << 24) >>> 24) + 0x300] ^ T[((b[i+5] << 24) >>> 24) + 0x200])
^ (T[((b[i+6] << 24) >>> 24) + 0x100] ^ T[((b[i+7] << 24) >>> 24)]));
}
/* loop unroll - duff's device style */
switch(len) {
case 7: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)];
case 6: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)];
case 5: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)];
case 4: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)];
case 3: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)];
case 2: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)];
case 1: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)];
switch(remainder) {
case 7: localCrc = (localCrc >>> 8) ^ T[((localCrc ^ b[i++]) << 24) >>> 24];
case 6: localCrc = (localCrc >>> 8) ^ T[((localCrc ^ b[i++]) << 24) >>> 24];
case 5: localCrc = (localCrc >>> 8) ^ T[((localCrc ^ b[i++]) << 24) >>> 24];
case 4: localCrc = (localCrc >>> 8) ^ T[((localCrc ^ b[i++]) << 24) >>> 24];
case 3: localCrc = (localCrc >>> 8) ^ T[((localCrc ^ b[i++]) << 24) >>> 24];
case 2: localCrc = (localCrc >>> 8) ^ T[((localCrc ^ b[i++]) << 24) >>> 24];
case 1: localCrc = (localCrc >>> 8) ^ T[((localCrc ^ b[i++]) << 24) >>> 24];
default:
/* nothing */
}
@ -99,24 +92,15 @@ public class PureJavaCrc32 implements Checksum {
@Override
final public void update(int b) {
crc = (crc >>> 8) ^ T[T8_0_start + ((crc ^ b) & 0xff)];
crc = (crc >>> 8) ^ T[(((crc ^ b) << 24) >>> 24)];
}
/*
* CRC-32 lookup tables generated by the polynomial 0xEDB88320.
* See also TestPureJavaCrc32.Table.
*/
private static final int T8_0_start = 0*256;
private static final int T8_1_start = 1*256;
private static final int T8_2_start = 2*256;
private static final int T8_3_start = 3*256;
private static final int T8_4_start = 4*256;
private static final int T8_5_start = 5*256;
private static final int T8_6_start = 6*256;
private static final int T8_7_start = 7*256;
private static final int[] T = new int[] {
/* T8_0 */
/* T8_0 */
0x00000000, 0x77073096, 0xEE0E612C, 0x990951BA,
0x076DC419, 0x706AF48F, 0xE963A535, 0x9E6495A3,
0x0EDB8832, 0x79DCB8A4, 0xE0D5E91E, 0x97D2D988,

View File

@ -20,12 +20,14 @@ package org.apache.hadoop.util;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
import org.junit.Assert;
import org.junit.Test;
@ -187,12 +189,18 @@ public class TestPureJavaCrc32 {
long polynomial = Long.parseLong(args[0], 16);
int i = 8;
final PrintStream out = new PrintStream(
new FileOutputStream("table" + i + ".txt"), true);
final Table t = new Table(i, 16, polynomial);
final String s = t.toString();
System.out.println(s);
out.println(s);
//print to a file
final PrintStream out = new PrintStream(
new FileOutputStream("table" + i + ".txt"), true);
try {
out.println(s);
} finally {
out.close();
}
}
}
@ -210,10 +218,15 @@ public class TestPureJavaCrc32 {
public static final int MAX_LEN = 32*1024*1024; // up to 32MB chunks
public static final int BYTES_PER_SIZE = MAX_LEN * 4;
static final Checksum zip = new CRC32();
static final Checksum[] CRCS = {new PureJavaCrc32()};
static final Class<? extends Checksum> zip = CRC32.class;
static final List<Class<? extends Checksum>> CRCS = new ArrayList<Class<? extends Checksum>>();
static {
CRCS.add(zip);
CRCS.add(PureJavaCrc32.class);
}
public static void main(String args[]) {
public static void main(String args[]) throws Exception {
printSystemProperties(System.out);
doBench(CRCS, System.out);
}
@ -223,76 +236,140 @@ public class TestPureJavaCrc32 {
out.printf(" %" + w + "s |", s);
}
private static void doBench(final Checksum[] crcs, final PrintStream out) {
final ArrayList<Checksum> a = new ArrayList<Checksum>();
a.add(zip);
for (Checksum c : crcs)
if(c.getClass() != zip.getClass())
a.add(c);
doBench(a, out);
}
private static void doBench(final List<Checksum> crcs, final PrintStream out
) {
private static void doBench(final List<Class<? extends Checksum>> crcs,
final PrintStream out) throws Exception {
final byte[] bytes = new byte[MAX_LEN];
new Random().nextBytes(bytes);
// Print header
out.printf("\nPerformance Table (The unit is MB/sec)\n||");
final String title = "Num Bytes";
printCell("Num Bytes", 0, out);
for (Checksum c : crcs) {
out.printf("|");
printCell(c.getClass().getSimpleName(), 8, out);
}
out.printf("|\n");
out.printf("\nPerformance Table (The unit is MB/sec; #T = #Theads)\n");
// Warm up implementations to get jit going.
for (Checksum c : crcs) {
doBench(c, bytes, 2, null);
doBench(c, bytes, 2101, null);
for (Class<? extends Checksum> c : crcs) {
doBench(c, 1, bytes, 2);
doBench(c, 1, bytes, 2101);
}
// Test on a variety of sizes
for (int size = 1; size < MAX_LEN; size *= 2) {
out.printf("|");
printCell(String.valueOf(size), title.length()+1, out);
// Test on a variety of sizes with different number of threads
for (int size = 32; size <= MAX_LEN; size <<= 1) {
doBench(crcs, bytes, size, out);
}
}
Long expected = null;
for(Checksum c : crcs) {
private static void doBench(final List<Class<? extends Checksum>> crcs,
final byte[] bytes, final int size, final PrintStream out) throws Exception {
final String numBytesStr = " #Bytes ";
final String numThreadsStr = "#T";
final String diffStr = "% diff";
out.print('|');
printCell(numBytesStr, 0, out);
printCell(numThreadsStr, 0, out);
for (int i = 0; i < crcs.size(); i++) {
final Class<? extends Checksum> c = crcs.get(i);
out.print('|');
printCell(c.getSimpleName(), 8, out);
for(int j = 0; j < i; j++) {
printCell(diffStr, diffStr.length(), out);
}
}
out.printf("\n");
for(int numThreads = 1; numThreads <= 16; numThreads <<= 1) {
out.printf("|");
printCell(String.valueOf(size), numBytesStr.length(), out);
printCell(String.valueOf(numThreads), numThreadsStr.length(), out);
BenchResult expected = null;
final List<BenchResult> previous = new ArrayList<BenchResult>();
for(Class<? extends Checksum> c : crcs) {
System.gc();
final long result = doBench(c, bytes, size, out);
if(c.getClass() == zip.getClass()) {
final BenchResult result = doBench(c, numThreads, bytes, size);
printCell(String.format("%9.1f", result.mbps),
c.getSimpleName().length()+1, out);
//check result
if(c == zip) {
expected = result;
} else if (result != expected) {
throw new RuntimeException(c.getClass() + " has bugs!");
} else if (expected == null) {
throw new RuntimeException("The first class is "
+ c.getName() + " but not " + zip.getName());
} else if (result.value != expected.value) {
throw new RuntimeException(c + " has bugs!");
}
//compare result with previous
for(BenchResult p : previous) {
final double diff = (result.mbps - p.mbps) / p.mbps * 100;
printCell(String.format("%5.1f%%", diff), diffStr.length(), out);
}
previous.add(result);
}
out.printf("\n");
}
}
private static long doBench(Checksum crc, byte[] bytes, int size,
PrintStream out) {
final String name = crc.getClass().getSimpleName();
final int trials = BYTES_PER_SIZE / size;
private static BenchResult doBench(Class<? extends Checksum> clazz,
final int numThreads, final byte[] bytes, final int size)
throws Exception {
final long st = System.nanoTime();
crc.reset();
for (int i = 0; i < trials; i++) {
crc.update(bytes, 0, size);
}
final long result = crc.getValue();
final long et = System.nanoTime();
final Thread[] threads = new Thread[numThreads];
final BenchResult[] results = new BenchResult[threads.length];
double mbProcessed = trials * size / 1024.0 / 1024.0;
double secsElapsed = (et - st) / 1000000000.0d;
if (out != null) {
final String s = String.format("%9.3f", mbProcessed/secsElapsed);
printCell(s, name.length()+1, out);
{
final int trials = BYTES_PER_SIZE / size;
final double mbProcessed = trials * size / 1024.0 / 1024.0;
final Constructor<? extends Checksum> ctor = clazz.getConstructor();
for(int i = 0; i < threads.length; i++) {
final int index = i;
threads[i] = new Thread() {
final Checksum crc = ctor.newInstance();
@Override
public void run() {
final long st = System.nanoTime();
crc.reset();
for (int i = 0; i < trials; i++) {
crc.update(bytes, 0, size);
}
final long et = System.nanoTime();
double secsElapsed = (et - st) / 1000000000.0d;
results[index] = new BenchResult(crc.getValue(), mbProcessed/secsElapsed);
}
};
}
}
for(int i = 0; i < threads.length; i++) {
threads[i].start();
}
for(int i = 0; i < threads.length; i++) {
threads[i].join();
}
final long expected = results[0].value;
double sum = results[0].mbps;
for(int i = 1; i < results.length; i++) {
if (results[i].value != expected) {
throw new AssertionError(clazz.getSimpleName() + " results not matched.");
}
sum += results[i].mbps;
}
return new BenchResult(expected, sum/results.length);
}
private static class BenchResult {
/** CRC value */
final long value;
/** Speed (MB per second) */
final double mbps;
BenchResult(long value, double mbps) {
this.value = value;
this.mbps = mbps;
}
return result;
}
private static void printSystemProperties(PrintStream out) {

View File

@ -40,6 +40,7 @@ import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MODIFY_
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REASSIGN_LEASE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_CACHE_DIRECTIVE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_CACHE_POOL;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_XATTR;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_OLD;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_SNAPSHOT;
@ -55,7 +56,6 @@ import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_PER
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_QUOTA;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_REPLICATION;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_XATTR;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_XATTR;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_START_LOG_SEGMENT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SYMLINK;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_TIMES;
@ -81,6 +81,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.XAttrCodec;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclEntryScope;
@ -88,7 +89,6 @@ import org.apache.hadoop.fs.permission.AclEntryType;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DeprecatedUTF8;
import org.apache.hadoop.hdfs.protocol.Block;
@ -116,7 +116,7 @@ import org.apache.hadoop.io.WritableFactory;
import org.apache.hadoop.ipc.ClientId;
import org.apache.hadoop.ipc.RpcConstants;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.PureJavaCrc32;
import org.apache.hadoop.util.DataChecksum;
import org.xml.sax.ContentHandler;
import org.xml.sax.SAXException;
import org.xml.sax.helpers.AttributesImpl;
@ -3784,7 +3784,7 @@ public abstract class FSEditLogOp {
public Writer(DataOutputBuffer out) {
this.buf = out;
this.checksum = new PureJavaCrc32();
this.checksum = DataChecksum.newCrc32();
}
/**
@ -3835,7 +3835,7 @@ public abstract class FSEditLogOp {
this.logVersion = logVersion;
if (NameNodeLayoutVersion.supports(
LayoutVersion.Feature.EDITS_CHESKUM, logVersion)) {
this.checksum = new PureJavaCrc32();
this.checksum = DataChecksum.newCrc32();
} else {
this.checksum = null;
}