mirror of https://github.com/apache/nifi.git
NIFI-2956 - GetHDFS - fixed directly path evaluation
This commit is contained in:
parent
4acc9ad288
commit
7fbc23639a
|
@ -415,7 +415,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
|
||||||
try {
|
try {
|
||||||
final FileSystem hdfs = getFileSystem();
|
final FileSystem hdfs = getFileSystem();
|
||||||
// get listing
|
// get listing
|
||||||
listing = selectFiles(hdfs, processorConfig.getConfiguredRootDirPath(), null);
|
listing = selectFiles(hdfs, new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue()), null);
|
||||||
lastPollTime.set(System.currentTimeMillis());
|
lastPollTime.set(System.currentTimeMillis());
|
||||||
} finally {
|
} finally {
|
||||||
listingLock.unlock();
|
listingLock.unlock();
|
||||||
|
@ -460,7 +460,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
|
||||||
if (file.isDirectory() && processorConfig.getRecurseSubdirs()) {
|
if (file.isDirectory() && processorConfig.getRecurseSubdirs()) {
|
||||||
files.addAll(selectFiles(hdfs, canonicalFile, filesVisited));
|
files.addAll(selectFiles(hdfs, canonicalFile, filesVisited));
|
||||||
|
|
||||||
} else if (!file.isDirectory() && processorConfig.getPathFilter().accept(canonicalFile)) {
|
} else if (!file.isDirectory() && processorConfig.getPathFilter(dir).accept(canonicalFile)) {
|
||||||
final long fileAge = System.currentTimeMillis() - file.getModificationTime();
|
final long fileAge = System.currentTimeMillis() - file.getModificationTime();
|
||||||
if (processorConfig.getMinimumAge() < fileAge && fileAge < processorConfig.getMaximumAge()) {
|
if (processorConfig.getMinimumAge() < fileAge && fileAge < processorConfig.getMaximumAge()) {
|
||||||
files.add(canonicalFile);
|
files.add(canonicalFile);
|
||||||
|
@ -480,17 +480,14 @@ public class GetHDFS extends AbstractHadoopProcessor {
|
||||||
*/
|
*/
|
||||||
protected static class ProcessorConfiguration {
|
protected static class ProcessorConfiguration {
|
||||||
|
|
||||||
final private Path configuredRootDirPath;
|
|
||||||
final private Pattern fileFilterPattern;
|
final private Pattern fileFilterPattern;
|
||||||
final private boolean ignoreDottedFiles;
|
final private boolean ignoreDottedFiles;
|
||||||
final private boolean filterMatchBasenameOnly;
|
final private boolean filterMatchBasenameOnly;
|
||||||
final private long minimumAge;
|
final private long minimumAge;
|
||||||
final private long maximumAge;
|
final private long maximumAge;
|
||||||
final private boolean recurseSubdirs;
|
final private boolean recurseSubdirs;
|
||||||
final private PathFilter pathFilter;
|
|
||||||
|
|
||||||
ProcessorConfiguration(final ProcessContext context) {
|
ProcessorConfiguration(final ProcessContext context) {
|
||||||
configuredRootDirPath = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue());
|
|
||||||
ignoreDottedFiles = context.getProperty(IGNORE_DOTTED_FILES).asBoolean();
|
ignoreDottedFiles = context.getProperty(IGNORE_DOTTED_FILES).asBoolean();
|
||||||
final String fileFilterRegex = context.getProperty(FILE_FILTER_REGEX).getValue();
|
final String fileFilterRegex = context.getProperty(FILE_FILTER_REGEX).getValue();
|
||||||
fileFilterPattern = (fileFilterRegex == null) ? null : Pattern.compile(fileFilterRegex);
|
fileFilterPattern = (fileFilterRegex == null) ? null : Pattern.compile(fileFilterRegex);
|
||||||
|
@ -500,7 +497,22 @@ public class GetHDFS extends AbstractHadoopProcessor {
|
||||||
final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
|
final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
|
||||||
maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
|
maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
|
||||||
recurseSubdirs = context.getProperty(RECURSE_SUBDIRS).asBoolean();
|
recurseSubdirs = context.getProperty(RECURSE_SUBDIRS).asBoolean();
|
||||||
pathFilter = new PathFilter() {
|
}
|
||||||
|
|
||||||
|
protected long getMinimumAge() {
|
||||||
|
return minimumAge;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected long getMaximumAge() {
|
||||||
|
return maximumAge;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean getRecurseSubdirs() {
|
||||||
|
return recurseSubdirs;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected PathFilter getPathFilter(final Path dir) {
|
||||||
|
return new PathFilter() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean accept(Path path) {
|
public boolean accept(Path path) {
|
||||||
|
@ -512,7 +524,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
|
||||||
pathToCompare = path.getName();
|
pathToCompare = path.getName();
|
||||||
} else {
|
} else {
|
||||||
// figure out portion of path that does not include the provided root dir.
|
// figure out portion of path that does not include the provided root dir.
|
||||||
String relativePath = getPathDifference(configuredRootDirPath, path);
|
String relativePath = getPathDifference(dir, path);
|
||||||
if (relativePath.length() == 0) {
|
if (relativePath.length() == 0) {
|
||||||
pathToCompare = path.getName();
|
pathToCompare = path.getName();
|
||||||
} else {
|
} else {
|
||||||
|
@ -528,25 +540,5 @@ public class GetHDFS extends AbstractHadoopProcessor {
|
||||||
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
public Path getConfiguredRootDirPath() {
|
|
||||||
return configuredRootDirPath;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected long getMinimumAge() {
|
|
||||||
return minimumAge;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected long getMaximumAge() {
|
|
||||||
return maximumAge;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean getRecurseSubdirs() {
|
|
||||||
return recurseSubdirs;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected PathFilter getPathFilter() {
|
|
||||||
return pathFilter;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue