HDFS-13660. DistCp job fails when new data is appended in the file while the DistCp copy job is running
This uses the length of the file known at the start of the copy to determine the amount of data to copy.
* If a file is appended to during the copy, the original bytes are copied.
* If a file is truncated during a copy, or the attempt to read the data fails with a truncated stream,
distcp will now fail. Until now these failures were not detected.
Contributed by Mukund Thakur.
Change-Id: I576a49d951fa48d37a45a7e4c82c47488aa8e884
(cherry picked from commit 51c64b357d
)
This commit is contained in:
parent
9acbbd885c
commit
3937abddbd
|
@ -171,4 +171,10 @@ public final class DistCpConstants {
|
||||||
|
|
||||||
/** Filename of sorted target listing. */
|
/** Filename of sorted target listing. */
|
||||||
public static final String TARGET_SORTED_FILE = "target_sorted.seq";
|
public static final String TARGET_SORTED_FILE = "target_sorted.seq";
|
||||||
|
|
||||||
|
public static final String LENGTH_MISMATCH_ERROR_MSG =
|
||||||
|
"Mismatch in length of source:";
|
||||||
|
|
||||||
|
public static final String CHECKSUM_MISMATCH_ERROR_MSG =
|
||||||
|
"Checksum mismatch between ";
|
||||||
}
|
}
|
||||||
|
|
|
@ -252,7 +252,7 @@ public class CopyCommitter extends FileOutputCommitter {
|
||||||
// This is the last chunk of the splits, consolidate allChunkPaths
|
// This is the last chunk of the splits, consolidate allChunkPaths
|
||||||
try {
|
try {
|
||||||
concatFileChunks(conf, srcFileStatus.getPath(), targetFile,
|
concatFileChunks(conf, srcFileStatus.getPath(), targetFile,
|
||||||
allChunkPaths);
|
allChunkPaths, srcFileStatus);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// If the concat failed because a chunk file doesn't exist,
|
// If the concat failed because a chunk file doesn't exist,
|
||||||
// then we assume that the CopyMapper has skipped copying this
|
// then we assume that the CopyMapper has skipped copying this
|
||||||
|
@ -609,7 +609,8 @@ public class CopyCommitter extends FileOutputCommitter {
|
||||||
* Concat the passed chunk files into one and rename it the targetFile.
|
* Concat the passed chunk files into one and rename it the targetFile.
|
||||||
*/
|
*/
|
||||||
private void concatFileChunks(Configuration conf, Path sourceFile,
|
private void concatFileChunks(Configuration conf, Path sourceFile,
|
||||||
Path targetFile, LinkedList<Path> allChunkPaths)
|
Path targetFile, LinkedList<Path> allChunkPaths,
|
||||||
|
CopyListingFileStatus srcFileStatus)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (allChunkPaths.size() == 1) {
|
if (allChunkPaths.size() == 1) {
|
||||||
return;
|
return;
|
||||||
|
@ -637,8 +638,9 @@ public class CopyCommitter extends FileOutputCommitter {
|
||||||
LOG.debug("concat: result: " + dstfs.getFileStatus(firstChunkFile));
|
LOG.debug("concat: result: " + dstfs.getFileStatus(firstChunkFile));
|
||||||
}
|
}
|
||||||
rename(dstfs, firstChunkFile, targetFile);
|
rename(dstfs, firstChunkFile, targetFile);
|
||||||
DistCpUtils.compareFileLengthsAndChecksums(
|
DistCpUtils.compareFileLengthsAndChecksums(srcFileStatus.getLen(),
|
||||||
srcfs, sourceFile, null, dstfs, targetFile, skipCrc);
|
srcfs, sourceFile, null, dstfs,
|
||||||
|
targetFile, skipCrc, srcFileStatus.getLen());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -139,7 +139,6 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
|
||||||
public void map(Text relPath, CopyListingFileStatus sourceFileStatus,
|
public void map(Text relPath, CopyListingFileStatus sourceFileStatus,
|
||||||
Context context) throws IOException, InterruptedException {
|
Context context) throws IOException, InterruptedException {
|
||||||
Path sourcePath = sourceFileStatus.getPath();
|
Path sourcePath = sourceFileStatus.getPath();
|
||||||
|
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("DistCpMapper::map(): Received " + sourcePath + ", " + relPath);
|
LOG.debug("DistCpMapper::map(): Received " + sourcePath + ", " + relPath);
|
||||||
|
|
||||||
|
@ -354,7 +353,7 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
|
||||||
if (sameLength && sameBlockSize) {
|
if (sameLength && sameBlockSize) {
|
||||||
return skipCrc ||
|
return skipCrc ||
|
||||||
DistCpUtils.checksumsAreEqual(sourceFS, source.getPath(), null,
|
DistCpUtils.checksumsAreEqual(sourceFS, source.getPath(), null,
|
||||||
targetFS, target.getPath());
|
targetFS, target.getPath(), source.getLen());
|
||||||
} else {
|
} else {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -143,8 +143,9 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
||||||
offset, context, fileAttributes, sourceChecksum);
|
offset, context, fileAttributes, sourceChecksum);
|
||||||
|
|
||||||
if (!source.isSplit()) {
|
if (!source.isSplit()) {
|
||||||
DistCpUtils.compareFileLengthsAndChecksums(sourceFS, sourcePath,
|
DistCpUtils.compareFileLengthsAndChecksums(source.getLen(), sourceFS,
|
||||||
sourceChecksum, targetFS, targetPath, skipCrc);
|
sourcePath, sourceChecksum, targetFS,
|
||||||
|
targetPath, skipCrc, source.getLen());
|
||||||
}
|
}
|
||||||
// it's not append or direct write (preferred for s3a) case, thus we first
|
// it's not append or direct write (preferred for s3a) case, thus we first
|
||||||
// write to a temporary file, then rename it to the target path.
|
// write to a temporary file, then rename it to the target path.
|
||||||
|
@ -248,24 +249,27 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
||||||
boolean finished = false;
|
boolean finished = false;
|
||||||
try {
|
try {
|
||||||
inStream = getInputStream(source, context.getConfiguration());
|
inStream = getInputStream(source, context.getConfiguration());
|
||||||
|
long fileLength = source2.getLen();
|
||||||
|
int numBytesToRead = (int) getNumBytesToRead(fileLength, sourceOffset,
|
||||||
|
bufferSize);
|
||||||
seekIfRequired(inStream, sourceOffset);
|
seekIfRequired(inStream, sourceOffset);
|
||||||
int bytesRead = readBytes(inStream, buf);
|
int bytesRead = readBytes(inStream, buf, numBytesToRead);
|
||||||
while (bytesRead >= 0) {
|
while (bytesRead > 0) {
|
||||||
if (chunkLength > 0 &&
|
if (chunkLength > 0 &&
|
||||||
(totalBytesRead + bytesRead) >= chunkLength) {
|
(totalBytesRead + bytesRead) >= chunkLength) {
|
||||||
bytesRead = (int)(chunkLength - totalBytesRead);
|
bytesRead = (int)(chunkLength - totalBytesRead);
|
||||||
finished = true;
|
finished = true;
|
||||||
}
|
}
|
||||||
totalBytesRead += bytesRead;
|
totalBytesRead += bytesRead;
|
||||||
if (action == FileAction.APPEND) {
|
|
||||||
sourceOffset += bytesRead;
|
sourceOffset += bytesRead;
|
||||||
}
|
|
||||||
outStream.write(buf, 0, bytesRead);
|
outStream.write(buf, 0, bytesRead);
|
||||||
updateContextStatus(totalBytesRead, context, source2);
|
updateContextStatus(totalBytesRead, context, source2);
|
||||||
if (finished) {
|
if (finished) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
bytesRead = readBytes(inStream, buf);
|
numBytesToRead = (int) getNumBytesToRead(fileLength, sourceOffset,
|
||||||
|
bufferSize);
|
||||||
|
bytesRead = readBytes(inStream, buf, numBytesToRead);
|
||||||
}
|
}
|
||||||
outStream.close();
|
outStream.close();
|
||||||
outStream = null;
|
outStream = null;
|
||||||
|
@ -275,6 +279,15 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
||||||
return totalBytesRead;
|
return totalBytesRead;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
long getNumBytesToRead(long fileLength, long position, long bufLength) {
|
||||||
|
if (position + bufLength < fileLength) {
|
||||||
|
return bufLength;
|
||||||
|
} else {
|
||||||
|
return fileLength - position;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void updateContextStatus(long totalBytesRead, Mapper.Context context,
|
private void updateContextStatus(long totalBytesRead, Mapper.Context context,
|
||||||
CopyListingFileStatus source2) {
|
CopyListingFileStatus source2) {
|
||||||
StringBuilder message = new StringBuilder(DistCpUtils.getFormatter()
|
StringBuilder message = new StringBuilder(DistCpUtils.getFormatter()
|
||||||
|
@ -288,10 +301,11 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
||||||
context.setStatus(message.toString());
|
context.setStatus(message.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static int readBytes(ThrottledInputStream inStream, byte buf[])
|
private static int readBytes(ThrottledInputStream inStream, byte[] buf,
|
||||||
|
int numBytes)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try {
|
try {
|
||||||
return inStream.read(buf);
|
return inStream.read(buf, 0, numBytes);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new CopyReadException(e);
|
throw new CopyReadException(e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.io.SequenceFile;
|
import org.apache.hadoop.io.SequenceFile;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapreduce.InputFormat;
|
import org.apache.hadoop.mapreduce.InputFormat;
|
||||||
|
import org.apache.hadoop.tools.DistCpConstants;
|
||||||
import org.apache.hadoop.tools.CopyListing.AclsNotSupportedException;
|
import org.apache.hadoop.tools.CopyListing.AclsNotSupportedException;
|
||||||
import org.apache.hadoop.tools.CopyListing.XAttrsNotSupportedException;
|
import org.apache.hadoop.tools.CopyListing.XAttrsNotSupportedException;
|
||||||
import org.apache.hadoop.tools.CopyListingFileStatus;
|
import org.apache.hadoop.tools.CopyListingFileStatus;
|
||||||
|
@ -565,13 +566,15 @@ public class DistCpUtils {
|
||||||
* @throws IOException if there's an exception while retrieving checksums.
|
* @throws IOException if there's an exception while retrieving checksums.
|
||||||
*/
|
*/
|
||||||
public static boolean checksumsAreEqual(FileSystem sourceFS, Path source,
|
public static boolean checksumsAreEqual(FileSystem sourceFS, Path source,
|
||||||
FileChecksum sourceChecksum, FileSystem targetFS, Path target)
|
FileChecksum sourceChecksum,
|
||||||
|
FileSystem targetFS,
|
||||||
|
Path target, long sourceLen)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
FileChecksum targetChecksum = null;
|
FileChecksum targetChecksum = null;
|
||||||
try {
|
try {
|
||||||
sourceChecksum = sourceChecksum != null
|
sourceChecksum = sourceChecksum != null
|
||||||
? sourceChecksum
|
? sourceChecksum
|
||||||
: sourceFS.getFileChecksum(source);
|
: sourceFS.getFileChecksum(source, sourceLen);
|
||||||
if (sourceChecksum != null) {
|
if (sourceChecksum != null) {
|
||||||
// iff there's a source checksum, look for one at the destination.
|
// iff there's a source checksum, look for one at the destination.
|
||||||
targetChecksum = targetFS.getFileChecksum(target);
|
targetChecksum = targetFS.getFileChecksum(target);
|
||||||
|
@ -595,23 +598,22 @@ public class DistCpUtils {
|
||||||
* @param skipCrc The flag to indicate whether to skip checksums.
|
* @param skipCrc The flag to indicate whether to skip checksums.
|
||||||
* @throws IOException if there's a mismatch in file lengths or checksums.
|
* @throws IOException if there's a mismatch in file lengths or checksums.
|
||||||
*/
|
*/
|
||||||
public static void compareFileLengthsAndChecksums(
|
public static void compareFileLengthsAndChecksums(long srcLen,
|
||||||
FileSystem sourceFS, Path source, FileChecksum sourceChecksum,
|
FileSystem sourceFS, Path source, FileChecksum sourceChecksum,
|
||||||
FileSystem targetFS, Path target, boolean skipCrc) throws IOException {
|
FileSystem targetFS, Path target, boolean skipCrc,
|
||||||
long srcLen = sourceFS.getFileStatus(source).getLen();
|
long targetLen) throws IOException {
|
||||||
long tgtLen = targetFS.getFileStatus(target).getLen();
|
if (srcLen != targetLen) {
|
||||||
if (srcLen != tgtLen) {
|
|
||||||
throw new IOException(
|
throw new IOException(
|
||||||
"Mismatch in length of source:" + source + " (" + srcLen
|
DistCpConstants.LENGTH_MISMATCH_ERROR_MSG + source + " (" + srcLen
|
||||||
+ ") and target:" + target + " (" + tgtLen + ")");
|
+ ") and target:" + target + " (" + targetLen + ")");
|
||||||
}
|
}
|
||||||
|
|
||||||
//At this point, src & dest lengths are same. if length==0, we skip checksum
|
//At this point, src & dest lengths are same. if length==0, we skip checksum
|
||||||
if ((srcLen != 0) && (!skipCrc)) {
|
if ((srcLen != 0) && (!skipCrc)) {
|
||||||
if (!checksumsAreEqual(sourceFS, source, sourceChecksum,
|
if (!checksumsAreEqual(sourceFS, source, sourceChecksum,
|
||||||
targetFS, target)) {
|
targetFS, target, srcLen)) {
|
||||||
StringBuilder errorMessage =
|
StringBuilder errorMessage =
|
||||||
new StringBuilder("Checksum mismatch between ")
|
new StringBuilder(DistCpConstants.CHECKSUM_MISMATCH_ERROR_MSG)
|
||||||
.append(source).append(" and ").append(target).append(".");
|
.append(source).append(" and ").append(target).append(".");
|
||||||
boolean addSkipHint = false;
|
boolean addSkipHint = false;
|
||||||
String srcScheme = sourceFS.getScheme();
|
String srcScheme = sourceFS.getScheme();
|
||||||
|
|
|
@ -473,9 +473,12 @@ public class TestCopyCommitter {
|
||||||
if (!skipCrc) {
|
if (!skipCrc) {
|
||||||
Assert.fail("Expected commit to fail");
|
Assert.fail("Expected commit to fail");
|
||||||
}
|
}
|
||||||
|
Path sourcePath = new Path(sourceBase + srcFilename);
|
||||||
|
CopyListingFileStatus sourceCurrStatus =
|
||||||
|
new CopyListingFileStatus(fs.getFileStatus(sourcePath));
|
||||||
Assert.assertFalse(DistCpUtils.checksumsAreEqual(
|
Assert.assertFalse(DistCpUtils.checksumsAreEqual(
|
||||||
fs, new Path(sourceBase + srcFilename), null,
|
fs, new Path(sourceBase + srcFilename), null,
|
||||||
fs, new Path(targetBase + srcFilename)));
|
fs, new Path(targetBase + srcFilename), sourceCurrStatus.getLen()));
|
||||||
} catch(IOException exception) {
|
} catch(IOException exception) {
|
||||||
if (skipCrc) {
|
if (skipCrc) {
|
||||||
LOG.error("Unexpected exception is found", exception);
|
LOG.error("Unexpected exception is found", exception);
|
||||||
|
|
|
@ -21,11 +21,16 @@ package org.apache.hadoop.tools.mapred;
|
||||||
import java.io.DataOutputStream;
|
import java.io.DataOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
import java.io.PrintWriter;
|
||||||
|
import java.io.StringWriter;
|
||||||
import java.security.PrivilegedAction;
|
import java.security.PrivilegedAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -53,6 +58,8 @@ import org.apache.hadoop.tools.DistCpOptions;
|
||||||
import org.apache.hadoop.tools.StubContext;
|
import org.apache.hadoop.tools.StubContext;
|
||||||
import org.apache.hadoop.tools.util.DistCpUtils;
|
import org.apache.hadoop.tools.util.DistCpUtils;
|
||||||
import org.apache.hadoop.util.DataChecksum;
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -444,6 +451,55 @@ public class TestCopyMapper {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 40000)
|
||||||
|
public void testCopyWhileAppend() throws Exception {
|
||||||
|
deleteState();
|
||||||
|
mkdirs(SOURCE_PATH + "/1");
|
||||||
|
touchFile(SOURCE_PATH + "/1/3");
|
||||||
|
CopyMapper copyMapper = new CopyMapper();
|
||||||
|
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
|
||||||
|
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
|
||||||
|
stubContext.getContext();
|
||||||
|
copyMapper.setup(context);
|
||||||
|
final Path path = new Path(SOURCE_PATH + "/1/3");
|
||||||
|
int manyBytes = 100000000;
|
||||||
|
appendFile(path, manyBytes);
|
||||||
|
ScheduledExecutorService scheduledExecutorService =
|
||||||
|
Executors.newSingleThreadScheduledExecutor();
|
||||||
|
Runnable task = new Runnable() {
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
int maxAppendAttempts = 20;
|
||||||
|
int appendCount = 0;
|
||||||
|
while (appendCount < maxAppendAttempts) {
|
||||||
|
appendFile(path, 1000);
|
||||||
|
Thread.sleep(200);
|
||||||
|
appendCount++;
|
||||||
|
}
|
||||||
|
} catch (IOException | InterruptedException e) {
|
||||||
|
LOG.error("Exception encountered ", e);
|
||||||
|
Assert.fail("Test failed: " + e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
scheduledExecutorService.schedule(task, 10, TimeUnit.MILLISECONDS);
|
||||||
|
try {
|
||||||
|
copyMapper.map(new Text(DistCpUtils.getRelativePath(
|
||||||
|
new Path(SOURCE_PATH), path)),
|
||||||
|
new CopyListingFileStatus(cluster.getFileSystem().getFileStatus(
|
||||||
|
path)), context);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.error("Exception encountered ", ex);
|
||||||
|
String exceptionAsString = StringUtils.stringifyException(ex);
|
||||||
|
if (exceptionAsString.contains(DistCpConstants.LENGTH_MISMATCH_ERROR_MSG) ||
|
||||||
|
exceptionAsString.contains(DistCpConstants.CHECKSUM_MISMATCH_ERROR_MSG)) {
|
||||||
|
Assert.fail("Test failed: " + exceptionAsString);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
scheduledExecutorService.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout=40000)
|
@Test(timeout=40000)
|
||||||
public void testMakeDirFailure() {
|
public void testMakeDirFailure() {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -24,6 +24,8 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.mapreduce.*;
|
import org.apache.hadoop.mapreduce.*;
|
||||||
import org.apache.hadoop.tools.CopyListingFileStatus;
|
import org.apache.hadoop.tools.CopyListingFileStatus;
|
||||||
import org.apache.hadoop.tools.mapred.CopyMapper.FileAction;
|
import org.apache.hadoop.tools.mapred.CopyMapper.FileAction;
|
||||||
|
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
import static org.mockito.Mockito.*;
|
import static org.mockito.Mockito.*;
|
||||||
|
@ -58,4 +60,25 @@ public class TestRetriableFileCopyCommand {
|
||||||
assertNotNull("close didn't fail", actualEx);
|
assertNotNull("close didn't fail", actualEx);
|
||||||
assertEquals(expectedEx, actualEx);
|
assertEquals(expectedEx, actualEx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 40000)
|
||||||
|
public void testGetNumBytesToRead() {
|
||||||
|
long pos = 100;
|
||||||
|
long buffLength = 1024;
|
||||||
|
long fileLength = 2058;
|
||||||
|
RetriableFileCopyCommand retriableFileCopyCommand =
|
||||||
|
new RetriableFileCopyCommand("Testing NumBytesToRead ",
|
||||||
|
FileAction.OVERWRITE);
|
||||||
|
long numBytes = retriableFileCopyCommand
|
||||||
|
.getNumBytesToRead(fileLength, pos, buffLength);
|
||||||
|
Assert.assertEquals(1024, numBytes);
|
||||||
|
pos += numBytes;
|
||||||
|
numBytes = retriableFileCopyCommand
|
||||||
|
.getNumBytesToRead(fileLength, pos, buffLength);
|
||||||
|
Assert.assertEquals(934, numBytes);
|
||||||
|
pos += numBytes;
|
||||||
|
numBytes = retriableFileCopyCommand
|
||||||
|
.getNumBytesToRead(fileLength, pos, buffLength);
|
||||||
|
Assert.assertEquals(0, numBytes);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,8 +33,8 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
|
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
|
||||||
import org.apache.hadoop.hdfs.tools.ECAdmin;
|
import org.apache.hadoop.hdfs.tools.ECAdmin;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
|
||||||
import org.apache.hadoop.tools.CopyListingFileStatus;
|
import org.apache.hadoop.tools.CopyListingFileStatus;
|
||||||
|
import org.apache.hadoop.tools.DistCpConstants;
|
||||||
import org.apache.hadoop.tools.DistCpOptionSwitch;
|
import org.apache.hadoop.tools.DistCpOptionSwitch;
|
||||||
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
|
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
@ -63,6 +63,7 @@ import static org.apache.hadoop.fs.permission.FsAction.READ;
|
||||||
import static org.apache.hadoop.fs.permission.FsAction.READ_EXECUTE;
|
import static org.apache.hadoop.fs.permission.FsAction.READ_EXECUTE;
|
||||||
import static org.apache.hadoop.fs.permission.FsAction.READ_WRITE;
|
import static org.apache.hadoop.fs.permission.FsAction.READ_WRITE;
|
||||||
import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry;
|
import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry;
|
||||||
|
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
@ -1208,7 +1209,7 @@ public class TestDistCpUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCompareFileLengthsAndChecksums() throws IOException {
|
public void testCompareFileLengthsAndChecksums() throws Throwable {
|
||||||
|
|
||||||
String base = "/tmp/verify-checksum/";
|
String base = "/tmp/verify-checksum/";
|
||||||
long srcSeed = System.currentTimeMillis();
|
long srcSeed = System.currentTimeMillis();
|
||||||
|
@ -1224,22 +1225,18 @@ public class TestDistCpUtils {
|
||||||
Path dstWithLen0 = new Path(base + "dstLen0");
|
Path dstWithLen0 = new Path(base + "dstLen0");
|
||||||
fs.create(srcWithLen0).close();
|
fs.create(srcWithLen0).close();
|
||||||
fs.create(dstWithLen0).close();
|
fs.create(dstWithLen0).close();
|
||||||
DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithLen0,
|
DistCpUtils.compareFileLengthsAndChecksums(0, fs, srcWithLen0,
|
||||||
null, fs, dstWithLen0, false);
|
null, fs, dstWithLen0, false, 0);
|
||||||
|
|
||||||
// different lengths comparison
|
// different lengths comparison
|
||||||
Path srcWithLen1 = new Path(base + "srcLen1");
|
Path srcWithLen1 = new Path(base + "srcLen1");
|
||||||
Path dstWithLen2 = new Path(base + "dstLen2");
|
Path dstWithLen2 = new Path(base + "dstLen2");
|
||||||
DFSTestUtil.createFile(fs, srcWithLen1, 1, replFactor, srcSeed);
|
DFSTestUtil.createFile(fs, srcWithLen1, 1, replFactor, srcSeed);
|
||||||
DFSTestUtil.createFile(fs, dstWithLen2, 2, replFactor, srcSeed);
|
DFSTestUtil.createFile(fs, dstWithLen2, 2, replFactor, srcSeed);
|
||||||
try {
|
|
||||||
DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithLen1,
|
intercept(IOException.class, DistCpConstants.LENGTH_MISMATCH_ERROR_MSG,
|
||||||
null, fs, dstWithLen2, false);
|
() -> DistCpUtils.compareFileLengthsAndChecksums(1, fs,
|
||||||
Assert.fail("Expected different lengths comparison to fail!");
|
srcWithLen1, null, fs, dstWithLen2, false, 2));
|
||||||
} catch (IOException e) {
|
|
||||||
GenericTestUtils.assertExceptionContains(
|
|
||||||
"Mismatch in length", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
// checksums matched
|
// checksums matched
|
||||||
Path srcWithChecksum1 = new Path(base + "srcChecksum1");
|
Path srcWithChecksum1 = new Path(base + "srcChecksum1");
|
||||||
|
@ -1248,28 +1245,24 @@ public class TestDistCpUtils {
|
||||||
replFactor, srcSeed);
|
replFactor, srcSeed);
|
||||||
DFSTestUtil.createFile(fs, dstWithChecksum1, 1024,
|
DFSTestUtil.createFile(fs, dstWithChecksum1, 1024,
|
||||||
replFactor, srcSeed);
|
replFactor, srcSeed);
|
||||||
DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithChecksum1,
|
DistCpUtils.compareFileLengthsAndChecksums(1024, fs, srcWithChecksum1,
|
||||||
null, fs, dstWithChecksum1, false);
|
null, fs, dstWithChecksum1, false, 1024);
|
||||||
DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithChecksum1,
|
DistCpUtils.compareFileLengthsAndChecksums(1024, fs, srcWithChecksum1,
|
||||||
fs.getFileChecksum(srcWithChecksum1), fs, dstWithChecksum1,
|
fs.getFileChecksum(srcWithChecksum1), fs, dstWithChecksum1,
|
||||||
false);
|
false, 1024);
|
||||||
|
|
||||||
// checksums mismatched
|
// checksums mismatched
|
||||||
Path dstWithChecksum2 = new Path(base + "dstChecksum2");
|
Path dstWithChecksum2 = new Path(base + "dstChecksum2");
|
||||||
DFSTestUtil.createFile(fs, dstWithChecksum2, 1024,
|
DFSTestUtil.createFile(fs, dstWithChecksum2, 1024,
|
||||||
replFactor, dstSeed);
|
replFactor, dstSeed);
|
||||||
try {
|
intercept(IOException.class, DistCpConstants.CHECKSUM_MISMATCH_ERROR_MSG,
|
||||||
DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithChecksum1,
|
() -> DistCpUtils.compareFileLengthsAndChecksums(1024, fs,
|
||||||
null, fs, dstWithChecksum2, false);
|
srcWithChecksum1, null, fs, dstWithChecksum2,
|
||||||
Assert.fail("Expected different checksums comparison to fail!");
|
false, 1024));
|
||||||
} catch (IOException e) {
|
|
||||||
GenericTestUtils.assertExceptionContains(
|
|
||||||
"Checksum mismatch", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
// checksums mismatched but skipped
|
// checksums mismatched but skipped
|
||||||
DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithChecksum1,
|
DistCpUtils.compareFileLengthsAndChecksums(1024, fs, srcWithChecksum1,
|
||||||
null, fs, dstWithChecksum2, true);
|
null, fs, dstWithChecksum2, true, 1024);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Random rand = new Random();
|
private static Random rand = new Random();
|
||||||
|
|
|
@ -109,7 +109,7 @@ public class TestDistCpUtilsWithCombineMode {
|
||||||
DFSTestUtil.createFile(fs, dst, 256, 1024, 1024,
|
DFSTestUtil.createFile(fs, dst, 256, 1024, 1024,
|
||||||
rf, seed);
|
rf, seed);
|
||||||
// then compare
|
// then compare
|
||||||
DistCpUtils.compareFileLengthsAndChecksums(fs, src,
|
DistCpUtils.compareFileLengthsAndChecksums(1024, fs, src,
|
||||||
null, fs, dst, false);
|
null, fs, dst, false, 1024);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue