HADOOP-16440. Distcp can not preserve timestamp with -delete option. Contributed by ludun.

This commit is contained in:
Ayush Saxena 2019-07-20 13:11:14 +05:30
parent 5f2d07af1b
commit 35ff1ce42c
2 changed files with 87 additions and 7 deletions

View File

@ -109,13 +109,6 @@ public class CopyCommitter extends FileOutputCommitter {
cleanupTempFiles(jobContext); cleanupTempFiles(jobContext);
String attributes = conf.get(DistCpConstants.CONF_LABEL_PRESERVE_STATUS);
final boolean preserveRawXattrs =
conf.getBoolean(DistCpConstants.CONF_LABEL_PRESERVE_RAWXATTRS, false);
if ((attributes != null && !attributes.isEmpty()) || preserveRawXattrs) {
preserveFileAttributesForDirectories(conf);
}
try { try {
if (conf.getBoolean(DistCpConstants.CONF_LABEL_DELETE_MISSING, false)) { if (conf.getBoolean(DistCpConstants.CONF_LABEL_DELETE_MISSING, false)) {
deleteMissing(conf); deleteMissing(conf);
@ -125,6 +118,13 @@ public class CopyCommitter extends FileOutputCommitter {
// save missing information to a directory // save missing information to a directory
trackMissing(conf); trackMissing(conf);
} }
// for HDFS-14621, should preserve status after -delete
String attributes = conf.get(DistCpConstants.CONF_LABEL_PRESERVE_STATUS);
final boolean preserveRawXattrs = conf.getBoolean(
DistCpConstants.CONF_LABEL_PRESERVE_RAWXATTRS, false);
if ((attributes != null && !attributes.isEmpty()) || preserveRawXattrs) {
preserveFileAttributesForDirectories(conf);
}
taskAttemptContext.setStatus("Commit Successful"); taskAttemptContext.setStatus("Commit Successful");
} }
finally { finally {

View File

@ -26,11 +26,15 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.tools.CopyListing; import org.apache.hadoop.tools.CopyListing;
import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpConstants; import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.DistCpContext; import org.apache.hadoop.tools.DistCpContext;
import org.apache.hadoop.tools.DistCpOptions; import org.apache.hadoop.tools.DistCpOptions;
@ -204,6 +208,61 @@ public class TestCopyCommitter {
} }
} }
// for HDFS-14621, should preserve times after -delete
@Test
public void testPreserveTimeWithDeleteMiss() throws IOException {
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
JobContext jobContext = new JobContextImpl(
taskAttemptContext.getConfiguration(),
taskAttemptContext.getTaskAttemptID().getJobID());
Configuration conf = jobContext.getConfiguration();
FileSystem fs = null;
try {
OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
fs = FileSystem.get(conf);
String sourceBase = TestDistCpUtils.createTestSetup(
fs, FsPermission.getDefault());
String targetBase = TestDistCpUtils.createTestSetup(
fs, FsPermission.getDefault());
String targetBaseAdd = TestDistCpUtils.createTestSetup(
fs, FsPermission.getDefault());
fs.rename(new Path(targetBaseAdd), new Path(targetBase));
final DistCpOptions options = new DistCpOptions.Builder(
Collections.singletonList(new Path(sourceBase)), new Path("/out"))
.withSyncFolder(true).withDeleteMissing(true)
.preserve(FileAttribute.TIMES).build();
options.appendToConf(conf);
final DistCpContext context = new DistCpContext(options);
CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
listing.buildListing(listingFile, context);
conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
Path sourceListing = new Path(
conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH));
SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf,
SequenceFile.Reader.file(sourceListing));
Path targetRoot = new Path(targetBase);
committer.commitJob(jobContext);
checkDirectoryTimes(fs, sourceReader, targetRoot);
//Test for idempotent commit
committer.commitJob(jobContext);
checkDirectoryTimes(fs, sourceReader, targetRoot);
} finally {
TestDistCpUtils.delete(fs, "/tmp1");
conf.unset(DistCpConstants.CONF_LABEL_PRESERVE_STATUS);
conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false");
}
}
@Test @Test
public void testDeleteMissingFlatInterleavedFiles() throws IOException { public void testDeleteMissingFlatInterleavedFiles() throws IOException {
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
@ -364,6 +423,27 @@ public class TestCopyCommitter {
} }
} }
private void checkDirectoryTimes(
FileSystem fs, SequenceFile.Reader sourceReader, Path targetRoot)
throws IOException {
try {
CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
Text srcRelPath = new Text();
// Iterate over every source path that was copied.
while (sourceReader.next(srcRelPath, srcFileStatus)) {
Path targetFile = new Path(targetRoot.toString() + "/" + srcRelPath);
FileStatus targetStatus = fs.getFileStatus(targetFile);
Assert.assertEquals(srcFileStatus.getModificationTime(),
targetStatus.getModificationTime());
Assert.assertEquals(srcFileStatus.getAccessTime(),
targetStatus.getAccessTime());
}
} finally {
IOUtils.closeStream(sourceReader);
}
}
private static class NullInputFormat extends InputFormat { private static class NullInputFormat extends InputFormat {
@Override @Override
public List getSplits(JobContext context) public List getSplits(JobContext context)