Revert "HDFS-9913. DistCp to add -useTrash to move deleted files to Trash."
Reverting due to test failures if ~/.Trash not present during test setup.
This reverts commit ee3115f488
.
Change-Id: Icbeeb261570b9131ff99d765ac0945c335b26658
This commit is contained in:
parent
ee3115f488
commit
19a001826f
|
@ -61,8 +61,6 @@ public final class DistCpConstants {
|
||||||
public static final String CONF_LABEL_DELETE_MISSING = "distcp.delete.missing.source";
|
public static final String CONF_LABEL_DELETE_MISSING = "distcp.delete.missing.source";
|
||||||
public static final String CONF_LABEL_TRACK_MISSING =
|
public static final String CONF_LABEL_TRACK_MISSING =
|
||||||
"distcp.track.missing.source";
|
"distcp.track.missing.source";
|
||||||
public static final String CONF_LABEL_DELETE_MISSING_USETRASH =
|
|
||||||
"distcp.delete.missing.usetrash";
|
|
||||||
public static final String CONF_LABEL_LISTSTATUS_THREADS = "distcp.liststatus.threads";
|
public static final String CONF_LABEL_LISTSTATUS_THREADS = "distcp.liststatus.threads";
|
||||||
public static final String CONF_LABEL_MAX_MAPS = "distcp.max.maps";
|
public static final String CONF_LABEL_MAX_MAPS = "distcp.max.maps";
|
||||||
public static final String CONF_LABEL_SOURCE_LISTING = "distcp.source.listing";
|
public static final String CONF_LABEL_SOURCE_LISTING = "distcp.source.listing";
|
||||||
|
|
|
@ -76,15 +76,6 @@ public enum DistCpOptionSwitch {
|
||||||
new Option("delete", false, "Delete from target, " +
|
new Option("delete", false, "Delete from target, " +
|
||||||
"files missing in source. Delete is applicable only with update or overwrite options")),
|
"files missing in source. Delete is applicable only with update or overwrite options")),
|
||||||
|
|
||||||
/**
|
|
||||||
* When -delete option on, files in target that are missing from source
|
|
||||||
* will be delete by default. This allows the files to be
|
|
||||||
* moved to the trash
|
|
||||||
*/
|
|
||||||
DELETE_USETRASH(DistCpConstants.CONF_LABEL_DELETE_MISSING_USETRASH,
|
|
||||||
new Option("useTrash", false, "Move deleted files into " +
|
|
||||||
"the user's trash directory in the destination filesystem")),
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Track missing files in target that are missing from source
|
* Track missing files in target that are missing from source
|
||||||
* This allows for other applications to complete the synchronization,
|
* This allows for other applications to complete the synchronization,
|
||||||
|
|
|
@ -95,8 +95,6 @@ public final class DistCpOptions {
|
||||||
/** Whether to run blocking or non-blocking. */
|
/** Whether to run blocking or non-blocking. */
|
||||||
private final boolean blocking;
|
private final boolean blocking;
|
||||||
|
|
||||||
private boolean deleteUseTrash;
|
|
||||||
|
|
||||||
// When "-diff s1 s2 src tgt" is passed, apply forward snapshot diff (from s1
|
// When "-diff s1 s2 src tgt" is passed, apply forward snapshot diff (from s1
|
||||||
// to s2) of source cluster to the target cluster to sync target cluster with
|
// to s2) of source cluster to the target cluster to sync target cluster with
|
||||||
// the source cluster. Referred to as "Fdiff" in the code.
|
// the source cluster. Referred to as "Fdiff" in the code.
|
||||||
|
@ -223,7 +221,6 @@ public final class DistCpOptions {
|
||||||
this.trackPath = builder.trackPath;
|
this.trackPath = builder.trackPath;
|
||||||
|
|
||||||
this.directWrite = builder.directWrite;
|
this.directWrite = builder.directWrite;
|
||||||
this.deleteUseTrash = builder.deleteUseTrash;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Path getSourceFileListing() {
|
public Path getSourceFileListing() {
|
||||||
|
@ -287,10 +284,6 @@ public final class DistCpOptions {
|
||||||
return shouldUseDiff() || shouldUseRdiff();
|
return shouldUseDiff() || shouldUseRdiff();
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean shouldDeleteUseTrash() {
|
|
||||||
return deleteUseTrash;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getFromSnapshot() {
|
public String getFromSnapshot() {
|
||||||
return this.fromSnapshot;
|
return this.fromSnapshot;
|
||||||
}
|
}
|
||||||
|
@ -381,8 +374,6 @@ public final class DistCpOptions {
|
||||||
String.valueOf(useDiff));
|
String.valueOf(useDiff));
|
||||||
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.RDIFF,
|
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.RDIFF,
|
||||||
String.valueOf(useRdiff));
|
String.valueOf(useRdiff));
|
||||||
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DELETE_USETRASH,
|
|
||||||
String.valueOf(deleteUseTrash));
|
|
||||||
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SKIP_CRC,
|
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SKIP_CRC,
|
||||||
String.valueOf(skipCRC));
|
String.valueOf(skipCRC));
|
||||||
if (mapBandwidth > 0) {
|
if (mapBandwidth > 0) {
|
||||||
|
@ -424,7 +415,6 @@ public final class DistCpOptions {
|
||||||
"atomicCommit=" + atomicCommit +
|
"atomicCommit=" + atomicCommit +
|
||||||
", syncFolder=" + syncFolder +
|
", syncFolder=" + syncFolder +
|
||||||
", deleteMissing=" + deleteMissing +
|
", deleteMissing=" + deleteMissing +
|
||||||
", deleteUseTrash=" + deleteUseTrash +
|
|
||||||
", ignoreFailures=" + ignoreFailures +
|
", ignoreFailures=" + ignoreFailures +
|
||||||
", overwrite=" + overwrite +
|
", overwrite=" + overwrite +
|
||||||
", append=" + append +
|
", append=" + append +
|
||||||
|
@ -477,8 +467,6 @@ public final class DistCpOptions {
|
||||||
|
|
||||||
private boolean useDiff = false;
|
private boolean useDiff = false;
|
||||||
private boolean useRdiff = false;
|
private boolean useRdiff = false;
|
||||||
private boolean deleteUseTrash = false;
|
|
||||||
|
|
||||||
private String fromSnapshot;
|
private String fromSnapshot;
|
||||||
private String toSnapshot;
|
private String toSnapshot;
|
||||||
|
|
||||||
|
@ -576,11 +564,6 @@ public final class DistCpOptions {
|
||||||
+ "only with update or overwrite options");
|
+ "only with update or overwrite options");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (deleteUseTrash && !deleteMissing) {
|
|
||||||
throw new IllegalArgumentException("Delete useTrash is applicable "
|
|
||||||
+ "only with delete option");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (overwrite && syncFolder) {
|
if (overwrite && syncFolder) {
|
||||||
throw new IllegalArgumentException("Overwrite and update options are "
|
throw new IllegalArgumentException("Overwrite and update options are "
|
||||||
+ "mutually exclusive");
|
+ "mutually exclusive");
|
||||||
|
@ -644,11 +627,6 @@ public final class DistCpOptions {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder withDeleteUseTrash(boolean newDeleteUseTrash) {
|
|
||||||
this.deleteUseTrash = newDeleteUseTrash;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Builder withIgnoreFailures(boolean newIgnoreFailures) {
|
public Builder withIgnoreFailures(boolean newIgnoreFailures) {
|
||||||
this.ignoreFailures = newIgnoreFailures;
|
this.ignoreFailures = newIgnoreFailures;
|
||||||
return this;
|
return this;
|
||||||
|
|
|
@ -102,8 +102,6 @@ public class OptionsParser {
|
||||||
command.hasOption(DistCpOptionSwitch.SYNC_FOLDERS.getSwitch()))
|
command.hasOption(DistCpOptionSwitch.SYNC_FOLDERS.getSwitch()))
|
||||||
.withDeleteMissing(
|
.withDeleteMissing(
|
||||||
command.hasOption(DistCpOptionSwitch.DELETE_MISSING.getSwitch()))
|
command.hasOption(DistCpOptionSwitch.DELETE_MISSING.getSwitch()))
|
||||||
.withDeleteUseTrash(
|
|
||||||
command.hasOption(DistCpOptionSwitch.DELETE_USETRASH.getSwitch()))
|
|
||||||
.withIgnoreFailures(
|
.withIgnoreFailures(
|
||||||
command.hasOption(DistCpOptionSwitch.IGNORE_FAILURES.getSwitch()))
|
command.hasOption(DistCpOptionSwitch.IGNORE_FAILURES.getSwitch()))
|
||||||
.withOverwrite(
|
.withOverwrite(
|
||||||
|
@ -155,9 +153,6 @@ public class OptionsParser {
|
||||||
command,
|
command,
|
||||||
DistCpOptionSwitch.TRACK_MISSING.getSwitch())));
|
DistCpOptionSwitch.TRACK_MISSING.getSwitch())));
|
||||||
}
|
}
|
||||||
if (command.hasOption(DistCpOptionSwitch.DELETE_USETRASH.getSwitch())) {
|
|
||||||
builder.withDeleteUseTrash(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) {
|
if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.Trash;
|
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.io.SequenceFile;
|
import org.apache.hadoop.io.SequenceFile;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
|
@ -454,8 +453,7 @@ public class CopyCommitter extends FileOutputCommitter {
|
||||||
if (tracker.shouldDelete(trgtFileStatus)) {
|
if (tracker.shouldDelete(trgtFileStatus)) {
|
||||||
showProgress = true;
|
showProgress = true;
|
||||||
try {
|
try {
|
||||||
boolean result = deletePath(targetFS, targetEntry, conf);
|
if (targetFS.delete(targetEntry, true)) {
|
||||||
if (result) {
|
|
||||||
// the delete worked. Unless the file is actually missing, this is the
|
// the delete worked. Unless the file is actually missing, this is the
|
||||||
LOG.info("Deleted " + targetEntry + " - missing at source");
|
LOG.info("Deleted " + targetEntry + " - missing at source");
|
||||||
deletedEntries++;
|
deletedEntries++;
|
||||||
|
@ -469,8 +467,7 @@ public class CopyCommitter extends FileOutputCommitter {
|
||||||
// For all the filestores which implement the FS spec properly,
|
// For all the filestores which implement the FS spec properly,
|
||||||
// this means "the file wasn't there".
|
// this means "the file wasn't there".
|
||||||
// so track but don't worry about it.
|
// so track but don't worry about it.
|
||||||
LOG.info("delete({}) returned false ({}). Consider using " +
|
LOG.info("delete({}) returned false ({})",
|
||||||
"-useTrash option if trash is enabled.",
|
|
||||||
targetEntry, trgtFileStatus);
|
targetEntry, trgtFileStatus);
|
||||||
missingDeletes++;
|
missingDeletes++;
|
||||||
}
|
}
|
||||||
|
@ -518,17 +515,6 @@ public class CopyCommitter extends FileOutputCommitter {
|
||||||
formatDuration(deletionEnd - listingStart));
|
formatDuration(deletionEnd - listingStart));
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean deletePath(FileSystem targetFS, Path targetEntry,
|
|
||||||
Configuration conf) throws IOException {
|
|
||||||
if (conf.getBoolean(DistCpConstants.CONF_LABEL_DELETE_MISSING_USETRASH,
|
|
||||||
false)) {
|
|
||||||
return Trash.moveToAppropriateTrash(
|
|
||||||
targetFS, targetEntry, conf);
|
|
||||||
} else {
|
|
||||||
return targetFS.delete(targetEntry, true);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Take a duration and return a human-readable duration of
|
* Take a duration and return a human-readable duration of
|
||||||
* hours:minutes:seconds.millis.
|
* hours:minutes:seconds.millis.
|
||||||
|
|
|
@ -349,7 +349,7 @@ Command Line Options
|
||||||
| `-filters` | The path to a file containing a list of pattern strings, one string per line, such that paths matching the pattern will be excluded from the copy. | Support regular expressions specified by java.util.regex.Pattern. |
|
| `-filters` | The path to a file containing a list of pattern strings, one string per line, such that paths matching the pattern will be excluded from the copy. | Support regular expressions specified by java.util.regex.Pattern. |
|
||||||
| `-filelimit <n>` | Limit the total number of files to be <= n | **Deprecated!** Ignored in the new DistCp. |
|
| `-filelimit <n>` | Limit the total number of files to be <= n | **Deprecated!** Ignored in the new DistCp. |
|
||||||
| `-sizelimit <n>` | Limit the total size to be <= n bytes | **Deprecated!** Ignored in the new DistCp. |
|
| `-sizelimit <n>` | Limit the total size to be <= n bytes | **Deprecated!** Ignored in the new DistCp. |
|
||||||
| `-delete [-useTrash]` | Delete the files existing in the `/dst/` but not in `/src/` . when `[-useTrash]` is enabled, the files will be moved into the user's trash directory. Notice that `[-useTrash]` option on some object store does a copy and delete ops and can be slow. Delete is applicable only with update or overwrite options. |
|
| `-delete` | Delete the files existing in the dst but not in src | The deletion is done by FS Shell. So the trash will be used, if it is enable. Delete is applicable only with update or overwrite options. |
|
||||||
| `-strategy {dynamic|uniformsize}` | Choose the copy-strategy to be used in DistCp. | By default, uniformsize is used. (i.e. Maps are balanced on the total size of files copied by each map. Similar to legacy.) If "dynamic" is specified, `DynamicInputFormat` is used instead. (This is described in the Architecture section, under InputFormats.) |
|
| `-strategy {dynamic|uniformsize}` | Choose the copy-strategy to be used in DistCp. | By default, uniformsize is used. (i.e. Maps are balanced on the total size of files copied by each map. Similar to legacy.) If "dynamic" is specified, `DynamicInputFormat` is used instead. (This is described in the Architecture section, under InputFormats.) |
|
||||||
| `-bandwidth` | Specify bandwidth per map, in MB/second. | Each map will be restricted to consume only the specified bandwidth. This is not always exact. The map throttles back its bandwidth consumption during a copy, such that the **net** bandwidth used tends towards the specified value. |
|
| `-bandwidth` | Specify bandwidth per map, in MB/second. | Each map will be restricted to consume only the specified bandwidth. This is not always exact. The map throttles back its bandwidth consumption during a copy, such that the **net** bandwidth used tends towards the specified value. |
|
||||||
| `-atomic {-tmp <tmp_dir>}` | Specify atomic commit, with optional tmp directory. | `-atomic` instructs DistCp to copy the source data to a temporary target location, and then move the temporary target to the final-location atomically. Data will either be available at final target in a complete and consistent form, or not at all. Optionally, `-tmp` may be used to specify the location of the tmp-target. If not specified, a default is chosen. **Note:** tmp_dir must be on the final target cluster. |
|
| `-atomic {-tmp <tmp_dir>}` | Specify atomic commit, with optional tmp directory. | `-atomic` instructs DistCp to copy the source data to a temporary target location, and then move the temporary target to the final-location atomically. Data will either be available at final target in a complete and consistent form, or not at all. Optionally, `-tmp` may be used to specify the location of the tmp-target. If not specified, a default is chosen. **Note:** tmp_dir must be on the final target cluster. |
|
||||||
|
|
|
@ -21,12 +21,10 @@ package org.apache.hadoop.tools;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.test.LambdaTestUtils;
|
|
||||||
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
|
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
|
||||||
|
|
||||||
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
|
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
|
||||||
|
@ -227,42 +225,6 @@ public class TestDistCpOptions {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testDeleteMissingUseTrash() throws Exception {
|
|
||||||
final DistCpOptions.Builder builder = new DistCpOptions.Builder(
|
|
||||||
Collections.singletonList(new Path("hdfs://localhost:8020/source")),
|
|
||||||
new Path("hdfs://localhost:8020/target/"));
|
|
||||||
Assert.assertFalse("Delete does not use trash by default.",
|
|
||||||
builder.build().shouldDeleteUseTrash());
|
|
||||||
|
|
||||||
DistCpOptions options = builder.withSyncFolder(true)
|
|
||||||
.withDeleteMissing(true)
|
|
||||||
.withDeleteUseTrash(true)
|
|
||||||
.build();
|
|
||||||
Assert.assertTrue(options.shouldSyncFolder());
|
|
||||||
Assert.assertTrue(options.shouldDeleteMissing());
|
|
||||||
Assert.assertTrue(options.shouldDeleteUseTrash());
|
|
||||||
|
|
||||||
options = new DistCpOptions.Builder(
|
|
||||||
Collections.singletonList(new Path("hdfs://localhost:8020/source")),
|
|
||||||
new Path("hdfs://localhost:8020/target/"))
|
|
||||||
.withOverwrite(true)
|
|
||||||
.withDeleteMissing(true)
|
|
||||||
.withDeleteUseTrash(true)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
Assert.assertTrue(options.shouldDeleteUseTrash());
|
|
||||||
Assert.assertTrue(options.shouldOverwrite());
|
|
||||||
Assert.assertTrue(options.shouldDeleteMissing());
|
|
||||||
|
|
||||||
LambdaTestUtils.intercept(IllegalArgumentException.class,
|
|
||||||
() -> new DistCpOptions.Builder(Collections.singletonList(
|
|
||||||
new Path("hdfs://localhost:8020/source")),
|
|
||||||
new Path("hdfs://localhost:8020/target/"))
|
|
||||||
.withDeleteUseTrash(true)
|
|
||||||
.build());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSetMaps() {
|
public void testSetMaps() {
|
||||||
final DistCpOptions.Builder builder = new DistCpOptions.Builder(
|
final DistCpOptions.Builder builder = new DistCpOptions.Builder(
|
||||||
|
@ -319,8 +281,8 @@ public class TestDistCpOptions {
|
||||||
DistCpOptions option = new DistCpOptions.Builder(new Path("abc"),
|
DistCpOptions option = new DistCpOptions.Builder(new Path("abc"),
|
||||||
new Path("xyz")).build();
|
new Path("xyz")).build();
|
||||||
String val = "DistCpOptions{atomicCommit=false, syncFolder=false, " +
|
String val = "DistCpOptions{atomicCommit=false, syncFolder=false, " +
|
||||||
"deleteMissing=false, deleteUseTrash=false, ignoreFailures=false, " +
|
"deleteMissing=false, ignoreFailures=false, overwrite=false, " +
|
||||||
"overwrite=false, append=false, useDiff=false, useRdiff=false, " +
|
"append=false, useDiff=false, useRdiff=false, " +
|
||||||
"fromSnapshot=null, toSnapshot=null, " +
|
"fromSnapshot=null, toSnapshot=null, " +
|
||||||
"skipCRC=false, blocking=true, numListstatusThreads=0, maxMaps=20, " +
|
"skipCRC=false, blocking=true, numListstatusThreads=0, maxMaps=20, " +
|
||||||
"mapBandwidth=0.0, copyStrategy='uniformsize', preserveStatus=[], " +
|
"mapBandwidth=0.0, copyStrategy='uniformsize', preserveStatus=[], " +
|
||||||
|
|
|
@ -348,7 +348,7 @@ public class TestIntegration {
|
||||||
createFiles("srcdir/file1", "dstdir/file1", "dstdir/file2");
|
createFiles("srcdir/file1", "dstdir/file1", "dstdir/file2");
|
||||||
|
|
||||||
Path target = new Path(root + "/dstdir");
|
Path target = new Path(root + "/dstdir");
|
||||||
runTest(listFile, target, false, true, true, false, false);
|
runTest(listFile, target, false, true, true, false);
|
||||||
|
|
||||||
checkResult(target, 1, "file1");
|
checkResult(target, 1, "file1");
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -372,7 +372,7 @@ public class TestIntegration {
|
||||||
createWithContents("dstdir/file1", contents2);
|
createWithContents("dstdir/file1", contents2);
|
||||||
|
|
||||||
Path target = new Path(root + "/dstdir");
|
Path target = new Path(root + "/dstdir");
|
||||||
runTest(listFile, target, false, false, false, true, false);
|
runTest(listFile, target, false, false, false, true);
|
||||||
|
|
||||||
checkResult(target, 1, "file1");
|
checkResult(target, 1, "file1");
|
||||||
|
|
||||||
|
@ -553,16 +553,15 @@ public class TestIntegration {
|
||||||
|
|
||||||
private void runTest(Path listFile, Path target, boolean targetExists,
|
private void runTest(Path listFile, Path target, boolean targetExists,
|
||||||
boolean sync) throws IOException {
|
boolean sync) throws IOException {
|
||||||
runTest(listFile, target, targetExists, sync, false, false, false);
|
runTest(listFile, target, targetExists, sync, false, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void runTest(Path listFile, Path target, boolean targetExists,
|
private void runTest(Path listFile, Path target, boolean targetExists,
|
||||||
boolean sync, boolean delete,
|
boolean sync, boolean delete,
|
||||||
boolean overwrite, boolean useTrash) throws IOException {
|
boolean overwrite) throws IOException {
|
||||||
final DistCpOptions options = new DistCpOptions.Builder(listFile, target)
|
final DistCpOptions options = new DistCpOptions.Builder(listFile, target)
|
||||||
.withSyncFolder(sync)
|
.withSyncFolder(sync)
|
||||||
.withDeleteMissing(delete)
|
.withDeleteMissing(delete)
|
||||||
.withDeleteUseTrash(useTrash)
|
|
||||||
.withOverwrite(overwrite)
|
.withOverwrite(overwrite)
|
||||||
.withNumListstatusThreads(numListstatusThreads)
|
.withNumListstatusThreads(numListstatusThreads)
|
||||||
.build();
|
.build();
|
||||||
|
|
|
@ -802,23 +802,4 @@ public class TestOptionsParser {
|
||||||
"hdfs://localhost:8020/target/"});
|
"hdfs://localhost:8020/target/"});
|
||||||
Assert.assertEquals(options.getFiltersFile(), "/tmp/filters.txt");
|
Assert.assertEquals(options.getFiltersFile(), "/tmp/filters.txt");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testParseDeleteSkipTrash() {
|
|
||||||
DistCpOptions options = OptionsParser.parse(new String[] {
|
|
||||||
"-overwrite",
|
|
||||||
"-delete",
|
|
||||||
"-useTrash",
|
|
||||||
"hdfs://localhost:8020/source/first",
|
|
||||||
"hdfs://localhost:8020/target/"});
|
|
||||||
Assert.assertTrue("Delete with useTrash.",
|
|
||||||
options.shouldDeleteUseTrash());
|
|
||||||
options = OptionsParser.parse(new String[] {
|
|
||||||
"-overwrite",
|
|
||||||
"-delete",
|
|
||||||
"hdfs://localhost:8020/source/first",
|
|
||||||
"hdfs://localhost:8020/target/"});
|
|
||||||
Assert.assertFalse("Delete does not use trash.",
|
|
||||||
options.shouldDeleteUseTrash());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,7 +26,6 @@ import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -138,7 +137,6 @@ public abstract class AbstractContractDistCpTest
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
super.setup();
|
super.setup();
|
||||||
conf = getContract().getConf();
|
conf = getContract().getConf();
|
||||||
conf.setLong(CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY, 10);
|
|
||||||
localFS = FileSystem.getLocal(conf);
|
localFS = FileSystem.getLocal(conf);
|
||||||
remoteFS = getFileSystem();
|
remoteFS = getFileSystem();
|
||||||
// Test paths are isolated by concrete subclass name and test method name.
|
// Test paths are isolated by concrete subclass name and test method name.
|
||||||
|
@ -226,13 +224,6 @@ public abstract class AbstractContractDistCpTest
|
||||||
distCpUpdateDeepDirectoryStructure(inputDirUnderOutputDir);
|
distCpUpdateDeepDirectoryStructure(inputDirUnderOutputDir);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testUpdateUseTrashDeepDirectoryStructureToRemote() throws Exception {
|
|
||||||
describe("update a deep directory structure from local to remote");
|
|
||||||
distCpDeepDirectoryStructure(localFS, localDir, remoteFS, remoteDir);
|
|
||||||
distCpUpdateUseTrashDeepDirectoryStructure(remoteDir);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUpdateDeepDirectoryStructureNoChange() throws Exception {
|
public void testUpdateDeepDirectoryStructureNoChange() throws Exception {
|
||||||
describe("update an unchanged directory structure"
|
describe("update an unchanged directory structure"
|
||||||
|
@ -293,10 +284,6 @@ public abstract class AbstractContractDistCpTest
|
||||||
|
|
||||||
modifySourceDirectories();
|
modifySourceDirectories();
|
||||||
|
|
||||||
ContractTestUtils.assertPathsDoNotExist(localFS,
|
|
||||||
"Paths for test are wrong",
|
|
||||||
inputFile1, inputFile3, inputSubDir4);
|
|
||||||
|
|
||||||
Job job = distCpUpdate(srcDir, destDir);
|
Job job = distCpUpdate(srcDir, destDir);
|
||||||
|
|
||||||
Path outputFileNew1 = new Path(outputSubDir2, "newfile1");
|
Path outputFileNew1 = new Path(outputSubDir2, "newfile1");
|
||||||
|
@ -335,73 +322,6 @@ public abstract class AbstractContractDistCpTest
|
||||||
.withOverwrite(false)));
|
.withOverwrite(false)));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Do a distcp -update -delete -useTrash.
|
|
||||||
* @param destDir output directory used by the initial distcp
|
|
||||||
* @return the distcp job
|
|
||||||
*/
|
|
||||||
protected Job distCpUpdateUseTrashDeepDirectoryStructure(final Path destDir)
|
|
||||||
throws Exception {
|
|
||||||
describe("Incremental update with deletion-use-trash of missing files");
|
|
||||||
Path srcDir = inputDir;
|
|
||||||
LOG.info("Source directory = {}, dest={}", srcDir, destDir);
|
|
||||||
|
|
||||||
ContractTestUtils.assertPathsExist(localFS,
|
|
||||||
"Paths for test are wrong",
|
|
||||||
inputFile1, inputFile2, inputFile3, inputFile4, inputFile5);
|
|
||||||
|
|
||||||
modifySourceDirectories();
|
|
||||||
ContractTestUtils.assertPathsDoNotExist(localFS, "deleted right now",
|
|
||||||
inputFile1, inputFile3, inputSubDir4);
|
|
||||||
|
|
||||||
Path trashRootDir = remoteFS.getTrashRoot(null);
|
|
||||||
ContractTestUtils.assertDeleted(remoteFS, trashRootDir, true);
|
|
||||||
|
|
||||||
|
|
||||||
Job job = distCpUpdateDeleteUseTrash(inputDir, inputDirUnderOutputDir);
|
|
||||||
lsR("Updated Remote", remoteFS, destDir);
|
|
||||||
|
|
||||||
ContractTestUtils.assertPathsDoNotExist(remoteFS,
|
|
||||||
"DistCP should have deleted",
|
|
||||||
outputFile1, outputFile3, outputFile4, outputSubDir4);
|
|
||||||
ContractTestUtils.assertPathExists(remoteFS,
|
|
||||||
"Path delete does not use trash", trashRootDir);
|
|
||||||
Path trashFile1 = new Path(trashRootDir,
|
|
||||||
"Current" + outputFile1.toUri().getPath());
|
|
||||||
Path trashFile3 = new Path(trashRootDir,
|
|
||||||
"Current" + outputFile3.toUri().getPath());
|
|
||||||
Path trashFile4 = new Path(trashRootDir,
|
|
||||||
"Current" + outputFile4.toUri().getPath());
|
|
||||||
Path trashFile5 = new Path(trashRootDir,
|
|
||||||
"Current" + outputFile5.toUri().getPath());
|
|
||||||
ContractTestUtils.assertPathsExist(remoteFS,
|
|
||||||
"Path delete does not use trash",
|
|
||||||
trashFile1, trashFile3, trashFile4, trashFile5);
|
|
||||||
return job;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Run distcp -update -delete -useTrash.
|
|
||||||
* @param srcDir local source directory
|
|
||||||
* @param destDir remote destination directory.
|
|
||||||
* @return the completed job
|
|
||||||
* @throws Exception any failure.
|
|
||||||
*/
|
|
||||||
private Job distCpUpdateDeleteUseTrash(final Path srcDir, final Path destDir)
|
|
||||||
throws Exception {
|
|
||||||
describe("\nDistcp -update from " + srcDir + " to " + destDir);
|
|
||||||
lsR("Local to update", localFS, srcDir);
|
|
||||||
lsR("Remote before update", remoteFS, destDir);
|
|
||||||
return runDistCp(buildWithStandardOptions(
|
|
||||||
new DistCpOptions.Builder(
|
|
||||||
Collections.singletonList(srcDir), destDir)
|
|
||||||
.withDeleteMissing(true)
|
|
||||||
.withDeleteUseTrash(true)
|
|
||||||
.withSyncFolder(true)
|
|
||||||
.withCRC(true)
|
|
||||||
.withOverwrite(false)));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Update the source directories as various tests expect,
|
* Update the source directories as various tests expect,
|
||||||
* including adding a new file.
|
* including adding a new file.
|
||||||
|
@ -416,11 +336,6 @@ public abstract class AbstractContractDistCpTest
|
||||||
// add one new file
|
// add one new file
|
||||||
Path inputFileNew1 = new Path(inputSubDir2, "newfile1");
|
Path inputFileNew1 = new Path(inputSubDir2, "newfile1");
|
||||||
ContractTestUtils.touch(localFS, inputFileNew1);
|
ContractTestUtils.touch(localFS, inputFileNew1);
|
||||||
|
|
||||||
ContractTestUtils.assertPathsDoNotExist(localFS, "deleted right now",
|
|
||||||
inputFile1, inputFile3, inputSubDir4);
|
|
||||||
ContractTestUtils.assertPathsExist(localFS, "touched right now",
|
|
||||||
inputFileNew1);
|
|
||||||
return inputFileNew1;
|
return inputFileNew1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,8 +21,6 @@ package org.apache.hadoop.tools.mapred;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
||||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -73,7 +71,6 @@ public class TestCopyCommitter {
|
||||||
public static void create() throws IOException {
|
public static void create() throws IOException {
|
||||||
config = getJobForClient().getConfiguration();
|
config = getJobForClient().getConfiguration();
|
||||||
config.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, 0);
|
config.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, 0);
|
||||||
config.setLong(CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY, 10);
|
|
||||||
cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).format(true)
|
cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).format(true)
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
@ -207,61 +204,6 @@ public class TestCopyCommitter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testDeleteUseTrash() throws IOException {
|
|
||||||
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
|
|
||||||
JobContext jobContext = new JobContextImpl(
|
|
||||||
taskAttemptContext.getConfiguration(),
|
|
||||||
taskAttemptContext.getTaskAttemptID().getJobID());
|
|
||||||
Configuration conf = jobContext.getConfiguration();
|
|
||||||
|
|
||||||
String sourceBase;
|
|
||||||
String targetBase;
|
|
||||||
FileSystem fs = null;
|
|
||||||
try {
|
|
||||||
OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
|
|
||||||
fs = FileSystem.get(conf);
|
|
||||||
sourceBase = TestDistCpUtils.createTestSetup(fs);
|
|
||||||
targetBase = TestDistCpUtils.createTestSetup(fs);
|
|
||||||
String targetBaseAdd = TestDistCpUtils.createTestSetup(fs);
|
|
||||||
ContractTestUtils.assertRenameOutcome(fs, new Path(targetBaseAdd),
|
|
||||||
new Path(targetBase),true);
|
|
||||||
|
|
||||||
DistCpOptions.Builder builder = new DistCpOptions.Builder(
|
|
||||||
Arrays.asList(new Path(sourceBase)), new Path("/out"));
|
|
||||||
builder.withSyncFolder(true);
|
|
||||||
builder.withDeleteMissing(true);
|
|
||||||
builder.withDeleteUseTrash(true);
|
|
||||||
builder.build().appendToConf(conf);
|
|
||||||
DistCpContext cpContext = new DistCpContext(builder.build());
|
|
||||||
|
|
||||||
CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
|
|
||||||
Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
|
|
||||||
listing.buildListing(listingFile, cpContext);
|
|
||||||
|
|
||||||
conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
|
|
||||||
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
|
|
||||||
|
|
||||||
Path trashRootDir = fs.getTrashRoot(null);
|
|
||||||
if (fs.exists(trashRootDir)) {
|
|
||||||
fs.delete(trashRootDir, true);
|
|
||||||
}
|
|
||||||
committer.commitJob(jobContext);
|
|
||||||
|
|
||||||
verifyFoldersAreInSync(fs, targetBase, sourceBase);
|
|
||||||
verifyFoldersAreInSync(fs, sourceBase, targetBase);
|
|
||||||
|
|
||||||
Assert.assertTrue("Path delete does not use trash",
|
|
||||||
fs.exists(trashRootDir));
|
|
||||||
Path trashDir = new Path(trashRootDir, "Current" + targetBaseAdd);
|
|
||||||
verifyFoldersAreInSync(fs, trashDir.toString(), sourceBase);
|
|
||||||
} finally {
|
|
||||||
TestDistCpUtils.delete(fs, "/tmp1");
|
|
||||||
conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false");
|
|
||||||
conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING_USETRASH, "false");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDeleteMissingFlatInterleavedFiles() throws IOException {
|
public void testDeleteMissingFlatInterleavedFiles() throws IOException {
|
||||||
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
|
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
|
||||||
|
|
Loading…
Reference in New Issue