HADOOP-1540. Support file exclusion list in distcp. Contributed by Rich Haase.

(cherry picked from commit 0790275f05)
This commit is contained in:
Jing Zhao 2015-05-18 13:24:35 -07:00
parent 3ceb2ffe54
commit 5caea4cd46
15 changed files with 615 additions and 191 deletions

View File

@ -112,6 +112,8 @@ Release 2.8.0 - UNRELEASED
HADOOP-11944. add option to test-patch to avoid relocating patch process HADOOP-11944. add option to test-patch to avoid relocating patch process
directory (Sean Busbey via aw) directory (Sean Busbey via aw)
HADOOP-1540. Support file exclusion list in distcp. (Rich Haase via jing9)
OPTIMIZATIONS OPTIMIZATIONS
HADOOP-11785. Reduce the number of listStatus operation in distcp HADOOP-11785. Reduce the number of listStatus operation in distcp

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
/**
* Interface for excluding files from DistCp.
*
*/
public abstract class CopyFilter {
/**
* Default initialize method does nothing.
*/
public void initialize() {}
/**
* Predicate to determine if a file can be excluded from copy.
*
* @param path a Path to be considered for copying
* @return boolean, true to copy, false to exclude
*/
public abstract boolean shouldCopy(Path path);
/**
* Public factory method which returns the appropriate implementation of
* CopyFilter.
*
* @param conf DistCp configuratoin
* @return An instance of the appropriate CopyFilter
*/
public static CopyFilter getCopyFilter(Configuration conf) {
String filtersFilename = conf.get(DistCpConstants.CONF_LABEL_FILTERS_FILE);
if (filtersFilename == null) {
return new TrueCopyFilter();
} else {
String filterFilename = conf.get(
DistCpConstants.CONF_LABEL_FILTERS_FILE);
return new RegexCopyFilter(filterFilename);
}
}
}

View File

