diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 35e37e9540a..d38967986d0 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -857,6 +857,9 @@ Release 0.23.8 - UNRELEASED IMPROVEMENTS + MAPREDUCE-5065. DistCp should skip checksum comparisons if block-sizes + are different on source/target (Mithun Radhakrishnan via kihwal) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java index ccd2eab34ae..87fb2d4511e 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java @@ -140,10 +140,17 @@ public class RetriableFileCopyCommand extends RetriableCommand { private void compareCheckSums(FileSystem sourceFS, Path source, FileSystem targetFS, Path target) throws IOException { - if (!DistCpUtils.checksumsAreEqual(sourceFS, source, targetFS, target)) - throw new IOException("Check-sum mismatch between " - + source + " and " + target); - + if (!DistCpUtils.checksumsAreEqual(sourceFS, source, targetFS, target)) { + StringBuilder errorMessage = new StringBuilder("Check-sum mismatch between ") + .append(source).append(" and ").append(target).append("."); + if (sourceFS.getFileStatus(source).getBlockSize() != targetFS.getFileStatus(target).getBlockSize()) { + errorMessage.append(" Source and target differ in block-size.") + .append(" Use -pb to preserve block-sizes during copy.") + .append(" Alternatively, skip checksum-checks altogether, using -skipCrc.") + .append(" (NOTE: By skipping checksums, one runs the risk of masking data-corruption during file-transfer.)"); + } + throw new IOException(errorMessage.toString()); + } } //If target file exists and unable to delete target - fail diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java index f9c2371ef23..b4e2d4f5b2e 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java @@ -53,7 +53,7 @@ public class TestCopyMapper { private static final Log LOG = LogFactory.getLog(TestCopyMapper.class); private static List pathList = new ArrayList(); private static int nFiles = 0; - private static final int FILE_SIZE = 1024; + private static final int DEFAULT_FILE_SIZE = 1024; private static MiniDFSCluster cluster; @@ -92,7 +92,7 @@ public class TestCopyMapper { configuration.setBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(), false); configuration.setBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(), - true); + false); configuration.setBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), true); configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(), @@ -112,6 +112,18 @@ public class TestCopyMapper { touchFile(SOURCE_PATH + "/7/8/9"); } + private static void createSourceDataWithDifferentBlockSize() throws Exception { + mkdirs(SOURCE_PATH + "/1"); + mkdirs(SOURCE_PATH + "/2"); + mkdirs(SOURCE_PATH + "/2/3/4"); + mkdirs(SOURCE_PATH + "/2/3"); + mkdirs(SOURCE_PATH + "/5"); + touchFile(SOURCE_PATH + "/5/6", true); + mkdirs(SOURCE_PATH + "/7"); + mkdirs(SOURCE_PATH + "/7/8"); + touchFile(SOURCE_PATH + "/7/8/9"); + } + private static void mkdirs(String path) throws Exception { FileSystem fileSystem = cluster.getFileSystem(); final Path qualifiedPath = new Path(path).makeQualified(fileSystem.getUri(), @@ -121,17 +133,31 @@ public class TestCopyMapper { } private static void touchFile(String path) throws Exception { + touchFile(path, false); + } + + private static void touchFile(String path, boolean createMultipleBlocks) throws Exception { + final long NON_DEFAULT_BLOCK_SIZE = 4096; FileSystem fs; DataOutputStream outputStream = null; try { fs = cluster.getFileSystem(); final Path qualifiedPath = new Path(path).makeQualified(fs.getUri(), fs.getWorkingDirectory()); - final long blockSize = fs.getDefaultBlockSize(qualifiedPath) * 2; + final long blockSize = createMultipleBlocks? NON_DEFAULT_BLOCK_SIZE : fs.getDefaultBlockSize(qualifiedPath) * 2; outputStream = fs.create(qualifiedPath, true, 0, (short)(fs.getDefaultReplication(qualifiedPath)*2), blockSize); - outputStream.write(new byte[FILE_SIZE]); + byte[] bytes = new byte[DEFAULT_FILE_SIZE]; + outputStream.write(bytes); + long fileSize = DEFAULT_FILE_SIZE; + if (createMultipleBlocks) { + while (fileSize < 2*blockSize) { + outputStream.write(bytes); + outputStream.flush(); + fileSize += DEFAULT_FILE_SIZE; + } + } pathList.add(qualifiedPath); ++nFiles; @@ -144,7 +170,7 @@ public class TestCopyMapper { } } - @Test + @Test(timeout=40000) public void testRun() { try { deleteState(); @@ -179,7 +205,7 @@ public class TestCopyMapper { Assert.assertEquals(pathList.size(), stubContext.getReporter().getCounter(CopyMapper.Counter.COPY).getValue()); - Assert.assertEquals(nFiles * FILE_SIZE, + Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE, stubContext.getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED).getValue()); testCopyingExistingFiles(fs, copyMapper, context); @@ -211,7 +237,7 @@ public class TestCopyMapper { } } - @Test + @Test(timeout=40000) public void testMakeDirFailure() { try { deleteState(); @@ -239,13 +265,13 @@ public class TestCopyMapper { } } - @Test + @Test(timeout=40000) public void testIgnoreFailures() { doTestIgnoreFailures(true); doTestIgnoreFailures(false); } - @Test + @Test(timeout=40000) public void testDirToFile() { try { deleteState(); @@ -273,7 +299,7 @@ public class TestCopyMapper { } } - @Test + @Test(timeout=40000) public void testPreserve() { try { deleteState(); @@ -343,7 +369,7 @@ public class TestCopyMapper { } } - @Test + @Test(timeout=40000) public void testCopyReadableFiles() { try { deleteState(); @@ -406,7 +432,7 @@ public class TestCopyMapper { } } - @Test + @Test(timeout=40000) public void testSkipCopyNoPerms() { try { deleteState(); @@ -480,7 +506,7 @@ public class TestCopyMapper { } } - @Test + @Test(timeout=40000) public void testFailCopyWithAccessControlException() { try { deleteState(); @@ -563,7 +589,7 @@ public class TestCopyMapper { } } - @Test + @Test(timeout=40000) public void testFileToDir() { try { deleteState(); @@ -640,12 +666,48 @@ public class TestCopyMapper { cluster.getFileSystem().delete(new Path(TARGET_PATH), true); } - @Test + @Test(timeout=40000) public void testPreserveBlockSizeAndReplication() { testPreserveBlockSizeAndReplicationImpl(true); testPreserveBlockSizeAndReplicationImpl(false); } + @Test(timeout=40000) + public void testCopyFailOnBlockSizeDifference() { + try { + + deleteState(); + createSourceDataWithDifferentBlockSize(); + + FileSystem fs = cluster.getFileSystem(); + CopyMapper copyMapper = new CopyMapper(); + StubContext stubContext = new StubContext(getConfiguration(), null, 0); + Mapper.Context context + = stubContext.getContext(); + + Configuration configuration = context.getConfiguration(); + EnumSet fileAttributes + = EnumSet.noneOf(DistCpOptions.FileAttribute.class); + configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(), + DistCpUtils.packAttributes(fileAttributes)); + + copyMapper.setup(context); + + for (Path path : pathList) { + final FileStatus fileStatus = fs.getFileStatus(path); + copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)), + fileStatus, context); + } + + Assert.fail("Copy should have failed because of block-size difference."); + } + catch (Exception exception) { + // Check that the exception suggests the use of -pb/-skipCrc. + Assert.assertTrue("Failure exception should have suggested the use of -pb.", exception.getCause().getCause().getMessage().contains("pb")); + Assert.assertTrue("Failure exception should have suggested the use of -skipCrc.", exception.getCause().getCause().getMessage().contains("skipCrc")); + } + } + private void testPreserveBlockSizeAndReplicationImpl(boolean preserve){ try { @@ -717,7 +779,7 @@ public class TestCopyMapper { * If a single file is being copied to a location where the file (of the same * name) already exists, then the file shouldn't be skipped. */ - @Test + @Test(timeout=40000) public void testSingleFileCopy() { try { deleteState(); @@ -766,7 +828,7 @@ public class TestCopyMapper { } } - @Test + @Test(timeout=40000) public void testPreserveUserGroup() { testPreserveUserGroupImpl(true); testPreserveUserGroupImpl(false);