HADOOP-16158. DistCp to support checksum validation when copy blocks in parallel (#919)

* DistCp to support checksum validation when copy blocks in parallel

* address review comments

* add checksums comparison test for combine mode

(cherry picked from commit c765584eb2)
This commit is contained in:
KAI XIE 2019-08-19 09:46:31 +08:00 committed by Wei-Chiu Chuang
parent 1b30b3fbe7
commit b3c14d4132
6 changed files with 412 additions and 62 deletions

View File

@ -73,6 +73,7 @@ public class CopyCommitter extends FileOutputCommitter {
private boolean overwrite = false;
private boolean targetPathExists = true;
private boolean ignoreFailures = false;
private boolean skipCrc = false;
private int blocksPerChunk = 0;
/**
@ -87,6 +88,9 @@ public class CopyCommitter extends FileOutputCommitter {
blocksPerChunk = context.getConfiguration().getInt(
DistCpOptionSwitch.BLOCKS_PER_CHUNK.getConfigLabel(), 0);
LOG.debug("blocks per chunk {}", blocksPerChunk);
skipCrc = context.getConfiguration().getBoolean(
DistCpOptionSwitch.SKIP_CRC.getConfigLabel(), false);
LOG.debug("skip CRC is {}", skipCrc);
this.taskAttemptContext = context;
}
@ -247,7 +251,8 @@ public class CopyCommitter extends FileOutputCommitter {
== srcFileStatus.getLen()) {
// This is the last chunk of the splits, consolidate allChunkPaths
try {
concatFileChunks(conf, targetFile, allChunkPaths);
concatFileChunks(conf, srcFileStatus.getPath(), targetFile,
allChunkPaths);
} catch (IOException e) {
// If the concat failed because a chunk file doesn't exist,
// then we assume that the CopyMapper has skipped copying this
@ -603,8 +608,9 @@ public class CopyCommitter extends FileOutputCommitter {
/**
* Concat the passed chunk files into one and rename it the targetFile.
*/
private void concatFileChunks(Configuration conf, Path targetFile,
LinkedList<Path> allChunkPaths) throws IOException {
private void concatFileChunks(Configuration conf, Path sourceFile,
Path targetFile, LinkedList<Path> allChunkPaths)
throws IOException {
if (allChunkPaths.size() == 1) {
return;
}
@ -613,6 +619,7 @@ public class CopyCommitter extends FileOutputCommitter {
+ allChunkPaths.size());
}
FileSystem dstfs = targetFile.getFileSystem(conf);
FileSystem srcfs = sourceFile.getFileSystem(conf);
Path firstChunkFile = allChunkPaths.removeFirst();
Path[] restChunkFiles = new Path[allChunkPaths.size()];
@ -630,6 +637,8 @@ public class CopyCommitter extends FileOutputCommitter {
LOG.debug("concat: result: " + dstfs.getFileStatus(firstChunkFile));
}
rename(dstfs, firstChunkFile, targetFile);
DistCpUtils.compareFileLengthsAndChecksums(
srcfs, sourceFile, null, dstfs, targetFile, skipCrc);
}
/**

View File

@ -143,15 +143,8 @@ public class RetriableFileCopyCommand extends RetriableCommand {
offset, context, fileAttributes, sourceChecksum);
if (!source.isSplit()) {
compareFileLengths(source, targetPath, configuration, bytesRead
+ offset);
}
//At this point, src&dest lengths are same. if length==0, we skip checksum
if ((bytesRead != 0) && (!skipCrc)) {
if (!source.isSplit()) {
compareCheckSums(sourceFS, source.getPath(), sourceChecksum,
targetFS, targetPath);
}
DistCpUtils.compareFileLengthsAndChecksums(sourceFS, sourcePath,
sourceChecksum, targetFS, targetPath, skipCrc);
}
// it's not append or direct write (preferred for s3a) case, thus we first
// write to a temporary file, then rename it to the target path.
@ -216,51 +209,6 @@ public class RetriableFileCopyCommand extends RetriableCommand {
context);
}
private void compareFileLengths(CopyListingFileStatus source, Path target,
Configuration configuration, long targetLen)
throws IOException {
final Path sourcePath = source.getPath();
FileSystem fs = sourcePath.getFileSystem(configuration);
long srcLen = fs.getFileStatus(sourcePath).getLen();
if (srcLen != targetLen)
throw new IOException("Mismatch in length of source:" + sourcePath + " (" + srcLen +
") and target:" + target + " (" + targetLen + ")");
}
private void compareCheckSums(FileSystem sourceFS, Path source,
FileChecksum sourceChecksum, FileSystem targetFS, Path target)
throws IOException {
if (!DistCpUtils.checksumsAreEqual(sourceFS, source, sourceChecksum,
targetFS, target)) {
StringBuilder errorMessage =
new StringBuilder("Checksum mismatch between ")
.append(source).append(" and ").append(target).append(".");
boolean addSkipHint = false;
String srcScheme = sourceFS.getScheme();
String targetScheme = targetFS.getScheme();
if (!srcScheme.equals(targetScheme)
&& !(srcScheme.contains("hdfs") && targetScheme.contains("hdfs"))) {
// the filesystems are different and they aren't both hdfs connectors
errorMessage.append("Source and destination filesystems are of"
+ " different types\n")
.append("Their checksum algorithms may be incompatible");
addSkipHint = true;
} else if (sourceFS.getFileStatus(source).getBlockSize() !=
targetFS.getFileStatus(target).getBlockSize()) {
errorMessage.append(" Source and target differ in block-size.\n")
.append(" Use -pb to preserve block-sizes during copy.");
addSkipHint = true;
}
if (addSkipHint) {
errorMessage.append(" You can skip checksum-checks altogether "
+ " with -skipcrccheck.\n")
.append(" (NOTE: By skipping checksums, one runs the risk of " +
"masking data-corruption during file-transfer.)\n");
}
throw new IOException(errorMessage.toString());
}
}
//If target file exists and unable to delete target - fail
//If target doesn't exist and unable to create parent folder - fail
//If target is successfully deleted and parent exists, if rename fails - fail

View File

@ -583,6 +583,66 @@ public class DistCpUtils {
sourceChecksum.equals(targetChecksum));
}
/**
* Utility to compare file lengths and checksums for source and target.
*
* @param sourceFS FileSystem for the source path.
* @param source The source path.
* @param sourceChecksum The checksum of the source file. If it is null we
* still need to retrieve it through sourceFS.
* @param targetFS FileSystem for the target path.
* @param target The target path.
* @param skipCrc The flag to indicate whether to skip checksums.
* @throws IOException if there's a mismatch in file lengths or checksums.
*/
public static void compareFileLengthsAndChecksums(
FileSystem sourceFS, Path source, FileChecksum sourceChecksum,
FileSystem targetFS, Path target, boolean skipCrc) throws IOException {
long srcLen = sourceFS.getFileStatus(source).getLen();
long tgtLen = targetFS.getFileStatus(target).getLen();
if (srcLen != tgtLen) {
throw new IOException(
"Mismatch in length of source:" + source + " (" + srcLen
+ ") and target:" + target + " (" + tgtLen + ")");
}
//At this point, src & dest lengths are same. if length==0, we skip checksum
if ((srcLen != 0) && (!skipCrc)) {
if (!checksumsAreEqual(sourceFS, source, sourceChecksum,
targetFS, target)) {
StringBuilder errorMessage =
new StringBuilder("Checksum mismatch between ")
.append(source).append(" and ").append(target).append(".");
boolean addSkipHint = false;
String srcScheme = sourceFS.getScheme();
String targetScheme = targetFS.getScheme();
if (!srcScheme.equals(targetScheme)) {
// the filesystems are different and they aren't both hdfs connectors
errorMessage.append("Source and destination filesystems are of"
+ " different types\n")
.append("Their checksum algorithms may be incompatible");
addSkipHint = true;
} else if (sourceFS.getFileStatus(source).getBlockSize() !=
targetFS.getFileStatus(target).getBlockSize()) {
errorMessage.append(" Source and target differ in block-size.\n")
.append(" Use -pb to preserve block-sizes during copy.");
addSkipHint = true;
}
if (addSkipHint) {
errorMessage
.append(" You can choose file-level checksum validation via "
+ "-Ddfs.checksum.combine.mode=COMPOSITE_CRC when block-sizes"
+ " or filesystems are different.")
.append(" Or you can skip checksum-checks altogether "
+ " with -skipcrccheck.\n")
.append(" (NOTE: By skipping checksums, one runs the risk of " +
"masking data-corruption during file-transfer.)\n");
}
throw new IOException(errorMessage.toString());
}
}
}
/*
* Return the Path for a given chunk.
* Used when splitting large file into chunks to copy in parallel.

View File

@ -25,6 +25,9 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
@ -33,6 +36,7 @@ import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.tools.CopyListing;
import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpConstants;
@ -40,6 +44,7 @@ import org.apache.hadoop.tools.DistCpContext;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
import org.apache.hadoop.tools.GlobbedCopyListing;
import org.apache.hadoop.tools.util.DistCpUtils;
import org.apache.hadoop.tools.util.TestDistCpUtils;
import org.apache.hadoop.security.Credentials;
import org.junit.*;
@ -55,13 +60,16 @@ public class TestCopyCommitter {
private static final Random rand = new Random();
private static final long BLOCK_SIZE = 1024;
private static final Credentials CREDENTIALS = new Credentials();
public static final int PORT = 39737;
private static Configuration config;
private static Configuration clusterConfig;
private static MiniDFSCluster cluster;
private Configuration config;
private static Job getJobForClient() throws IOException {
Job job = Job.getInstance(new Configuration());
job.getConfiguration().set("mapred.job.tracker", "localhost:" + PORT);
@ -73,10 +81,17 @@ public class TestCopyCommitter {
@BeforeClass
public static void create() throws IOException {
config = getJobForClient().getConfiguration();
config.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, 0);
cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).format(true)
.build();
clusterConfig = getJobForClient().getConfiguration();
clusterConfig.setLong(
DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, 0);
clusterConfig.setLong(
DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, BLOCK_SIZE);
clusterConfig.setLong(
DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
cluster = new MiniDFSCluster.Builder(clusterConfig)
.numDataNodes(1)
.format(true)
.build();
}
@AfterClass
@ -88,6 +103,7 @@ public class TestCopyCommitter {
@Before
public void createMetaFolder() throws IOException {
config = new Configuration(clusterConfig);
config.set(DistCpConstants.CONF_LABEL_META_FOLDER, "/meta");
Path meta = new Path("/meta");
cluster.getFileSystem().mkdirs(meta);
@ -397,6 +413,141 @@ public class TestCopyCommitter {
}
}
@Test
public void testCommitWithChecksumMismatchAndSkipCrc() throws IOException {
testCommitWithChecksumMismatch(true);
}
@Test
public void testCommitWithChecksumMismatchWithoutSkipCrc()
throws IOException {
testCommitWithChecksumMismatch(false);
}
private void testCommitWithChecksumMismatch(boolean skipCrc)
throws IOException {
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
JobContext jobContext = new JobContextImpl(
taskAttemptContext.getConfiguration(),
taskAttemptContext.getTaskAttemptID().getJobID());
Configuration conf = jobContext.getConfiguration();
String sourceBase;
String targetBase;
FileSystem fs = null;
try {
fs = FileSystem.get(conf);
sourceBase = "/tmp1/" + String.valueOf(rand.nextLong());
targetBase = "/tmp1/" + String.valueOf(rand.nextLong());
int blocksPerChunk = 5;
String srcFilename = "/srcdata";
createSrcAndWorkFilesWithDifferentChecksum(fs, targetBase, sourceBase,
srcFilename, blocksPerChunk);
DistCpOptions options = new DistCpOptions.Builder(
Collections.singletonList(new Path(sourceBase)),
new Path("/out"))
.withBlocksPerChunk(blocksPerChunk)
.withCRC(skipCrc)
.build();
options.appendToConf(conf);
conf.setBoolean(
DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES, false);
DistCpContext context = new DistCpContext(options);
context.setTargetPathExists(false);
CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
Path listingFile = new Path("/tmp1/"
+ String.valueOf(rand.nextLong()));
listing.buildListing(listingFile, context);
conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
OutputCommitter committer = new CopyCommitter(
null, taskAttemptContext);
try {
committer.commitJob(jobContext);
if (!skipCrc) {
Assert.fail("Expected commit to fail");
}
Assert.assertFalse(DistCpUtils.checksumsAreEqual(
fs, new Path(sourceBase + srcFilename), null,
fs, new Path(targetBase + srcFilename)));
} catch(IOException exception) {
if (skipCrc) {
LOG.error("Unexpected exception is found", exception);
throw exception;
}
Throwable cause = exception.getCause();
GenericTestUtils.assertExceptionContains(
"Checksum mismatch", cause);
}
} finally {
TestDistCpUtils.delete(fs, "/tmp1");
TestDistCpUtils.delete(fs, "/meta");
}
}
/**
* Create a source file and its DistCp working files with different checksum
* to test the checksum validation for copying blocks in parallel.
*
* For the ease of construction, it assumes a source file can be broken down
* into 2 working files (or 2 chunks).
*
* So for a source file with length =
* BLOCK_SIZE * blocksPerChunk + BLOCK_SIZE / 2,
* its 1st working file will have length =
* BLOCK_SIZE * blocksPerChunk,
* then the 2nd working file will have length =
* BLOCK_SIZE / 2.
* And the working files are generated with a different seed to mimic
* same length but different checksum scenario.
*
* @param fs the FileSystem
* @param targetBase the path to the working files
* @param sourceBase the path to a source file
* @param filename the filename to copy and work on
* @param blocksPerChunk the blocks per chunk config that enables copying
* blocks in parallel
* @throws IOException when it fails to create files
*/
private void createSrcAndWorkFilesWithDifferentChecksum(FileSystem fs,
String targetBase,
String sourceBase,
String filename,
int blocksPerChunk)
throws IOException {
long srcSeed = System.currentTimeMillis();
long dstSeed = srcSeed + rand.nextLong();
int bufferLen = 128;
short replFactor = 2;
Path srcData = new Path(sourceBase + filename);
// create data with 2 chunks: the 2nd chunk has half of the block size
long firstChunkLength = BLOCK_SIZE * blocksPerChunk;
long secondChunkLength = BLOCK_SIZE / 2;
DFSTestUtil.createFile(fs, srcData,
bufferLen, firstChunkLength, BLOCK_SIZE, replFactor,
srcSeed);
DFSTestUtil.appendFileNewBlock((DistributedFileSystem) fs, srcData,
(int) secondChunkLength);
DFSTestUtil.createFile(fs, new Path(targetBase
+ filename + ".____distcpSplit____0."
+ firstChunkLength), bufferLen,
firstChunkLength, BLOCK_SIZE, replFactor, dstSeed);
DFSTestUtil.createFile(fs, new Path(targetBase
+ filename + ".____distcpSplit____"
+ firstChunkLength + "." + secondChunkLength), bufferLen,
secondChunkLength, BLOCK_SIZE, replFactor, dstSeed);
}
private TaskAttemptContext getTaskAttemptContext(Configuration conf) {
return new TaskAttemptContextImpl(conf,
new TaskAttemptID("200707121733", 1, TaskType.MAP, 1, 1));

View File

@ -28,10 +28,12 @@ import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.tools.ECAdmin;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpOptionSwitch;
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
@ -1205,6 +1207,71 @@ public class TestDistCpUtils {
Assert.assertFalse(srcStatus.getReplication() == f2Status.getReplication());
}
@Test
public void testCompareFileLengthsAndChecksums() throws IOException {
String base = "/tmp/verify-checksum/";
long srcSeed = System.currentTimeMillis();
long dstSeed = srcSeed + rand.nextLong();
short replFactor = 2;
FileSystem fs = FileSystem.get(config);
Path basePath = new Path(base);
fs.mkdirs(basePath);
// empty lengths comparison
Path srcWithLen0 = new Path(base + "srcLen0");
Path dstWithLen0 = new Path(base + "dstLen0");
fs.create(srcWithLen0).close();
fs.create(dstWithLen0).close();
DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithLen0,
null, fs, dstWithLen0, false);
// different lengths comparison
Path srcWithLen1 = new Path(base + "srcLen1");
Path dstWithLen2 = new Path(base + "dstLen2");
DFSTestUtil.createFile(fs, srcWithLen1, 1, replFactor, srcSeed);
DFSTestUtil.createFile(fs, dstWithLen2, 2, replFactor, srcSeed);
try {
DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithLen1,
null, fs, dstWithLen2, false);
Assert.fail("Expected different lengths comparison to fail!");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains(
"Mismatch in length", e);
}
// checksums matched
Path srcWithChecksum1 = new Path(base + "srcChecksum1");
Path dstWithChecksum1 = new Path(base + "dstChecksum1");
DFSTestUtil.createFile(fs, srcWithChecksum1, 1024,
replFactor, srcSeed);
DFSTestUtil.createFile(fs, dstWithChecksum1, 1024,
replFactor, srcSeed);
DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithChecksum1,
null, fs, dstWithChecksum1, false);
DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithChecksum1,
fs.getFileChecksum(srcWithChecksum1), fs, dstWithChecksum1,
false);
// checksums mismatched
Path dstWithChecksum2 = new Path(base + "dstChecksum2");
DFSTestUtil.createFile(fs, dstWithChecksum2, 1024,
replFactor, dstSeed);
try {
DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithChecksum1,
null, fs, dstWithChecksum2, false);
Assert.fail("Expected different checksums comparison to fail!");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains(
"Checksum mismatch", e);
}
// checksums mismatched but skipped
DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithChecksum1,
null, fs, dstWithChecksum2, true);
}
private static Random rand = new Random();
public static String createTestSetup(FileSystem fs) throws IOException {

View File

@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.util;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* Test length and checksums comparison with checksum combine mode.
* When the combine mode is COMPOSITE_CRC, it should tolerate different file
* systems and different block sizes.
*/
public class TestDistCpUtilsWithCombineMode {
private static final Logger LOG = LoggerFactory.getLogger(TestDistCpUtils.class);
private Configuration config;
private MiniDFSCluster cluster;
@Rule
public TestName testName = new TestName();
@Before
public void create() throws IOException {
config = new Configuration();
if (testName.getMethodName().contains("WithCombineMode")) {
config.set("dfs.checksum.combine.mode", "COMPOSITE_CRC");
}
config.setLong(
DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 512);
cluster = new MiniDFSCluster.Builder(config)
.numDataNodes(2)
.format(true)
.build();
}
@After
public void destroy() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testChecksumsComparisonWithCombineMode() throws IOException {
try {
compareSameContentButDiffBlockSizes();
} catch (IOException e) {
LOG.error("Unexpected exception is found", e);
throw e;
}
}
@Test
public void testChecksumsComparisonWithoutCombineMode() {
try {
compareSameContentButDiffBlockSizes();
Assert.fail("Expected comparison to fail");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains(
"Checksum mismatch", e);
}
}
private void compareSameContentButDiffBlockSizes() throws IOException {
String base = "/tmp/verify-checksum-" + testName.getMethodName() + "/";
long seed = System.currentTimeMillis();
short rf = 2;
FileSystem fs = FileSystem.get(config);
Path basePath = new Path(base);
fs.mkdirs(basePath);
// create 2 files of same content but different block-sizes
Path src = new Path(base + "src");
Path dst = new Path(base + "dst");
DFSTestUtil.createFile(fs, src, 256, 1024, 512,
rf, seed);
DFSTestUtil.createFile(fs, dst, 256, 1024, 1024,
rf, seed);
// then compare
DistCpUtils.compareFileLengthsAndChecksums(fs, src,
null, fs, dst, false);
}
}