@ -59,7 +59,8 @@ public class DistCpConstants {
public static final String CONF_LABEL_APPEND = "distcp.copy.append"; public static final String CONF_LABEL_APPEND = "distcp.copy.append";
public static final String CONF_LABEL_DIFF = "distcp.copy.diff"; public static final String CONF_LABEL_DIFF = "distcp.copy.diff";
public static final String CONF_LABEL_BANDWIDTH_MB = "distcp.map.bandwidth.mb"; public static final String CONF_LABEL_BANDWIDTH_MB = "distcp.map.bandwidth.mb";
public static final String CONF_LABEL_FILTERS_FILE =
"distcp.filters.file";
public static final String CONF_LABEL_MAX_CHUNKS_TOLERABLE = public static final String CONF_LABEL_MAX_CHUNKS_TOLERABLE =
"distcp.dynamic.max.chunks.tolerable"; "distcp.dynamic.max.chunks.tolerable";
public static final String CONF_LABEL_MAX_CHUNKS_IDEAL = public static final String CONF_LABEL_MAX_CHUNKS_IDEAL =

View File

@ -177,7 +177,16 @@ public enum DistCpOptionSwitch {
* Specify bandwidth per map in MB * Specify bandwidth per map in MB
*/ */
BANDWIDTH(DistCpConstants.CONF_LABEL_BANDWIDTH_MB, BANDWIDTH(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
new Option("bandwidth", true, "Specify bandwidth per map in MB")); new Option("bandwidth", true, "Specify bandwidth per map in MB")),
/**
* Path containing a list of strings, which when found in the path of
* a file to be copied excludes that file from the copy job.
*/
FILTERS(DistCpConstants.CONF_LABEL_FILTERS_FILE,
new Option("filters", true, "The path to a file containing a list of"
+ " strings for paths to be excluded from the copy."));
public static final String PRESERVE_STATUS_DEFAULT = "-prbugpct"; public static final String PRESERVE_STATUS_DEFAULT = "-prbugpct";
private final String confLabel; private final String confLabel;

View File

@ -69,7 +69,12 @@ public class DistCpOptions {
private Path targetPath; private Path targetPath;
// targetPathExist is a derived field, it's initialized in the /**
* The path to a file containing a list of paths to filter out of the copy.
*/
private String filtersFile;
// targetPathExist is a derived field, it's initialized in the
// beginning of distcp. // beginning of distcp.
private boolean targetPathExists = true; private boolean targetPathExists = true;
@ -139,6 +144,7 @@ public DistCpOptions(DistCpOptions that) {
this.sourcePaths = that.getSourcePaths(); this.sourcePaths = that.getSourcePaths();
this.targetPath = that.getTargetPath(); this.targetPath = that.getTargetPath();
this.targetPathExists = that.getTargetPathExists(); this.targetPathExists = that.getTargetPathExists();
this.filtersFile = that.getFiltersFile();
} }
} }
@ -549,6 +555,23 @@ public boolean setTargetPathExists(boolean targetPathExists) {
return this.targetPathExists = targetPathExists; return this.targetPathExists = targetPathExists;
} }
/**
* File path that contains the list of patterns
* for paths to be filtered from the file copy.
* @return - Filter file path.
*/
public final String getFiltersFile() {
return filtersFile;
}
/**
* Set filtersFile.
* @param filtersFilename The path to a list of patterns to exclude from copy.
*/
public final void setFiltersFile(String filtersFilename) {
this.filtersFile = filtersFilename;
}
public void validate(DistCpOptionSwitch option, boolean value) { public void validate(DistCpOptionSwitch option, boolean value) {
boolean syncFolder = (option == DistCpOptionSwitch.SYNC_FOLDERS ? boolean syncFolder = (option == DistCpOptionSwitch.SYNC_FOLDERS ?
@ -623,6 +646,10 @@ public void appendToConf(Configuration conf) {
String.valueOf(mapBandwidth)); String.valueOf(mapBandwidth));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.PRESERVE_STATUS, DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.PRESERVE_STATUS,
DistCpUtils.packAttributes(preserveStatus)); DistCpUtils.packAttributes(preserveStatus));
if (filtersFile != null) {
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.FILTERS,
filtersFile);
}
} }
/** /**
@ -645,6 +672,7 @@ public String toString() {
", targetPath=" + targetPath + ", targetPath=" + targetPath +
", targetPathExists=" + targetPathExists + ", targetPathExists=" + targetPathExists +
", preserveRawXattrs=" + preserveRawXattrs + ", preserveRawXattrs=" + preserveRawXattrs +
", filtersFile='" + filtersFile + '\'' +
'}'; '}';
} }

View File

@ -86,37 +86,7 @@ public static DistCpOptions parse(String args[]) throws IllegalArgumentException
Arrays.toString(args), e); Arrays.toString(args), e);
} }
DistCpOptions option; DistCpOptions option = parseSourceAndTargetPaths(command);
Path targetPath;
List<Path> sourcePaths = new ArrayList<Path>();
String leftOverArgs[] = command.getArgs();
if (leftOverArgs == null || leftOverArgs.length < 1) {
throw new IllegalArgumentException("Target path not specified");
}
//Last Argument is the target path
targetPath = new Path(leftOverArgs[leftOverArgs.length -1].trim());
//Copy any source paths in the arguments to the list
for (int index = 0; index < leftOverArgs.length - 1; index++) {
sourcePaths.add(new Path(leftOverArgs[index].trim()));
}
/* If command has source file listing, use it else, fall back on source paths in args
If both are present, throw exception and bail */
if (command.hasOption(DistCpOptionSwitch.SOURCE_FILE_LISTING.getSwitch())) {
if (!sourcePaths.isEmpty()) {
throw new IllegalArgumentException("Both source file listing and source paths present");
}
option = new DistCpOptions(new Path(getVal(command, DistCpOptionSwitch.
SOURCE_FILE_LISTING.getSwitch())), targetPath);
} else {
if (sourcePaths.isEmpty()) {
throw new IllegalArgumentException("Neither source file listing nor source paths present");
}
option = new DistCpOptions(sourcePaths, targetPath);
}
//Process all the other option switches and set options appropriately //Process all the other option switches and set options appropriately
if (command.hasOption(DistCpOptionSwitch.IGNORE_FAILURES.getSwitch())) { if (command.hasOption(DistCpOptionSwitch.IGNORE_FAILURES.getSwitch())) {
@ -165,54 +135,95 @@ public static DistCpOptions parse(String args[]) throws IllegalArgumentException
option.setBlocking(false); option.setBlocking(false);
} }
if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) { parseBandwidth(command, option);
try {
Integer mapBandwidth = Integer.parseInt(
getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()).trim());
if (mapBandwidth.intValue() <= 0) {
throw new IllegalArgumentException("Bandwidth specified is not positive: " +
mapBandwidth);
}
option.setMapBandwidth(mapBandwidth);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Bandwidth specified is invalid: " +
getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()), e);
}
}
if (command.hasOption(DistCpOptionSwitch.SSL_CONF.getSwitch())) { if (command.hasOption(DistCpOptionSwitch.SSL_CONF.getSwitch())) {
option.setSslConfigurationFile(command. option.setSslConfigurationFile(command.
getOptionValue(DistCpOptionSwitch.SSL_CONF.getSwitch())); getOptionValue(DistCpOptionSwitch.SSL_CONF.getSwitch()));
} }
if (command.hasOption(DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch())) { parseNumListStatusThreads(command, option);
try {
Integer numThreads = Integer.parseInt(getVal(command,
DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch()).trim());
option.setNumListstatusThreads(numThreads);
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
"Number of liststatus threads is invalid: " + getVal(command,
DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch()), e);
}
}
if (command.hasOption(DistCpOptionSwitch.MAX_MAPS.getSwitch())) { parseMaxMaps(command, option);
try {
Integer maps = Integer.parseInt(
getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()).trim());
option.setMaxMaps(maps);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Number of maps is invalid: " +
getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()), e);
}
}
if (command.hasOption(DistCpOptionSwitch.COPY_STRATEGY.getSwitch())) { if (command.hasOption(DistCpOptionSwitch.COPY_STRATEGY.getSwitch())) {
option.setCopyStrategy( option.setCopyStrategy(
getVal(command, DistCpOptionSwitch.COPY_STRATEGY.getSwitch())); getVal(command, DistCpOptionSwitch.COPY_STRATEGY.getSwitch()));
} }
parsePreserveStatus(command, option);
if (command.hasOption(DistCpOptionSwitch.DIFF.getSwitch())) {
String[] snapshots = getVals(command, DistCpOptionSwitch.DIFF.getSwitch());
Preconditions.checkArgument(snapshots != null && snapshots.length == 2,
"Must provide both the starting and ending snapshot names");
option.setUseDiff(true, snapshots[0], snapshots[1]);
}
parseFileLimit(command);
parseSizeLimit(command);
if (command.hasOption(DistCpOptionSwitch.FILTERS.getSwitch())) {
option.setFiltersFile(getVal(command,
DistCpOptionSwitch.FILTERS.getSwitch()));
}
return option;
}
/**
* parseSizeLimit is a helper method for parsing the deprecated
* argument SIZE_LIMIT.
*
* @param command command line arguments
*/
private static void parseSizeLimit(CommandLine command) {
if (command.hasOption(DistCpOptionSwitch.SIZE_LIMIT.getSwitch())) {
String sizeLimitString = getVal(command,
DistCpOptionSwitch.SIZE_LIMIT.getSwitch().trim());
try {
Long.parseLong(sizeLimitString);
}
catch (NumberFormatException e) {
throw new IllegalArgumentException("Size-limit is invalid: "
+ sizeLimitString, e);
}
LOG.warn(DistCpOptionSwitch.SIZE_LIMIT.getSwitch() + " is a deprecated" +
" option. Ignoring.");
}
}
/**
* parseFileLimit is a helper method for parsing the deprecated
* argument FILE_LIMIT.
*
* @param command command line arguments
*/
private static void parseFileLimit(CommandLine command) {
if (command.hasOption(DistCpOptionSwitch.FILE_LIMIT.getSwitch())) {
String fileLimitString = getVal(command,
DistCpOptionSwitch.FILE_LIMIT.getSwitch().trim());
try {
Integer.parseInt(fileLimitString);
}
catch (NumberFormatException e) {
throw new IllegalArgumentException("File-limit is invalid: "
+ fileLimitString, e);
}
LOG.warn(DistCpOptionSwitch.FILE_LIMIT.getSwitch() + " is a deprecated" +
" option. Ignoring.");
}
}
/**
* parsePreserveStatus is a helper method for parsing PRESERVE_STATUS.
*
* @param command command line arguments
* @param option parsed distcp options
*/
private static void parsePreserveStatus(CommandLine command,
DistCpOptions option) {
if (command.hasOption(DistCpOptionSwitch.PRESERVE_STATUS.getSwitch())) { if (command.hasOption(DistCpOptionSwitch.PRESERVE_STATUS.getSwitch())) {
String attributes = String attributes =
getVal(command, DistCpOptionSwitch.PRESERVE_STATUS.getSwitch()); getVal(command, DistCpOptionSwitch.PRESERVE_STATUS.getSwitch());
@ -227,42 +238,118 @@ public static DistCpOptions parse(String args[]) throws IllegalArgumentException
} }
} }
} }
}
if (command.hasOption(DistCpOptionSwitch.DIFF.getSwitch())) { /**
String[] snapshots = getVals(command, DistCpOptionSwitch.DIFF.getSwitch()); * parseMaxMaps is a helper method for parsing MAX_MAPS.
Preconditions.checkArgument(snapshots != null && snapshots.length == 2, *
"Must provide both the starting and ending snapshot names"); * @param command command line arguments
option.setUseDiff(true, snapshots[0], snapshots[1]); * @param option parsed distcp options
} */
private static void parseMaxMaps(CommandLine command,
if (command.hasOption(DistCpOptionSwitch.FILE_LIMIT.getSwitch())) { DistCpOptions option) {
String fileLimitString = getVal(command, if (command.hasOption(DistCpOptionSwitch.MAX_MAPS.getSwitch())) {
DistCpOptionSwitch.FILE_LIMIT.getSwitch().trim());
try { try {
Integer.parseInt(fileLimitString); Integer maps = Integer.parseInt(
getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()).trim());
option.setMaxMaps(maps);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Number of maps is invalid: " +
getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()), e);
} }
catch (NumberFormatException e) {
throw new IllegalArgumentException("File-limit is invalid: "
+ fileLimitString, e);
}
LOG.warn(DistCpOptionSwitch.FILE_LIMIT.getSwitch() + " is a deprecated" +
" option. Ignoring.");
} }
}
if (command.hasOption(DistCpOptionSwitch.SIZE_LIMIT.getSwitch())) { /**
String sizeLimitString = getVal(command, * parseNumListStatusThreads is a helper method for parsing
DistCpOptionSwitch.SIZE_LIMIT.getSwitch().trim()); * NUM_LISTSTATUS_THREADS.
*
* @param command command line arguments
* @param option parsed distcp options
*/
private static void parseNumListStatusThreads(CommandLine command,
DistCpOptions option) {
if (command.hasOption(
DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch())) {
try { try {
Long.parseLong(sizeLimitString); Integer numThreads = Integer.parseInt(getVal(command,
DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch()).trim());
option.setNumListstatusThreads(numThreads);
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
"Number of liststatus threads is invalid: " + getVal(command,
DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch()), e);
} }
catch (NumberFormatException e) { }
throw new IllegalArgumentException("Size-limit is invalid: " }
+ sizeLimitString, e);
/**
* parseBandwidth is a helper method for parsing BANDWIDTH.
*
* @param command command line arguments
* @param option parsed distcp options
*/
private static void parseBandwidth(CommandLine command,
DistCpOptions option) {
if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) {
try {
Integer mapBandwidth = Integer.parseInt(
getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()).trim());
if (mapBandwidth <= 0) {
throw new IllegalArgumentException("Bandwidth specified is not " +
"positive: " + mapBandwidth);
}
option.setMapBandwidth(mapBandwidth);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Bandwidth specified is invalid: " +
getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()), e);
} }
LOG.warn(DistCpOptionSwitch.SIZE_LIMIT.getSwitch() + " is a deprecated" + }
" option. Ignoring."); }
/**
* parseSourceAndTargetPaths is a helper method for parsing the source
* and target paths.
*
* @param command command line arguments
* @return DistCpOptions
*/
private static DistCpOptions parseSourceAndTargetPaths(
CommandLine command) {
DistCpOptions option;
Path targetPath;
List<Path> sourcePaths = new ArrayList<Path>();
String[] leftOverArgs = command.getArgs();
if (leftOverArgs == null || leftOverArgs.length < 1) {
throw new IllegalArgumentException("Target path not specified");
} }
//Last Argument is the target path
targetPath = new Path(leftOverArgs[leftOverArgs.length - 1].trim());
//Copy any source paths in the arguments to the list
for (int index = 0; index < leftOverArgs.length - 1; index++) {
sourcePaths.add(new Path(leftOverArgs[index].trim()));
}
/* If command has source file listing, use it else, fall back on source
paths in args. If both are present, throw exception and bail */
if (command.hasOption(
DistCpOptionSwitch.SOURCE_FILE_LISTING.getSwitch())) {
if (!sourcePaths.isEmpty()) {
throw new IllegalArgumentException("Both source file listing and " +
"source paths present");
}
option = new DistCpOptions(new Path(getVal(command, DistCpOptionSwitch.
SOURCE_FILE_LISTING.getSwitch())), targetPath);
} else {
if (sourcePaths.isEmpty()) {
throw new IllegalArgumentException("Neither source file listing nor " +
"source paths present");
}
option = new DistCpOptions(sourcePaths, targetPath);
}
return option; return option;
} }

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import java.io.*;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
import com.google.common.annotations.VisibleForTesting;
/**
* A CopyFilter which compares Java Regex Patterns to each Path to determine
* whether a file should be copied.
*/
public class RegexCopyFilter extends CopyFilter {
private static final Log LOG = LogFactory.getLog(RegexCopyFilter.class);
private File filtersFile;
private List<Pattern> filters;
/**
* Constructor, sets up a File object to read filter patterns from and
* the List to store the patterns.
*/
protected RegexCopyFilter(String filtersFilename) {
filtersFile = new File(filtersFilename);
filters = new ArrayList<>();
}
/**
* Loads a list of filter patterns for use in shouldCopy.
*/
@Override
public void initialize() {
BufferedReader reader = null;
try {
InputStream is = new FileInputStream(filtersFile);
reader = new BufferedReader(new InputStreamReader(is,
Charset.forName("UTF-8")));
String line;
while ((line = reader.readLine()) != null) {
Pattern pattern = Pattern.compile(line);
filters.add(pattern);
}
} catch (FileNotFoundException notFound) {
LOG.error("Can't find filters file " + filtersFile);
} catch (IOException cantRead) {
LOG.error("An error occurred while attempting to read from " +
filtersFile);
} finally {
IOUtils.cleanup(LOG, reader);
}
}
/**
* Sets the list of filters to exclude files from copy.
* Simplifies testing of the filters feature.
*
* @param filtersList a list of Patterns to be excluded
*/
@VisibleForTesting
protected final void setFilters(List<Pattern> filtersList) {
this.filters = filtersList;
}
@Override
public boolean shouldCopy(Path path) {
for (Pattern filter : filters) {
if (filter.matcher(path.toString()).matches()) {
return false;
}
}
return true;
}
}

