diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 41ba87d8616..6cdcb7add8a 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -1175,6 +1175,9 @@ Release 2.8.0 - UNRELEASED HADOOP-12758. Extend CSRF Filter with UserAgent Checks (Larry McCay via cnauroth) + HADOOP-10865. Add a Crc32 chunked verification benchmark for both directly + and non-directly buffer cases. (szetszwo) + BUG FIXES HADOOP-12352. Delay in checkpointing Trash can leave trash for 2 intervals diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java index d9dc7af1fc2..faac5879f1e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java @@ -286,92 +286,117 @@ public class DataChecksum implements Checksum { * @throws ChecksumException if the checksums do not match */ public void verifyChunkedSums(ByteBuffer data, ByteBuffer checksums, - String fileName, long basePos) - throws ChecksumException { + String fileName, long basePos) throws ChecksumException { if (type.size == 0) return; if (data.hasArray() && checksums.hasArray()) { - verifyChunkedSums( - data.array(), data.arrayOffset() + data.position(), data.remaining(), - checksums.array(), checksums.arrayOffset() + checksums.position(), - fileName, basePos); + final int dataOffset = data.arrayOffset() + data.position(); + final int crcsOffset = checksums.arrayOffset() + checksums.position(); + verifyChunked(type, summer, data.array(), dataOffset, data.remaining(), + bytesPerChecksum, checksums.array(), crcsOffset, fileName, basePos); return; } if (NativeCrc32.isAvailable()) { NativeCrc32.verifyChunkedSums(bytesPerChecksum, type.id, checksums, data, fileName, basePos); - return; + } else { + verifyChunked(type, summer, data, bytesPerChecksum, checksums, fileName, + basePos); } - - int startDataPos = data.position(); + } + + static void verifyChunked(final Type type, final Checksum algorithm, + final ByteBuffer data, final int bytesPerCrc, final ByteBuffer crcs, + final String filename, final long basePos) throws ChecksumException { + final byte[] bytes = new byte[bytesPerCrc]; + final int dataOffset = data.position(); + final int dataLength = data.remaining(); data.mark(); - checksums.mark(); + crcs.mark(); + try { - byte[] buf = new byte[bytesPerChecksum]; - byte[] sum = new byte[type.size]; - while (data.remaining() > 0) { - int n = Math.min(data.remaining(), bytesPerChecksum); - checksums.get(sum); - data.get(buf, 0, n); - summer.reset(); - summer.update(buf, 0, n); - int calculated = (int)summer.getValue(); - int stored = (sum[0] << 24 & 0xff000000) | - (sum[1] << 16 & 0xff0000) | - (sum[2] << 8 & 0xff00) | - sum[3] & 0xff; - if (calculated != stored) { - long errPos = basePos + data.position() - startDataPos - n; - throw new ChecksumException( - "Checksum error: "+ fileName + " at "+ errPos + - " exp: " + stored + " got: " + calculated, errPos); + int i = 0; + for(final int n = dataLength - bytesPerCrc + 1; i < n; i += bytesPerCrc) { + data.get(bytes); + algorithm.reset(); + algorithm.update(bytes, 0, bytesPerCrc); + final int computed = (int)algorithm.getValue(); + final int expected = crcs.getInt(); + + if (computed != expected) { + long errPos = basePos + data.position() - dataOffset - bytesPerCrc; + throwChecksumException(type, algorithm, filename, errPos, expected, + computed); + } + } + + final int remainder = dataLength - i; + if (remainder > 0) { + data.get(bytes, 0, remainder); + algorithm.reset(); + algorithm.update(bytes, 0, remainder); + final int computed = (int)algorithm.getValue(); + final int expected = crcs.getInt(); + + if (computed != expected) { + long errPos = basePos + data.position() - dataOffset - remainder; + throwChecksumException(type, algorithm, filename, errPos, expected, + computed); } } } finally { data.reset(); - checksums.reset(); + crcs.reset(); } } - + /** * Implementation of chunked verification specifically on byte arrays. This * is to avoid the copy when dealing with ByteBuffers that have array backing. */ - private void verifyChunkedSums( - byte[] data, int dataOff, int dataLen, - byte[] checksums, int checksumsOff, String fileName, - long basePos) throws ChecksumException { - if (type.size == 0) return; + static void verifyChunked(final Type type, final Checksum algorithm, + final byte[] data, final int dataOffset, final int dataLength, + final int bytesPerCrc, final byte[] crcs, final int crcsOffset, + final String filename, final long basePos) throws ChecksumException { + final int dataEnd = dataOffset + dataLength; + int i = dataOffset; + int j = crcsOffset; + for(final int n = dataEnd-bytesPerCrc+1; i < n; i += bytesPerCrc, j += 4) { + algorithm.reset(); + algorithm.update(data, i, bytesPerCrc); + final int computed = (int)algorithm.getValue(); + final int expected = ((crcs[j] << 24) + ((crcs[j + 1] << 24) >>> 8)) + + (((crcs[j + 2] << 24) >>> 16) + ((crcs[j + 3] << 24) >>> 24)); - if (NativeCrc32.isAvailable()) { - NativeCrc32.verifyChunkedSumsByteArray(bytesPerChecksum, type.id, - checksums, checksumsOff, data, dataOff, dataLen, fileName, basePos); - return; - } - - int remaining = dataLen; - int dataPos = 0; - while (remaining > 0) { - int n = Math.min(remaining, bytesPerChecksum); - - summer.reset(); - summer.update(data, dataOff + dataPos, n); - dataPos += n; - remaining -= n; - - int calculated = (int)summer.getValue(); - int stored = (checksums[checksumsOff] << 24 & 0xff000000) | - (checksums[checksumsOff + 1] << 16 & 0xff0000) | - (checksums[checksumsOff + 2] << 8 & 0xff00) | - checksums[checksumsOff + 3] & 0xff; - checksumsOff += 4; - if (calculated != stored) { - long errPos = basePos + dataPos - n; - throw new ChecksumException( - "Checksum error: "+ fileName + " at "+ errPos + - " exp: " + stored + " got: " + calculated, errPos); + if (computed != expected) { + final long errPos = basePos + i - dataOffset; + throwChecksumException(type, algorithm, filename, errPos, expected, + computed); } } + final int remainder = dataEnd - i; + if (remainder > 0) { + algorithm.reset(); + algorithm.update(data, i, remainder); + final int computed = (int)algorithm.getValue(); + final int expected = ((crcs[j] << 24) + ((crcs[j + 1] << 24) >>> 8)) + + (((crcs[j + 2] << 24) >>> 16) + ((crcs[j + 3] << 24) >>> 24)); + + if (computed != expected) { + final long errPos = basePos + i - dataOffset; + throwChecksumException(type, algorithm, filename, errPos, expected, + computed); + } + } + } + + private static void throwChecksumException(Type type, Checksum algorithm, + String filename, long errPos, int expected, int computed) + throws ChecksumException { + throw new ChecksumException("Checksum " + type + + " not matched for file " + filename + " at position "+ errPos + + String.format(": expected=%X but computed=%X", expected, computed) + + ", algorithm=" + algorithm.getClass().getSimpleName(), errPos); } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/Crc32PerformanceTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/Crc32PerformanceTest.java new file mode 100644 index 00000000000..d8963df9863 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/Crc32PerformanceTest.java @@ -0,0 +1,350 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +import java.io.PrintStream; +import java.lang.reflect.Constructor; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.Random; +import java.util.zip.CRC32; +import java.util.zip.Checksum; + +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.fs.ChecksumException; +import org.apache.log4j.Level; + +/** + * Performance tests to compare performance of Crc32 implementations + * This can be run from the command line with: + * + * java -cp path/to/test/classes:path/to/common/classes \ + * 'org.apache.hadoop.util.Crc32PerformanceTest' + * + * or + * + * hadoop org.apache.hadoop.util.Crc32PerformanceTest + * + * The output is in JIRA table format. + */ +public class Crc32PerformanceTest { + static final int MB = 1024 * 1024; + + static interface Crc32 { + + public void verifyChunked(ByteBuffer data, int bytesPerCrc, ByteBuffer crcs, + String filename, long basePos) throws ChecksumException; + + static final class Native implements Crc32 { + @Override + public void verifyChunked(ByteBuffer data, int bytesPerSum, + ByteBuffer sums, String fileName, long basePos) + throws ChecksumException { + NativeCrc32.verifyChunkedSums(bytesPerSum, DataChecksum.Type.CRC32.id, + sums, data, fileName, basePos); + } + } + + + static abstract class AbstractCrc32 implements Crc32 { + abstract T newAlgorithm(); + + @Override + public void verifyChunked(ByteBuffer data, int bytesPerCrc, + ByteBuffer crcs, String filename, long basePos) + throws ChecksumException { + final Checksum algorithm = newAlgorithm(); + if (data.hasArray() && crcs.hasArray()) { + DataChecksum.verifyChunked(DataChecksum.Type.CRC32, algorithm, + data.array(), data.position(), data.remaining(), bytesPerCrc, + crcs.array(), crcs.position(), filename, basePos); + } else { + DataChecksum.verifyChunked(DataChecksum.Type.CRC32, algorithm, + data, bytesPerCrc, crcs, filename, basePos); + } + } + } + + static final class Zip extends AbstractCrc32 { + @Override + public CRC32 newAlgorithm() { + return new CRC32(); + } + } + + static final class PureJava extends AbstractCrc32 { + @Override + public PureJavaCrc32 newAlgorithm() { + return new PureJavaCrc32(); + } + } + } + + final int dataLengthMB; + final int trials; + final boolean direct; + + final PrintStream out = System.out; + + final List> crcs = new ArrayList<>(); + + Crc32PerformanceTest(final int dataLengthMB, final int trials, + final boolean direct) { + this.dataLengthMB = dataLengthMB; + this.trials = trials; + this.direct = direct; + + crcs.add(Crc32.Zip.class); + crcs.add(Crc32.PureJava.class); + + if (direct && NativeCrc32.isAvailable()) { + crcs.add(Crc32.Native.class); + ((Log4JLogger)LogFactory.getLog(NativeCodeLoader.class)) + .getLogger().setLevel(Level.ALL); + } + } + + void run() throws Exception { + final long startTime = System.nanoTime(); + printSystemProperties(out); + out.println("Data Length = " + dataLengthMB + " MB"); + out.println("Trials = " + trials); + doBench(crcs); + out.printf("Elapsed %.1fs\n", secondsElapsed(startTime)); + } + + public static void main(String args[]) throws Exception { + new Crc32PerformanceTest(64, 5, true).run(); + } + + private static void printCell(String s, int width, PrintStream out) { + final int w = s.length() > width? s.length(): width; + out.printf(" %" + w + "s |", s); + } + + private ByteBuffer allocateByteBuffer(int length) { + return direct? ByteBuffer.allocateDirect(length) + : ByteBuffer.allocate(length); + } + + private ByteBuffer newData() { + final byte[] bytes = new byte[dataLengthMB << 20]; + new Random().nextBytes(bytes); + final ByteBuffer dataBufs = allocateByteBuffer(bytes.length); + dataBufs.mark(); + dataBufs.put(bytes); + dataBufs.reset(); + return dataBufs; + } + + private ByteBuffer computeCrc(ByteBuffer dataBufs, int bytePerCrc) { + final int size = 4 * (dataBufs.remaining() - 1) / bytePerCrc + 1; + final ByteBuffer crcBufs = allocateByteBuffer(size); + final DataChecksum checksum = DataChecksum.newDataChecksum( + DataChecksum.Type.CRC32, bytePerCrc); + checksum.calculateChunkedSums(dataBufs, crcBufs); + return crcBufs; + } + + private void doBench(final List> crcs) + throws Exception { + final ByteBuffer[] dataBufs = new ByteBuffer[16]; + for(int i = 0; i < dataBufs.length; i++) { + dataBufs[i] = newData(); + } + + // Print header + out.printf("\n%s Buffer Performance Table", direct? "Direct": "Non-direct"); + out.printf(" (bpc: byte-per-crc in MB/sec; #T: #Theads)\n"); + + // Warm up implementations to get jit going. + final ByteBuffer[] crc32 = {computeCrc(dataBufs[0], 32)}; + final ByteBuffer[] crc512 = {computeCrc(dataBufs[0], 512)}; + for (Class c : crcs) { + doBench(c, 1, dataBufs, crc32, 32); + doBench(c, 1, dataBufs, crc512, 512); + } + + // Test on a variety of sizes with different number of threads + for (int i = 5; i <= 16; i++) { + doBench(crcs, dataBufs, 1 << i, out); + } + } + + private void doBench(final List> crcs, + final ByteBuffer[] dataBufs, final int bytePerCrc, final PrintStream out) + throws Exception { + final ByteBuffer[] crcBufs = new ByteBuffer[dataBufs.length]; + for(int i = 0; i < crcBufs.length; i++) { + crcBufs[i] = computeCrc(dataBufs[i], bytePerCrc); + } + + final String numBytesStr = " bpc "; + final String numThreadsStr = "#T"; + final String diffStr = "% diff"; + + out.print('|'); + printCell(numBytesStr, 0, out); + printCell(numThreadsStr, 0, out); + for (int i = 0; i < crcs.size(); i++) { + final Class c = crcs.get(i); + out.print('|'); + printCell(c.getSimpleName(), 8, out); + for(int j = 0; j < i; j++) { + printCell(diffStr, diffStr.length(), out); + } + } + out.printf("\n"); + + for(int numThreads = 1; numThreads <= dataBufs.length; numThreads <<= 1) { + out.printf("|"); + printCell(String.valueOf(bytePerCrc), numBytesStr.length(), out); + printCell(String.valueOf(numThreads), numThreadsStr.length(), out); + + final List previous = new ArrayList(); + for(Class c : crcs) { + System.gc(); + + final BenchResult result = doBench(c, numThreads, dataBufs, crcBufs, + bytePerCrc); + printCell(String.format("%9.1f", result.mbps), + c.getSimpleName().length() + 1, out); + + //compare result with previous + for(BenchResult p : previous) { + final double diff = (result.mbps - p.mbps) / p.mbps * 100; + printCell(String.format("%5.1f%%", diff), diffStr.length(), out); + } + previous.add(result); + } + out.printf("\n"); + } + } + + + private BenchResult doBench(Class clazz, + final int numThreads, final ByteBuffer[] dataBufs, + final ByteBuffer[] crcBufs, final int bytePerCrc) + throws Exception { + + final Thread[] threads = new Thread[numThreads]; + final BenchResult[] results = new BenchResult[threads.length]; + + { + final Constructor ctor = clazz.getConstructor(); + + for(int i = 0; i < threads.length; i++) { + final Crc32 crc = ctor.newInstance(); + final long byteProcessed = dataBufs[i].remaining() * trials; + final int index = i; + threads[i] = new Thread() { + @Override + public void run() { + final long startTime = System.nanoTime(); + for (int i = 0; i < trials; i++) { + dataBufs[index].mark(); + crcBufs[index].mark(); + try { + crc.verifyChunked(dataBufs[index], bytePerCrc, crcBufs[index], + crc.getClass().getSimpleName(), dataBufs[index].position()); + } catch (Throwable t) { + results[index] = new BenchResult(t); + return; + } finally { + dataBufs[index].reset(); + crcBufs[index].reset(); + } + } + final double secsElapsed = secondsElapsed(startTime); + results[index] = new BenchResult(byteProcessed/secsElapsed/MB); + } + }; + } + } + + for(Thread t : threads) { + t.start(); + } + for(Thread t : threads) { + t.join(); + } + + double sum = 0; + for(int i = 0; i < results.length; i++) { + sum += results[i].getMbps(); + } + return new BenchResult(sum/results.length); + } + + private static class BenchResult { + /** Speed (MB per second) */ + final double mbps; + final Throwable thrown; + + BenchResult(double mbps) { + this.mbps = mbps; + this.thrown = null; + } + + BenchResult(Throwable e) { + this.mbps = 0; + this.thrown = e; + } + + double getMbps() { + if (thrown != null) { + throw new AssertionError(thrown); + } + return mbps; + } + } + + static double secondsElapsed(final long startTime) { + return (System.nanoTime() - startTime) / 1000000000.0d; + } + + static void printSystemProperties(PrintStream out) { + final String[] names = { + "java.version", + "java.runtime.name", + "java.runtime.version", + "java.vm.version", + "java.vm.vendor", + "java.vm.name", + "java.vm.specification.version", + "java.specification.version", + "os.arch", + "os.name", + "os.version" + }; + int max = 0; + for(String n : names) { + if (n.length() > max) { + max = n.length(); + } + } + + final Properties p = System.getProperties(); + for(String n : names) { + out.printf("%" + max + "s = %s\n", n, p.getProperty(n)); + } + } +} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestDataChecksum.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestDataChecksum.java index 73fd25a7fa2..8841809202f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestDataChecksum.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestDataChecksum.java @@ -197,4 +197,10 @@ public class TestDataChecksum { newBuf.limit(dataBuf.limit()); return newBuf; } + + @Test + public void testCrc32() throws Exception { + new Crc32PerformanceTest(8, 3, true).run(); + new Crc32PerformanceTest(8, 3, false).run(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java index 1f783f6ab60..62dfe790f8a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java @@ -52,6 +52,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.CryptoProtocolVersion; +import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataInputStream; @@ -62,8 +63,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.client.impl.LeaseRenewer; import org.apache.hadoop.hdfs.client.HdfsUtils; +import org.apache.hadoop.hdfs.client.impl.LeaseRenewer; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeID; @@ -929,11 +930,12 @@ public class TestDFSClientRetries { try { dis.read(arr, 0, (int)FILE_LENGTH); fail("Expected ChecksumException not thrown"); - } catch (Exception ex) { + } catch (ChecksumException ex) { GenericTestUtils.assertExceptionContains( - "Checksum error", ex); + "Checksum", ex); } } + client.close(); } finally { cluster.shutdown(); }