MAPREDUCE-7287. Distcp will delete exists file , If we use "-delete and -update" options and distcp file. (#2852)

Contributed by zhengchenyu

Change-Id: I61edf9a443c0c6cd5b5dd911901708530cf131ed
This commit is contained in:
zhengchenyu 2021-05-29 03:21:37 +08:00 committed by Steve Loughran
parent c5535caf6e
commit 7feb41b73d
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
4 changed files with 111 additions and 5 deletions

View File

@ -553,10 +553,6 @@ public class CopyCommitter extends FileOutputCommitter {
conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH)); conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
List<Path> targets = new ArrayList<>(1); List<Path> targets = new ArrayList<>(1);
targets.add(targetFinalPath); targets.add(targetFinalPath);
Path resultNonePath = Path.getPathWithoutSchemeAndAuthority(targetFinalPath)
.toString().startsWith(DistCpConstants.HDFS_RESERVED_RAW_DIRECTORY_NAME)
? DistCpConstants.RAW_NONE_PATH
: DistCpConstants.NONE_PATH;
// //
// Set up options to be the same from the CopyListing.buildListing's // Set up options to be the same from the CopyListing.buildListing's
// perspective, so to collect similar listings as when doing the copy // perspective, so to collect similar listings as when doing the copy
@ -568,7 +564,7 @@ public class CopyCommitter extends FileOutputCommitter {
conf.getBoolean(DistCpConstants.CONF_LABEL_USE_ITERATOR, false); conf.getBoolean(DistCpConstants.CONF_LABEL_USE_ITERATOR, false);
LOG.info("Scanning destination directory {} with thread count: {}", LOG.info("Scanning destination directory {} with thread count: {}",
targetFinalPath, threads); targetFinalPath, threads);
DistCpOptions options = new DistCpOptions.Builder(targets, resultNonePath) DistCpOptions options = new DistCpOptions.Builder(targets, targetFinalPath)
.withOverwrite(overwrite) .withOverwrite(overwrite)
.withSyncFolder(syncFolder) .withSyncFolder(syncFolder)
.withNumListstatusThreads(threads) .withNumListstatusThreads(threads)

View File

@ -709,4 +709,60 @@ public abstract class AbstractContractDistCpTest
Collections.singletonList(srcDir), destDir) Collections.singletonList(srcDir), destDir)
.withDirectWrite(true))); .withDirectWrite(true)));
} }
@Test
public void testDistCpWithFile() throws Exception {
describe("Distcp only file");
Path source = new Path(remoteDir, "file");
Path dest = new Path(localDir, "file");
dest = localFS.makeQualified(dest);
mkdirs(remoteFS, remoteDir);
mkdirs(localFS, localDir);
int len = 4;
int base = 0x40;
byte[] block = dataset(len, base, base + len);
ContractTestUtils.createFile(remoteFS, source, true, block);
verifyPathExists(remoteFS, "", source);
verifyPathExists(localFS, "", localDir);
DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, source.toString(),
dest.toString(), null, conf);
Assertions
.assertThat(RemoteIterators.toList(localFS.listFiles(dest, true)))
.describedAs("files").hasSize(1);
verifyFileContents(localFS, dest, block);
}
@Test
public void testDistCpWithUpdateExistFile() throws Exception {
describe("Now update an exist file.");
Path source = new Path(remoteDir, "file");
Path dest = new Path(localDir, "file");
dest = localFS.makeQualified(dest);
mkdirs(remoteFS, remoteDir);
mkdirs(localFS, localDir);
int len = 4;
int base = 0x40;
byte[] block = dataset(len, base, base + len);
byte[] destBlock = dataset(len, base, base + len + 1);
ContractTestUtils.createFile(remoteFS, source, true, block);
ContractTestUtils.createFile(localFS, dest, true, destBlock);
verifyPathExists(remoteFS, "", source);
verifyPathExists(localFS, "", dest);
DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, source.toString(),
dest.toString(), "-delete -update", conf);
Assertions.assertThat(RemoteIterators.toList(localFS.listFiles(dest, true)))
.hasSize(1);
verifyFileContents(localFS, dest, block);
}
} }

View File

@ -265,6 +265,51 @@ public class TestCopyCommitter {
} }
} }
@Test
public void testDeleteMissingWithOnlyFile() throws IOException {
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
JobContext jobContext = new JobContextImpl(taskAttemptContext
.getConfiguration(), taskAttemptContext.getTaskAttemptID().getJobID());
Configuration conf = jobContext.getConfiguration();
String sourceBase;
String targetBase;
FileSystem fs = null;
try {
OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
fs = FileSystem.get(conf);
sourceBase = TestDistCpUtils.createTestSetupWithOnlyFile(fs,
FsPermission.getDefault());
targetBase = TestDistCpUtils.createTestSetupWithOnlyFile(fs,
FsPermission.getDefault());
final DistCpOptions options = new DistCpOptions.Builder(
Collections.singletonList(new Path(sourceBase)), new Path(targetBase))
.withSyncFolder(true).withDeleteMissing(true).build();
options.appendToConf(conf);
final DistCpContext context = new DistCpContext(options);
CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
Path listingFile = new Path(sourceBase);
listing.buildListing(listingFile, context);
conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
committer.commitJob(jobContext);
verifyFoldersAreInSync(fs, targetBase, sourceBase);
verifyFoldersAreInSync(fs, sourceBase, targetBase);
//Test for idempotent commit
committer.commitJob(jobContext);
verifyFoldersAreInSync(fs, targetBase, sourceBase);
verifyFoldersAreInSync(fs, sourceBase, targetBase);
} finally {
TestDistCpUtils.delete(fs, "/tmp1");
conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false");
}
}
// for HDFS-14621, should preserve times after -delete // for HDFS-14621, should preserve times after -delete
@Test @Test
public void testPreserveTimeWithDeleteMiss() throws IOException { public void testPreserveTimeWithDeleteMiss() throws IOException {

View File

@ -1360,6 +1360,15 @@ public class TestDistCpUtils {
return base + "/" + location; return base + "/" + location;
} }
public static String createTestSetupWithOnlyFile(FileSystem fs,
FsPermission perm) throws IOException {
String location = String.valueOf(rand.nextLong());
fs.mkdirs(new Path("/tmp1/" + location));
fs.setPermission(new Path("/tmp1/" + location), perm);
createFile(fs, new Path("/tmp1/" + location + "/file"));
return "/tmp1/" + location + "/file";
}
public static void delete(FileSystem fs, String path) { public static void delete(FileSystem fs, String path) {
try { try {
if (fs != null) { if (fs != null) {