NIFI-2859 - This closes #1383. Ignore files starting with a dot in ListHDFS

This commit is contained in:
Pierre Villard 2017-01-03 16:22:11 +01:00 committed by joewitt
parent 6279fd4184
commit 2d58497c2e
2 changed files with 43 additions and 16 deletions

View File

@ -19,6 +19,7 @@ package org.apache.nifi.processors.hadoop;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
@ -42,11 +43,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.hadoop.util.HDFSListing; import org.apache.nifi.processor.util.StandardValidators;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -59,6 +56,7 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
@TriggerSerially @TriggerSerially
@ -105,6 +103,13 @@ public class ListHDFS extends AbstractHadoopProcessor {
.defaultValue("true") .defaultValue("true")
.build(); .build();
public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
.name("File Filter")
.description("Only files whose names match the given regular expression will be picked up")
.required(true)
.defaultValue("[^\\.].*")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder() public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
@ -135,6 +140,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
props.add(DISTRIBUTED_CACHE_SERVICE); props.add(DISTRIBUTED_CACHE_SERVICE);
props.add(DIRECTORY); props.add(DIRECTORY);
props.add(RECURSE_SUBDIRS); props.add(RECURSE_SUBDIRS);
props.add(FILE_FILTER);
return props; return props;
} }
@ -152,18 +158,12 @@ public class ListHDFS extends AbstractHadoopProcessor {
@Override @Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
super.onPropertyModified(descriptor, oldValue, newValue); super.onPropertyModified(descriptor, oldValue, newValue);
if (isConfigurationRestored() && descriptor.equals(DIRECTORY)) { if (isConfigurationRestored() && (descriptor.equals(DIRECTORY) || descriptor.equals(FILE_FILTER))) {
latestTimestampEmitted = -1L; latestTimestampEmitted = -1L;
latestTimestampListed = -1L; latestTimestampListed = -1L;
} }
} }
private HDFSListing deserialize(final String serializedState) throws JsonParseException, JsonMappingException, IOException {
final ObjectMapper mapper = new ObjectMapper();
final JsonNode jsonNode = mapper.readTree(serializedState);
return mapper.readValue(jsonNode, HDFSListing.class);
}
/** /**
* Determines which of the given FileStatus's describes a File that should be listed. * Determines which of the given FileStatus's describes a File that should be listed.
* *
@ -283,7 +283,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
final Set<FileStatus> statuses; final Set<FileStatus> statuses;
try { try {
final Path rootPath = new Path(directory); final Path rootPath = new Path(directory);
statuses = getStatuses(rootPath, recursive, hdfs); statuses = getStatuses(rootPath, recursive, hdfs, createPathFilter(context));
getLogger().debug("Found a total of {} files in HDFS", new Object[] {statuses.size()}); getLogger().debug("Found a total of {} files in HDFS", new Object[] {statuses.size()});
} catch (final IOException | IllegalArgumentException e) { } catch (final IOException | IllegalArgumentException e) {
getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {e}); getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {e});
@ -326,17 +326,17 @@ public class ListHDFS extends AbstractHadoopProcessor {
} }
} }
private Set<FileStatus> getStatuses(final Path path, final boolean recursive, final FileSystem hdfs) throws IOException { private Set<FileStatus> getStatuses(final Path path, final boolean recursive, final FileSystem hdfs, final PathFilter filter) throws IOException {
final Set<FileStatus> statusSet = new HashSet<>(); final Set<FileStatus> statusSet = new HashSet<>();
getLogger().debug("Fetching listing for {}", new Object[] {path}); getLogger().debug("Fetching listing for {}", new Object[] {path});
final FileStatus[] statuses = hdfs.listStatus(path); final FileStatus[] statuses = hdfs.listStatus(path, filter);
for ( final FileStatus status : statuses ) { for ( final FileStatus status : statuses ) {
if ( status.isDirectory() ) { if ( status.isDirectory() ) {
if ( recursive ) { if ( recursive ) {
try { try {
statusSet.addAll(getStatuses(status.getPath(), recursive, hdfs)); statusSet.addAll(getStatuses(status.getPath(), recursive, hdfs, filter));
} catch (final IOException ioe) { } catch (final IOException ioe) {
getLogger().error("Failed to retrieve HDFS listing for subdirectory {} due to {}; will continue listing others", new Object[] {status.getPath(), ioe}); getLogger().error("Failed to retrieve HDFS listing for subdirectory {} due to {}; will continue listing others", new Object[] {status.getPath(), ioe});
} }
@ -394,4 +394,14 @@ public class ListHDFS extends AbstractHadoopProcessor {
return sb.toString(); return sb.toString();
} }
private PathFilter createPathFilter(final ProcessContext context) {
final Pattern filePattern = Pattern.compile(context.getProperty(FILE_FILTER).getValue());
return new PathFilter() {
@Override
public boolean accept(Path path) {
return filePattern.matcher(path.getName()).matches();
}
};
}
} }

View File

@ -101,6 +101,23 @@ public class TestListHDFS {
mff.assertAttributeEquals("filename", "testFile.txt"); mff.assertAttributeEquals("filename", "testFile.txt");
} }
@Test
public void testListingWithFilter() throws InterruptedException {
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
runner.setProperty(ListHDFS.DIRECTORY, "${literal('/test'):substring(0,5)}");
runner.setProperty(ListHDFS.FILE_FILTER, "[^test].*");
// first iteration will not pick up files because it has to instead check timestamps.
// We must then wait long enough to ensure that the listing can be performed safely and
// run the Processor again.
runner.run();
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
}
@Test @Test
public void testListingWithInvalidELFunction() throws InterruptedException { public void testListingWithInvalidELFunction() throws InterruptedException {
runner.setProperty(ListHDFS.DIRECTORY, "${literal('/test'):foo()}"); runner.setProperty(ListHDFS.DIRECTORY, "${literal('/test'):foo()}");