View File

@ -58,6 +58,7 @@ public class SimpleCopyListing extends CopyListing {
private long totalBytesToCopy = 0; private long totalBytesToCopy = 0;
private int numListstatusThreads = 1; private int numListstatusThreads = 1;
private final int maxRetries = 3; private final int maxRetries = 3;
private CopyFilter copyFilter;
/** /**
* Protected constructor, to initialize configuration. * Protected constructor, to initialize configuration.
@ -71,6 +72,8 @@ protected SimpleCopyListing(Configuration configuration, Credentials credentials
numListstatusThreads = getConf().getInt( numListstatusThreads = getConf().getInt(
DistCpConstants.CONF_LABEL_LISTSTATUS_THREADS, DistCpConstants.CONF_LABEL_LISTSTATUS_THREADS,
DistCpConstants.DEFAULT_LISTSTATUS_THREADS); DistCpConstants.DEFAULT_LISTSTATUS_THREADS);
copyFilter = CopyFilter.getCopyFilter(getConf());
copyFilter.initialize();
} }
@VisibleForTesting @VisibleForTesting
@ -213,7 +216,7 @@ public void doBuildListing(SequenceFile.Writer fileListWriter,
preserveXAttrs && sourceStatus.isDirectory(), preserveXAttrs && sourceStatus.isDirectory(),
preserveRawXAttrs && sourceStatus.isDirectory()); preserveRawXAttrs && sourceStatus.isDirectory());
writeToFileListing(fileListWriter, sourceCopyListingStatus, writeToFileListing(fileListWriter, sourceCopyListingStatus,
sourcePathRoot, options); sourcePathRoot);
if (sourceStatus.isDirectory()) { if (sourceStatus.isDirectory()) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -264,11 +267,10 @@ private Path computeSourceRootPath(FileStatus sourceStatus,
* Provide an option to skip copy of a path, Allows for exclusion * Provide an option to skip copy of a path, Allows for exclusion
* of files such as {@link org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter#SUCCEEDED_FILE_NAME} * of files such as {@link org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter#SUCCEEDED_FILE_NAME}
* @param path - Path being considered for copy while building the file listing * @param path - Path being considered for copy while building the file listing
* @param options - Input options passed during DistCp invocation
* @return - True if the path should be considered for copy, false otherwise * @return - True if the path should be considered for copy, false otherwise
*/ */
protected boolean shouldCopy(Path path, DistCpOptions options) { protected boolean shouldCopy(Path path) {
return true; return copyFilter.shouldCopy(path);
} }
/** {@inheritDoc} */ /** {@inheritDoc} */
@ -409,7 +411,7 @@ private void traverseDirectory(SequenceFile.Writer fileListWriter,
preserveXAttrs && child.isDirectory(), preserveXAttrs && child.isDirectory(),
preserveRawXattrs && child.isDirectory()); preserveRawXattrs && child.isDirectory());
writeToFileListing(fileListWriter, childCopyListingStatus, writeToFileListing(fileListWriter, childCopyListingStatus,
sourcePathRoot, options); sourcePathRoot);
} }
if (retry < maxRetries) { if (retry < maxRetries) {
if (child.isDirectory()) { if (child.isDirectory()) {
@ -443,26 +445,23 @@ private void writeToFileListingRoot(SequenceFile.Writer fileListWriter,
} }
return; return;
} }
writeToFileListing(fileListWriter, fileStatus, sourcePathRoot, options); writeToFileListing(fileListWriter, fileStatus, sourcePathRoot);
} }
private void writeToFileListing(SequenceFile.Writer fileListWriter, private void writeToFileListing(SequenceFile.Writer fileListWriter,
CopyListingFileStatus fileStatus, CopyListingFileStatus fileStatus,
Path sourcePathRoot, Path sourcePathRoot) throws IOException {
DistCpOptions options) throws IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("REL PATH: " + DistCpUtils.getRelativePath(sourcePathRoot, LOG.debug("REL PATH: " + DistCpUtils.getRelativePath(sourcePathRoot,
fileStatus.getPath()) + ", FULL PATH: " + fileStatus.getPath()); fileStatus.getPath()) + ", FULL PATH: " + fileStatus.getPath());
} }
FileStatus status = fileStatus; if (!shouldCopy(fileStatus.getPath())) {
if (!shouldCopy(fileStatus.getPath(), options)) {
return; return;
} }
fileListWriter.append(new Text(DistCpUtils.getRelativePath(sourcePathRoot, fileListWriter.append(new Text(DistCpUtils.getRelativePath(sourcePathRoot,
fileStatus.getPath())), status); fileStatus.getPath())), fileStatus);
fileListWriter.sync(); fileListWriter.sync();
if (!fileStatus.isDirectory()) { if (!fileStatus.isDirectory()) {

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools;
import org.apache.hadoop.fs.Path;
/**
* A CopyFilter which always returns true.
*
*/
public class TrueCopyFilter extends CopyFilter {
@Override
public boolean shouldCopy(Path path) {
return true;
}
}

View File

@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* DistCp is a tool for replicating data using MapReduce jobs for concurrent
* copy operations.
*
* @version 2
*/
package org.apache.hadoop.tools;

View File

@ -94,40 +94,6 @@ protected long getNumberOfPaths() {
return 0; return 0;
} }
@Test(timeout=10000)
public void testSkipCopy() throws Exception {
SimpleCopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS) {
@Override
protected boolean shouldCopy(Path path, DistCpOptions options) {
return !path.getName().equals(FileOutputCommitter.SUCCEEDED_FILE_NAME);
}
};
FileSystem fs = FileSystem.get(getConf());
List<Path> srcPaths = new ArrayList<Path>();
srcPaths.add(new Path("/tmp/in4/1"));
srcPaths.add(new Path("/tmp/in4/2"));
Path target = new Path("/tmp/out4/1");
TestDistCpUtils.createFile(fs, "/tmp/in4/1/_SUCCESS");
TestDistCpUtils.createFile(fs, "/tmp/in4/1/file");
TestDistCpUtils.createFile(fs, "/tmp/in4/2");
fs.mkdirs(target);
DistCpOptions options = new DistCpOptions(srcPaths, target);
Path listingFile = new Path("/tmp/list4");
listing.buildListing(listingFile, options);
Assert.assertEquals(listing.getNumberOfPaths(), 3);
SequenceFile.Reader reader = new SequenceFile.Reader(getConf(),
SequenceFile.Reader.file(listingFile));
CopyListingFileStatus fileStatus = new CopyListingFileStatus();
Text relativePath = new Text();
Assert.assertTrue(reader.next(relativePath, fileStatus));
Assert.assertEquals(relativePath.toString(), "/1");
Assert.assertTrue(reader.next(relativePath, fileStatus));
Assert.assertEquals(relativePath.toString(), "/1/file");
Assert.assertTrue(reader.next(relativePath, fileStatus));
Assert.assertEquals(relativePath.toString(), "/2");
Assert.assertFalse(reader.next(relativePath, fileStatus));
}
@Test(timeout=10000) @Test(timeout=10000)
public void testMultipleSrcToFile() { public void testMultipleSrcToFile() {
FileSystem fs = null; FileSystem fs = null;

View File

@ -241,55 +241,6 @@ private void caseMultiFileTargetPresent(boolean sync) {
} }
} }
@Test(timeout=100000)
public void testCustomCopyListing() {
try {
addEntries(listFile, "multifile1/file3", "multifile1/file4", "multifile1/file5");
createFiles("multifile1/file3", "multifile1/file4", "multifile1/file5");
mkdirs(target.toString());
Configuration conf = getConf();
try {
conf.setClass(DistCpConstants.CONF_LABEL_COPY_LISTING_CLASS,
CustomCopyListing.class, CopyListing.class);
DistCpOptions options = new DistCpOptions(Arrays.
asList(new Path(root + "/" + "multifile1")), target);
options.setSyncFolder(true);
options.setDeleteMissing(false);
options.setOverwrite(false);
try {
new DistCp(conf, options).execute();
} catch (Exception e) {
LOG.error("Exception encountered ", e);
throw new IOException(e);
}
} finally {
conf.unset(DistCpConstants.CONF_LABEL_COPY_LISTING_CLASS);
}
checkResult(target, 2, "file4", "file5");
} catch (IOException e) {
LOG.error("Exception encountered while testing distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
}
}
private static class CustomCopyListing extends SimpleCopyListing {
public CustomCopyListing(Configuration configuration,
Credentials credentials) {
super(configuration, credentials);
}
@Override
protected boolean shouldCopy(Path path, DistCpOptions options) {
return !path.getName().equals("file3");
}
}
@Test(timeout=100000) @Test(timeout=100000)
public void testMultiFileTargetMissing() { public void testMultiFileTargetMissing() {
caseMultiFileTargetMissing(false); caseMultiFileTargetMissing(false);

View File

@ -400,7 +400,7 @@ public void testToString() {
String val = "DistCpOptions{atomicCommit=false, syncFolder=false, deleteMissing=false, " + String val = "DistCpOptions{atomicCommit=false, syncFolder=false, deleteMissing=false, " +
"ignoreFailures=false, maxMaps=20, sslConfigurationFile='null', copyStrategy='uniformsize', " + "ignoreFailures=false, maxMaps=20, sslConfigurationFile='null', copyStrategy='uniformsize', " +
"sourceFileListing=abc, sourcePaths=null, targetPath=xyz, targetPathExists=true, " + "sourceFileListing=abc, sourcePaths=null, targetPath=xyz, targetPathExists=true, " +
"preserveRawXattrs=false}"; "preserveRawXattrs=false, filtersFile='null'}";
Assert.assertEquals(val, option.toString()); Assert.assertEquals(val, option.toString());
Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(), Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(),
DistCpOptionSwitch.ATOMIC_COMMIT.name()); DistCpOptionSwitch.ATOMIC_COMMIT.name());
@ -718,4 +718,19 @@ public void testDiffOption() {
"Diff is valid only with update and delete options", e); "Diff is valid only with update and delete options", e);
} }
} }
@Test
public void testExclusionsOption() {
DistCpOptions options = OptionsParser.parse(new String[] {
"hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target/"});
Assert.assertNull(options.getFiltersFile());
options = OptionsParser.parse(new String[] {
"-filters",
"/tmp/filters.txt",
"hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target/"});
Assert.assertEquals(options.getFiltersFile(), "/tmp/filters.txt");
}
} }

View File

@ -0,0 +1,113 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
public class TestRegexCopyFilter {
@Test
public void testShouldCopyTrue() {
List<Pattern> filters = new ArrayList<>();
filters.add(Pattern.compile("user"));
RegexCopyFilter regexCopyFilter = new RegexCopyFilter("fakeFile");
regexCopyFilter.setFilters(filters);
Path shouldCopyPath = new Path("/user/bar");
Assert.assertTrue(regexCopyFilter.shouldCopy(shouldCopyPath));
}
@Test
public void testShouldCopyFalse() {
List<Pattern> filters = new ArrayList<>();
filters.add(Pattern.compile(".*test.*"));
RegexCopyFilter regexCopyFilter = new RegexCopyFilter("fakeFile");
regexCopyFilter.setFilters(filters);
Path shouldNotCopyPath = new Path("/user/testing");
Assert.assertFalse(regexCopyFilter.shouldCopy(shouldNotCopyPath));
}
@Test
public void testShouldCopyWithMultipleFilters() {
List<Pattern> filters = new ArrayList<>();
filters.add(Pattern.compile(".*test.*"));
filters.add(Pattern.compile("/user/b.*"));
filters.add(Pattern.compile(".*_SUCCESS"));
List<Path> toCopy = getTestPaths();
int shouldCopyCount = 0;
RegexCopyFilter regexCopyFilter = new RegexCopyFilter("fakeFile");
regexCopyFilter.setFilters(filters);
for (Path path: toCopy) {
if (regexCopyFilter.shouldCopy(path)) {
shouldCopyCount++;
}
}
Assert.assertEquals(2, shouldCopyCount);
}
@Test
public void testShouldExcludeAll() {
List<Pattern> filters = new ArrayList<>();
filters.add(Pattern.compile(".*test.*"));
filters.add(Pattern.compile("/user/b.*"));
filters.add(Pattern.compile(".*")); // exclude everything
List<Path> toCopy = getTestPaths();
int shouldCopyCount = 0;
RegexCopyFilter regexCopyFilter = new RegexCopyFilter("fakeFile");
regexCopyFilter.setFilters(filters);
for (Path path: toCopy) {
if (regexCopyFilter.shouldCopy(path)) {
shouldCopyCount++;
}
}
Assert.assertEquals(0, shouldCopyCount);
}
private List<Path> getTestPaths() {
List<Path> toCopy = new ArrayList<>();
toCopy.add(new Path("/user/bar"));
toCopy.add(new Path("/user/foo/_SUCCESS"));
toCopy.add(new Path("/hive/test_data"));
toCopy.add(new Path("test"));
toCopy.add(new Path("/user/foo/bar"));
toCopy.add(new Path("/mapred/.staging_job"));
return toCopy;
}
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Test;
public class TestTrueCopyFilter {
@Test
public void testShouldCopy() {
Assert.assertTrue(new TrueCopyFilter().shouldCopy(new Path("fake")));
}
@Test
public void testShouldCopyWithNull() {
Assert.assertTrue(new TrueCopyFilter().shouldCopy(new Path("fake")));
}
}