HDFS-9913. DistCp to add -useTrash to move deleted files to Trash.

Contributed by Shen Yinjie.

Change-Id: I03ac7d22ab1054f8e5de4aa7552909c734438f4a
This commit is contained in:
Shen Yinjie 2019-07-17 11:49:45 +01:00 committed by Steve Loughran
parent 85d9111a88
commit ee3115f488
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
11 changed files with 262 additions and 9 deletions

View File

@ -61,6 +61,8 @@ private DistCpConstants() {
public static final String CONF_LABEL_DELETE_MISSING = "distcp.delete.missing.source";
public static final String CONF_LABEL_TRACK_MISSING =
"distcp.track.missing.source";
public static final String CONF_LABEL_DELETE_MISSING_USETRASH =
"distcp.delete.missing.usetrash";
public static final String CONF_LABEL_LISTSTATUS_THREADS = "distcp.liststatus.threads";
public static final String CONF_LABEL_MAX_MAPS = "distcp.max.maps";
public static final String CONF_LABEL_SOURCE_LISTING = "distcp.source.listing";

View File

@ -76,6 +76,15 @@ public enum DistCpOptionSwitch {
new Option("delete", false, "Delete from target, " +
"files missing in source. Delete is applicable only with update or overwrite options")),
/**
* When -delete option on, files in target that are missing from source
* will be delete by default. This allows the files to be
* moved to the trash
*/
DELETE_USETRASH(DistCpConstants.CONF_LABEL_DELETE_MISSING_USETRASH,
new Option("useTrash", false, "Move deleted files into " +
"the user's trash directory in the destination filesystem")),
/**
* Track missing files in target that are missing from source
* This allows for other applications to complete the synchronization,

View File

@ -95,6 +95,8 @@ public final class DistCpOptions {
/** Whether to run blocking or non-blocking. */
private final boolean blocking;
private boolean deleteUseTrash;
// When "-diff s1 s2 src tgt" is passed, apply forward snapshot diff (from s1
// to s2) of source cluster to the target cluster to sync target cluster with
// the source cluster. Referred to as "Fdiff" in the code.
@ -221,6 +223,7 @@ private DistCpOptions(Builder builder) {
this.trackPath = builder.trackPath;
this.directWrite = builder.directWrite;
this.deleteUseTrash = builder.deleteUseTrash;
}
public Path getSourceFileListing() {
@ -284,6 +287,10 @@ public boolean shouldUseSnapshotDiff() {
return shouldUseDiff() || shouldUseRdiff();
}
public boolean shouldDeleteUseTrash() {
return deleteUseTrash;
}
public String getFromSnapshot() {
return this.fromSnapshot;
}
@ -374,6 +381,8 @@ public void appendToConf(Configuration conf) {
String.valueOf(useDiff));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.RDIFF,
String.valueOf(useRdiff));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DELETE_USETRASH,
String.valueOf(deleteUseTrash));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SKIP_CRC,
String.valueOf(skipCRC));
if (mapBandwidth > 0) {
@ -415,6 +424,7 @@ public String toString() {
"atomicCommit=" + atomicCommit +
", syncFolder=" + syncFolder +
", deleteMissing=" + deleteMissing +
", deleteUseTrash=" + deleteUseTrash +
", ignoreFailures=" + ignoreFailures +
", overwrite=" + overwrite +
", append=" + append +
@ -467,6 +477,8 @@ public static class Builder {
private boolean useDiff = false;
private boolean useRdiff = false;
private boolean deleteUseTrash = false;
private String fromSnapshot;
private String toSnapshot;
@ -564,6 +576,11 @@ private void validate() {
+ "only with update or overwrite options");
}
if (deleteUseTrash && !deleteMissing) {
throw new IllegalArgumentException("Delete useTrash is applicable "
+ "only with delete option");
}
if (overwrite && syncFolder) {
throw new IllegalArgumentException("Overwrite and update options are "
+ "mutually exclusive");
@ -627,6 +644,11 @@ public Builder withDeleteMissing(boolean newDeleteMissing) {
return this;
}
public Builder withDeleteUseTrash(boolean newDeleteUseTrash) {
this.deleteUseTrash = newDeleteUseTrash;
return this;
}
public Builder withIgnoreFailures(boolean newIgnoreFailures) {
this.ignoreFailures = newIgnoreFailures;
return this;

View File

@ -102,6 +102,8 @@ public static DistCpOptions parse(String[] args)
command.hasOption(DistCpOptionSwitch.SYNC_FOLDERS.getSwitch()))
.withDeleteMissing(
command.hasOption(DistCpOptionSwitch.DELETE_MISSING.getSwitch()))
.withDeleteUseTrash(
command.hasOption(DistCpOptionSwitch.DELETE_USETRASH.getSwitch()))
.withIgnoreFailures(
command.hasOption(DistCpOptionSwitch.IGNORE_FAILURES.getSwitch()))
.withOverwrite(
@ -153,6 +155,9 @@ public static DistCpOptions parse(String[] args)
command,
DistCpOptionSwitch.TRACK_MISSING.getSwitch())));
}
if (command.hasOption(DistCpOptionSwitch.DELETE_USETRASH.getSwitch())) {
builder.withDeleteUseTrash(true);
}
if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) {
try {

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
@ -453,7 +454,8 @@ private void deleteMissing(Configuration conf) throws IOException {
if (tracker.shouldDelete(trgtFileStatus)) {
showProgress = true;
try {
if (targetFS.delete(targetEntry, true)) {
boolean result = deletePath(targetFS, targetEntry, conf);
if (result) {
// the delete worked. Unless the file is actually missing, this is the
LOG.info("Deleted " + targetEntry + " - missing at source");
deletedEntries++;
@ -467,7 +469,8 @@ private void deleteMissing(Configuration conf) throws IOException {
// For all the filestores which implement the FS spec properly,
// this means "the file wasn't there".
// so track but don't worry about it.
LOG.info("delete({}) returned false ({})",
LOG.info("delete({}) returned false ({}). Consider using " +
"-useTrash option if trash is enabled.",
targetEntry, trgtFileStatus);
missingDeletes++;
}
@ -515,6 +518,17 @@ private void deleteMissing(Configuration conf) throws IOException {
formatDuration(deletionEnd - listingStart));
}
private boolean deletePath(FileSystem targetFS, Path targetEntry,
Configuration conf) throws IOException {
if (conf.getBoolean(DistCpConstants.CONF_LABEL_DELETE_MISSING_USETRASH,
false)) {
return Trash.moveToAppropriateTrash(
targetFS, targetEntry, conf);
} else {
return targetFS.delete(targetEntry, true);
}
}
/**
* Take a duration and return a human-readable duration of
* hours:minutes:seconds.millis.

View File

@ -349,7 +349,7 @@ Command Line Options
| `-filters` | The path to a file containing a list of pattern strings, one string per line, such that paths matching the pattern will be excluded from the copy. | Support regular expressions specified by java.util.regex.Pattern. |
| `-filelimit <n>` | Limit the total number of files to be <= n | **Deprecated!** Ignored in the new DistCp. |
| `-sizelimit <n>` | Limit the total size to be <= n bytes | **Deprecated!** Ignored in the new DistCp. |
| `-delete` | Delete the files existing in the dst but not in src | The deletion is done by FS Shell. So the trash will be used, if it is enable. Delete is applicable only with update or overwrite options. |
| `-delete [-useTrash]` | Delete the files existing in the `/dst/` but not in `/src/` . when `[-useTrash]` is enabled, the files will be moved into the user's trash directory. Notice that `[-useTrash]` option on some object store does a copy and delete ops and can be slow. Delete is applicable only with update or overwrite options. |
| `-strategy {dynamic|uniformsize}` | Choose the copy-strategy to be used in DistCp. | By default, uniformsize is used. (i.e. Maps are balanced on the total size of files copied by each map. Similar to legacy.) If "dynamic" is specified, `DynamicInputFormat` is used instead. (This is described in the Architecture section, under InputFormats.) |
| `-bandwidth` | Specify bandwidth per map, in MB/second. | Each map will be restricted to consume only the specified bandwidth. This is not always exact. The map throttles back its bandwidth consumption during a copy, such that the **net** bandwidth used tends towards the specified value. |
| `-atomic {-tmp <tmp_dir>}` | Specify atomic commit, with optional tmp directory. | `-atomic` instructs DistCp to copy the source data to a temporary target location, and then move the temporary target to the final-location atomically. Data will either be available at final target in a complete and consistent form, or not at all. Optionally, `-tmp` may be used to specify the location of the tmp-target. If not specified, a default is chosen. **Note:** tmp_dir must be on the final target cluster. |

View File

@ -21,10 +21,12 @@
import java.util.Collections;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Test;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
@ -225,6 +227,42 @@ public void testSetDeleteMissing() {
}
}
@Test
public void testDeleteMissingUseTrash() throws Exception {
final DistCpOptions.Builder builder = new DistCpOptions.Builder(
Collections.singletonList(new Path("hdfs://localhost:8020/source")),
new Path("hdfs://localhost:8020/target/"));
Assert.assertFalse("Delete does not use trash by default.",
builder.build().shouldDeleteUseTrash());
DistCpOptions options = builder.withSyncFolder(true)
.withDeleteMissing(true)
.withDeleteUseTrash(true)
.build();
Assert.assertTrue(options.shouldSyncFolder());
Assert.assertTrue(options.shouldDeleteMissing());
Assert.assertTrue(options.shouldDeleteUseTrash());
options = new DistCpOptions.Builder(
Collections.singletonList(new Path("hdfs://localhost:8020/source")),
new Path("hdfs://localhost:8020/target/"))
.withOverwrite(true)
.withDeleteMissing(true)
.withDeleteUseTrash(true)
.build();
Assert.assertTrue(options.shouldDeleteUseTrash());
Assert.assertTrue(options.shouldOverwrite());
Assert.assertTrue(options.shouldDeleteMissing());
LambdaTestUtils.intercept(IllegalArgumentException.class,
() -> new DistCpOptions.Builder(Collections.singletonList(
new Path("hdfs://localhost:8020/source")),
new Path("hdfs://localhost:8020/target/"))
.withDeleteUseTrash(true)
.build());
}
@Test
public void testSetMaps() {
final DistCpOptions.Builder builder = new DistCpOptions.Builder(
@ -281,8 +319,8 @@ public void testToString() {
DistCpOptions option = new DistCpOptions.Builder(new Path("abc"),
new Path("xyz")).build();
String val = "DistCpOptions{atomicCommit=false, syncFolder=false, " +
"deleteMissing=false, ignoreFailures=false, overwrite=false, " +
"append=false, useDiff=false, useRdiff=false, " +
"deleteMissing=false, deleteUseTrash=false, ignoreFailures=false, " +
"overwrite=false, append=false, useDiff=false, useRdiff=false, " +
"fromSnapshot=null, toSnapshot=null, " +
"skipCRC=false, blocking=true, numListstatusThreads=0, maxMaps=20, " +
"mapBandwidth=0.0, copyStrategy='uniformsize', preserveStatus=[], " +

View File

@ -348,7 +348,7 @@ public void testDeleteMissingInDestination() {
createFiles("srcdir/file1", "dstdir/file1", "dstdir/file2");
Path target = new Path(root + "/dstdir");
runTest(listFile, target, false, true, true, false);
runTest(listFile, target, false, true, true, false, false);
checkResult(target, 1, "file1");
} catch (IOException e) {
@ -372,7 +372,7 @@ public void testOverwrite() {
createWithContents("dstdir/file1", contents2);
Path target = new Path(root + "/dstdir");
runTest(listFile, target, false, false, false, true);
runTest(listFile, target, false, false, false, true, false);
checkResult(target, 1, "file1");
@ -553,15 +553,16 @@ private void mkdirs(String... entries) throws IOException {
private void runTest(Path listFile, Path target, boolean targetExists,
boolean sync) throws IOException {
runTest(listFile, target, targetExists, sync, false, false);
runTest(listFile, target, targetExists, sync, false, false, false);
}
private void runTest(Path listFile, Path target, boolean targetExists,
boolean sync, boolean delete,
boolean overwrite) throws IOException {
boolean overwrite, boolean useTrash) throws IOException {
final DistCpOptions options = new DistCpOptions.Builder(listFile, target)
.withSyncFolder(sync)
.withDeleteMissing(delete)
.withDeleteUseTrash(useTrash)
.withOverwrite(overwrite)
.withNumListstatusThreads(numListstatusThreads)
.build();

View File

@ -802,4 +802,23 @@ public void testExclusionsOption() {
"hdfs://localhost:8020/target/"});
Assert.assertEquals(options.getFiltersFile(), "/tmp/filters.txt");
}
@Test
public void testParseDeleteSkipTrash() {
DistCpOptions options = OptionsParser.parse(new String[] {
"-overwrite",
"-delete",
"-useTrash",
"hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target/"});
Assert.assertTrue("Delete with useTrash.",
options.shouldDeleteUseTrash());
options = OptionsParser.parse(new String[] {
"-overwrite",
"-delete",
"hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target/"});
Assert.assertFalse("Delete does not use trash.",
options.shouldDeleteUseTrash());
}
}

View File

@ -26,6 +26,7 @@
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
@ -137,6 +138,7 @@ protected Configuration createConfiguration() {
public void setup() throws Exception {
super.setup();
conf = getContract().getConf();
conf.setLong(CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY, 10);
localFS = FileSystem.getLocal(conf);
remoteFS = getFileSystem();
// Test paths are isolated by concrete subclass name and test method name.
@ -224,6 +226,13 @@ public void testUpdateDeepDirectoryStructureToRemote() throws Exception {
distCpUpdateDeepDirectoryStructure(inputDirUnderOutputDir);
}
@Test
public void testUpdateUseTrashDeepDirectoryStructureToRemote() throws Exception {
describe("update a deep directory structure from local to remote");
distCpDeepDirectoryStructure(localFS, localDir, remoteFS, remoteDir);
distCpUpdateUseTrashDeepDirectoryStructure(remoteDir);
}
@Test
public void testUpdateDeepDirectoryStructureNoChange() throws Exception {
describe("update an unchanged directory structure"
@ -284,6 +293,10 @@ protected Job distCpUpdateDeepDirectoryStructure(final Path destDir)
modifySourceDirectories();
ContractTestUtils.assertPathsDoNotExist(localFS,
"Paths for test are wrong",
inputFile1, inputFile3, inputSubDir4);
Job job = distCpUpdate(srcDir, destDir);
Path outputFileNew1 = new Path(outputSubDir2, "newfile1");
@ -322,6 +335,73 @@ private Job distCpUpdate(final Path srcDir, final Path destDir)
.withOverwrite(false)));
}
/**
* Do a distcp -update -delete -useTrash.
* @param destDir output directory used by the initial distcp
* @return the distcp job
*/
protected Job distCpUpdateUseTrashDeepDirectoryStructure(final Path destDir)
throws Exception {
describe("Incremental update with deletion-use-trash of missing files");
Path srcDir = inputDir;
LOG.info("Source directory = {}, dest={}", srcDir, destDir);
ContractTestUtils.assertPathsExist(localFS,
"Paths for test are wrong",
inputFile1, inputFile2, inputFile3, inputFile4, inputFile5);
modifySourceDirectories();
ContractTestUtils.assertPathsDoNotExist(localFS, "deleted right now",
inputFile1, inputFile3, inputSubDir4);
Path trashRootDir = remoteFS.getTrashRoot(null);
ContractTestUtils.assertDeleted(remoteFS, trashRootDir, true);
Job job = distCpUpdateDeleteUseTrash(inputDir, inputDirUnderOutputDir);
lsR("Updated Remote", remoteFS, destDir);
ContractTestUtils.assertPathsDoNotExist(remoteFS,
"DistCP should have deleted",
outputFile1, outputFile3, outputFile4, outputSubDir4);
ContractTestUtils.assertPathExists(remoteFS,
"Path delete does not use trash", trashRootDir);
Path trashFile1 = new Path(trashRootDir,
"Current" + outputFile1.toUri().getPath());
Path trashFile3 = new Path(trashRootDir,
"Current" + outputFile3.toUri().getPath());
Path trashFile4 = new Path(trashRootDir,
"Current" + outputFile4.toUri().getPath());
Path trashFile5 = new Path(trashRootDir,
"Current" + outputFile5.toUri().getPath());
ContractTestUtils.assertPathsExist(remoteFS,
"Path delete does not use trash",
trashFile1, trashFile3, trashFile4, trashFile5);
return job;
}
/**
* Run distcp -update -delete -useTrash.
* @param srcDir local source directory
* @param destDir remote destination directory.
* @return the completed job
* @throws Exception any failure.
*/
private Job distCpUpdateDeleteUseTrash(final Path srcDir, final Path destDir)
throws Exception {
describe("\nDistcp -update from " + srcDir + " to " + destDir);
lsR("Local to update", localFS, srcDir);
lsR("Remote before update", remoteFS, destDir);
return runDistCp(buildWithStandardOptions(
new DistCpOptions.Builder(
Collections.singletonList(srcDir), destDir)
.withDeleteMissing(true)
.withDeleteUseTrash(true)
.withSyncFolder(true)
.withCRC(true)
.withOverwrite(false)));
}
/**
* Update the source directories as various tests expect,
* including adding a new file.
@ -336,6 +416,11 @@ private Path modifySourceDirectories() throws IOException {
// add one new file
Path inputFileNew1 = new Path(inputSubDir2, "newfile1");
ContractTestUtils.touch(localFS, inputFileNew1);
ContractTestUtils.assertPathsDoNotExist(localFS, "deleted right now",
inputFile1, inputFile3, inputSubDir4);
ContractTestUtils.assertPathsExist(localFS, "touched right now",
inputFileNew1);
return inputFileNew1;
}

View File

@ -21,6 +21,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -71,6 +73,7 @@ private static Job getJobForClient() throws IOException {
public static void create() throws IOException {
config = getJobForClient().getConfiguration();
config.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, 0);
config.setLong(CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY, 10);
cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).format(true)
.build();
}
@ -204,6 +207,61 @@ public void testDeleteMissing() throws IOException {
}
}
@Test
public void testDeleteUseTrash() throws IOException {
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
JobContext jobContext = new JobContextImpl(
taskAttemptContext.getConfiguration(),
taskAttemptContext.getTaskAttemptID().getJobID());
Configuration conf = jobContext.getConfiguration();
String sourceBase;
String targetBase;
FileSystem fs = null;
try {
OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
fs = FileSystem.get(conf);
sourceBase = TestDistCpUtils.createTestSetup(fs);
targetBase = TestDistCpUtils.createTestSetup(fs);
String targetBaseAdd = TestDistCpUtils.createTestSetup(fs);
ContractTestUtils.assertRenameOutcome(fs, new Path(targetBaseAdd),
new Path(targetBase),true);
DistCpOptions.Builder builder = new DistCpOptions.Builder(
Arrays.asList(new Path(sourceBase)), new Path("/out"));
builder.withSyncFolder(true);
builder.withDeleteMissing(true);
builder.withDeleteUseTrash(true);
builder.build().appendToConf(conf);
DistCpContext cpContext = new DistCpContext(builder.build());
CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
listing.buildListing(listingFile, cpContext);
conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
Path trashRootDir = fs.getTrashRoot(null);
if (fs.exists(trashRootDir)) {
fs.delete(trashRootDir, true);
}
committer.commitJob(jobContext);
verifyFoldersAreInSync(fs, targetBase, sourceBase);
verifyFoldersAreInSync(fs, sourceBase, targetBase);
Assert.assertTrue("Path delete does not use trash",
fs.exists(trashRootDir));
Path trashDir = new Path(trashRootDir, "Current" + targetBaseAdd);
verifyFoldersAreInSync(fs, trashDir.toString(), sourceBase);
} finally {
TestDistCpUtils.delete(fs, "/tmp1");
conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false");
conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING_USETRASH, "false");
}
}
@Test
public void testDeleteMissingFlatInterleavedFiles() throws IOException {
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);