HDFS-14788. Use dynamic regex filter to ignore copy of source files in Distcp.
Contributed by Mukund Thakur. Change-Id: I781387ddce95ee300c12a160dc9a0f7d602403c3
This commit is contained in:
parent
d81d45ff2f
commit
819159fa06
|
@ -17,6 +17,11 @@
|
|||
*/
|
||||
package org.apache.hadoop.tools;
|
||||
|
||||
import java.lang.reflect.Constructor;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
|
@ -26,6 +31,8 @@ import org.apache.hadoop.fs.Path;
|
|||
*/
|
||||
public abstract class CopyFilter {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(CopyFilter.class);
|
||||
|
||||
/**
|
||||
* Default initialize method does nothing.
|
||||
*/
|
||||
|
@ -47,6 +54,30 @@ public abstract class CopyFilter {
|
|||
* @return An instance of the appropriate CopyFilter
|
||||
*/
|
||||
public static CopyFilter getCopyFilter(Configuration conf) {
|
||||
String filtersClassName = conf
|
||||
.get(DistCpConstants.CONF_LABEL_FILTERS_CLASS);
|
||||
if (filtersClassName != null) {
|
||||
try {
|
||||
Class<? extends CopyFilter> filtersClass = conf
|
||||
.getClassByName(filtersClassName)
|
||||
.asSubclass(CopyFilter.class);
|
||||
filtersClassName = filtersClass.getName();
|
||||
Constructor<? extends CopyFilter> constructor = filtersClass
|
||||
.getDeclaredConstructor(Configuration.class);
|
||||
return constructor.newInstance(conf);
|
||||
} catch (Exception e) {
|
||||
LOG.error(DistCpConstants.CLASS_INSTANTIATION_ERROR_MSG +
|
||||
filtersClassName, e);
|
||||
throw new RuntimeException(
|
||||
DistCpConstants.CLASS_INSTANTIATION_ERROR_MSG +
|
||||
filtersClassName, e);
|
||||
}
|
||||
} else {
|
||||
return getDefaultCopyFilter(conf);
|
||||
}
|
||||
}
|
||||
|
||||
private static CopyFilter getDefaultCopyFilter(Configuration conf) {
|
||||
String filtersFilename = conf.get(DistCpConstants.CONF_LABEL_FILTERS_FILE);
|
||||
|
||||
if (filtersFilename == null) {
|
||||
|
|
|
@ -120,6 +120,17 @@ public final class DistCpConstants {
|
|||
/* DistCp CopyListing class override param */
|
||||
public static final String CONF_LABEL_COPY_LISTING_CLASS = "distcp.copy.listing.class";
|
||||
|
||||
/**
|
||||
* DistCp Filter class override param.
|
||||
*/
|
||||
public static final String CONF_LABEL_FILTERS_CLASS = "distcp.filters.class";
|
||||
|
||||
/**
|
||||
* Distcp exclude file regex override param.
|
||||
*/
|
||||
public static final String DISTCP_EXCLUDE_FILE_REGEX =
|
||||
"distcp.exclude-file-regex";
|
||||
|
||||
/* DistCp Copy Buffer Size */
|
||||
public static final String CONF_LABEL_COPY_BUFFER_SIZE =
|
||||
"distcp.copy.buffer.size";
|
||||
|
@ -177,4 +188,7 @@ public final class DistCpConstants {
|
|||
|
||||
public static final String CHECKSUM_MISMATCH_ERROR_MSG =
|
||||
"Checksum mismatch between ";
|
||||
|
||||
public static final String CLASS_INSTANTIATION_ERROR_MSG =
|
||||
"Unable to instantiate ";
|
||||
}
|
||||
|
|
|
@ -0,0 +1,72 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.tools;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
|
||||
/**
|
||||
* Implementation of regex based filter for DistCp.
|
||||
* {@link DistCpConstants#CONF_LABEL_FILTERS_CLASS} needs to be set
|
||||
* in {@link Configuration} when launching a distcp job.
|
||||
*/
|
||||
public class RegexpInConfigurationFilter extends CopyFilter {
|
||||
|
||||
private static final Logger LOG = LoggerFactory
|
||||
.getLogger(RegexpInConfigurationFilter.class);
|
||||
|
||||
/**
|
||||
* Regex which can used to filter source files.
|
||||
* {@link DistCpConstants#DISTCP_EXCLUDE_FILE_REGEX} can be set
|
||||
* in {@link Configuration} when launching a DistCp job.
|
||||
* If not set no files will be filtered.
|
||||
*/
|
||||
private String excludeFileRegex;
|
||||
|
||||
private List<Pattern> filters = new ArrayList<>();
|
||||
|
||||
protected RegexpInConfigurationFilter(Configuration conf) {
|
||||
excludeFileRegex = conf
|
||||
.getTrimmed(DistCpConstants.DISTCP_EXCLUDE_FILE_REGEX, "");
|
||||
if (!excludeFileRegex.isEmpty()) {
|
||||
Pattern pattern = Pattern.compile(excludeFileRegex);
|
||||
filters.add(pattern);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldCopy(Path path) {
|
||||
for (Pattern filter : filters) {
|
||||
if (filter.matcher(path.toString()).matches()) {
|
||||
LOG.debug("Skipping {} as it matches the filter regex",
|
||||
path.toString());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -440,6 +440,23 @@ $H3 Copy-listing Generator
|
|||
of DistCp differs here from the legacy DistCp, in how paths are considered
|
||||
for copy.
|
||||
|
||||
One may also customize the filtering of files which shouldn't be copied
|
||||
by passing the current supported implementation of CopyFilter interface
|
||||
or a new implementation can be written. This can be specified by setting the
|
||||
`distcp.filters.class` in the DistCpOptions:
|
||||
|
||||
1. `distcp.filters.class` to "RegexCopyFilter". If you are using this implementation,
|
||||
you will have to pass along "CopyFilter" `distcp.filters.file` which contains the
|
||||
regex used for filtering. Support regular expressions specified by
|
||||
java.util.regex.Pattern.
|
||||
2. `distcp.filters.class` to "RegexpInConfigurationFilter". If you are using this
|
||||
implementation, you will have to pass along the regex also using
|
||||
`distcp.exclude-file-regex` parameter in "DistCpOptions". Support regular
|
||||
expressions specified by java.util.regex.Pattern. This is a more dynamic approach
|
||||
as compared to "RegexCopyFilter".
|
||||
3. `distcp.filters.class` to "TrueCopyFilter". This is used as a default
|
||||
implementation if none of the above options are specified.
|
||||
|
||||
The legacy implementation only lists those paths that must definitely be
|
||||
copied on to target. E.g. if a file already exists at the target (and
|
||||
`-overwrite` isn't specified), the file isn't even considered in the
|
||||
|
|
|
@ -0,0 +1,97 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.tools;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* Test {@link CopyFilter}.
|
||||
*/
|
||||
public class TestCopyFilter {
|
||||
|
||||
@Test
|
||||
public void testGetCopyFilterTrueCopyFilter() {
|
||||
Configuration configuration = new Configuration(false);
|
||||
CopyFilter copyFilter = CopyFilter.getCopyFilter(configuration);
|
||||
assertTrue("copyFilter should be instance of TrueCopyFilter",
|
||||
copyFilter instanceof TrueCopyFilter);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetCopyFilterRegexCopyFilter() {
|
||||
Configuration configuration = new Configuration(false);
|
||||
configuration.set(DistCpConstants.CONF_LABEL_FILTERS_FILE, "random");
|
||||
CopyFilter copyFilter = CopyFilter.getCopyFilter(configuration);
|
||||
assertTrue("copyFilter should be instance of RegexCopyFilter",
|
||||
copyFilter instanceof RegexCopyFilter);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetCopyFilterRegexpInConfigurationFilter() {
|
||||
final String filterName =
|
||||
"org.apache.hadoop.tools.RegexpInConfigurationFilter";
|
||||
Configuration configuration = new Configuration(false);
|
||||
configuration.set(DistCpConstants.CONF_LABEL_FILTERS_CLASS, filterName);
|
||||
CopyFilter copyFilter = CopyFilter.getCopyFilter(configuration);
|
||||
assertTrue("copyFilter should be instance of RegexpInConfigurationFilter",
|
||||
copyFilter instanceof RegexpInConfigurationFilter);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetCopyFilterNonExistingClass() throws Exception {
|
||||
final String filterName =
|
||||
"org.apache.hadoop.tools.RegexpInConfigurationWrongFilter";
|
||||
Configuration configuration = new Configuration(false);
|
||||
configuration.set(DistCpConstants.CONF_LABEL_FILTERS_CLASS, filterName);
|
||||
intercept(RuntimeException.class,
|
||||
DistCpConstants.CLASS_INSTANTIATION_ERROR_MSG + filterName,
|
||||
() -> CopyFilter.getCopyFilter(configuration));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetCopyFilterWrongClassType() throws Exception {
|
||||
final String filterName =
|
||||
"org.apache.hadoop.tools." +
|
||||
"TestCopyFilter.FilterNotExtendingCopyFilter";
|
||||
Configuration configuration = new Configuration(false);
|
||||
configuration.set(DistCpConstants.CONF_LABEL_FILTERS_CLASS, filterName);
|
||||
intercept(RuntimeException.class,
|
||||
DistCpConstants.CLASS_INSTANTIATION_ERROR_MSG + filterName,
|
||||
() -> CopyFilter.getCopyFilter(configuration));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetCopyFilterEmptyString() throws Exception {
|
||||
final String filterName = "";
|
||||
Configuration configuration = new Configuration(false);
|
||||
configuration.set(DistCpConstants.CONF_LABEL_FILTERS_CLASS, filterName);
|
||||
intercept(RuntimeException.class,
|
||||
DistCpConstants.CLASS_INSTANTIATION_ERROR_MSG + filterName,
|
||||
() -> CopyFilter.getCopyFilter(configuration));
|
||||
}
|
||||
|
||||
private class FilterNotExtendingCopyFilter {
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.tools;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* Test {@link RegexpInConfigurationFilter}.
|
||||
*/
|
||||
public class TestRegexpInConfigurationFilter {
|
||||
|
||||
@Test
|
||||
public void testShouldCopy() {
|
||||
Configuration configuration = new Configuration(false);
|
||||
configuration.set(DistCpConstants.DISTCP_EXCLUDE_FILE_REGEX,
|
||||
"\\/.*_COPYING_$|\\/.*_COPYING$|^.*\\/\\.[^\\/]*$|" +
|
||||
"\\/_temporary$|\\/\\_temporary\\/|.*\\/\\.Trash\\/.*");
|
||||
RegexpInConfigurationFilter defaultCopyFilter =
|
||||
new RegexpInConfigurationFilter(configuration);
|
||||
Path shouldCopyPath = new Path("/user/bar");
|
||||
assertTrue(shouldCopyPath.toString() + " should be copied",
|
||||
defaultCopyFilter.shouldCopy(shouldCopyPath));
|
||||
shouldCopyPath = new Path("/user/bar/_COPYING");
|
||||
assertFalse(shouldCopyPath.toString() + " shouldn't be copied",
|
||||
defaultCopyFilter.shouldCopy(shouldCopyPath));
|
||||
shouldCopyPath = new Path("/user/bar/_COPYING_");
|
||||
assertFalse(shouldCopyPath.toString() + " shouldn't be copied",
|
||||
defaultCopyFilter.shouldCopy(shouldCopyPath));
|
||||
shouldCopyPath = new Path("/temp/");
|
||||
assertTrue(shouldCopyPath.toString() + " should be copied",
|
||||
defaultCopyFilter.shouldCopy(shouldCopyPath));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue