HADOOP-15209. DistCp to eliminate needless deletion of files under already-deleted directories.

Contributed by Steve Loughran.
This commit is contained in:
Steve Loughran 2018-03-15 18:05:14 +00:00
parent 78b05fde6c
commit 1976e0066e
20 changed files with 1508 additions and 209 deletions

View File

@ -228,9 +228,9 @@ public class ContractTestUtils extends Assert {
public static void verifyFileContents(FileSystem fs, public static void verifyFileContents(FileSystem fs,
Path path, Path path,
byte[] original) throws IOException { byte[] original) throws IOException {
assertIsFile(fs, path);
FileStatus stat = fs.getFileStatus(path); FileStatus stat = fs.getFileStatus(path);
String statText = stat.toString(); String statText = stat.toString();
assertTrue("not a file " + statText, stat.isFile());
assertEquals("wrong length " + statText, original.length, stat.getLen()); assertEquals("wrong length " + statText, original.length, stat.getLen());
byte[] bytes = readDataset(fs, path, original.length); byte[] bytes = readDataset(fs, path, original.length);
compareByteArrays(original, bytes, original.length); compareByteArrays(original, bytes, original.length);
@ -853,6 +853,36 @@ public class ContractTestUtils extends Assert {
status.isSymlink()); status.isSymlink());
} }
/**
* Assert that a varargs list of paths exist.
* @param fs filesystem
* @param message message for exceptions
* @param paths paths
* @throws IOException IO failure
*/
public static void assertPathsExist(FileSystem fs,
String message,
Path... paths) throws IOException {
for (Path path : paths) {
assertPathExists(fs, message, path);
}
}
/**
* Assert that a varargs list of paths do not exist.
* @param fs filesystem
* @param message message for exceptions
* @param paths paths
* @throws IOException IO failure
*/
public static void assertPathsDoNotExist(FileSystem fs,
String message,
Path... paths) throws IOException {
for (Path path : paths) {
assertPathDoesNotExist(fs, message, path);
}
}
/** /**
* Create a dataset for use in the tests; all data is in the range * Create a dataset for use in the tests; all data is in the range
* base to (base+modulo-1) inclusive. * base to (base+modulo-1) inclusive.

View File

@ -18,11 +18,15 @@
package org.apache.hadoop.fs.contract.s3a; package org.apache.hadoop.fs.contract.s3a;
import java.io.IOException;
import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3ATestConstants.SCALE_TEST_TIMEOUT_MILLIS; import static org.apache.hadoop.fs.s3a.S3ATestConstants.SCALE_TEST_TIMEOUT_MILLIS;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard; import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.FailureInjectionPolicy;
import org.apache.hadoop.tools.contract.AbstractContractDistCpTest; import org.apache.hadoop.tools.contract.AbstractContractDistCpTest;
/** /**
@ -57,4 +61,17 @@ public class ITestS3AContractDistCp extends AbstractContractDistCpTest {
protected S3AContract createContract(Configuration conf) { protected S3AContract createContract(Configuration conf) {
return new S3AContract(conf); return new S3AContract(conf);
} }
/**
* Always inject the delay path in, so if the destination is inconsistent,
* and uses this key, inconsistency triggered.
* @param filepath path string in
* @return path on the remote FS for distcp
* @throws IOException IO failure
*/
@Override
protected Path path(final String filepath) throws IOException {
Path path = super.path(filepath);
return new Path(path, FailureInjectionPolicy.DEFAULT_DELAY_KEY_SUBSTRING);
}
} }

View File

@ -147,5 +147,24 @@
<version>${okhttp.version}</version> <version>${okhttp.version}</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-distcp</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-distcp</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.fs.adl.live;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.tools.contract.AbstractContractDistCpTest;
/**
* Test DistCP operations.
*/
public class TestAdlContractDistCpLive extends AbstractContractDistCpTest {
@Override
protected AbstractFSContract createContract(Configuration configuration) {
return new AdlStorageContract(configuration);
}
}

View File

