mirror of https://github.com/apache/nifi.git
NIFI-4607, NIFI-3975, NIFI-4602, NIFI-4606: This closes #2272. Fixed bug in TailFile that caused new Primary Node to not pull current Clustered State when File Location was set to Remote. Fixed bug that caused TailFile to occasionally become 'stuck' when the file it is tailing is renamed and a new file is created with the same name. Removed the 'Rolling Strategy' property from TailFile because it is not actually used in the processor anymore. Deleted MonitorMemoryTest because the unit test was testing the behavior of FlowController more than the behavior of the reporting task itself and in order to do so had a dependency in the pom.xml on nifi-framework-core, which means that it no longer compiles when FlowController is modified.
Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
parent
3b15ed855c
commit
45df23b1e0
|
@ -16,39 +16,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.standard;
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
import org.apache.commons.io.FileUtils;
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
|
||||||
import org.apache.nifi.annotation.behavior.Restricted;
|
|
||||||
import org.apache.nifi.annotation.behavior.Stateful;
|
|
||||||
import org.apache.nifi.annotation.behavior.TriggerSerially;
|
|
||||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
|
||||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
|
||||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
|
||||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
|
||||||
import org.apache.nifi.components.AllowableValue;
|
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
|
||||||
import org.apache.nifi.components.ValidationContext;
|
|
||||||
import org.apache.nifi.components.ValidationResult;
|
|
||||||
import org.apache.nifi.components.state.Scope;
|
|
||||||
import org.apache.nifi.components.state.StateMap;
|
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
|
||||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
|
||||||
import org.apache.nifi.processor.AbstractProcessor;
|
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
|
||||||
import org.apache.nifi.processor.ProcessSession;
|
|
||||||
import org.apache.nifi.processor.Relationship;
|
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
|
||||||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
|
||||||
import org.apache.nifi.stream.io.NullOutputStream;
|
|
||||||
import org.apache.nifi.stream.io.StreamUtils;
|
|
||||||
|
|
||||||
import java.io.BufferedOutputStream;
|
import java.io.BufferedOutputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.EOFException;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -77,6 +47,37 @@ import java.util.zip.CRC32;
|
||||||
import java.util.zip.CheckedInputStream;
|
import java.util.zip.CheckedInputStream;
|
||||||
import java.util.zip.Checksum;
|
import java.util.zip.Checksum;
|
||||||
|
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
|
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||||
|
import org.apache.nifi.annotation.behavior.Restricted;
|
||||||
|
import org.apache.nifi.annotation.behavior.Stateful;
|
||||||
|
import org.apache.nifi.annotation.behavior.TriggerSerially;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||||
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
|
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||||
|
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||||
|
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
|
||||||
|
import org.apache.nifi.components.AllowableValue;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.components.ValidationContext;
|
||||||
|
import org.apache.nifi.components.ValidationResult;
|
||||||
|
import org.apache.nifi.components.state.Scope;
|
||||||
|
import org.apache.nifi.components.state.StateMap;
|
||||||
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||||
|
import org.apache.nifi.processor.AbstractProcessor;
|
||||||
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
|
import org.apache.nifi.processor.Relationship;
|
||||||
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
|
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
import org.apache.nifi.stream.io.NullOutputStream;
|
||||||
|
import org.apache.nifi.stream.io.StreamUtils;
|
||||||
|
|
||||||
// note: it is important that this Processor is not marked as @SupportsBatching because the session commits must complete before persisting state locally; otherwise, data loss may occur
|
// note: it is important that this Processor is not marked as @SupportsBatching because the session commits must complete before persisting state locally; otherwise, data loss may occur
|
||||||
@TriggerSerially
|
@TriggerSerially
|
||||||
@InputRequirement(Requirement.INPUT_FORBIDDEN)
|
@InputRequirement(Requirement.INPUT_FORBIDDEN)
|
||||||
|
@ -89,9 +90,7 @@ import java.util.zip.Checksum;
|
||||||
+ "ingesting files that have been compressed when 'rolled over'.")
|
+ "ingesting files that have been compressed when 'rolled over'.")
|
||||||
@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "Stores state about where in the Tailed File it left off so that on restart it does not have to duplicate data. "
|
@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "Stores state about where in the Tailed File it left off so that on restart it does not have to duplicate data. "
|
||||||
+ "State is stored either local or clustered depend on the <File Location> property.")
|
+ "State is stored either local or clustered depend on the <File Location> property.")
|
||||||
@WritesAttributes({
|
|
||||||
@WritesAttribute(attribute = "tailfile.original.path", description = "Path of the original file the flow file comes from.")
|
@WritesAttribute(attribute = "tailfile.original.path", description = "Path of the original file the flow file comes from.")
|
||||||
})
|
|
||||||
@Restricted("Provides operator the ability to read from any file that NiFi has access to.")
|
@Restricted("Provides operator the ability to read from any file that NiFi has access to.")
|
||||||
public class TailFile extends AbstractProcessor {
|
public class TailFile extends AbstractProcessor {
|
||||||
|
|
||||||
|
@ -195,15 +194,6 @@ public class TailFile extends AbstractProcessor {
|
||||||
.required(true)
|
.required(true)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
static final PropertyDescriptor ROLLING_STRATEGY = new PropertyDescriptor.Builder()
|
|
||||||
.name("tailfile-rolling-strategy")
|
|
||||||
.displayName("Rolling Strategy")
|
|
||||||
.description("Specifies if the files to tail have a fixed name or not.")
|
|
||||||
.required(true)
|
|
||||||
.allowableValues(FIXED_NAME, CHANGING_NAME)
|
|
||||||
.defaultValue(FIXED_NAME.getValue())
|
|
||||||
.build();
|
|
||||||
|
|
||||||
static final PropertyDescriptor LOOKUP_FREQUENCY = new PropertyDescriptor.Builder()
|
static final PropertyDescriptor LOOKUP_FREQUENCY = new PropertyDescriptor.Builder()
|
||||||
.name("tailfile-lookup-frequency")
|
.name("tailfile-lookup-frequency")
|
||||||
.displayName("Lookup frequency")
|
.displayName("Lookup frequency")
|
||||||
|
@ -234,6 +224,7 @@ public class TailFile extends AbstractProcessor {
|
||||||
private volatile Map<String, TailFileObject> states = new HashMap<String, TailFileObject>();
|
private volatile Map<String, TailFileObject> states = new HashMap<String, TailFileObject>();
|
||||||
private volatile AtomicLong lastLookup = new AtomicLong(0L);
|
private volatile AtomicLong lastLookup = new AtomicLong(0L);
|
||||||
private volatile AtomicBoolean isMultiChanging = new AtomicBoolean(false);
|
private volatile AtomicBoolean isMultiChanging = new AtomicBoolean(false);
|
||||||
|
private volatile boolean requireStateLookup = true;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
@ -245,7 +236,6 @@ public class TailFile extends AbstractProcessor {
|
||||||
properties.add(START_POSITION);
|
properties.add(START_POSITION);
|
||||||
properties.add(STATE_LOCATION);
|
properties.add(STATE_LOCATION);
|
||||||
properties.add(RECURSIVE);
|
properties.add(RECURSIVE);
|
||||||
properties.add(ROLLING_STRATEGY);
|
|
||||||
properties.add(LOOKUP_FREQUENCY);
|
properties.add(LOOKUP_FREQUENCY);
|
||||||
properties.add(MAXIMUM_AGE);
|
properties.add(MAXIMUM_AGE);
|
||||||
return properties;
|
return properties;
|
||||||
|
@ -270,43 +260,28 @@ public class TailFile extends AbstractProcessor {
|
||||||
if (context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue())) {
|
if (context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue())) {
|
||||||
String path = context.getProperty(BASE_DIRECTORY).evaluateAttributeExpressions().getValue();
|
String path = context.getProperty(BASE_DIRECTORY).evaluateAttributeExpressions().getValue();
|
||||||
if (path == null) {
|
if (path == null) {
|
||||||
results.add(new ValidationResult.Builder().subject(BASE_DIRECTORY.getName()).valid(false)
|
results.add(new ValidationResult.Builder()
|
||||||
.explanation("Base directory property cannot be empty in Multifile mode.").build());
|
.subject(BASE_DIRECTORY.getName())
|
||||||
|
.valid(false)
|
||||||
|
.explanation("Base directory property cannot be empty in Multifile mode.")
|
||||||
|
.build());
|
||||||
} else if (!new File(path).isDirectory()) {
|
} else if (!new File(path).isDirectory()) {
|
||||||
results.add(new ValidationResult.Builder().subject(BASE_DIRECTORY.getName()).valid(false)
|
results.add(new ValidationResult.Builder()
|
||||||
.explanation(path + " is not a directory.").build());
|
.subject(BASE_DIRECTORY.getName())
|
||||||
}
|
.valid(false)
|
||||||
|
.explanation(path + " is not a directory.")
|
||||||
if(context.getProperty(ROLLING_STRATEGY).getValue().equals(CHANGING_NAME.getValue())) {
|
.build());
|
||||||
String freq = context.getProperty(LOOKUP_FREQUENCY).getValue();
|
|
||||||
if(freq == null) {
|
|
||||||
results.add(new ValidationResult.Builder().subject(LOOKUP_FREQUENCY.getName()).valid(false)
|
|
||||||
.explanation("In Multiple files mode and Changing name rolling strategy, lookup frequency "
|
|
||||||
+ "property must be specified.").build());
|
|
||||||
}
|
|
||||||
String maxAge = context.getProperty(MAXIMUM_AGE).getValue();
|
|
||||||
if(maxAge == null) {
|
|
||||||
results.add(new ValidationResult.Builder().subject(MAXIMUM_AGE.getName()).valid(false)
|
|
||||||
.explanation("In Multiple files mode and Changing name rolling strategy, maximum age "
|
|
||||||
+ "property must be specified.").build());
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
long max = context.getProperty(MAXIMUM_AGE).getValue() == null ? Long.MAX_VALUE : context.getProperty(MAXIMUM_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
|
|
||||||
List<String> filesToTail = getFilesToTail(context.getProperty(BASE_DIRECTORY).evaluateAttributeExpressions().getValue(),
|
|
||||||
context.getProperty(FILENAME).evaluateAttributeExpressions().getValue(),
|
|
||||||
context.getProperty(RECURSIVE).asBoolean(),
|
|
||||||
max);
|
|
||||||
|
|
||||||
if(filesToTail.isEmpty()) {
|
|
||||||
results.add(new ValidationResult.Builder().subject(FILENAME.getName()).valid(false)
|
|
||||||
.explanation("There is no file to tail. Files must exist when starting this processor.").build());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return results;
|
return results;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@OnPrimaryNodeStateChange
|
||||||
|
public void onPrimaryNodeChange() {
|
||||||
|
this.requireStateLookup = true;
|
||||||
|
}
|
||||||
|
|
||||||
@OnScheduled
|
@OnScheduled
|
||||||
public void recoverState(final ProcessContext context) throws IOException {
|
public void recoverState(final ProcessContext context) throws IOException {
|
||||||
// set isMultiChanging
|
// set isMultiChanging
|
||||||
|
@ -429,10 +404,10 @@ public class TailFile extends AbstractProcessor {
|
||||||
* @return List of files to tail
|
* @return List of files to tail
|
||||||
*/
|
*/
|
||||||
private List<String> getFilesToTail(final String baseDir, String fileRegex, boolean isRecursive, long maxAge) {
|
private List<String> getFilesToTail(final String baseDir, String fileRegex, boolean isRecursive, long maxAge) {
|
||||||
Collection<File> files = FileUtils.listFiles(new File(baseDir), null, isRecursive);
|
final Collection<File> files = FileUtils.listFiles(new File(baseDir), null, isRecursive);
|
||||||
List<String> result = new ArrayList<String>();
|
final List<String> result = new ArrayList<String>();
|
||||||
|
|
||||||
String baseDirNoTrailingSeparator = baseDir.endsWith(File.separator) ? baseDir.substring(0, baseDir.length() -1) : baseDir;
|
final String baseDirNoTrailingSeparator = baseDir.endsWith(File.separator) ? baseDir.substring(0, baseDir.length() - 1) : baseDir;
|
||||||
final String fullRegex;
|
final String fullRegex;
|
||||||
if (File.separator.equals("/")) {
|
if (File.separator.equals("/")) {
|
||||||
// handle unix-style paths
|
// handle unix-style paths
|
||||||
|
@ -441,10 +416,10 @@ public class TailFile extends AbstractProcessor {
|
||||||
// handle windows-style paths, need to quote backslash characters
|
// handle windows-style paths, need to quote backslash characters
|
||||||
fullRegex = baseDirNoTrailingSeparator + Pattern.quote(File.separator) + fileRegex;
|
fullRegex = baseDirNoTrailingSeparator + Pattern.quote(File.separator) + fileRegex;
|
||||||
}
|
}
|
||||||
Pattern p = Pattern.compile(fullRegex);
|
final Pattern p = Pattern.compile(fullRegex);
|
||||||
|
|
||||||
for (File file : files) {
|
for (File file : files) {
|
||||||
String path = file.getPath();
|
final String path = file.getPath();
|
||||||
if (p.matcher(path).matches()) {
|
if (p.matcher(path).matches()) {
|
||||||
if (isMultiChanging.get()) {
|
if (isMultiChanging.get()) {
|
||||||
if ((new Date().getTime() - file.lastModified()) < maxAge) {
|
if ((new Date().getTime() - file.lastModified()) < maxAge) {
|
||||||
|
@ -515,7 +490,14 @@ public class TailFile extends AbstractProcessor {
|
||||||
if (existingTailFile.length() >= position) {
|
if (existingTailFile.length() >= position) {
|
||||||
try (final InputStream tailFileIs = new FileInputStream(existingTailFile);
|
try (final InputStream tailFileIs = new FileInputStream(existingTailFile);
|
||||||
final CheckedInputStream in = new CheckedInputStream(tailFileIs, checksum)) {
|
final CheckedInputStream in = new CheckedInputStream(tailFileIs, checksum)) {
|
||||||
|
|
||||||
|
try {
|
||||||
StreamUtils.copy(in, new NullOutputStream(), states.get(filePath).getState().getPosition());
|
StreamUtils.copy(in, new NullOutputStream(), states.get(filePath).getState().getPosition());
|
||||||
|
} catch (final EOFException eof) {
|
||||||
|
// If we hit EOFException, then the file is smaller than we expected. Assume rollover.
|
||||||
|
getLogger().debug("When recovering state, file being tailed has less data than was stored in the state. "
|
||||||
|
+ "Assuming rollover. Will begin tailing current file from beginning.");
|
||||||
|
}
|
||||||
|
|
||||||
final long checksumResult = in.getChecksum().getValue();
|
final long checksumResult = in.getChecksum().getValue();
|
||||||
if (checksumResult == states.get(filePath).getExpectedRecoveryChecksum()) {
|
if (checksumResult == states.get(filePath).getExpectedRecoveryChecksum()) {
|
||||||
|
@ -589,16 +571,30 @@ public class TailFile extends AbstractProcessor {
|
||||||
try {
|
try {
|
||||||
recoverState(context);
|
recoverState(context);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
getLogger().error("Exception raised while looking up for new files", e);
|
getLogger().error("Exception raised while attempting to recover state about where the tailing last left off", e);
|
||||||
context.yield();
|
context.yield();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (requireStateLookup) {
|
||||||
|
try {
|
||||||
|
recoverState(context);
|
||||||
|
} catch (IOException e) {
|
||||||
|
getLogger().error("Exception raised while attempting to recover state about where the tailing last left off", e);
|
||||||
|
context.yield();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
requireStateLookup = false;
|
||||||
|
}
|
||||||
|
|
||||||
if (states.isEmpty()) {
|
if (states.isEmpty()) {
|
||||||
context.yield();
|
context.yield();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (String tailFile : states.keySet()) {
|
for (String tailFile : states.keySet()) {
|
||||||
processTailFile(context, session, tailFile);
|
processTailFile(context, session, tailFile);
|
||||||
}
|
}
|
||||||
|
@ -688,9 +684,38 @@ public class TailFile extends AbstractProcessor {
|
||||||
final long startNanos = System.nanoTime();
|
final long startNanos = System.nanoTime();
|
||||||
|
|
||||||
// Check if file has rotated
|
// Check if file has rotated
|
||||||
if (rolloverOccurred
|
// We determine that the file has rotated if any of the following conditions are met:
|
||||||
|| (timestamp <= file.lastModified() && length > file.length())) {
|
// 1. 'rolloverOccured' == true, which indicates that we have found a new file matching the rollover pattern.
|
||||||
|
// 2. The file was modified after the timestamp in our state, AND the file is smaller than we expected. This satisfies
|
||||||
|
// the case where we are tailing File A, and that file is then renamed (say to B) and a new file named A is created
|
||||||
|
// and is written to. In such a case, File A may have a file size smaller than we have in our state, so we know that
|
||||||
|
// it rolled over.
|
||||||
|
// 3. The File Channel that we have indicates that the size of the file is different than file.length() indicates, AND
|
||||||
|
// the File Channel also indicates that we have read all data in the file. This case may also occur in the same scenario
|
||||||
|
// as #2, above. In this case, the File Channel is pointing to File A, but the 'file' object is pointing to File B. They
|
||||||
|
// both have the same name but are different files. As a result, once we have consumed all data from the File Channel,
|
||||||
|
// we want to roll over and consume data from the new file.
|
||||||
|
boolean rotated = rolloverOccurred;
|
||||||
|
if (!rotated) {
|
||||||
|
final long fileLength = file.length();
|
||||||
|
if (length > fileLength) {
|
||||||
|
rotated = true;
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
final long readerSize = reader.size();
|
||||||
|
final long readerPosition = reader.position();
|
||||||
|
|
||||||
|
if (readerSize == readerPosition && readerSize != fileLength) {
|
||||||
|
rotated = true;
|
||||||
|
}
|
||||||
|
} catch (final IOException e) {
|
||||||
|
getLogger().warn("Failed to determined the size or position of the File Channel when "
|
||||||
|
+ "determining if the file has rolled over. Will assume that the file being tailed has not rolled over", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (rotated) {
|
||||||
// Since file has rotated, we close the reader, create a new one, and then reset our state.
|
// Since file has rotated, we close the reader, create a new one, and then reset our state.
|
||||||
try {
|
try {
|
||||||
reader.close();
|
reader.close();
|
||||||
|
|
|
@ -105,28 +105,23 @@
|
||||||
and new log messages are always appended in log file of the current day.
|
and new log messages are always appended in log file of the current day.
|
||||||
</p>
|
</p>
|
||||||
<p>
|
<p>
|
||||||
If the processor is configured with '<b>Multiple files</b>' mode and '<b>Fixed name</b>' rolling strategy, the processor will list the
|
If the processor is configured with '<b>Multiple files</b>' mode, two additional properties are relevant:
|
||||||
files to tail in the 'Base directory' (recursively or not) and matching the regular expression. This listing will only occur once when
|
|
||||||
the processor is started. In this configuration, the processor will act as in 'Single file' mode for all files listed by the processor
|
|
||||||
when started.
|
|
||||||
</p>
|
|
||||||
<p>
|
|
||||||
If the processor is configured with '<b>Multiple files</b>' mode and '<b>Changing name</b>' rolling strategy, two new properties are
|
|
||||||
mandatory:
|
|
||||||
</p>
|
</p>
|
||||||
<ul>
|
<ul>
|
||||||
<li><b>Lookup frequency</b>: specifies the minimum duration the processor will wait before listing again the files to tail.</li>
|
<li><b>Lookup frequency</b>: specifies the minimum duration the processor will wait before listing again the files to tail.</li>
|
||||||
<li><b>Maximum age</b>: specifies the necessary minimum duration to consider that no new messages will be appended in a file
|
<li><b>Maximum age</b>: specifies the necessary minimum duration to consider that no new messages will be appended in a file
|
||||||
regarding its last modification date.</li>
|
regarding its last modification date. If the amount of time that has elapsed since the file was modified is larger than this
|
||||||
|
period of time, the file will not be tailed. For example, if a file was modified 24 hours ago and this property is set to 12 hours,
|
||||||
|
the file will not be tailed. But if this property is set to 36 hours, then the file will continue to be tailed.</li>
|
||||||
</ul>
|
</ul>
|
||||||
<p>
|
<p>
|
||||||
It is necessary to pay attention to 'Lookup frequency' and 'Maximum age' properties as well as the frequency at which the processor is
|
It is necessary to pay attention to 'Lookup frequency' and 'Maximum age' properties, as well as the frequency at which the processor is
|
||||||
triggered in order to keep good performances. It is recommended to keep 'Maximum age' > 'Lookup frequency' > processor scheduling
|
triggered, in order to achieve high performance. It is recommended to keep 'Maximum age' > 'Lookup frequency' > processor scheduling
|
||||||
frequency to avoid loosing messages. It also recommended not to set 'Maximum Age' too low because if messages are appended in a file
|
frequency to avoid missing data. It also recommended not to set 'Maximum Age' too low because if messages are appended in a file
|
||||||
after this file has been considered "too old", all the messages in the file may be read again leading to data duplication.
|
after this file has been considered "too old", all the messages in the file may be read again, leading to data duplication.
|
||||||
</p>
|
</p>
|
||||||
<p>
|
<p>
|
||||||
Besides, if the processor is configured with '<b>Multiple files</b>' mode and '<b>Changing name</b>' rolling strategy, the 'Rolling
|
If the processor is configured with '<b>Multiple files</b>' mode, the 'Rolling
|
||||||
filename pattern' property must be specific enough to ensure that only the rolling files will be listed and not other currently tailed
|
filename pattern' property must be specific enough to ensure that only the rolling files will be listed and not other currently tailed
|
||||||
files in the same directory (this can be achieved using ${filename} tag).
|
files in the same directory (this can be achieved using ${filename} tag).
|
||||||
</p>
|
</p>
|
||||||
|
|
|
@ -319,6 +319,42 @@ public class TestTailFile {
|
||||||
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("1\n");
|
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("1\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRolloverWriteMoreDataThanPrevious() throws IOException, InterruptedException {
|
||||||
|
// If we have read all data in a file, and that file does not end with a new-line, then the last line
|
||||||
|
// in the file will have been read, added to the checksum, and then we would re-seek to "unread" that
|
||||||
|
// last line since it didn't have a new-line. We need to ensure that if the data is then rolled over
|
||||||
|
// that our checksum does not take into account those bytes that have been "unread."
|
||||||
|
|
||||||
|
// this mimics the case when we are reading a log file that rolls over while processor is running.
|
||||||
|
runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
|
||||||
|
runner.run(1, false, true);
|
||||||
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
|
||||||
|
|
||||||
|
raf.write("hello\n".getBytes());
|
||||||
|
runner.run(1, true, false);
|
||||||
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
|
||||||
|
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n");
|
||||||
|
runner.clearTransferState();
|
||||||
|
|
||||||
|
raf.write("world".getBytes());
|
||||||
|
|
||||||
|
runner.run(1);
|
||||||
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0); // should not pull in data because no \n
|
||||||
|
|
||||||
|
raf.close();
|
||||||
|
file.renameTo(new File("target/log.1"));
|
||||||
|
|
||||||
|
raf = new RandomAccessFile(new File("target/log.txt"), "rw");
|
||||||
|
raf.write("longer than hello\n".getBytes());
|
||||||
|
runner.run(1);
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
|
||||||
|
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("world");
|
||||||
|
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("longer than hello\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMultipleRolloversAfterHavingReadAllData() throws IOException, InterruptedException {
|
public void testMultipleRolloversAfterHavingReadAllData() throws IOException, InterruptedException {
|
||||||
// this mimics the case when we are reading a log file that rolls over while processor is running.
|
// this mimics the case when we are reading a log file that rolls over while processor is running.
|
||||||
|
@ -670,7 +706,6 @@ public class TestTailFile {
|
||||||
runner.setProperty(TailFile.LOOKUP_FREQUENCY, "1 sec");
|
runner.setProperty(TailFile.LOOKUP_FREQUENCY, "1 sec");
|
||||||
runner.setProperty(TailFile.FILENAME, "log_[0-9]*\\.txt");
|
runner.setProperty(TailFile.FILENAME, "log_[0-9]*\\.txt");
|
||||||
runner.setProperty(TailFile.RECURSIVE, "false");
|
runner.setProperty(TailFile.RECURSIVE, "false");
|
||||||
runner.setProperty(TailFile.ROLLING_STRATEGY, TailFile.FIXED_NAME);
|
|
||||||
|
|
||||||
initializeFile("target/log_1.txt", "firstLine\n");
|
initializeFile("target/log_1.txt", "firstLine\n");
|
||||||
|
|
||||||
|
@ -795,7 +830,6 @@ public class TestTailFile {
|
||||||
public void testMultipleFilesChangingNameStrategy() throws IOException, InterruptedException {
|
public void testMultipleFilesChangingNameStrategy() throws IOException, InterruptedException {
|
||||||
runner.setProperty(TailFile.START_POSITION, TailFile.START_CURRENT_FILE);
|
runner.setProperty(TailFile.START_POSITION, TailFile.START_CURRENT_FILE);
|
||||||
runner.setProperty(TailFile.MODE, TailFile.MODE_MULTIFILE);
|
runner.setProperty(TailFile.MODE, TailFile.MODE_MULTIFILE);
|
||||||
runner.setProperty(TailFile.ROLLING_STRATEGY, TailFile.CHANGING_NAME);
|
|
||||||
runner.setProperty(TailFile.BASE_DIRECTORY, "target");
|
runner.setProperty(TailFile.BASE_DIRECTORY, "target");
|
||||||
runner.setProperty(TailFile.FILENAME, ".*app-.*.log");
|
runner.setProperty(TailFile.FILENAME, ".*app-.*.log");
|
||||||
runner.setProperty(TailFile.LOOKUP_FREQUENCY, "2s");
|
runner.setProperty(TailFile.LOOKUP_FREQUENCY, "2s");
|
||||||
|
|
|
@ -52,12 +52,6 @@
|
||||||
<artifactId>mockito-all</artifactId>
|
<artifactId>mockito-all</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.nifi</groupId>
|
|
||||||
<artifactId>nifi-framework-core</artifactId>
|
|
||||||
<version>${project.version}</version>
|
|
||||||
<scope>test</scope>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.nifi</groupId>
|
<groupId>org.apache.nifi</groupId>
|
||||||
<artifactId>nifi-nar-utils</artifactId>
|
<artifactId>nifi-nar-utils</artifactId>
|
||||||
|
|
|
@ -1,167 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.nifi.controller;
|
|
||||||
|
|
||||||
import org.apache.commons.io.FileUtils;
|
|
||||||
import org.apache.nifi.admin.service.AuditService;
|
|
||||||
import org.apache.nifi.authorization.Authorizer;
|
|
||||||
import org.apache.nifi.bundle.Bundle;
|
|
||||||
import org.apache.nifi.controller.repository.FlowFileEventRepository;
|
|
||||||
import org.apache.nifi.nar.ExtensionManager;
|
|
||||||
import org.apache.nifi.nar.SystemBundle;
|
|
||||||
import org.apache.nifi.provenance.MockProvenanceRepository;
|
|
||||||
import org.apache.nifi.util.CapturingLogger;
|
|
||||||
import org.apache.nifi.util.NiFiProperties;
|
|
||||||
import org.apache.nifi.util.Tuple;
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Ignore;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.lang.reflect.Field;
|
|
||||||
import java.lang.reflect.Modifier;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
|
|
||||||
public class MonitorMemoryTest {
|
|
||||||
|
|
||||||
private FlowController fc;
|
|
||||||
private Bundle systemBundle;
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void before() throws Exception {
|
|
||||||
System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties");
|
|
||||||
final Map<String, String> addProps = new HashMap<>();
|
|
||||||
addProps.put(NiFiProperties.ADMINISTRATIVE_YIELD_DURATION, "1 sec");
|
|
||||||
addProps.put(NiFiProperties.STATE_MANAGEMENT_CONFIG_FILE, "target/test-classes/state-management.xml");
|
|
||||||
addProps.put(NiFiProperties.STATE_MANAGEMENT_LOCAL_PROVIDER_ID, "local-provider");
|
|
||||||
|
|
||||||
final Tuple<FlowController, Bundle> tuple = this.buildFlowControllerForTest(addProps);
|
|
||||||
fc = tuple.getKey();
|
|
||||||
systemBundle = tuple.getValue();
|
|
||||||
}
|
|
||||||
|
|
||||||
@After
|
|
||||||
public void after() throws Exception {
|
|
||||||
fc.shutdown(true);
|
|
||||||
FileUtils.deleteDirectory(new File("./target/test-repo"));
|
|
||||||
FileUtils.deleteDirectory(new File("./target/content_repository"));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(expected = IllegalStateException.class)
|
|
||||||
public void validatevalidationKicksInOnWrongPoolNames() throws Exception {
|
|
||||||
ReportingTaskNode reportingTask = fc.createReportingTask(MonitorMemory.class.getName(), systemBundle.getBundleDetails().getCoordinate());
|
|
||||||
|
|
||||||
Map<String,String> props = new HashMap<>();
|
|
||||||
props.put(MonitorMemory.MEMORY_POOL_PROPERTY.getName(), "foo");
|
|
||||||
reportingTask.setProperties(props);
|
|
||||||
ProcessScheduler ps = fc.getProcessScheduler();
|
|
||||||
ps.schedule(reportingTask);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
@Ignore // temporarily ignoring it since it fails intermittently due to
|
|
||||||
// unpredictability during full build
|
|
||||||
// still keeping it for local testing
|
|
||||||
public void validateWarnWhenPercentThresholdReached() throws Exception {
|
|
||||||
this.doValidate("10%");
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* We're ignoring this tests as it is practically impossible to run it
|
|
||||||
* reliably together with automated Maven build since we can't control the
|
|
||||||
* state of the JVM on each machine during the build. However, you can run
|
|
||||||
* it selectively for further validation
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
@Ignore
|
|
||||||
public void validateWarnWhenSizeThresholdReached() throws Exception {
|
|
||||||
this.doValidate("10 MB");
|
|
||||||
}
|
|
||||||
|
|
||||||
public void doValidate(String threshold) throws Exception {
|
|
||||||
CapturingLogger capturingLogger = this.wrapAndReturnCapturingLogger();
|
|
||||||
ReportingTaskNode reportingTask = fc.createReportingTask(MonitorMemory.class.getName(), systemBundle.getBundleDetails().getCoordinate());
|
|
||||||
reportingTask.setSchedulingPeriod("1 sec");
|
|
||||||
|
|
||||||
Map<String,String> props = new HashMap<>();
|
|
||||||
props.put(MonitorMemory.MEMORY_POOL_PROPERTY.getName(), "PS Old Gen");
|
|
||||||
props.put(MonitorMemory.REPORTING_INTERVAL.getName(), "100 millis");
|
|
||||||
props.put(MonitorMemory.THRESHOLD_PROPERTY.getName(), threshold);
|
|
||||||
reportingTask.setProperties(props);
|
|
||||||
|
|
||||||
ProcessScheduler ps = fc.getProcessScheduler();
|
|
||||||
ps.schedule(reportingTask);
|
|
||||||
|
|
||||||
Thread.sleep(2000);
|
|
||||||
// ensure no memory warning were issued
|
|
||||||
assertTrue(capturingLogger.getWarnMessages().size() == 0);
|
|
||||||
|
|
||||||
// throw something on the heap
|
|
||||||
@SuppressWarnings("unused")
|
|
||||||
byte[] b = new byte[Integer.MAX_VALUE / 3];
|
|
||||||
Thread.sleep(200);
|
|
||||||
assertTrue(capturingLogger.getWarnMessages().size() > 0);
|
|
||||||
assertTrue(capturingLogger.getWarnMessages().get(0).getMsg()
|
|
||||||
.startsWith("Memory Pool 'PS Old Gen' has exceeded the configured Threshold of " + threshold));
|
|
||||||
|
|
||||||
// now try to clear the heap and see memory being reclaimed
|
|
||||||
b = null;
|
|
||||||
System.gc();
|
|
||||||
Thread.sleep(1000);
|
|
||||||
assertTrue(capturingLogger.getInfoMessages().get(0).getMsg().startsWith(
|
|
||||||
"Memory Pool 'PS Old Gen' is no longer exceeding the configured Threshold of " + threshold));
|
|
||||||
}
|
|
||||||
|
|
||||||
private CapturingLogger wrapAndReturnCapturingLogger() throws Exception {
|
|
||||||
Field loggerField = MonitorMemory.class.getDeclaredField("logger");
|
|
||||||
Field modifiersField = Field.class.getDeclaredField("modifiers");
|
|
||||||
modifiersField.setAccessible(true);
|
|
||||||
modifiersField.setInt(loggerField, loggerField.getModifiers() & ~Modifier.FINAL);
|
|
||||||
|
|
||||||
loggerField.setAccessible(true);
|
|
||||||
CapturingLogger capturingLogger = new CapturingLogger((Logger) loggerField.get(null));
|
|
||||||
loggerField.set(null, capturingLogger);
|
|
||||||
return capturingLogger;
|
|
||||||
}
|
|
||||||
|
|
||||||
private Tuple<FlowController, Bundle> buildFlowControllerForTest(final Map<String, String> addProps) throws Exception {
|
|
||||||
addProps.put(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS, MockProvenanceRepository.class.getName());
|
|
||||||
addProps.put("nifi.remote.input.socket.port", "");
|
|
||||||
addProps.put("nifi.remote.input.secure", "");
|
|
||||||
final NiFiProperties nifiProperties = NiFiProperties.createBasicNiFiProperties(null, addProps);
|
|
||||||
|
|
||||||
// build the system bundle
|
|
||||||
final Bundle bundle = SystemBundle.create(nifiProperties);
|
|
||||||
ExtensionManager.discoverExtensions(bundle, Collections.emptySet());
|
|
||||||
|
|
||||||
return new Tuple<>(FlowController.createStandaloneInstance(
|
|
||||||
mock(FlowFileEventRepository.class),
|
|
||||||
nifiProperties,
|
|
||||||
mock(Authorizer.class),
|
|
||||||
mock(AuditService.class),
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null), bundle);
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue