svn merge -c 1468629 Merging from trunk to branch-2 to fix MR-5065.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1468631 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fd6956c3bd
commit
88086534a8
|
@ -708,6 +708,9 @@ Release 0.23.8 - UNRELEASED
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
|
MAPREDUCE-5065. DistCp should skip checksum comparisons if block-sizes
|
||||||
|
are different on source/target (Mithun Radhakrishnan via kihwal)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -140,10 +140,17 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
||||||
private void compareCheckSums(FileSystem sourceFS, Path source,
|
private void compareCheckSums(FileSystem sourceFS, Path source,
|
||||||
FileSystem targetFS, Path target)
|
FileSystem targetFS, Path target)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (!DistCpUtils.checksumsAreEqual(sourceFS, source, targetFS, target))
|
if (!DistCpUtils.checksumsAreEqual(sourceFS, source, targetFS, target)) {
|
||||||
throw new IOException("Check-sum mismatch between "
|
StringBuilder errorMessage = new StringBuilder("Check-sum mismatch between ")
|
||||||
+ source + " and " + target);
|
.append(source).append(" and ").append(target).append(".");
|
||||||
|
if (sourceFS.getFileStatus(source).getBlockSize() != targetFS.getFileStatus(target).getBlockSize()) {
|
||||||
|
errorMessage.append(" Source and target differ in block-size.")
|
||||||
|
.append(" Use -pb to preserve block-sizes during copy.")
|
||||||
|
.append(" Alternatively, skip checksum-checks altogether, using -skipCrc.")
|
||||||
|
.append(" (NOTE: By skipping checksums, one runs the risk of masking data-corruption during file-transfer.)");
|
||||||
|
}
|
||||||
|
throw new IOException(errorMessage.toString());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//If target file exists and unable to delete target - fail
|
//If target file exists and unable to delete target - fail
|
||||||
|
|
|
@ -53,7 +53,7 @@ public class TestCopyMapper {
|
||||||
private static final Log LOG = LogFactory.getLog(TestCopyMapper.class);
|
private static final Log LOG = LogFactory.getLog(TestCopyMapper.class);
|
||||||
private static List<Path> pathList = new ArrayList<Path>();
|
private static List<Path> pathList = new ArrayList<Path>();
|
||||||
private static int nFiles = 0;
|
private static int nFiles = 0;
|
||||||
private static final int FILE_SIZE = 1024;
|
private static final int DEFAULT_FILE_SIZE = 1024;
|
||||||
|
|
||||||
private static MiniDFSCluster cluster;
|
private static MiniDFSCluster cluster;
|
||||||
|
|
||||||
|
@ -92,7 +92,7 @@ public class TestCopyMapper {
|
||||||
configuration.setBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(),
|
configuration.setBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(),
|
||||||
false);
|
false);
|
||||||
configuration.setBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(),
|
configuration.setBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(),
|
||||||
true);
|
false);
|
||||||
configuration.setBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(),
|
configuration.setBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(),
|
||||||
true);
|
true);
|
||||||
configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
|
configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
|
||||||
|
@ -112,6 +112,18 @@ public class TestCopyMapper {
|
||||||
touchFile(SOURCE_PATH + "/7/8/9");
|
touchFile(SOURCE_PATH + "/7/8/9");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void createSourceDataWithDifferentBlockSize() throws Exception {
|
||||||
|
mkdirs(SOURCE_PATH + "/1");
|
||||||
|
mkdirs(SOURCE_PATH + "/2");
|
||||||
|
mkdirs(SOURCE_PATH + "/2/3/4");
|
||||||
|
mkdirs(SOURCE_PATH + "/2/3");
|
||||||
|
mkdirs(SOURCE_PATH + "/5");
|
||||||
|
touchFile(SOURCE_PATH + "/5/6", true);
|
||||||
|
mkdirs(SOURCE_PATH + "/7");
|
||||||
|
mkdirs(SOURCE_PATH + "/7/8");
|
||||||
|
touchFile(SOURCE_PATH + "/7/8/9");
|
||||||
|
}
|
||||||
|
|
||||||
private static void mkdirs(String path) throws Exception {
|
private static void mkdirs(String path) throws Exception {
|
||||||
FileSystem fileSystem = cluster.getFileSystem();
|
FileSystem fileSystem = cluster.getFileSystem();
|
||||||
final Path qualifiedPath = new Path(path).makeQualified(fileSystem.getUri(),
|
final Path qualifiedPath = new Path(path).makeQualified(fileSystem.getUri(),
|
||||||
|
@ -121,17 +133,31 @@ public class TestCopyMapper {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void touchFile(String path) throws Exception {
|
private static void touchFile(String path) throws Exception {
|
||||||
|
touchFile(path, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void touchFile(String path, boolean createMultipleBlocks) throws Exception {
|
||||||
|
final long NON_DEFAULT_BLOCK_SIZE = 4096;
|
||||||
FileSystem fs;
|
FileSystem fs;
|
||||||
DataOutputStream outputStream = null;
|
DataOutputStream outputStream = null;
|
||||||
try {
|
try {
|
||||||
fs = cluster.getFileSystem();
|
fs = cluster.getFileSystem();
|
||||||
final Path qualifiedPath = new Path(path).makeQualified(fs.getUri(),
|
final Path qualifiedPath = new Path(path).makeQualified(fs.getUri(),
|
||||||
fs.getWorkingDirectory());
|
fs.getWorkingDirectory());
|
||||||
final long blockSize = fs.getDefaultBlockSize(qualifiedPath) * 2;
|
final long blockSize = createMultipleBlocks? NON_DEFAULT_BLOCK_SIZE : fs.getDefaultBlockSize(qualifiedPath) * 2;
|
||||||
outputStream = fs.create(qualifiedPath, true, 0,
|
outputStream = fs.create(qualifiedPath, true, 0,
|
||||||
(short)(fs.getDefaultReplication(qualifiedPath)*2),
|
(short)(fs.getDefaultReplication(qualifiedPath)*2),
|
||||||
blockSize);
|
blockSize);
|
||||||
outputStream.write(new byte[FILE_SIZE]);
|
byte[] bytes = new byte[DEFAULT_FILE_SIZE];
|
||||||
|
outputStream.write(bytes);
|
||||||
|
long fileSize = DEFAULT_FILE_SIZE;
|
||||||
|
if (createMultipleBlocks) {
|
||||||
|
while (fileSize < 2*blockSize) {
|
||||||
|
outputStream.write(bytes);
|
||||||
|
outputStream.flush();
|
||||||
|
fileSize += DEFAULT_FILE_SIZE;
|
||||||
|
}
|
||||||
|
}
|
||||||
pathList.add(qualifiedPath);
|
pathList.add(qualifiedPath);
|
||||||
++nFiles;
|
++nFiles;
|
||||||
|
|
||||||
|
@ -144,7 +170,7 @@ public class TestCopyMapper {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout=40000)
|
||||||
public void testRun() {
|
public void testRun() {
|
||||||
try {
|
try {
|
||||||
deleteState();
|
deleteState();
|
||||||
|
@ -179,7 +205,7 @@ public class TestCopyMapper {
|
||||||
|
|
||||||
Assert.assertEquals(pathList.size(),
|
Assert.assertEquals(pathList.size(),
|
||||||
stubContext.getReporter().getCounter(CopyMapper.Counter.COPY).getValue());
|
stubContext.getReporter().getCounter(CopyMapper.Counter.COPY).getValue());
|
||||||
Assert.assertEquals(nFiles * FILE_SIZE,
|
Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE,
|
||||||
stubContext.getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED).getValue());
|
stubContext.getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED).getValue());
|
||||||
|
|
||||||
testCopyingExistingFiles(fs, copyMapper, context);
|
testCopyingExistingFiles(fs, copyMapper, context);
|
||||||
|
@ -211,7 +237,7 @@ public class TestCopyMapper {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout=40000)
|
||||||
public void testMakeDirFailure() {
|
public void testMakeDirFailure() {
|
||||||
try {
|
try {
|
||||||
deleteState();
|
deleteState();
|
||||||
|
@ -239,13 +265,13 @@ public class TestCopyMapper {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout=40000)
|
||||||
public void testIgnoreFailures() {
|
public void testIgnoreFailures() {
|
||||||
doTestIgnoreFailures(true);
|
doTestIgnoreFailures(true);
|
||||||
doTestIgnoreFailures(false);
|
doTestIgnoreFailures(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout=40000)
|
||||||
public void testDirToFile() {
|
public void testDirToFile() {
|
||||||
try {
|
try {
|
||||||
deleteState();
|
deleteState();
|
||||||
|
@ -273,7 +299,7 @@ public class TestCopyMapper {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout=40000)
|
||||||
public void testPreserve() {
|
public void testPreserve() {
|
||||||
try {
|
try {
|
||||||
deleteState();
|
deleteState();
|
||||||
|
@ -343,7 +369,7 @@ public class TestCopyMapper {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout=40000)
|
||||||
public void testCopyReadableFiles() {
|
public void testCopyReadableFiles() {
|
||||||
try {
|
try {
|
||||||
deleteState();
|
deleteState();
|
||||||
|
@ -406,7 +432,7 @@ public class TestCopyMapper {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout=40000)
|
||||||
public void testSkipCopyNoPerms() {
|
public void testSkipCopyNoPerms() {
|
||||||
try {
|
try {
|
||||||
deleteState();
|
deleteState();
|
||||||
|
@ -480,7 +506,7 @@ public class TestCopyMapper {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout=40000)
|
||||||
public void testFailCopyWithAccessControlException() {
|
public void testFailCopyWithAccessControlException() {
|
||||||
try {
|
try {
|
||||||
deleteState();
|
deleteState();
|
||||||
|
@ -563,7 +589,7 @@ public class TestCopyMapper {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout=40000)
|
||||||
public void testFileToDir() {
|
public void testFileToDir() {
|
||||||
try {
|
try {
|
||||||
deleteState();
|
deleteState();
|
||||||
|
@ -640,12 +666,48 @@ public class TestCopyMapper {
|
||||||
cluster.getFileSystem().delete(new Path(TARGET_PATH), true);
|
cluster.getFileSystem().delete(new Path(TARGET_PATH), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout=40000)
|
||||||
public void testPreserveBlockSizeAndReplication() {
|
public void testPreserveBlockSizeAndReplication() {
|
||||||
testPreserveBlockSizeAndReplicationImpl(true);
|
testPreserveBlockSizeAndReplicationImpl(true);
|
||||||
testPreserveBlockSizeAndReplicationImpl(false);
|
testPreserveBlockSizeAndReplicationImpl(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=40000)
|
||||||
|
public void testCopyFailOnBlockSizeDifference() {
|
||||||
|
try {
|
||||||
|
|
||||||
|
deleteState();
|
||||||
|
createSourceDataWithDifferentBlockSize();
|
||||||
|
|
||||||
|
FileSystem fs = cluster.getFileSystem();
|
||||||
|
CopyMapper copyMapper = new CopyMapper();
|
||||||
|
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
|
||||||
|
Mapper<Text, FileStatus, Text, Text>.Context context
|
||||||
|
= stubContext.getContext();
|
||||||
|
|
||||||
|
Configuration configuration = context.getConfiguration();
|
||||||
|
EnumSet<DistCpOptions.FileAttribute> fileAttributes
|
||||||
|
= EnumSet.noneOf(DistCpOptions.FileAttribute.class);
|
||||||
|
configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
|
||||||
|
DistCpUtils.packAttributes(fileAttributes));
|
||||||
|
|
||||||
|
copyMapper.setup(context);
|
||||||
|
|
||||||
|
for (Path path : pathList) {
|
||||||
|
final FileStatus fileStatus = fs.getFileStatus(path);
|
||||||
|
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
|
||||||
|
fileStatus, context);
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.fail("Copy should have failed because of block-size difference.");
|
||||||
|
}
|
||||||
|
catch (Exception exception) {
|
||||||
|
// Check that the exception suggests the use of -pb/-skipCrc.
|
||||||
|
Assert.assertTrue("Failure exception should have suggested the use of -pb.", exception.getCause().getCause().getMessage().contains("pb"));
|
||||||
|
Assert.assertTrue("Failure exception should have suggested the use of -skipCrc.", exception.getCause().getCause().getMessage().contains("skipCrc"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void testPreserveBlockSizeAndReplicationImpl(boolean preserve){
|
private void testPreserveBlockSizeAndReplicationImpl(boolean preserve){
|
||||||
try {
|
try {
|
||||||
|
|
||||||
|
@ -717,7 +779,7 @@ public class TestCopyMapper {
|
||||||
* If a single file is being copied to a location where the file (of the same
|
* If a single file is being copied to a location where the file (of the same
|
||||||
* name) already exists, then the file shouldn't be skipped.
|
* name) already exists, then the file shouldn't be skipped.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test(timeout=40000)
|
||||||
public void testSingleFileCopy() {
|
public void testSingleFileCopy() {
|
||||||
try {
|
try {
|
||||||
deleteState();
|
deleteState();
|
||||||
|
@ -766,7 +828,7 @@ public class TestCopyMapper {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout=40000)
|
||||||
public void testPreserveUserGroup() {
|
public void testPreserveUserGroup() {
|
||||||
testPreserveUserGroupImpl(true);
|
testPreserveUserGroupImpl(true);
|
||||||
testPreserveUserGroupImpl(false);
|
testPreserveUserGroupImpl(false);
|
||||||
|
|
Loading…
Reference in New Issue