@ -143,7 +143,6 @@ public abstract class CopyListing extends Configured {
throws DuplicateFileException, IOException { throws DuplicateFileException, IOException {
Configuration config = getConf(); Configuration config = getConf();
FileSystem fs = pathToListFile.getFileSystem(config);
final boolean splitLargeFile = context.splitLargeFile(); final boolean splitLargeFile = context.splitLargeFile();
@ -153,7 +152,7 @@ public abstract class CopyListing extends Configured {
// <chunkOffset, chunkLength> is continuous. // <chunkOffset, chunkLength> is continuous.
// //
Path checkPath = splitLargeFile? Path checkPath = splitLargeFile?
pathToListFile : DistCpUtils.sortListing(fs, config, pathToListFile); pathToListFile : DistCpUtils.sortListing(config, pathToListFile);
SequenceFile.Reader reader = new SequenceFile.Reader( SequenceFile.Reader reader = new SequenceFile.Reader(
config, SequenceFile.Reader.file(checkPath)); config, SequenceFile.Reader.file(checkPath));

View File

@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclEntry;
@ -46,8 +47,18 @@ import com.google.common.collect.Maps;
/** /**
* CopyListingFileStatus is a view of {@link FileStatus}, recording additional * CopyListingFileStatus is a view of {@link FileStatus}, recording additional
* data members useful to distcp. * data members useful to distcp.
*
* This is the datastructure persisted in the sequence files generated
* in the CopyCommitter when deleting files.
* Any tool working with these generated files needs to be aware of an
* important stability guarantee: there is none; expect it to change
* across minor Hadoop releases without any support for reading the files of
* different versions.
* Tools parsing the listings must be built and tested against the point
* release of Hadoop which they intend to support.
*/ */
@InterfaceAudience.Private @InterfaceAudience.LimitedPrivate("Distcp support tools")
@InterfaceStability.Unstable
public final class CopyListingFileStatus implements Writable { public final class CopyListingFileStatus implements Writable {
private static final byte NO_ACL_ENTRIES = -1; private static final byte NO_ACL_ENTRIES = -1;

View File

@ -18,12 +18,19 @@
package org.apache.hadoop.tools; package org.apache.hadoop.tools;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
/** /**
* Utility class to hold commonly used constants. * Utility class to hold commonly used constants.
*/ */
public class DistCpConstants { @InterfaceAudience.LimitedPrivate("Distcp support tools")
@InterfaceStability.Evolving
public final class DistCpConstants {
private DistCpConstants() {
}
/* Default number of threads to use for building file listing */ /* Default number of threads to use for building file listing */
public static final int DEFAULT_LISTSTATUS_THREADS = 1; public static final int DEFAULT_LISTSTATUS_THREADS = 1;
@ -52,6 +59,8 @@ public class DistCpConstants {
"distcp.preserve.rawxattrs"; "distcp.preserve.rawxattrs";
public static final String CONF_LABEL_SYNC_FOLDERS = "distcp.sync.folders"; public static final String CONF_LABEL_SYNC_FOLDERS = "distcp.sync.folders";
public static final String CONF_LABEL_DELETE_MISSING = "distcp.delete.missing.source"; public static final String CONF_LABEL_DELETE_MISSING = "distcp.delete.missing.source";
public static final String CONF_LABEL_TRACK_MISSING =
"distcp.track.missing.source";
public static final String CONF_LABEL_LISTSTATUS_THREADS = "distcp.liststatus.threads"; public static final String CONF_LABEL_LISTSTATUS_THREADS = "distcp.liststatus.threads";
public static final String CONF_LABEL_MAX_MAPS = "distcp.max.maps"; public static final String CONF_LABEL_MAX_MAPS = "distcp.max.maps";
public static final String CONF_LABEL_SOURCE_LISTING = "distcp.source.listing"; public static final String CONF_LABEL_SOURCE_LISTING = "distcp.source.listing";
@ -148,4 +157,13 @@ public class DistCpConstants {
static final String HDFS_DISTCP_DIFF_DIRECTORY_NAME = ".distcp.diff.tmp"; static final String HDFS_DISTCP_DIFF_DIRECTORY_NAME = ".distcp.diff.tmp";
public static final int COPY_BUFFER_SIZE_DEFAULT = 8 * 1024; public static final int COPY_BUFFER_SIZE_DEFAULT = 8 * 1024;
/** Filename of sorted files in when tracking saves them. */
public static final String SOURCE_SORTED_FILE = "source_sorted.seq";
/** Filename of unsorted target listing. */
public static final String TARGET_LISTING_FILE = "target_listing.seq";
/** Filename of sorted target listing. */
public static final String TARGET_SORTED_FILE = "target_sorted.seq";
} }

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.tools; package org.apache.hadoop.tools;
import org.apache.commons.cli.Option; import org.apache.commons.cli.Option;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
/** /**
@ -66,7 +67,7 @@ public enum DistCpOptionSwitch {
" files or directories")), " files or directories")),
/** /**
* Deletes missing files in target that are missing from source * Deletes missing files in target that are missing from source.
* This allows the target to be in sync with the source contents * This allows the target to be in sync with the source contents
* Typically used in conjunction with SYNC_FOLDERS * Typically used in conjunction with SYNC_FOLDERS
* Incompatible with ATOMIC_COMMIT * Incompatible with ATOMIC_COMMIT
@ -74,6 +75,21 @@ public enum DistCpOptionSwitch {
DELETE_MISSING(DistCpConstants.CONF_LABEL_DELETE_MISSING, DELETE_MISSING(DistCpConstants.CONF_LABEL_DELETE_MISSING,
new Option("delete", false, "Delete from target, " + new Option("delete", false, "Delete from target, " +
"files missing in source. Delete is applicable only with update or overwrite options")), "files missing in source. Delete is applicable only with update or overwrite options")),
/**
* Track missing files in target that are missing from source
* This allows for other applications to complete the synchronization,
* possibly with object-store-specific delete algorithms.
* Typically used in conjunction with SYNC_FOLDERS
* Incompatible with ATOMIC_COMMIT
*/
@InterfaceStability.Unstable
TRACK_MISSING(DistCpConstants.CONF_LABEL_TRACK_MISSING,
new Option("xtrack", true,
"Save information about missing source files to the"
+ " specified directory")),
/** /**
* Number of threads for building source file listing (before map-reduce * Number of threads for building source file listing (before map-reduce
* phase, max one listStatus per thread at a time). * phase, max one listStatus per thread at a time).

View File

@ -24,6 +24,8 @@ import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.tools.util.DistCpUtils; import org.apache.hadoop.tools.util.DistCpUtils;
@ -43,6 +45,8 @@ import java.util.Set;
* *
* This class is immutable. * This class is immutable.
*/ */
@InterfaceAudience.Public
@InterfaceStability.Evolving
public final class DistCpOptions { public final class DistCpOptions {
private static final Logger LOG = LoggerFactory.getLogger(Builder.class); private static final Logger LOG = LoggerFactory.getLogger(Builder.class);
public static final int MAX_NUM_LISTSTATUS_THREADS = 40; public static final int MAX_NUM_LISTSTATUS_THREADS = 40;
@ -68,6 +72,9 @@ public final class DistCpOptions {
/** Whether source and target folder contents be sync'ed up. */ /** Whether source and target folder contents be sync'ed up. */
private final boolean syncFolder; private final boolean syncFolder;
/** Path to save source/dest sequence files to, if non-null. */
private final Path trackPath;
/** Whether files only present in target should be deleted. */ /** Whether files only present in target should be deleted. */
private boolean deleteMissing; private boolean deleteMissing;
@ -208,6 +215,7 @@ public final class DistCpOptions {
this.copyBufferSize = builder.copyBufferSize; this.copyBufferSize = builder.copyBufferSize;
this.verboseLog = builder.verboseLog; this.verboseLog = builder.verboseLog;
this.trackPath = builder.trackPath;
} }
public Path getSourceFileListing() { public Path getSourceFileListing() {
@ -331,6 +339,10 @@ public final class DistCpOptions {
return verboseLog; return verboseLog;
} }
public Path getTrackPath() {
return trackPath;
}
/** /**
* Add options to configuration. These will be used in the Mapper/committer * Add options to configuration. These will be used in the Mapper/committer
* *
@ -371,6 +383,11 @@ public final class DistCpOptions {
String.valueOf(copyBufferSize)); String.valueOf(copyBufferSize));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.VERBOSE_LOG, DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.VERBOSE_LOG,
String.valueOf(verboseLog)); String.valueOf(verboseLog));
if (trackPath != null) {
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.TRACK_MISSING,
String.valueOf(trackPath));
}
} }
/** /**
@ -441,6 +458,7 @@ public final class DistCpOptions {
private String filtersFile; private String filtersFile;
private Path logPath; private Path logPath;
private Path trackPath;
private String copyStrategy = DistCpConstants.UNIFORMSIZE; private String copyStrategy = DistCpConstants.UNIFORMSIZE;
private int numListstatusThreads = 0; // 0 indicates that flag is not set. private int numListstatusThreads = 0; // 0 indicates that flag is not set.
@ -641,6 +659,11 @@ public final class DistCpOptions {
return this; return this;
} }
public Builder withTrackMissing(Path path) {
this.trackPath = path;
return this;
}
public Builder withCopyStrategy(String newCopyStrategy) { public Builder withCopyStrategy(String newCopyStrategy) {
this.copyStrategy = newCopyStrategy; this.copyStrategy = newCopyStrategy;
return this; return this;

View File

@ -145,6 +145,12 @@ public class OptionsParser {
builder.withAtomicWorkPath(new Path(workPath)); builder.withAtomicWorkPath(new Path(workPath));
} }
} }
if (command.hasOption(DistCpOptionSwitch.TRACK_MISSING.getSwitch())) {
builder.withTrackMissing(
new Path(getVal(
command,
DistCpOptionSwitch.TRACK_MISSING.getSwitch())));
}
if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) { if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) {
try { try {

View File

@ -18,8 +18,9 @@
package org.apache.hadoop.tools.mapred; package org.apache.hadoop.tools.mapred;
import org.apache.commons.logging.Log; import org.slf4j.Logger;
import org.apache.commons.logging.LogFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -49,6 +50,8 @@ import java.util.EnumSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import static org.apache.hadoop.tools.DistCpConstants.*;
/** /**
* The CopyCommitter class is DistCp's OutputCommitter implementation. It is * The CopyCommitter class is DistCp's OutputCommitter implementation. It is
* responsible for handling the completion/cleanup of the DistCp run. * responsible for handling the completion/cleanup of the DistCp run.
@ -62,7 +65,8 @@ import java.util.List;
* 5. Cleanup of any partially copied files, from previous, failed attempts. * 5. Cleanup of any partially copied files, from previous, failed attempts.
*/ */
public class CopyCommitter extends FileOutputCommitter { public class CopyCommitter extends FileOutputCommitter {
private static final Log LOG = LogFactory.getLog(CopyCommitter.class); private static final Logger LOG =
LoggerFactory.getLogger(CopyCommitter.class);
private final TaskAttemptContext taskAttemptContext; private final TaskAttemptContext taskAttemptContext;
private boolean syncFolder = false; private boolean syncFolder = false;
@ -111,6 +115,9 @@ public class CopyCommitter extends FileOutputCommitter {
deleteMissing(conf); deleteMissing(conf);
} else if (conf.getBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, false)) { } else if (conf.getBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, false)) {
commitData(conf); commitData(conf);
} else if (conf.get(CONF_LABEL_TRACK_MISSING) != null) {
// save missing information to a directory
trackMissing(conf);
} }
taskAttemptContext.setStatus("Commit Successful"); taskAttemptContext.setStatus("Commit Successful");
} }
@ -334,40 +341,64 @@ public class CopyCommitter extends FileOutputCommitter {
LOG.info("Preserved status on " + preservedEntries + " dir entries on target"); LOG.info("Preserved status on " + preservedEntries + " dir entries on target");
} }
// This method deletes "extra" files from the target, if they're not /**
// available at the source. * Track all the missing files by saving the listings to the tracking
* directory.
* This is the same as listing phase of the
* {@link #deleteMissing(Configuration)} operation.
* @param conf configuration to read options from, and for FS instantiation.
* @throws IOException IO failure
*/
private void trackMissing(Configuration conf) throws IOException {
// destination directory for all output files
Path trackDir = new Path(
conf.get(DistCpConstants.CONF_LABEL_TRACK_MISSING));
// where is the existing source listing?
Path sourceListing = new Path(
conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH));
LOG.info("Tracking file changes to directory {}", trackDir);
// the destination path is under the track directory
Path sourceSortedListing = new Path(trackDir,
DistCpConstants.SOURCE_SORTED_FILE);
LOG.info("Source listing {}", sourceSortedListing);
DistCpUtils.sortListing(conf, sourceListing, sourceSortedListing);
// Similarly, create the listing of target-files. Sort alphabetically.
// target listing will be deleted after the sort
Path targetListing = new Path(trackDir, TARGET_LISTING_FILE);
Path sortedTargetListing = new Path(trackDir, TARGET_SORTED_FILE);
// list the target
listTargetFiles(conf, targetListing, sortedTargetListing);
LOG.info("Target listing {}", sortedTargetListing);
targetListing.getFileSystem(conf).delete(targetListing, false);
}
/**
* Deletes "extra" files and directories from the target, if they're not
* available at the source.
* @param conf configuration to read options from, and for FS instantiation.
* @throws IOException IO failure
*/
private void deleteMissing(Configuration conf) throws IOException { private void deleteMissing(Configuration conf) throws IOException {
LOG.info("-delete option is enabled. About to remove entries from " + LOG.info("-delete option is enabled. About to remove entries from " +
"target that are missing in source"); "target that are missing in source");
long listingStart = System.currentTimeMillis();
// Sort the source-file listing alphabetically. // Sort the source-file listing alphabetically.
Path sourceListing = new Path(conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH)); Path sourceListing = new Path(conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH));
FileSystem clusterFS = sourceListing.getFileSystem(conf); FileSystem clusterFS = sourceListing.getFileSystem(conf);
Path sortedSourceListing = DistCpUtils.sortListing(clusterFS, conf, sourceListing); Path sortedSourceListing = DistCpUtils.sortListing(conf, sourceListing);
// Similarly, create the listing of target-files. Sort alphabetically. // Similarly, create the listing of target-files. Sort alphabetically.
Path targetListing = new Path(sourceListing.getParent(), "targetListing.seq"); Path targetListing = new Path(sourceListing.getParent(), "targetListing.seq");
CopyListing target = new GlobbedCopyListing(new Configuration(conf), null); Path sortedTargetListing = new Path(targetListing.toString() + "_sorted");
List<Path> targets = new ArrayList<Path>(1); Path targetFinalPath = listTargetFiles(conf,
Path targetFinalPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH)); targetListing, sortedTargetListing);
targets.add(targetFinalPath);
Path resultNonePath = Path.getPathWithoutSchemeAndAuthority(targetFinalPath)
.toString().startsWith(DistCpConstants.HDFS_RESERVED_RAW_DIRECTORY_NAME)
? DistCpConstants.RAW_NONE_PATH : DistCpConstants.NONE_PATH;
//
// Set up options to be the same from the CopyListing.buildListing's perspective,
// so to collect similar listings as when doing the copy
//
DistCpOptions options = new DistCpOptions.Builder(targets, resultNonePath)
.withOverwrite(overwrite)
.withSyncFolder(syncFolder)
.build();
DistCpContext distCpContext = new DistCpContext(options);
distCpContext.setTargetPathExists(targetPathExists);
target.buildListing(targetListing, distCpContext);
Path sortedTargetListing = DistCpUtils.sortListing(clusterFS, conf, targetListing);
long totalLen = clusterFS.getFileStatus(sortedTargetListing).getLen(); long totalLen = clusterFS.getFileStatus(sortedTargetListing).getLen();
SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf, SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf,
@ -377,41 +408,153 @@ public class CopyCommitter extends FileOutputCommitter {
// Walk both source and target file listings. // Walk both source and target file listings.
// Delete all from target that doesn't also exist on source. // Delete all from target that doesn't also exist on source.
long deletionStart = System.currentTimeMillis();
LOG.info("Listing completed in {}",
formatDuration(deletionStart - listingStart));
long deletedEntries = 0; long deletedEntries = 0;
long filesDeleted = 0;
long missingDeletes = 0;
long failedDeletes = 0;
long skippedDeletes = 0;
long deletedDirectories = 0;
// this is an arbitrary constant.
final DeletedDirTracker tracker = new DeletedDirTracker(1000);
try { try {
CopyListingFileStatus srcFileStatus = new CopyListingFileStatus(); CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
Text srcRelPath = new Text(); Text srcRelPath = new Text();
CopyListingFileStatus trgtFileStatus = new CopyListingFileStatus(); CopyListingFileStatus trgtFileStatus = new CopyListingFileStatus();
Text trgtRelPath = new Text(); Text trgtRelPath = new Text();
FileSystem targetFS = targetFinalPath.getFileSystem(conf); final FileSystem targetFS = targetFinalPath.getFileSystem(conf);
boolean showProgress;
boolean srcAvailable = sourceReader.next(srcRelPath, srcFileStatus); boolean srcAvailable = sourceReader.next(srcRelPath, srcFileStatus);
while (targetReader.next(trgtRelPath, trgtFileStatus)) { while (targetReader.next(trgtRelPath, trgtFileStatus)) {
// Skip sources that don't exist on target. // Skip sources that don't exist on target.
while (srcAvailable && trgtRelPath.compareTo(srcRelPath) > 0) { while (srcAvailable && trgtRelPath.compareTo(srcRelPath) > 0) {
srcAvailable = sourceReader.next(srcRelPath, srcFileStatus); srcAvailable = sourceReader.next(srcRelPath, srcFileStatus);
} }
Path targetEntry = trgtFileStatus.getPath();
LOG.debug("Comparing {} and {}",
srcFileStatus.getPath(), targetEntry);
if (srcAvailable && trgtRelPath.equals(srcRelPath)) continue; if (srcAvailable && trgtRelPath.equals(srcRelPath)) continue;
// Target doesn't exist at source. Delete. // Target doesn't exist at source. Try to delete it.
boolean result = targetFS.delete(trgtFileStatus.getPath(), true) if (tracker.shouldDelete(trgtFileStatus)) {
|| !targetFS.exists(trgtFileStatus.getPath()); showProgress = true;
if (result) { try {
LOG.info("Deleted " + trgtFileStatus.getPath() + " - Missing at source"); if (targetFS.delete(targetEntry, true)) {
// the delete worked. Unless the file is actually missing, this is the
LOG.info("Deleted " + targetEntry + " - missing at source");
deletedEntries++; deletedEntries++;
if (trgtFileStatus.isDirectory()) {
deletedDirectories++;
} else { } else {
throw new IOException("Unable to delete " + trgtFileStatus.getPath()); filesDeleted++;
} }
} else {
// delete returned false.
// For all the filestores which implement the FS spec properly,
// this means "the file wasn't there".
// so track but don't worry about it.
LOG.info("delete({}) returned false ({})",
targetEntry, trgtFileStatus);
missingDeletes++;
}
} catch (IOException e) {
if (!ignoreFailures) {
throw e;
} else {
// failed to delete, but ignoring errors. So continue
LOG.info("Failed to delete {}, ignoring exception {}",
targetEntry, e.toString());
LOG.debug("Failed to delete {}", targetEntry, e);
// count and break out the loop
failedDeletes++;
}
}
} else {
LOG.debug("Skipping deletion of {}", targetEntry);
skippedDeletes++;
showProgress = false;
}
if (showProgress) {
// update progress if there's been any FS IO/files deleted.
taskAttemptContext.progress(); taskAttemptContext.progress();
taskAttemptContext.setStatus("Deleting missing files from target. [" + taskAttemptContext.setStatus("Deleting removed files from target. [" +
targetReader.getPosition() * 100 / totalLen + "%]"); targetReader.getPosition() * 100 / totalLen + "%]");
} }
}
// if the FS toString() call prints statistics, they get logged here
LOG.info("Completed deletion of files from {}", targetFS);
} finally { } finally {
IOUtils.closeStream(sourceReader); IOUtils.closeStream(sourceReader);
IOUtils.closeStream(targetReader); IOUtils.closeStream(targetReader);
} }
LOG.info("Deleted " + deletedEntries + " from target: " + targets.get(0)); long deletionEnd = System.currentTimeMillis();
long deletedFileCount = deletedEntries - deletedDirectories;
LOG.info("Deleted from target: files: {} directories: {};"
+ " skipped deletions {}; deletions already missing {};"
+ " failed deletes {}",
deletedFileCount, deletedDirectories, skippedDeletes,
missingDeletes, failedDeletes);
LOG.info("Number of tracked deleted directories {}", tracker.size());
LOG.info("Duration of deletions: {}",
formatDuration(deletionEnd - deletionStart));
LOG.info("Total duration of deletion operation: {}",
formatDuration(deletionEnd - listingStart));
}
/**
* Take a duration and return a human-readable duration of
* hours:minutes:seconds.millis.
* @param duration to process
* @return a string for logging.
*/
private String formatDuration(long duration) {
long seconds = duration > 0 ? (duration / 1000) : 0;
long minutes = (seconds / 60);
long hours = (minutes / 60);
return String.format("%d:%02d:%02d.%03d",
hours, minutes % 60, seconds % 60, duration % 1000);
}
/**
* Build a listing of the target files, sorted and unsorted.
* @param conf configuration to work with
* @param targetListing target listing
* @param sortedTargetListing sorted version of the listing
* @return the target path of the operation
* @throws IOException IO failure.
*/
private Path listTargetFiles(final Configuration conf,
final Path targetListing,
final Path sortedTargetListing) throws IOException {
CopyListing target = new GlobbedCopyListing(new Configuration(conf), null);
Path targetFinalPath = new Path(
conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
List<Path> targets = new ArrayList<>(1);
targets.add(targetFinalPath);
Path resultNonePath = Path.getPathWithoutSchemeAndAuthority(targetFinalPath)
.toString().startsWith(DistCpConstants.HDFS_RESERVED_RAW_DIRECTORY_NAME)
? DistCpConstants.RAW_NONE_PATH
: DistCpConstants.NONE_PATH;
//
// Set up options to be the same from the CopyListing.buildListing's
// perspective, so to collect similar listings as when doing the copy
//
DistCpOptions options = new DistCpOptions.Builder(targets, resultNonePath)
.withOverwrite(overwrite)
.withSyncFolder(syncFolder)
.build();
DistCpContext distCpContext = new DistCpContext(options);
distCpContext.setTargetPathExists(targetPathExists);
target.buildListing(targetListing, distCpContext);
DistCpUtils.sortListing(conf, targetListing, sortedTargetListing);
return targetFinalPath;
} }
private void commitData(Configuration conf) throws IOException { private void commitData(Configuration conf) throws IOException {

View File

@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.mapred;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.tools.CopyListingFileStatus;
/**
* Track deleted directories and support queries to
* check for add them.
*
* Assumptions.
* <ol>
* <liA sorted list of deletions are processed, where directories come
* before their children/descendants.</li>
* <li>Deep directory trees are being deleted.</li>
* <li>The total number of directories deleted is very much
* less than the number of files.</li>
* <li>Most deleted files are in directories which have
* been deleted.</li>
* <li>The cost of issuing a delete() call is less than that that
* of creating Path entries for parent directories and looking them
* up in a hash table.</li>
* <li>That a modest cache is sufficient to identify whether or not
* a parent directory has been deleted./li>
* <li>And that if a path has been evicted from a path, the cost of
* the extra deletions incurred is not significant.</li>
* </ol>
*
* The directory structure this algorithm is intended to optimize for is
* the deletion of datasets partitioned/bucketed into a directory tree,
* and deleted in bulk.
*
* The ordering of deletions comes from the merge sort of the copy listings;
* we rely on this placing a path "/dir1" ahead of "/dir1/file1",
* "/dir1/dir2/file2", and other descendants.
* We do not rely on parent entries being added immediately before children,
* as sorting may place "/dir12" between "/dir1" and its descendants.
*
* Algorithm
*
* <ol>
* <li>
* Before deleting a directory or file, a check is made to see if an
* ancestor is in the cache of deleted directories.
* </li>
* <li>
* If an ancestor is found is: skip the delete.
* </li>
* <li>
* If an ancestor is not foundI: delete the file/dir.
* </li>
* <li>
* When the entry probed is a directory, it is always added to the cache of
* directories, irrespective of the search for an ancestor.
* This is to speed up scans of files directly underneath the path.
* </li>
* </ol>
*
*
*/
final class DeletedDirTracker {
/**
* An LRU cache of directories.
*/
private final Cache<Path, Path> directories;
/**
* Maximum size of the cache.
*/
private final int cacheSize;
/**
* Create an instance.
* @param cacheSize maximum cache size.
*/
DeletedDirTracker(int cacheSize) {
this.cacheSize = cacheSize;
directories = CacheBuilder.newBuilder()
.maximumSize(this.cacheSize)
.build();
}
/**
* Recursive scan for a directory being in the cache of deleted paths.
* @param dir directory to look for.
* @return true iff the path or a parent is in the cache.
*/
boolean isDirectoryOrAncestorDeleted(Path dir) {
if (dir == null) {
// at root
return false;
} else if (isContained(dir)) {
// cache hit
return true;
} else {
// cache miss, check parent
return isDirectoryOrAncestorDeleted(dir.getParent());
}
}
/**
* Probe for a path being deleted by virtue of the fact that an
* ancestor dir has already been deleted.
* @param path path to check
* @return true if the parent dir is deleted.
*/
private boolean isInDeletedDirectory(Path path) {
Preconditions.checkArgument(!path.isRoot(), "Root Dir");
return isDirectoryOrAncestorDeleted(path.getParent());
}
/**
* Should a file or directory be deleted?
* The cache of deleted directories will be updated with the path
* of the status if it references a directory.
* @param status file/path to check
* @return true if the path should be deleted.
*/
boolean shouldDelete(CopyListingFileStatus status) {
Path path = status.getPath();
Preconditions.checkArgument(!path.isRoot(), "Root Dir");
if (status.isDirectory()) {
boolean deleted = isDirectoryOrAncestorDeleted(path);
// even if an ancestor has been deleted, add this entry as
// a deleted directory.
directories.put(path, path);
return !deleted;
} else {
return !isInDeletedDirectory(path);
}
}
/**
* Is a path directly contained in the set of deleted directories.
* @param dir directory to probe
* @return true if this directory is recorded as being deleted.
*/
boolean isContained(Path dir) {
return directories.getIfPresent(dir) != null;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
"DeletedDirTracker{");
sb.append("maximum size=").append(cacheSize);
sb.append("; current size=").append(directories.size());
sb.append('}');
return sb.toString();
}
/**
* Return the current size of the tracker, as in #of entries in the cache.
* @return tracker size.
*/
long size() {
return directories.size();
}
}

View File

@ -433,24 +433,45 @@ public class DistCpUtils {
} }
/** /**
* Sort sequence file containing FileStatus and Text as key and value respecitvely * Sort sequence file containing FileStatus and Text as key and value
* respectively.
* *
* @param fs - File System
* @param conf - Configuration * @param conf - Configuration
* @param sourceListing - Source listing file * @param sourceListing - Source listing file
* @return Path of the sorted file. Is source file with _sorted appended to the name * @return Path of the sorted file. Is source file with _sorted appended to the name
* @throws IOException - Any exception during sort. * @throws IOException - Any exception during sort.
*/ */
public static Path sortListing(FileSystem fs, Configuration conf, Path sourceListing) public static Path sortListing(Configuration conf,
Path sourceListing)
throws IOException { throws IOException {
Path output = new Path(sourceListing.toString() + "_sorted");
sortListing(conf, sourceListing, output);
return output;
}
/**
* Sort sequence file containing FileStatus and Text as key and value
* respectively, saving the result to the {@code output} path, which
* will be deleted first.
*
* @param conf - Configuration
* @param sourceListing - Source listing file
* @param output output path
* @throws IOException - Any exception during sort.
*/
public static void sortListing(final Configuration conf,
final Path sourceListing,
final Path output) throws IOException {
FileSystem fs = sourceListing.getFileSystem(conf);
// force verify that the destination FS matches the input
fs.makeQualified(output);
SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, Text.class, SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, Text.class,
CopyListingFileStatus.class, conf); CopyListingFileStatus.class, conf);
Path output = new Path(sourceListing.toString() + "_sorted");
fs.delete(output, false); fs.delete(output, false);
sorter.sort(sourceListing, output); sorter.sort(sourceListing, output);
return output;
} }
/** /**
@ -547,9 +568,13 @@ public class DistCpUtils {
throws IOException { throws IOException {
FileChecksum targetChecksum = null; FileChecksum targetChecksum = null;
try { try {
sourceChecksum = sourceChecksum != null ? sourceChecksum : sourceFS sourceChecksum = sourceChecksum != null
.getFileChecksum(source); ? sourceChecksum
: sourceFS.getFileChecksum(source);
if (sourceChecksum != null) {
// iff there's a source checksum, look for one at the destination.
targetChecksum = targetFS.getFileChecksum(target); targetChecksum = targetFS.getFileChecksum(target);
}
} catch (IOException e) { } catch (IOException e) {
LOG.error("Unable to retrieve checksum for " + source + " or " + target, e); LOG.error("Unable to retrieve checksum for " + source + " or " + target, e);
} }

View File

@ -20,22 +20,35 @@ package org.apache.hadoop.tools.contract;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*; import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.contract.AbstractFSContractTestBase; import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCp; import org.apache.hadoop.tools.DistCp;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.DistCpOptions; import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.tools.mapred.CopyMapper;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TestName; import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* Contract test suite covering a file system's integration with DistCp. The * Contract test suite covering a file system's integration with DistCp. The
@ -48,13 +61,70 @@ import org.junit.rules.TestName;
public abstract class AbstractContractDistCpTest public abstract class AbstractContractDistCpTest
extends AbstractFSContractTestBase { extends AbstractFSContractTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractContractDistCpTest.class);
public static final String SCALE_TEST_DISTCP_FILE_SIZE_KB
= "scale.test.distcp.file.size.kb";
public static final int DEFAULT_DISTCP_SIZE_KB = 1024;
protected static final int MB = 1024 * 1024;
@Rule @Rule
public TestName testName = new TestName(); public TestName testName = new TestName();
/**
* The timeout value is extended over the default so that large updates
* are allowed to take time, especially to remote stores.
* @return the current test timeout
*/
protected int getTestTimeoutMillis() {
return 15 * 60 * 1000;
}
private Configuration conf; private Configuration conf;
private FileSystem localFS, remoteFS; private FileSystem localFS, remoteFS;
private Path localDir, remoteDir; private Path localDir, remoteDir;
private Path inputDir;
private Path inputSubDir1;
private Path inputSubDir2;
private Path inputSubDir4;
private Path inputFile1;
private Path inputFile2;
private Path inputFile3;
private Path inputFile4;
private Path inputFile5;
private Path outputDir;
private Path outputSubDir1;
private Path outputSubDir2;
private Path outputSubDir4;
private Path outputFile1;
private Path outputFile2;
private Path outputFile3;
private Path outputFile4;
private Path outputFile5;
private Path inputDirUnderOutputDir;
@Override @Override
protected Configuration createConfiguration() { protected Configuration createConfiguration() {
Configuration newConf = new Configuration(); Configuration newConf = new Configuration();
@ -73,20 +143,307 @@ public abstract class AbstractContractDistCpTest
// All paths are fully qualified including scheme (not taking advantage of // All paths are fully qualified including scheme (not taking advantage of
// default file system), so if something fails, the messages will make it // default file system), so if something fails, the messages will make it
// clear which paths are local and which paths are remote. // clear which paths are local and which paths are remote.
Path testSubDir = new Path(getClass().getSimpleName(), String className = getClass().getSimpleName();
testName.getMethodName()); String testSubDir = className + "/" + testName.getMethodName();
localDir = localFS.makeQualified(new Path(new Path( localDir =
GenericTestUtils.getTestDir().toURI()), testSubDir)); localFS.makeQualified(new Path(new Path(
GenericTestUtils.getTestDir().toURI()), testSubDir + "/local"));
mkdirs(localFS, localDir); mkdirs(localFS, localDir);
remoteDir = remoteFS.makeQualified( remoteDir = path(testSubDir + "/remote");
new Path(getContract().getTestPath(), testSubDir));
mkdirs(remoteFS, remoteDir); mkdirs(remoteFS, remoteDir);
// test teardown does this, but IDE-based test debugging can skip
// that teardown; this guarantees the initial state is clean
remoteFS.delete(remoteDir, true);
localFS.delete(localDir, true);
}
/**
* Set up both input and output fields.
* @param src source tree
* @param dest dest tree
*/
protected void initPathFields(final Path src, final Path dest) {
initInputFields(src);
initOutputFields(dest);
}
/**
* Output field setup.
* @param path path to set up
*/
protected void initOutputFields(final Path path) {
outputDir = new Path(path, "outputDir");
inputDirUnderOutputDir = new Path(outputDir, "inputDir");
outputFile1 = new Path(inputDirUnderOutputDir, "file1");
outputSubDir1 = new Path(inputDirUnderOutputDir, "subDir1");
outputFile2 = new Path(outputSubDir1, "file2");
outputSubDir2 = new Path(inputDirUnderOutputDir, "subDir2/subDir2");
outputFile3 = new Path(outputSubDir2, "file3");
outputSubDir4 = new Path(inputDirUnderOutputDir, "subDir4/subDir4");
outputFile4 = new Path(outputSubDir4, "file4");
outputFile5 = new Path(outputSubDir4, "file5");
}
/**
* this path setup is used across different methods (copy, update, track)
* so they are set up as fields.
* @param srcDir source directory for these to go under.
*/
protected void initInputFields(final Path srcDir) {
inputDir = new Path(srcDir, "inputDir");
inputFile1 = new Path(inputDir, "file1");
inputSubDir1 = new Path(inputDir, "subDir1");
inputFile2 = new Path(inputSubDir1, "file2");
inputSubDir2 = new Path(inputDir, "subDir2/subDir2");
inputFile3 = new Path(inputSubDir2, "file3");
inputSubDir4 = new Path(inputDir, "subDir4/subDir4");
inputFile4 = new Path(inputSubDir4, "file4");
inputFile5 = new Path(inputSubDir4, "file5");
}
protected FileSystem getLocalFS() {
return localFS;
}
protected FileSystem getRemoteFS() {
return remoteFS;
}
protected Path getLocalDir() {
return localDir;
}
protected Path getRemoteDir() {
return remoteDir;
} }
@Test @Test
public void deepDirectoryStructureToRemote() throws Exception { public void testUpdateDeepDirectoryStructureToRemote() throws Exception {
describe("update a deep directory structure from local to remote");
distCpDeepDirectoryStructure(localFS, localDir, remoteFS, remoteDir);
distCpUpdateDeepDirectoryStructure(inputDirUnderOutputDir);
}
@Test
public void testUpdateDeepDirectoryStructureNoChange() throws Exception {
describe("update an unchanged directory structure"
+ " from local to remote; expect no copy");
Path target = distCpDeepDirectoryStructure(localFS, localDir, remoteFS,
remoteDir);
describe("\nExecuting Update\n");
Job job = distCpUpdate(localDir, target);
assertCounterInRange(job, CopyMapper.Counter.SKIP, 1, -1);
assertCounterInRange(job, CopyMapper.Counter.BYTESCOPIED, 0, 0);
}
/**
* Assert that a counter is in a range; min and max values are inclusive.
* @param job job to query
* @param counter counter to examine
* @param min min value, if negative "no minimum"
* @param max max value, if negative "no maximum"
* @throws IOException IO problem
*/
void assertCounterInRange(Job job, Enum<?> counter, long min, long max)
throws IOException {
Counter c = job.getCounters().findCounter(counter);
long value = c.getValue();
String description =
String.format("%s value %s", c.getDisplayName(), value);
if (min >= 0) {
assertTrue(description + " too below minimum " + min,
value >= min);
}
if (max >= 0) {
assertTrue(description + " above maximum " + max,
value <= max);
}
}
/**
* Do a distcp from the local source to the destination filesystem.
* This is executed as part of
* {@link #testUpdateDeepDirectoryStructureToRemote()}; it's designed to be
* overidden or wrapped by subclasses which wish to add more assertions.
*
* Life is complicated here by the way that the src/dest paths
* on a distcp is different with -update.
* @param destDir output directory used by the initial distcp
* @return the distcp job
*/
protected Job distCpUpdateDeepDirectoryStructure(final Path destDir)
throws Exception {
describe("Now do an incremental update with deletion of missing files");
Path srcDir = inputDir;
LOG.info("Source directory = {}, dest={}", srcDir, destDir);
ContractTestUtils.assertPathsExist(localFS,
"Paths for test are wrong",
inputFile1, inputFile2, inputFile3, inputFile4, inputFile5);
modifySourceDirectories();
Job job = distCpUpdate(srcDir, destDir);
Path outputFileNew1 = new Path(outputSubDir2, "newfile1");
lsR("Updated Remote", remoteFS, destDir);
ContractTestUtils.assertPathDoesNotExist(remoteFS,
" deleted from " + inputFile1, outputFile1);
ContractTestUtils.assertIsFile(remoteFS, outputFileNew1);
ContractTestUtils.assertPathsDoNotExist(remoteFS,
"DistCP should have deleted",
outputFile3, outputFile4, outputSubDir4);
assertCounterInRange(job, CopyMapper.Counter.COPY, 1, 1);
assertCounterInRange(job, CopyMapper.Counter.SKIP, 1, -1);
return job;
}
/**
* Run distcp -update srcDir destDir.
* @param srcDir local source directory
* @param destDir remote destination directory.
* @return the completed job
* @throws Exception any failure.
*/
private Job distCpUpdate(final Path srcDir, final Path destDir)
throws Exception {
describe("\nDistcp -update from " + srcDir + " to " + destDir);
lsR("Local to update", localFS, srcDir);
lsR("Remote before update", remoteFS, destDir);
return runDistCp(buildWithStandardOptions(
new DistCpOptions.Builder(
Collections.singletonList(srcDir), destDir)
.withDeleteMissing(true)
.withSyncFolder(true)
.withCRC(true)
.withOverwrite(false)));
}
/**
* Update the source directories as various tests expect,
* including adding a new file.
* @return the path to the newly created file
* @throws IOException IO failure
*/
private Path modifySourceDirectories() throws IOException {
localFS.delete(inputFile1, false);
localFS.delete(inputFile3, false);
// delete all of subdir4, so input/output file 4 & 5 will go
localFS.delete(inputSubDir4, true);
// add one new file
Path inputFileNew1 = new Path(inputSubDir2, "newfile1");
ContractTestUtils.touch(localFS, inputFileNew1);
return inputFileNew1;
}
@Test
public void testTrackDeepDirectoryStructureToRemote() throws Exception {
describe("copy a deep directory structure from local to remote"); describe("copy a deep directory structure from local to remote");
deepDirectoryStructure(localFS, localDir, remoteFS, remoteDir);
Path destDir = distCpDeepDirectoryStructure(localFS, localDir, remoteFS,
remoteDir);
ContractTestUtils.assertIsDirectory(remoteFS, destDir);
describe("Now do an incremental update and save of missing files");
Path srcDir = inputDir;
// same path setup as in deepDirectoryStructure()
Path trackDir = new Path(localDir, "trackDir");
describe("\nDirectories\n");
lsR("Local to update", localFS, srcDir);
lsR("Remote before update", remoteFS, destDir);
ContractTestUtils.assertPathsExist(localFS,
"Paths for test are wrong",
inputFile2, inputFile3, inputFile4, inputFile5);
Path inputFileNew1 = modifySourceDirectories();
// Distcp set to track but not delete
runDistCp(buildWithStandardOptions(
new DistCpOptions.Builder(
Collections.singletonList(srcDir),
inputDirUnderOutputDir)
.withTrackMissing(trackDir)
.withSyncFolder(true)
.withOverwrite(false)));
lsR("tracked udpate", remoteFS, destDir);
// new file went over
Path outputFileNew1 = new Path(outputSubDir2, "newfile1");
ContractTestUtils.assertIsFile(remoteFS, outputFileNew1);
ContractTestUtils.assertPathExists(localFS, "tracking directory",
trackDir);
// now read in the listings
Path sortedSourceListing = new Path(trackDir,
DistCpConstants.SOURCE_SORTED_FILE);
ContractTestUtils.assertIsFile(localFS, sortedSourceListing);
Path sortedTargetListing = new Path(trackDir,
DistCpConstants.TARGET_SORTED_FILE);
ContractTestUtils.assertIsFile(localFS, sortedTargetListing);
// deletion didn't happen
ContractTestUtils.assertPathsExist(remoteFS,
"DistCP should have retained",
outputFile2, outputFile3, outputFile4, outputSubDir4);
// now scan the table and see that things are there.
Map<String, Path> sourceFiles = new HashMap<>(10);
Map<String, Path> targetFiles = new HashMap<>(10);
try (SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf,
SequenceFile.Reader.file(sortedSourceListing));
SequenceFile.Reader targetReader = new SequenceFile.Reader(conf,
SequenceFile.Reader.file(sortedTargetListing))) {
CopyListingFileStatus copyStatus = new CopyListingFileStatus();
Text name = new Text();
while(sourceReader.next(name, copyStatus)) {
String key = name.toString();
Path path = copyStatus.getPath();
LOG.info("{}: {}", key, path);
sourceFiles.put(key, path);
}
while(targetReader.next(name, copyStatus)) {
String key = name.toString();
Path path = copyStatus.getPath();
LOG.info("{}: {}", key, path);
targetFiles.put(name.toString(), copyStatus.getPath());
}
}
// look for the new file in both lists
assertTrue("No " + outputFileNew1 + " in source listing",
sourceFiles.containsValue(inputFileNew1));
assertTrue("No " + outputFileNew1 + " in target listing",
targetFiles.containsValue(outputFileNew1));
assertTrue("No " + outputSubDir4 + " in target listing",
targetFiles.containsValue(outputSubDir4));
assertFalse("Found " + inputSubDir4 + " in source listing",
sourceFiles.containsValue(inputSubDir4));
}
public void lsR(final String description,
final FileSystem fs,
final Path dir) throws IOException {
RemoteIterator<LocatedFileStatus> files = fs.listFiles(dir, true);
LOG.info("{}: {}:", description, dir);
StringBuilder sb = new StringBuilder();
while(files.hasNext()) {
LocatedFileStatus status = files.next();
sb.append(String.format(" %s; type=%s; length=%d",
status.getPath(),
status.isDirectory()? "dir" : "file",
status.getLen()));
}
LOG.info("{}", sb);
} }
@Test @Test
@ -96,34 +453,35 @@ public abstract class AbstractContractDistCpTest
} }
@Test @Test
public void deepDirectoryStructureFromRemote() throws Exception { public void testDeepDirectoryStructureFromRemote() throws Exception {
describe("copy a deep directory structure from remote to local"); describe("copy a deep directory structure from remote to local");
deepDirectoryStructure(remoteFS, remoteDir, localFS, localDir); distCpDeepDirectoryStructure(remoteFS, remoteDir, localFS, localDir);
} }
@Test @Test
public void largeFilesFromRemote() throws Exception { public void testLargeFilesFromRemote() throws Exception {
describe("copy multiple large files from remote to local"); describe("copy multiple large files from remote to local");
largeFiles(remoteFS, remoteDir, localFS, localDir); largeFiles(remoteFS, remoteDir, localFS, localDir);
} }
/** /**
* Executes a test using a file system sub-tree with multiple nesting levels. * Executes a DistCp using a file system sub-tree with multiple nesting
* levels.
* The filenames are those of the fields initialized in setup.
* *
* @param srcFS source FileSystem * @param srcFS source FileSystem
* @param srcDir source directory * @param srcDir source directory
* @param dstFS destination FileSystem * @param dstFS destination FileSystem
* @param dstDir destination directory * @param dstDir destination directory
* @return the target directory of the copy
* @throws Exception if there is a failure * @throws Exception if there is a failure
*/ */
private void deepDirectoryStructure(FileSystem srcFS, Path srcDir, private Path distCpDeepDirectoryStructure(FileSystem srcFS,
FileSystem dstFS, Path dstDir) throws Exception { Path srcDir,
Path inputDir = new Path(srcDir, "inputDir"); FileSystem dstFS,
Path inputSubDir1 = new Path(inputDir, "subDir1"); Path dstDir) throws Exception {
Path inputSubDir2 = new Path(inputDir, "subDir2/subDir3"); initPathFields(srcDir, dstDir);
Path inputFile1 = new Path(inputDir, "file1");
Path inputFile2 = new Path(inputSubDir1, "file2");
Path inputFile3 = new Path(inputSubDir2, "file3");
mkdirs(srcFS, inputSubDir1); mkdirs(srcFS, inputSubDir1);
mkdirs(srcFS, inputSubDir2); mkdirs(srcFS, inputSubDir2);
byte[] data1 = dataset(100, 33, 43); byte[] data1 = dataset(100, 33, 43);
@ -132,14 +490,18 @@ public abstract class AbstractContractDistCpTest
createFile(srcFS, inputFile2, true, data2); createFile(srcFS, inputFile2, true, data2);
byte[] data3 = dataset(300, 53, 63); byte[] data3 = dataset(300, 53, 63);
createFile(srcFS, inputFile3, true, data3); createFile(srcFS, inputFile3, true, data3);
createFile(srcFS, inputFile4, true, dataset(400, 53, 63));
createFile(srcFS, inputFile5, true, dataset(500, 53, 63));
Path target = new Path(dstDir, "outputDir"); Path target = new Path(dstDir, "outputDir");
runDistCp(inputDir, target); runDistCp(inputDir, target);
ContractTestUtils.assertIsDirectory(dstFS, target); ContractTestUtils.assertIsDirectory(dstFS, target);
lsR("Destination tree after distcp", dstFS, target);
verifyFileContents(dstFS, new Path(target, "inputDir/file1"), data1); verifyFileContents(dstFS, new Path(target, "inputDir/file1"), data1);
verifyFileContents(dstFS, verifyFileContents(dstFS,
new Path(target, "inputDir/subDir1/file2"), data2); new Path(target, "inputDir/subDir1/file2"), data2);
verifyFileContents(dstFS, verifyFileContents(dstFS,
new Path(target, "inputDir/subDir2/subDir3/file3"), data3); new Path(target, "inputDir/subDir2/subDir2/file3"), data3);
return target;
} }
/** /**
@ -153,20 +515,21 @@ public abstract class AbstractContractDistCpTest
*/ */
private void largeFiles(FileSystem srcFS, Path srcDir, FileSystem dstFS, private void largeFiles(FileSystem srcFS, Path srcDir, FileSystem dstFS,
Path dstDir) throws Exception { Path dstDir) throws Exception {
Path inputDir = new Path(srcDir, "inputDir"); initPathFields(srcDir, dstDir);
Path inputFile1 = new Path(inputDir, "file1"); Path largeFile1 = new Path(inputDir, "file1");
Path inputFile2 = new Path(inputDir, "file2"); Path largeFile2 = new Path(inputDir, "file2");
Path inputFile3 = new Path(inputDir, "file3"); Path largeFile3 = new Path(inputDir, "file3");
mkdirs(srcFS, inputDir); mkdirs(srcFS, inputDir);
int fileSizeKb = conf.getInt("scale.test.distcp.file.size.kb", 10 * 1024); int fileSizeKb = conf.getInt(SCALE_TEST_DISTCP_FILE_SIZE_KB,
DEFAULT_DISTCP_SIZE_KB);
int fileSizeMb = fileSizeKb / 1024; int fileSizeMb = fileSizeKb / 1024;
getLog().info("{} with file size {}", testName.getMethodName(), fileSizeMb); getLog().info("{} with file size {}", testName.getMethodName(), fileSizeMb);
byte[] data1 = dataset((fileSizeMb + 1) * 1024 * 1024, 33, 43); byte[] data1 = dataset((fileSizeMb + 1) * MB, 33, 43);
createFile(srcFS, inputFile1, true, data1); createFile(srcFS, largeFile1, true, data1);
byte[] data2 = dataset((fileSizeMb + 2) * 1024 * 1024, 43, 53); byte[] data2 = dataset((fileSizeMb + 2) * MB, 43, 53);
createFile(srcFS, inputFile2, true, data2); createFile(srcFS, largeFile2, true, data2);
byte[] data3 = dataset((fileSizeMb + 3) * 1024 * 1024, 53, 63); byte[] data3 = dataset((fileSizeMb + 3) * MB, 53, 63);
createFile(srcFS, inputFile3, true, data3); createFile(srcFS, largeFile3, true, data3);
Path target = new Path(dstDir, "outputDir"); Path target = new Path(dstDir, "outputDir");
runDistCp(inputDir, target); runDistCp(inputDir, target);
ContractTestUtils.assertIsDirectory(dstFS, target); ContractTestUtils.assertIsDirectory(dstFS, target);
@ -183,12 +546,34 @@ public abstract class AbstractContractDistCpTest
* @throws Exception if there is a failure * @throws Exception if there is a failure
*/ */
private void runDistCp(Path src, Path dst) throws Exception { private void runDistCp(Path src, Path dst) throws Exception {
DistCpOptions options = new DistCpOptions.Builder( runDistCp(buildWithStandardOptions(
Collections.singletonList(src), dst).build(); new DistCpOptions.Builder(Collections.singletonList(src), dst)));
}
/**
* Run the distcp job.
* @param optons distcp options
* @return the job. It will have already completed.
* @throws Exception failure
*/
private Job runDistCp(final DistCpOptions options) throws Exception {
Job job = new DistCp(conf, options).execute(); Job job = new DistCp(conf, options).execute();
assertNotNull("Unexpected null job returned from DistCp execution.", job); assertNotNull("Unexpected null job returned from DistCp execution.", job);
assertTrue("DistCp job did not complete.", job.isComplete()); assertTrue("DistCp job did not complete.", job.isComplete());
assertTrue("DistCp job did not complete successfully.", job.isSuccessful()); assertTrue("DistCp job did not complete successfully.", job.isSuccessful());
return job;
}
/**
* Add any standard options and then build.
* @param builder DistCp option builder
* @return the build options
*/
private DistCpOptions buildWithStandardOptions(
DistCpOptions.Builder builder) {
return builder
.withNumListstatusThreads(8)
.build();
} }
/** /**

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.contract;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.localfs.LocalFSContract;
/**
* Verifies that the local FS passes all the tests in
* {@link AbstractContractDistCpTest}.
* As such, it acts as an in-module validation of this contract test itself.
*/
public class TestLocalContractDistCp extends AbstractContractDistCpTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new LocalFSContract(conf);
}
}

View File

@ -43,6 +43,9 @@ import org.junit.*;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.*;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
import static org.apache.hadoop.tools.util.TestDistCpUtils.*;
public class TestCopyCommitter { public class TestCopyCommitter {
private static final Log LOG = LogFactory.getLog(TestCopyCommitter.class); private static final Log LOG = LogFactory.getLog(TestCopyCommitter.class);
@ -80,56 +83,42 @@ public class TestCopyCommitter {
} }
@Before @Before
public void createMetaFolder() { public void createMetaFolder() throws IOException {
config.set(DistCpConstants.CONF_LABEL_META_FOLDER, "/meta"); config.set(DistCpConstants.CONF_LABEL_META_FOLDER, "/meta");
// Unset listing file path since the config is shared by // Unset listing file path since the config is shared by
// multiple tests, and some test doesn't set it, such as // multiple tests, and some test doesn't set it, such as
// testNoCommitAction, but the distcp code will check it. // testNoCommitAction, but the distcp code will check it.
config.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, ""); config.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, "");
Path meta = new Path("/meta"); Path meta = new Path("/meta");
try {
cluster.getFileSystem().mkdirs(meta); cluster.getFileSystem().mkdirs(meta);
} catch (IOException e) {
LOG.error("Exception encountered while creating meta folder", e);
Assert.fail("Unable to create meta folder");
}
} }
@After @After
public void cleanupMetaFolder() { public void cleanupMetaFolder() throws IOException {
Path meta = new Path("/meta"); Path meta = new Path("/meta");
try {
if (cluster.getFileSystem().exists(meta)) { if (cluster.getFileSystem().exists(meta)) {
cluster.getFileSystem().delete(meta, true); cluster.getFileSystem().delete(meta, true);
Assert.fail("Expected meta folder to be deleted"); Assert.fail("Expected meta folder to be deleted");
} }
} catch (IOException e) {
LOG.error("Exception encountered while cleaning up folder", e);
Assert.fail("Unable to clean up meta folder");
}
} }
@Test @Test
public void testNoCommitAction() { public void testNoCommitAction() throws IOException {
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(), JobContext jobContext = new JobContextImpl(
taskAttemptContext.getConfiguration(),
taskAttemptContext.getTaskAttemptID().getJobID()); taskAttemptContext.getTaskAttemptID().getJobID());
try {
OutputCommitter committer = new CopyCommitter(null, taskAttemptContext); OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
committer.commitJob(jobContext); committer.commitJob(jobContext);
Assert.assertEquals(taskAttemptContext.getStatus(), "Commit Successful"); Assert.assertEquals("Commit Successful", taskAttemptContext.getStatus());
//Test for idempotent commit //Test for idempotent commit
committer.commitJob(jobContext); committer.commitJob(jobContext);
Assert.assertEquals(taskAttemptContext.getStatus(), "Commit Successful"); Assert.assertEquals("Commit Successful", taskAttemptContext.getStatus());
} catch (IOException e) {
LOG.error("Exception encountered ", e);
Assert.fail("Commit failed");
}
} }
@Test @Test
public void testPreserveStatus() { public void testPreserveStatus() throws IOException {
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(), JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
taskAttemptContext.getTaskAttemptID().getJobID()); taskAttemptContext.getTaskAttemptID().getJobID());
@ -161,19 +150,12 @@ public class TestCopyCommitter {
conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase); conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
committer.commitJob(jobContext); committer.commitJob(jobContext);
if (!checkDirectoryPermissions(fs, targetBase, sourcePerm)) { checkDirectoryPermissions(fs, targetBase, sourcePerm);
Assert.fail("Permission don't match");
}
//Test for idempotent commit //Test for idempotent commit
committer.commitJob(jobContext); committer.commitJob(jobContext);
if (!checkDirectoryPermissions(fs, targetBase, sourcePerm)) { checkDirectoryPermissions(fs, targetBase, sourcePerm);
Assert.fail("Permission don't match");
}
} catch (IOException e) {
LOG.error("Exception encountered while testing for preserve status", e);
Assert.fail("Preserve status failure");
} finally { } finally {
TestDistCpUtils.delete(fs, "/tmp1"); TestDistCpUtils.delete(fs, "/tmp1");
conf.unset(DistCpConstants.CONF_LABEL_PRESERVE_STATUS); conf.unset(DistCpConstants.CONF_LABEL_PRESERVE_STATUS);
@ -182,7 +164,7 @@ public class TestCopyCommitter {
} }
@Test @Test
public void testDeleteMissing() { public void testDeleteMissing() throws IOException {
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(), JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
taskAttemptContext.getTaskAttemptID().getJobID()); taskAttemptContext.getTaskAttemptID().getJobID());
@ -213,24 +195,13 @@ public class TestCopyCommitter {
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase); conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
committer.commitJob(jobContext); committer.commitJob(jobContext);
if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) { verifyFoldersAreInSync(fs, targetBase, sourceBase);
Assert.fail("Source and target folders are not in sync"); verifyFoldersAreInSync(fs, sourceBase, targetBase);
}
if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, sourceBase, targetBase)) {
Assert.fail("Source and target folders are not in sync");
}
//Test for idempotent commit //Test for idempotent commit
committer.commitJob(jobContext); committer.commitJob(jobContext);
if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) { verifyFoldersAreInSync(fs, targetBase, sourceBase);
Assert.fail("Source and target folders are not in sync"); verifyFoldersAreInSync(fs, sourceBase, targetBase);
}
if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, sourceBase, targetBase)) {
Assert.fail("Source and target folders are not in sync");
}
} catch (Throwable e) {
LOG.error("Exception encountered while testing for delete missing", e);
Assert.fail("Delete missing failure");
} finally { } finally {
TestDistCpUtils.delete(fs, "/tmp1"); TestDistCpUtils.delete(fs, "/tmp1");
conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false"); conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false");
@ -238,7 +209,7 @@ public class TestCopyCommitter {
} }
@Test @Test
public void testDeleteMissingFlatInterleavedFiles() { public void testDeleteMissingFlatInterleavedFiles() throws IOException {
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(), JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
taskAttemptContext.getTaskAttemptID().getJobID()); taskAttemptContext.getTaskAttemptID().getJobID());
@ -253,20 +224,20 @@ public class TestCopyCommitter {
fs = FileSystem.get(conf); fs = FileSystem.get(conf);
sourceBase = "/tmp1/" + String.valueOf(rand.nextLong()); sourceBase = "/tmp1/" + String.valueOf(rand.nextLong());
targetBase = "/tmp1/" + String.valueOf(rand.nextLong()); targetBase = "/tmp1/" + String.valueOf(rand.nextLong());
TestDistCpUtils.createFile(fs, sourceBase + "/1"); createFile(fs, sourceBase + "/1");
TestDistCpUtils.createFile(fs, sourceBase + "/3"); createFile(fs, sourceBase + "/3");
TestDistCpUtils.createFile(fs, sourceBase + "/4"); createFile(fs, sourceBase + "/4");
TestDistCpUtils.createFile(fs, sourceBase + "/5"); createFile(fs, sourceBase + "/5");
TestDistCpUtils.createFile(fs, sourceBase + "/7"); createFile(fs, sourceBase + "/7");
TestDistCpUtils.createFile(fs, sourceBase + "/8"); createFile(fs, sourceBase + "/8");
TestDistCpUtils.createFile(fs, sourceBase + "/9"); createFile(fs, sourceBase + "/9");
TestDistCpUtils.createFile(fs, targetBase + "/2"); createFile(fs, targetBase + "/2");
TestDistCpUtils.createFile(fs, targetBase + "/4"); createFile(fs, targetBase + "/4");
TestDistCpUtils.createFile(fs, targetBase + "/5"); createFile(fs, targetBase + "/5");
TestDistCpUtils.createFile(fs, targetBase + "/7"); createFile(fs, targetBase + "/7");
TestDistCpUtils.createFile(fs, targetBase + "/9"); createFile(fs, targetBase + "/9");
TestDistCpUtils.createFile(fs, targetBase + "/A"); createFile(fs, targetBase + "/A");
final DistCpOptions options = new DistCpOptions.Builder( final DistCpOptions options = new DistCpOptions.Builder(
Collections.singletonList(new Path(sourceBase)), new Path("/out")) Collections.singletonList(new Path(sourceBase)), new Path("/out"))
@ -282,20 +253,13 @@ public class TestCopyCommitter {
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase); conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
committer.commitJob(jobContext); committer.commitJob(jobContext);
if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) { verifyFoldersAreInSync(fs, targetBase, sourceBase);
Assert.fail("Source and target folders are not in sync"); Assert.assertEquals(4, fs.listStatus(new Path(targetBase)).length);
}
Assert.assertEquals(fs.listStatus(new Path(targetBase)).length, 4);
//Test for idempotent commit //Test for idempotent commit
committer.commitJob(jobContext); committer.commitJob(jobContext);
if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) { verifyFoldersAreInSync(fs, targetBase, sourceBase);
Assert.fail("Source and target folders are not in sync"); Assert.assertEquals(4, fs.listStatus(new Path(targetBase)).length);
}
Assert.assertEquals(fs.listStatus(new Path(targetBase)).length, 4);
} catch (IOException e) {
LOG.error("Exception encountered while testing for delete missing", e);
Assert.fail("Delete missing failure");
} finally { } finally {
TestDistCpUtils.delete(fs, "/tmp1"); TestDistCpUtils.delete(fs, "/tmp1");
conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false"); conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false");
@ -304,7 +268,7 @@ public class TestCopyCommitter {
} }
@Test @Test
public void testAtomicCommitMissingFinal() { public void testAtomicCommitMissingFinal() throws IOException {
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(), JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
taskAttemptContext.getTaskAttemptID().getJobID()); taskAttemptContext.getTaskAttemptID().getJobID());
@ -322,19 +286,16 @@ public class TestCopyCommitter {
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath); conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath);
conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true); conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true);
Assert.assertTrue(fs.exists(new Path(workPath))); assertPathExists(fs, "Work path", new Path(workPath));
Assert.assertFalse(fs.exists(new Path(finalPath))); assertPathDoesNotExist(fs, "Final path", new Path(finalPath));
committer.commitJob(jobContext); committer.commitJob(jobContext);
Assert.assertFalse(fs.exists(new Path(workPath))); assertPathDoesNotExist(fs, "Work path", new Path(workPath));
Assert.assertTrue(fs.exists(new Path(finalPath))); assertPathExists(fs, "Final path", new Path(finalPath));
//Test for idempotent commit //Test for idempotent commit
committer.commitJob(jobContext); committer.commitJob(jobContext);
Assert.assertFalse(fs.exists(new Path(workPath))); assertPathDoesNotExist(fs, "Work path", new Path(workPath));
Assert.assertTrue(fs.exists(new Path(finalPath))); assertPathExists(fs, "Final path", new Path(finalPath));
} catch (IOException e) {
LOG.error("Exception encountered while testing for preserve status", e);
Assert.fail("Atomic commit failure");
} finally { } finally {
TestDistCpUtils.delete(fs, workPath); TestDistCpUtils.delete(fs, workPath);
TestDistCpUtils.delete(fs, finalPath); TestDistCpUtils.delete(fs, finalPath);
@ -343,7 +304,7 @@ public class TestCopyCommitter {
} }
@Test @Test
public void testAtomicCommitExistingFinal() { public void testAtomicCommitExistingFinal() throws IOException {
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(), JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
taskAttemptContext.getTaskAttemptID().getJobID()); taskAttemptContext.getTaskAttemptID().getJobID());
@ -363,20 +324,17 @@ public class TestCopyCommitter {
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath); conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath);
conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true); conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true);
Assert.assertTrue(fs.exists(new Path(workPath))); assertPathExists(fs, "Work path", new Path(workPath));
Assert.assertTrue(fs.exists(new Path(finalPath))); assertPathExists(fs, "Final path", new Path(finalPath));
try { try {
committer.commitJob(jobContext); committer.commitJob(jobContext);
Assert.fail("Should not be able to atomic-commit to pre-existing path."); Assert.fail("Should not be able to atomic-commit to pre-existing path.");
} catch(Exception exception) { } catch(Exception exception) {
Assert.assertTrue(fs.exists(new Path(workPath))); assertPathExists(fs, "Work path", new Path(workPath));
Assert.assertTrue(fs.exists(new Path(finalPath))); assertPathExists(fs, "Final path", new Path(finalPath));
LOG.info("Atomic-commit Test pass."); LOG.info("Atomic-commit Test pass.");
} }
} catch (IOException e) {
LOG.error("Exception encountered while testing for atomic commit.", e);
Assert.fail("Atomic commit failure");
} finally { } finally {
TestDistCpUtils.delete(fs, workPath); TestDistCpUtils.delete(fs, workPath);
TestDistCpUtils.delete(fs, finalPath); TestDistCpUtils.delete(fs, finalPath);
@ -389,11 +347,11 @@ public class TestCopyCommitter {
new TaskAttemptID("200707121733", 1, TaskType.MAP, 1, 1)); new TaskAttemptID("200707121733", 1, TaskType.MAP, 1, 1));
} }
private boolean checkDirectoryPermissions(FileSystem fs, String targetBase, private void checkDirectoryPermissions(FileSystem fs, String targetBase,
FsPermission sourcePerm) throws IOException { FsPermission sourcePerm) throws IOException {
Path base = new Path(targetBase); Path base = new Path(targetBase);
Stack<Path> stack = new Stack<Path>(); Stack<Path> stack = new Stack<>();
stack.push(base); stack.push(base);
while (!stack.isEmpty()) { while (!stack.isEmpty()) {
Path file = stack.pop(); Path file = stack.pop();
@ -404,11 +362,10 @@ public class TestCopyCommitter {
for (FileStatus status : fStatus) { for (FileStatus status : fStatus) {
if (status.isDirectory()) { if (status.isDirectory()) {
stack.push(status.getPath()); stack.push(status.getPath());
Assert.assertEquals(status.getPermission(), sourcePerm); Assert.assertEquals(sourcePerm, status.getPermission());
} }
} }
} }
return true;
} }
private static class NullInputFormat extends InputFormat { private static class NullInputFormat extends InputFormat {

View File

@ -0,0 +1,250 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.mapred;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.tools.CopyListingFileStatus;
/**
* Unit tests of the deleted directory tracker.
*/
@SuppressWarnings("RedundantThrows")
public class TestDeletedDirTracker extends Assert {
private static final Logger LOG =
LoggerFactory.getLogger(TestDeletedDirTracker.class);
public static final Path ROOT = new Path("hdfs://namenode/");
public static final Path DIR1 = new Path(ROOT, "dir1");
public static final Path FILE0 = new Path(ROOT, "file0");
public static final Path DIR1_FILE1 = new Path(DIR1, "file1");
public static final Path DIR1_FILE2 = new Path(DIR1, "file2");
public static final Path DIR1_DIR3 = new Path(DIR1, "dir3");
public static final Path DIR1_DIR3_DIR4 = new Path(DIR1_DIR3, "dir4");
public static final Path DIR1_DIR3_DIR4_FILE_3 =
new Path(DIR1_DIR3_DIR4, "file1");
private DeletedDirTracker tracker;
@Before
public void setup() {
tracker = new DeletedDirTracker(1000);
}
@After
public void teardown() {
LOG.info(tracker.toString());
}
@Test(expected = IllegalArgumentException.class)
public void testNoRootDir() throws Throwable {
shouldDelete(ROOT, true);
}
@Test(expected = IllegalArgumentException.class)
public void testNoRootFile() throws Throwable {
shouldDelete(dirStatus(ROOT));
}
@Test
public void testFileInRootDir() throws Throwable {
expectShouldDelete(FILE0, false);
expectShouldDelete(FILE0, false);
}
@Test
public void testDeleteDir1() throws Throwable {
expectShouldDelete(DIR1, true);
expectShouldNotDelete(DIR1, true);
expectShouldNotDelete(DIR1_FILE1, false);
expectNotCached(DIR1_FILE1);
expectShouldNotDelete(DIR1_DIR3, true);
expectCached(DIR1_DIR3);
expectShouldNotDelete(DIR1_FILE2, false);
expectShouldNotDelete(DIR1_DIR3_DIR4_FILE_3, false);
expectShouldNotDelete(DIR1_DIR3_DIR4, true);
expectShouldNotDelete(DIR1_DIR3_DIR4, true);
}
@Test
public void testDeleteDirDeep() throws Throwable {
expectShouldDelete(DIR1, true);
expectShouldNotDelete(DIR1_DIR3_DIR4_FILE_3, false);
}
@Test
public void testDeletePerfectCache() throws Throwable {
// run a larger scale test. Also use the ordering we'd expect for a sorted
// listing, which we implement by sorting the paths
List<CopyListingFileStatus> statusList = buildStatusList();
// cache is bigger than the status list
tracker = new DeletedDirTracker(statusList.size());
AtomicInteger deletedFiles = new AtomicInteger(0);
AtomicInteger deletedDirs = new AtomicInteger(0);
deletePaths(statusList, deletedFiles, deletedDirs);
assertEquals(0, deletedFiles.get());
}
@Test
public void testDeleteFullCache() throws Throwable {
// run a larger scale test. Also use the ordering we'd expect for a sorted
// listing, which we implement by sorting the paths
AtomicInteger deletedFiles = new AtomicInteger(0);
AtomicInteger deletedDirs = new AtomicInteger(0);
deletePaths(buildStatusList(), deletedFiles, deletedDirs);
assertEquals(0, deletedFiles.get());
}
@Test
public void testDeleteMediumCache() throws Throwable {
tracker = new DeletedDirTracker(100);
AtomicInteger deletedFiles = new AtomicInteger(0);
AtomicInteger deletedDirs = new AtomicInteger(0);
deletePaths(buildStatusList(), deletedFiles, deletedDirs);
assertEquals(0, deletedFiles.get());
}
@Test
public void testDeleteFullSmallCache() throws Throwable {
tracker = new DeletedDirTracker(10);
AtomicInteger deletedFiles = new AtomicInteger(0);
AtomicInteger deletedDirs = new AtomicInteger(0);
deletePaths(buildStatusList(), deletedFiles, deletedDirs);
assertEquals(0, deletedFiles.get());
}
protected void deletePaths(final List<CopyListingFileStatus> statusList,
final AtomicInteger deletedFiles, final AtomicInteger deletedDirs) {
for (CopyListingFileStatus status : statusList) {
if (shouldDelete(status)) {
AtomicInteger r = status.isDirectory() ? deletedDirs : deletedFiles;
r.incrementAndGet();
LOG.info("Delete {}", status.getPath());
}
}
LOG.info("After proposing to delete {} paths, {} directories and {} files"
+ " were explicitly deleted from a cache {}",
statusList.size(), deletedDirs, deletedFiles, tracker);
}
/**
* Build a large YMD status list; 30 * 12 * 10 directories,
* each with 24 files.
* @return a sorted list.
*/
protected List<CopyListingFileStatus> buildStatusList() {
List<CopyListingFileStatus> statusList = new ArrayList<>();
// recursive create of many files
for (int y = 0; y <= 20; y++) {
Path yp = new Path(String.format("YEAR=%d", y));
statusList.add(dirStatus(yp));
for (int m = 1; m <= 12; m++) {
Path ymp = new Path(yp, String.format("MONTH=%d", m));
statusList.add(dirStatus(ymp));
for (int d = 1; d < 30; d++) {
Path dir = new Path(ymp, String.format("DAY=%02d", d));
statusList.add(dirStatus(dir));
for (int h = 0; h < 24; h++) {
statusList.add(fileStatus(new Path(dir,
String.format("%02d00.avro", h))));
}
}
}
// sort on paths.
Collections.sort(statusList,
(l, r) -> l.getPath().compareTo(r.getPath()));
}
return statusList;
}
private void expectShouldDelete(final Path path, boolean isDir) {
expectShouldDelete(newStatus(path, isDir));
}
private void expectShouldDelete(CopyListingFileStatus status) {
assertTrue("Expected shouldDelete of " + status.getPath(),
shouldDelete(status));
}
private boolean shouldDelete(final Path path, final boolean isDir) {
return shouldDelete(newStatus(path, isDir));
}
private boolean shouldDelete(final CopyListingFileStatus status) {
return tracker.shouldDelete(status);
}
private void expectShouldNotDelete(final Path path, boolean isDir) {
expectShouldNotDelete(newStatus(path, isDir));
}
private void expectShouldNotDelete(CopyListingFileStatus status) {
assertFalse("Expected !shouldDelete of " + status.getPath()
+ " but got true",
shouldDelete(status));
}
private CopyListingFileStatus newStatus(final Path path,
final boolean isDir) {
return new CopyListingFileStatus(new FileStatus(0, isDir, 0, 0, 0, path));
}
private CopyListingFileStatus dirStatus(final Path path) {
return newStatus(path, true);
}
private CopyListingFileStatus fileStatus(final Path path) {
return newStatus(path, false);
}
private void expectCached(final Path path) {
assertTrue("Path " + path + " is not in the cache of " + tracker,
tracker.isContained(path));
}
private void expectNotCached(final Path path) {
assertFalse("Path " + path + " is in the cache of " + tracker,
tracker.isContained(path));
}
}

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.INodeFile; import org.apache.hadoop.hdfs.server.namenode.INodeFile;
@ -1187,26 +1188,33 @@ public class TestDistCpUtils {
} }
} }
public static boolean checkIfFoldersAreInSync(FileSystem fs, String targetBase, String sourceBase) public static void verifyFoldersAreInSync(FileSystem fs, String targetBase,
throws IOException { String sourceBase) throws IOException {
Path base = new Path(targetBase); Path base = new Path(targetBase);
Stack<Path> stack = new Stack<Path>(); Stack<Path> stack = new Stack<>();
stack.push(base); stack.push(base);
while (!stack.isEmpty()) { while (!stack.isEmpty()) {
Path file = stack.pop(); Path file = stack.pop();
if (!fs.exists(file)) continue; if (!fs.exists(file)) {
continue;
}
FileStatus[] fStatus = fs.listStatus(file); FileStatus[] fStatus = fs.listStatus(file);
if (fStatus == null || fStatus.length == 0) continue; if (fStatus == null || fStatus.length == 0) {
continue;
}
for (FileStatus status : fStatus) { for (FileStatus status : fStatus) {
if (status.isDirectory()) { if (status.isDirectory()) {
stack.push(status.getPath()); stack.push(status.getPath());
} }
Assert.assertTrue(fs.exists(new Path(sourceBase + "/" + Path p = new Path(sourceBase + "/" +
DistCpUtils.getRelativePath(new Path(targetBase), status.getPath())))); DistCpUtils.getRelativePath(new Path(targetBase),
status.getPath()));
ContractTestUtils.assertPathExists(fs,
"path in sync with " + status.getPath(), p);
} }
} }
return true;
} }
} }

View File

@ -0,0 +1,128 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration>
<!--
This is a bit ugly: it's a copy of
hadoop-common-project/hadoop-common/src/test/resources/contract/localfs.xml
Why is it needed? The DistCp contract tests are downstream of hadoop-common,
so cannot be run there -but the test XML configuration is in the test
resources there, which are not pulled into the *-test.jar.
-->
<property>
<name>fs.contract.is-case-sensitive</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-unix-permissions</name>
<value>true</value>
</property>
<!--
The remaining options are static
-->
<property>
<name>fs.contract.test.root-tests-enabled</name>
<value>false</value>
</property>
<property>
<name>fs.contract.test.random-seek-count</name>
<value>1000</value>
</property>
<property>
<name>fs.contract.rename-creates-dest-dirs</name>
<value>true</value>
</property>
<property>
<name>fs.contract.rename-overwrites-dest</name>
<value>true</value>
</property>
<property>
<name>fs.contract.rename-remove-dest-if-empty-dir</name>
<value>true</value>
</property>
<!--
checksummed filesystems do not support append; see HADOOP-4292
-->
<property>
<name>fs.contract.supports-append</name>
<value>false</value>
</property>
<property>
<name>fs.contract.supports-atomic-directory-delete</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-atomic-rename</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-block-locality</name>
<value>false</value>
</property>
<property>
<name>fs.contract.supports-concat</name>
<value>false</value>
</property>
<property>
<name>fs.contract.supports-seek</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-seek-on-closed-file</name>
<value>true</value>
</property>
<!-- checksum FS doesn't allow seeking past EOF -->
<property>
<name>fs.contract.rejects-seek-past-eof</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-strict-exceptions</name>
<value>false</value>
</property>
<property>
<name>fs.contract.supports-settimes</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-getfilestatus</name>
<value>true</value>
</property>
</configuration>

View File

@ -16,7 +16,18 @@
# #
# log4j configuration used during build and unit tests # log4j configuration used during build and unit tests
log4j.rootLogger=debug,stdout log4j.rootLogger=info,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} (%F:%M(%L)) - %m%n log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} (%F:%M(%L)) - %m%n
log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR
log4j.logger.org.apache.hadoop.conf.Configuration.deprecation=WARN
log4j.logger.org.apache.hadoop.metrics2=ERROR
log4j.logger.org.apache.hadoop.mapreduce.JobResourceUploader=ERROR
log4j.logger.org.apache.hadoop.yarn.util.ProcfsBasedProcessTree=ERROR
log4j.logger.org.apache.commons.beanutils.FluentPropertyBeanIntrospector=ERROR
log4j.logger.org.apache.commons.configuration2.AbstractConfiguration=ERROR
# Debug level logging of distcp in test runs.
log4j.logger.org.apache.hadoop.tools.mapred=DEBUG