mirror of https://github.com/apache/nifi.git
NIFI-208 refined logic for identifying files to pull
This commit is contained in:
parent
ad90fbf24f
commit
3004ff1720
|
@ -67,7 +67,7 @@ import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
|
||||||
@TriggerWhenEmpty
|
@TriggerWhenEmpty
|
||||||
@Tags({"local", "files", "filesystem", "ingest", "ingress", "get", "source", "input"})
|
@Tags({"local", "files", "filesystem", "ingest", "ingress", "get", "source", "input"})
|
||||||
@CapabilityDescription("Creates FlowFiles from files in a directory")
|
@CapabilityDescription("Creates FlowFiles from files in a directory. NiFi will ignore files it doesn't have at least read permissions for.")
|
||||||
public class GetFile extends AbstractProcessor {
|
public class GetFile extends AbstractProcessor {
|
||||||
|
|
||||||
public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
|
||||||
|
@ -86,7 +86,10 @@ public class GetFile extends AbstractProcessor {
|
||||||
.build();
|
.build();
|
||||||
public static final PropertyDescriptor KEEP_SOURCE_FILE = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor KEEP_SOURCE_FILE = new PropertyDescriptor.Builder()
|
||||||
.name("Keep Source File")
|
.name("Keep Source File")
|
||||||
.description("If true, the file is not deleted after it has been copied to the Content Repository; this causes the file to be picked up continually and is useful for testing purposes")
|
.description("If true, the file is not deleted after it has been copied to the Content Repository; "
|
||||||
|
+ "this causes the file to be picked up continually and is useful for testing purposes. "
|
||||||
|
+ "If not keeping original NiFi will need write permissions on the directory it is pulling "
|
||||||
|
+ "from otherwise it will ignore the file.")
|
||||||
.required(true)
|
.required(true)
|
||||||
.allowableValues("true", "false")
|
.allowableValues("true", "false")
|
||||||
.defaultValue("false")
|
.defaultValue("false")
|
||||||
|
@ -224,6 +227,7 @@ public class GetFile extends AbstractProcessor {
|
||||||
final boolean recurseDirs = context.getProperty(RECURSE).asBoolean();
|
final boolean recurseDirs = context.getProperty(RECURSE).asBoolean();
|
||||||
final String pathPatternStr = context.getProperty(PATH_FILTER).getValue();
|
final String pathPatternStr = context.getProperty(PATH_FILTER).getValue();
|
||||||
final Pattern pathPattern = (!recurseDirs || pathPatternStr == null) ? null : Pattern.compile(pathPatternStr);
|
final Pattern pathPattern = (!recurseDirs || pathPatternStr == null) ? null : Pattern.compile(pathPatternStr);
|
||||||
|
final boolean keepOriginal = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
|
||||||
|
|
||||||
return new FileFilter() {
|
return new FileFilter() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -252,6 +256,15 @@ public class GetFile extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
//Verify that we have at least read permissions on the file we're considering grabbing
|
||||||
|
if(!Files.isReadable(file.toPath())){
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
//Verify that if we're not keeping original that we have write permissions on the directory the file is in
|
||||||
|
if(keepOriginal == false && !Files.isWritable(file.toPath().getParent())){
|
||||||
|
return false;
|
||||||
|
}
|
||||||
return filePattern.matcher(file.getName()).matches();
|
return filePattern.matcher(file.getName()).matches();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -375,12 +388,11 @@ public class GetFile extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
final ListIterator<File> itr = files.listIterator();
|
final ListIterator<File> itr = files.listIterator();
|
||||||
File file = null;
|
|
||||||
FlowFile flowFile = null;
|
FlowFile flowFile = null;
|
||||||
try {
|
try {
|
||||||
final Path directoryPath = directory.toPath();
|
final Path directoryPath = directory.toPath();
|
||||||
while (itr.hasNext()) {
|
while (itr.hasNext()) {
|
||||||
file = itr.next();
|
final File file = itr.next();
|
||||||
final Path filePath = file.toPath();
|
final Path filePath = file.toPath();
|
||||||
final Path relativePath = directoryPath.relativize(filePath.getParent());
|
final Path relativePath = directoryPath.relativize(filePath.getParent());
|
||||||
String relativePathString = relativePath.toString() + "/";
|
String relativePathString = relativePath.toString() + "/";
|
||||||
|
|
|
@ -23,7 +23,7 @@
|
||||||
<body>
|
<body>
|
||||||
<!-- Processor Documentation ================================================== -->
|
<!-- Processor Documentation ================================================== -->
|
||||||
<h2>Description:</h2>
|
<h2>Description:</h2>
|
||||||
<p>This processor obtains FlowFiles from a local directory.</p>
|
<p>This processor obtains FlowFiles from a local directory. NiFi will need at least read permissions on the files it will pull otherwise it will ignore them.</p>
|
||||||
|
|
||||||
<p>
|
<p>
|
||||||
<strong>Modifies Attributes:</strong>
|
<strong>Modifies Attributes:</strong>
|
||||||
|
@ -105,7 +105,9 @@
|
||||||
<li>A Boolean value (true/false), indicating whether to leave a
|
<li>A Boolean value (true/false), indicating whether to leave a
|
||||||
copy of the file in the input directory after retrieving it; this
|
copy of the file in the input directory after retrieving it; this
|
||||||
causes the file to be picked up continually and is useful for
|
causes the file to be picked up continually and is useful for
|
||||||
testing purposes.</li>
|
testing purposes. If not keeping source file NiFi will need
|
||||||
|
at least write permissions to the directory it is pulling from
|
||||||
|
otherwise it will ignore the file.</li>
|
||||||
<li>Default value: false</li>
|
<li>Default value: false</li>
|
||||||
<li>Supports expression language: false</li>
|
<li>Supports expression language: false</li>
|
||||||
</ul></li>
|
</ul></li>
|
||||||
|
|
Loading…
Reference in New Issue