mirror of https://github.com/apache/nifi.git
NIFI-3008 Enabled expression language on BASE_DIRECTORY and FILENAME properties in TailFile
This closes #1190
This commit is contained in:
parent
3e892c55ec
commit
cc2fbcaac4
|
@ -126,7 +126,7 @@ public class TailFile extends AbstractProcessor {
|
||||||
.name("tail-base-directory")
|
.name("tail-base-directory")
|
||||||
.displayName("Base directory")
|
.displayName("Base directory")
|
||||||
.description("Base directory used to look for files to tail. This property is required when using Multifile mode.")
|
.description("Base directory used to look for files to tail. This property is required when using Multifile mode.")
|
||||||
.expressionLanguageSupported(false)
|
.expressionLanguageSupported(true)
|
||||||
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
|
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
|
||||||
.required(false)
|
.required(false)
|
||||||
.build();
|
.build();
|
||||||
|
@ -148,8 +148,8 @@ public class TailFile extends AbstractProcessor {
|
||||||
.description("Path of the file to tail in case of single file mode. If using multifile mode, regular expression to find files "
|
.description("Path of the file to tail in case of single file mode. If using multifile mode, regular expression to find files "
|
||||||
+ "to tail in the base directory. In case recursivity is set to true, the regular expression will be used to match the "
|
+ "to tail in the base directory. In case recursivity is set to true, the regular expression will be used to match the "
|
||||||
+ "path starting from the base directory (see additional details for examples).")
|
+ "path starting from the base directory (see additional details for examples).")
|
||||||
.expressionLanguageSupported(false)
|
.expressionLanguageSupported(true)
|
||||||
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
|
.addValidator(StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true))
|
||||||
.required(true)
|
.required(true)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
@ -267,7 +267,7 @@ public class TailFile extends AbstractProcessor {
|
||||||
final List<ValidationResult> results = new ArrayList<>(super.customValidate(context));
|
final List<ValidationResult> results = new ArrayList<>(super.customValidate(context));
|
||||||
|
|
||||||
if(context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue())) {
|
if(context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue())) {
|
||||||
String path = context.getProperty(BASE_DIRECTORY).getValue();
|
String path = context.getProperty(BASE_DIRECTORY).evaluateAttributeExpressions().getValue();
|
||||||
if(path == null) {
|
if(path == null) {
|
||||||
results.add(new ValidationResult.Builder().subject(BASE_DIRECTORY.getName()).valid(false)
|
results.add(new ValidationResult.Builder().subject(BASE_DIRECTORY.getName()).valid(false)
|
||||||
.explanation("Base directory property cannot be empty in Multifile mode.").build());
|
.explanation("Base directory property cannot be empty in Multifile mode.").build());
|
||||||
|
@ -291,8 +291,8 @@ public class TailFile extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
long max = context.getProperty(MAXIMUM_AGE).getValue() == null ? Long.MAX_VALUE : context.getProperty(MAXIMUM_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
|
long max = context.getProperty(MAXIMUM_AGE).getValue() == null ? Long.MAX_VALUE : context.getProperty(MAXIMUM_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
|
||||||
List<String> filesToTail = getFilesToTail(context.getProperty(BASE_DIRECTORY).getValue(),
|
List<String> filesToTail = getFilesToTail(context.getProperty(BASE_DIRECTORY).evaluateAttributeExpressions().getValue(),
|
||||||
context.getProperty(FILENAME).getValue(),
|
context.getProperty(FILENAME).evaluateAttributeExpressions().getValue(),
|
||||||
context.getProperty(RECURSIVE).asBoolean(),
|
context.getProperty(RECURSIVE).asBoolean(),
|
||||||
max);
|
max);
|
||||||
|
|
||||||
|
@ -322,12 +322,12 @@ public class TailFile extends AbstractProcessor {
|
||||||
List<String> filesToTail = new ArrayList<String>();
|
List<String> filesToTail = new ArrayList<String>();
|
||||||
|
|
||||||
if(context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue())) {
|
if(context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue())) {
|
||||||
filesToTail.addAll(getFilesToTail(context.getProperty(BASE_DIRECTORY).getValue(),
|
filesToTail.addAll(getFilesToTail(context.getProperty(BASE_DIRECTORY).evaluateAttributeExpressions().getValue(),
|
||||||
context.getProperty(FILENAME).getValue(),
|
context.getProperty(FILENAME).evaluateAttributeExpressions().getValue(),
|
||||||
context.getProperty(RECURSIVE).asBoolean(),
|
context.getProperty(RECURSIVE).asBoolean(),
|
||||||
maxAge));
|
maxAge));
|
||||||
} else {
|
} else {
|
||||||
filesToTail.add(context.getProperty(FILENAME).getValue());
|
filesToTail.add(context.getProperty(FILENAME).evaluateAttributeExpressions().getValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -1130,7 +1130,8 @@ public class TailFile extends AbstractProcessor {
|
||||||
|
|
||||||
// use a timestamp of lastModified() + 1 so that we do not ingest this file again.
|
// use a timestamp of lastModified() + 1 so that we do not ingest this file again.
|
||||||
cleanup();
|
cleanup();
|
||||||
tfo.setState(new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, file.lastModified() + 1L, file.length(), null, tfo.getState().getBuffer()));
|
tfo.setState(new TailFileState(context.getProperty(FILENAME).evaluateAttributeExpressions().getValue(), null, null, 0L, file.lastModified() + 1L, file.length(), null,
|
||||||
|
tfo.getState().getBuffer()));
|
||||||
|
|
||||||
// must ensure that we do session.commit() before persisting state in order to avoid data loss.
|
// must ensure that we do session.commit() before persisting state in order to avoid data loss.
|
||||||
session.commit();
|
session.commit();
|
||||||
|
|
|
@ -620,6 +620,27 @@ public class TestTailFile {
|
||||||
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(4).assertContentEquals("1\n");
|
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(4).assertContentEquals("1\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipleFilesWithBasedirAndFilenameEL() throws IOException, InterruptedException {
|
||||||
|
runner.setVariable("vrBaseDirectory", "target");
|
||||||
|
runner.setProperty(TailFile.BASE_DIRECTORY, "${vrBaseDirectory}");
|
||||||
|
runner.setProperty(TailFile.MODE, TailFile.MODE_MULTIFILE);
|
||||||
|
runner.setVariable("vrFilename", "(testDir/)?log(ging)?.txt");
|
||||||
|
runner.setProperty(TailFile.FILENAME, "${vrFilename}");
|
||||||
|
runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "${filename}.?");
|
||||||
|
runner.setProperty(TailFile.START_POSITION, TailFile.START_CURRENT_FILE);
|
||||||
|
runner.setProperty(TailFile.RECURSIVE, "true");
|
||||||
|
|
||||||
|
runner.run(1);
|
||||||
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
|
||||||
|
|
||||||
|
otherRaf.write("hi\n".getBytes());
|
||||||
|
raf.write("hello\n".getBytes());
|
||||||
|
|
||||||
|
runner.run(1);
|
||||||
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This test is used to check the case where we have multiple files in the same directory
|
* This test is used to check the case where we have multiple files in the same directory
|
||||||
* and where it is not possible to specify a single rolling pattern for all files.
|
* and where it is not possible to specify a single rolling pattern for all files.
|
||||||
|
|
Loading…
Reference in New Issue