HADOOP-18582. skip unnecessary cleanup logic in distcp (#5251)

Co-authored-by: 万康 <mingge@xiaohongshu.com>
Reviewed-by: Steve Loughran <stevel@apache.org>
Signed-off-by: Ayush Saxena <ayushsaxena@apache.org>
Signed-off-by: Chris Nauroth <cnauroth@apache.org>
(cherry picked from commit 3b7b79b37a)
This commit is contained in:
kevin wan 2023-01-25 07:49:32 +08:00 committed by Chris Nauroth
parent 63443be5f4
commit 5cd006455d
2 changed files with 82 additions and 2 deletions

View File

@ -149,9 +149,18 @@ public class CopyCommitter extends FileOutputCommitter {
}
private void cleanupTempFiles(JobContext context) {
try {
Configuration conf = context.getConfiguration();
final boolean directWrite = conf.getBoolean(
DistCpOptionSwitch.DIRECT_WRITE.getConfigLabel(), false);
final boolean append = conf.getBoolean(
DistCpOptionSwitch.APPEND.getConfigLabel(), false);
final boolean useTempTarget = !append && !directWrite;
if (!useTempTarget) {
return;
}
try {
Path targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
FileSystem targetFS = targetWorkPath.getFileSystem(conf);

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.tools.mapred;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@ -580,6 +581,76 @@ public class TestCopyCommitter {
}
}
@Test
public void testCommitWithCleanupTempFiles() throws IOException {
testCommitWithCleanup(true, false);
testCommitWithCleanup(false, true);
testCommitWithCleanup(true, true);
testCommitWithCleanup(false, false);
}
private void testCommitWithCleanup(boolean append, boolean directWrite)throws IOException {
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
JobID jobID = taskAttemptContext.getTaskAttemptID().getJobID();
JobContext jobContext = new JobContextImpl(
taskAttemptContext.getConfiguration(),
jobID);
Configuration conf = jobContext.getConfiguration();
String sourceBase;
String targetBase;
FileSystem fs = null;
try {
fs = FileSystem.get(conf);
sourceBase = "/tmp1/" + rand.nextLong();
targetBase = "/tmp1/" + rand.nextLong();
DistCpOptions options = new DistCpOptions.Builder(
Collections.singletonList(new Path(sourceBase)),
new Path("/out"))
.withAppend(append)
.withSyncFolder(true)
.withDirectWrite(directWrite)
.build();
options.appendToConf(conf);
DistCpContext context = new DistCpContext(options);
context.setTargetPathExists(false);
conf.set(CONF_LABEL_TARGET_WORK_PATH, targetBase);
conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase);
Path tempFilePath = getTempFile(targetBase, taskAttemptContext);
createDirectory(fs, tempFilePath);
OutputCommitter committer = new CopyCommitter(
null, taskAttemptContext);
committer.commitJob(jobContext);
if (append || directWrite) {
ContractTestUtils.assertPathExists(fs, "Temp files should not be cleanup with append or direct option",
tempFilePath);
} else {
ContractTestUtils.assertPathDoesNotExist(
fs,
"Temp files should be clean up without append or direct option",
tempFilePath);
}
} finally {
TestDistCpUtils.delete(fs, "/tmp1");
TestDistCpUtils.delete(fs, "/meta");
}
}
private Path getTempFile(String targetWorkPath, TaskAttemptContext taskAttemptContext) {
Path tempFile = new Path(targetWorkPath, ".distcp.tmp." +
taskAttemptContext.getTaskAttemptID().toString() +
"." + System.currentTimeMillis());
LOG.info("Creating temp file: {}", tempFile);
return tempFile;
}
/**
* Create a source file and its DistCp working files with different checksum
* to test the checksum validation for copying blocks in parallel.