From b3c14d4132ed6aa871bb88c4f84f3e3d90da6f93 Mon Sep 17 00:00:00 2001 From: KAI XIE Date: Mon, 19 Aug 2019 09:46:31 +0800 Subject: [PATCH] HADOOP-16158. DistCp to support checksum validation when copy blocks in parallel (#919) * DistCp to support checksum validation when copy blocks in parallel * address review comments * add checksums comparison test for combine mode (cherry picked from commit c765584eb231f8482f5b90b7e8f61f9f7a931d09) --- .../hadoop/tools/mapred/CopyCommitter.java | 15 +- .../mapred/RetriableFileCopyCommand.java | 56 +----- .../apache/hadoop/tools/util/DistCpUtils.java | 60 +++++++ .../tools/mapred/TestCopyCommitter.java | 161 +++++++++++++++++- .../hadoop/tools/util/TestDistCpUtils.java | 67 ++++++++ .../util/TestDistCpUtilsWithCombineMode.java | 115 +++++++++++++ 6 files changed, 412 insertions(+), 62 deletions(-) create mode 100644 hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtilsWithCombineMode.java diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java index d7a730dfca9..546062ff8ed 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java @@ -73,6 +73,7 @@ public class CopyCommitter extends FileOutputCommitter { private boolean overwrite = false; private boolean targetPathExists = true; private boolean ignoreFailures = false; + private boolean skipCrc = false; private int blocksPerChunk = 0; /** @@ -87,6 +88,9 @@ public class CopyCommitter extends FileOutputCommitter { blocksPerChunk = context.getConfiguration().getInt( DistCpOptionSwitch.BLOCKS_PER_CHUNK.getConfigLabel(), 0); LOG.debug("blocks per chunk {}", blocksPerChunk); + skipCrc = context.getConfiguration().getBoolean( + DistCpOptionSwitch.SKIP_CRC.getConfigLabel(), false); + LOG.debug("skip CRC is {}", skipCrc); this.taskAttemptContext = context; } @@ -247,7 +251,8 @@ public class CopyCommitter extends FileOutputCommitter { == srcFileStatus.getLen()) { // This is the last chunk of the splits, consolidate allChunkPaths try { - concatFileChunks(conf, targetFile, allChunkPaths); + concatFileChunks(conf, srcFileStatus.getPath(), targetFile, + allChunkPaths); } catch (IOException e) { // If the concat failed because a chunk file doesn't exist, // then we assume that the CopyMapper has skipped copying this @@ -603,8 +608,9 @@ public class CopyCommitter extends FileOutputCommitter { /** * Concat the passed chunk files into one and rename it the targetFile. */ - private void concatFileChunks(Configuration conf, Path targetFile, - LinkedList allChunkPaths) throws IOException { + private void concatFileChunks(Configuration conf, Path sourceFile, + Path targetFile, LinkedList allChunkPaths) + throws IOException { if (allChunkPaths.size() == 1) { return; } @@ -613,6 +619,7 @@ public class CopyCommitter extends FileOutputCommitter { + allChunkPaths.size()); } FileSystem dstfs = targetFile.getFileSystem(conf); + FileSystem srcfs = sourceFile.getFileSystem(conf); Path firstChunkFile = allChunkPaths.removeFirst(); Path[] restChunkFiles = new Path[allChunkPaths.size()]; @@ -630,6 +637,8 @@ public class CopyCommitter extends FileOutputCommitter { LOG.debug("concat: result: " + dstfs.getFileStatus(firstChunkFile)); } rename(dstfs, firstChunkFile, targetFile); + DistCpUtils.compareFileLengthsAndChecksums( + srcfs, sourceFile, null, dstfs, targetFile, skipCrc); } /** diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java index db21f64d72a..fa9193077ac 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java @@ -143,15 +143,8 @@ public class RetriableFileCopyCommand extends RetriableCommand { offset, context, fileAttributes, sourceChecksum); if (!source.isSplit()) { - compareFileLengths(source, targetPath, configuration, bytesRead - + offset); - } - //At this point, src&dest lengths are same. if length==0, we skip checksum - if ((bytesRead != 0) && (!skipCrc)) { - if (!source.isSplit()) { - compareCheckSums(sourceFS, source.getPath(), sourceChecksum, - targetFS, targetPath); - } + DistCpUtils.compareFileLengthsAndChecksums(sourceFS, sourcePath, + sourceChecksum, targetFS, targetPath, skipCrc); } // it's not append or direct write (preferred for s3a) case, thus we first // write to a temporary file, then rename it to the target path. @@ -216,51 +209,6 @@ public class RetriableFileCopyCommand extends RetriableCommand { context); } - private void compareFileLengths(CopyListingFileStatus source, Path target, - Configuration configuration, long targetLen) - throws IOException { - final Path sourcePath = source.getPath(); - FileSystem fs = sourcePath.getFileSystem(configuration); - long srcLen = fs.getFileStatus(sourcePath).getLen(); - if (srcLen != targetLen) - throw new IOException("Mismatch in length of source:" + sourcePath + " (" + srcLen + - ") and target:" + target + " (" + targetLen + ")"); - } - - private void compareCheckSums(FileSystem sourceFS, Path source, - FileChecksum sourceChecksum, FileSystem targetFS, Path target) - throws IOException { - if (!DistCpUtils.checksumsAreEqual(sourceFS, source, sourceChecksum, - targetFS, target)) { - StringBuilder errorMessage = - new StringBuilder("Checksum mismatch between ") - .append(source).append(" and ").append(target).append("."); - boolean addSkipHint = false; - String srcScheme = sourceFS.getScheme(); - String targetScheme = targetFS.getScheme(); - if (!srcScheme.equals(targetScheme) - && !(srcScheme.contains("hdfs") && targetScheme.contains("hdfs"))) { - // the filesystems are different and they aren't both hdfs connectors - errorMessage.append("Source and destination filesystems are of" - + " different types\n") - .append("Their checksum algorithms may be incompatible"); - addSkipHint = true; - } else if (sourceFS.getFileStatus(source).getBlockSize() != - targetFS.getFileStatus(target).getBlockSize()) { - errorMessage.append(" Source and target differ in block-size.\n") - .append(" Use -pb to preserve block-sizes during copy."); - addSkipHint = true; - } - if (addSkipHint) { - errorMessage.append(" You can skip checksum-checks altogether " - + " with -skipcrccheck.\n") - .append(" (NOTE: By skipping checksums, one runs the risk of " + - "masking data-corruption during file-transfer.)\n"); - } - throw new IOException(errorMessage.toString()); - } - } - //If target file exists and unable to delete target - fail //If target doesn't exist and unable to create parent folder - fail //If target is successfully deleted and parent exists, if rename fails - fail diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java index 96a7c5de5bc..3ba9802f888 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java @@ -583,6 +583,66 @@ public class DistCpUtils { sourceChecksum.equals(targetChecksum)); } + /** + * Utility to compare file lengths and checksums for source and target. + * + * @param sourceFS FileSystem for the source path. + * @param source The source path. + * @param sourceChecksum The checksum of the source file. If it is null we + * still need to retrieve it through sourceFS. + * @param targetFS FileSystem for the target path. + * @param target The target path. + * @param skipCrc The flag to indicate whether to skip checksums. + * @throws IOException if there's a mismatch in file lengths or checksums. + */ + public static void compareFileLengthsAndChecksums( + FileSystem sourceFS, Path source, FileChecksum sourceChecksum, + FileSystem targetFS, Path target, boolean skipCrc) throws IOException { + long srcLen = sourceFS.getFileStatus(source).getLen(); + long tgtLen = targetFS.getFileStatus(target).getLen(); + if (srcLen != tgtLen) { + throw new IOException( + "Mismatch in length of source:" + source + " (" + srcLen + + ") and target:" + target + " (" + tgtLen + ")"); + } + + //At this point, src & dest lengths are same. if length==0, we skip checksum + if ((srcLen != 0) && (!skipCrc)) { + if (!checksumsAreEqual(sourceFS, source, sourceChecksum, + targetFS, target)) { + StringBuilder errorMessage = + new StringBuilder("Checksum mismatch between ") + .append(source).append(" and ").append(target).append("."); + boolean addSkipHint = false; + String srcScheme = sourceFS.getScheme(); + String targetScheme = targetFS.getScheme(); + if (!srcScheme.equals(targetScheme)) { + // the filesystems are different and they aren't both hdfs connectors + errorMessage.append("Source and destination filesystems are of" + + " different types\n") + .append("Their checksum algorithms may be incompatible"); + addSkipHint = true; + } else if (sourceFS.getFileStatus(source).getBlockSize() != + targetFS.getFileStatus(target).getBlockSize()) { + errorMessage.append(" Source and target differ in block-size.\n") + .append(" Use -pb to preserve block-sizes during copy."); + addSkipHint = true; + } + if (addSkipHint) { + errorMessage + .append(" You can choose file-level checksum validation via " + + "-Ddfs.checksum.combine.mode=COMPOSITE_CRC when block-sizes" + + " or filesystems are different.") + .append(" Or you can skip checksum-checks altogether " + + " with -skipcrccheck.\n") + .append(" (NOTE: By skipping checksums, one runs the risk of " + + "masking data-corruption during file-transfer.)\n"); + } + throw new IOException(errorMessage.toString()); + } + } + } + /* * Return the Path for a given chunk. * Used when splitting large file into chunks to copy in parallel. diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java index 2ef89e5804b..f4566a6e554 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java @@ -25,6 +25,9 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; @@ -33,6 +36,7 @@ import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.tools.CopyListing; import org.apache.hadoop.tools.CopyListingFileStatus; import org.apache.hadoop.tools.DistCpConstants; @@ -40,6 +44,7 @@ import org.apache.hadoop.tools.DistCpContext; import org.apache.hadoop.tools.DistCpOptions; import org.apache.hadoop.tools.DistCpOptions.FileAttribute; import org.apache.hadoop.tools.GlobbedCopyListing; +import org.apache.hadoop.tools.util.DistCpUtils; import org.apache.hadoop.tools.util.TestDistCpUtils; import org.apache.hadoop.security.Credentials; import org.junit.*; @@ -55,13 +60,16 @@ public class TestCopyCommitter { private static final Random rand = new Random(); + private static final long BLOCK_SIZE = 1024; private static final Credentials CREDENTIALS = new Credentials(); public static final int PORT = 39737; - private static Configuration config; + private static Configuration clusterConfig; private static MiniDFSCluster cluster; + private Configuration config; + private static Job getJobForClient() throws IOException { Job job = Job.getInstance(new Configuration()); job.getConfiguration().set("mapred.job.tracker", "localhost:" + PORT); @@ -73,10 +81,17 @@ public class TestCopyCommitter { @BeforeClass public static void create() throws IOException { - config = getJobForClient().getConfiguration(); - config.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, 0); - cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).format(true) - .build(); + clusterConfig = getJobForClient().getConfiguration(); + clusterConfig.setLong( + DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, 0); + clusterConfig.setLong( + DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, BLOCK_SIZE); + clusterConfig.setLong( + DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + cluster = new MiniDFSCluster.Builder(clusterConfig) + .numDataNodes(1) + .format(true) + .build(); } @AfterClass @@ -88,6 +103,7 @@ public class TestCopyCommitter { @Before public void createMetaFolder() throws IOException { + config = new Configuration(clusterConfig); config.set(DistCpConstants.CONF_LABEL_META_FOLDER, "/meta"); Path meta = new Path("/meta"); cluster.getFileSystem().mkdirs(meta); @@ -397,6 +413,141 @@ public class TestCopyCommitter { } } + @Test + public void testCommitWithChecksumMismatchAndSkipCrc() throws IOException { + testCommitWithChecksumMismatch(true); + } + + @Test + public void testCommitWithChecksumMismatchWithoutSkipCrc() + throws IOException { + testCommitWithChecksumMismatch(false); + } + + private void testCommitWithChecksumMismatch(boolean skipCrc) + throws IOException { + + TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); + JobContext jobContext = new JobContextImpl( + taskAttemptContext.getConfiguration(), + taskAttemptContext.getTaskAttemptID().getJobID()); + Configuration conf = jobContext.getConfiguration(); + + String sourceBase; + String targetBase; + FileSystem fs = null; + try { + fs = FileSystem.get(conf); + sourceBase = "/tmp1/" + String.valueOf(rand.nextLong()); + targetBase = "/tmp1/" + String.valueOf(rand.nextLong()); + + int blocksPerChunk = 5; + String srcFilename = "/srcdata"; + createSrcAndWorkFilesWithDifferentChecksum(fs, targetBase, sourceBase, + srcFilename, blocksPerChunk); + + DistCpOptions options = new DistCpOptions.Builder( + Collections.singletonList(new Path(sourceBase)), + new Path("/out")) + .withBlocksPerChunk(blocksPerChunk) + .withCRC(skipCrc) + .build(); + options.appendToConf(conf); + conf.setBoolean( + DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES, false); + DistCpContext context = new DistCpContext(options); + context.setTargetPathExists(false); + + CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS); + Path listingFile = new Path("/tmp1/" + + String.valueOf(rand.nextLong())); + listing.buildListing(listingFile, context); + + conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase); + conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase); + + OutputCommitter committer = new CopyCommitter( + null, taskAttemptContext); + try { + committer.commitJob(jobContext); + if (!skipCrc) { + Assert.fail("Expected commit to fail"); + } + Assert.assertFalse(DistCpUtils.checksumsAreEqual( + fs, new Path(sourceBase + srcFilename), null, + fs, new Path(targetBase + srcFilename))); + } catch(IOException exception) { + if (skipCrc) { + LOG.error("Unexpected exception is found", exception); + throw exception; + } + Throwable cause = exception.getCause(); + GenericTestUtils.assertExceptionContains( + "Checksum mismatch", cause); + } + } finally { + TestDistCpUtils.delete(fs, "/tmp1"); + TestDistCpUtils.delete(fs, "/meta"); + } + } + + /** + * Create a source file and its DistCp working files with different checksum + * to test the checksum validation for copying blocks in parallel. + * + * For the ease of construction, it assumes a source file can be broken down + * into 2 working files (or 2 chunks). + * + * So for a source file with length = + * BLOCK_SIZE * blocksPerChunk + BLOCK_SIZE / 2, + * its 1st working file will have length = + * BLOCK_SIZE * blocksPerChunk, + * then the 2nd working file will have length = + * BLOCK_SIZE / 2. + * And the working files are generated with a different seed to mimic + * same length but different checksum scenario. + * + * @param fs the FileSystem + * @param targetBase the path to the working files + * @param sourceBase the path to a source file + * @param filename the filename to copy and work on + * @param blocksPerChunk the blocks per chunk config that enables copying + * blocks in parallel + * @throws IOException when it fails to create files + */ + private void createSrcAndWorkFilesWithDifferentChecksum(FileSystem fs, + String targetBase, + String sourceBase, + String filename, + int blocksPerChunk) + throws IOException { + + long srcSeed = System.currentTimeMillis(); + long dstSeed = srcSeed + rand.nextLong(); + int bufferLen = 128; + short replFactor = 2; + Path srcData = new Path(sourceBase + filename); + + // create data with 2 chunks: the 2nd chunk has half of the block size + long firstChunkLength = BLOCK_SIZE * blocksPerChunk; + long secondChunkLength = BLOCK_SIZE / 2; + + DFSTestUtil.createFile(fs, srcData, + bufferLen, firstChunkLength, BLOCK_SIZE, replFactor, + srcSeed); + DFSTestUtil.appendFileNewBlock((DistributedFileSystem) fs, srcData, + (int) secondChunkLength); + + DFSTestUtil.createFile(fs, new Path(targetBase + + filename + ".____distcpSplit____0." + + firstChunkLength), bufferLen, + firstChunkLength, BLOCK_SIZE, replFactor, dstSeed); + DFSTestUtil.createFile(fs, new Path(targetBase + + filename + ".____distcpSplit____" + + firstChunkLength + "." + secondChunkLength), bufferLen, + secondChunkLength, BLOCK_SIZE, replFactor, dstSeed); + } + private TaskAttemptContext getTaskAttemptContext(Configuration conf) { return new TaskAttemptContextImpl(conf, new TaskAttemptID("200707121733", 1, TaskType.MAP, 1, 1)); diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java index 304e41c2819..5cf184052b7 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java @@ -28,10 +28,12 @@ import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.namenode.INodeFile; import org.apache.hadoop.hdfs.tools.ECAdmin; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.tools.CopyListingFileStatus; import org.apache.hadoop.tools.DistCpOptionSwitch; import org.apache.hadoop.tools.DistCpOptions.FileAttribute; @@ -1205,6 +1207,71 @@ public class TestDistCpUtils { Assert.assertFalse(srcStatus.getReplication() == f2Status.getReplication()); } + @Test + public void testCompareFileLengthsAndChecksums() throws IOException { + + String base = "/tmp/verify-checksum/"; + long srcSeed = System.currentTimeMillis(); + long dstSeed = srcSeed + rand.nextLong(); + short replFactor = 2; + + FileSystem fs = FileSystem.get(config); + Path basePath = new Path(base); + fs.mkdirs(basePath); + + // empty lengths comparison + Path srcWithLen0 = new Path(base + "srcLen0"); + Path dstWithLen0 = new Path(base + "dstLen0"); + fs.create(srcWithLen0).close(); + fs.create(dstWithLen0).close(); + DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithLen0, + null, fs, dstWithLen0, false); + + // different lengths comparison + Path srcWithLen1 = new Path(base + "srcLen1"); + Path dstWithLen2 = new Path(base + "dstLen2"); + DFSTestUtil.createFile(fs, srcWithLen1, 1, replFactor, srcSeed); + DFSTestUtil.createFile(fs, dstWithLen2, 2, replFactor, srcSeed); + try { + DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithLen1, + null, fs, dstWithLen2, false); + Assert.fail("Expected different lengths comparison to fail!"); + } catch (IOException e) { + GenericTestUtils.assertExceptionContains( + "Mismatch in length", e); + } + + // checksums matched + Path srcWithChecksum1 = new Path(base + "srcChecksum1"); + Path dstWithChecksum1 = new Path(base + "dstChecksum1"); + DFSTestUtil.createFile(fs, srcWithChecksum1, 1024, + replFactor, srcSeed); + DFSTestUtil.createFile(fs, dstWithChecksum1, 1024, + replFactor, srcSeed); + DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithChecksum1, + null, fs, dstWithChecksum1, false); + DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithChecksum1, + fs.getFileChecksum(srcWithChecksum1), fs, dstWithChecksum1, + false); + + // checksums mismatched + Path dstWithChecksum2 = new Path(base + "dstChecksum2"); + DFSTestUtil.createFile(fs, dstWithChecksum2, 1024, + replFactor, dstSeed); + try { + DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithChecksum1, + null, fs, dstWithChecksum2, false); + Assert.fail("Expected different checksums comparison to fail!"); + } catch (IOException e) { + GenericTestUtils.assertExceptionContains( + "Checksum mismatch", e); + } + + // checksums mismatched but skipped + DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithChecksum1, + null, fs, dstWithChecksum2, true); + } + private static Random rand = new Random(); public static String createTestSetup(FileSystem fs) throws IOException { diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtilsWithCombineMode.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtilsWithCombineMode.java new file mode 100644 index 00000000000..5d44ab0a321 --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtilsWithCombineMode.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Test length and checksums comparison with checksum combine mode. + * When the combine mode is COMPOSITE_CRC, it should tolerate different file + * systems and different block sizes. + */ +public class TestDistCpUtilsWithCombineMode { + private static final Logger LOG = LoggerFactory.getLogger(TestDistCpUtils.class); + + private Configuration config; + private MiniDFSCluster cluster; + + @Rule + public TestName testName = new TestName(); + + @Before + public void create() throws IOException { + config = new Configuration(); + if (testName.getMethodName().contains("WithCombineMode")) { + config.set("dfs.checksum.combine.mode", "COMPOSITE_CRC"); + } + config.setLong( + DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 512); + cluster = new MiniDFSCluster.Builder(config) + .numDataNodes(2) + .format(true) + .build(); + } + + @After + public void destroy() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testChecksumsComparisonWithCombineMode() throws IOException { + try { + compareSameContentButDiffBlockSizes(); + } catch (IOException e) { + LOG.error("Unexpected exception is found", e); + throw e; + } + } + + @Test + public void testChecksumsComparisonWithoutCombineMode() { + try { + compareSameContentButDiffBlockSizes(); + Assert.fail("Expected comparison to fail"); + } catch (IOException e) { + GenericTestUtils.assertExceptionContains( + "Checksum mismatch", e); + } + } + + private void compareSameContentButDiffBlockSizes() throws IOException { + String base = "/tmp/verify-checksum-" + testName.getMethodName() + "/"; + long seed = System.currentTimeMillis(); + short rf = 2; + + FileSystem fs = FileSystem.get(config); + Path basePath = new Path(base); + fs.mkdirs(basePath); + + // create 2 files of same content but different block-sizes + Path src = new Path(base + "src"); + Path dst = new Path(base + "dst"); + DFSTestUtil.createFile(fs, src, 256, 1024, 512, + rf, seed); + DFSTestUtil.createFile(fs, dst, 256, 1024, 1024, + rf, seed); + // then compare + DistCpUtils.compareFileLengthsAndChecksums(fs, src, + null, fs, dst, false); + } +}