NIFI-1170 - Improved TailFile processor to support multiple files tailing

This closes #980
This commit is contained in:
Pierre Villard 2016-08-31 18:53:26 +02:00 committed by Oleg Zhurakousky
parent 68291636cb
commit 930e95aa00
3 changed files with 916 additions and 111 deletions

View File

@ -29,29 +29,38 @@ import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.StandardOpenOption; import java.nio.file.StandardOpenOption;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.zip.CRC32; import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream; import java.util.zip.CheckedInputStream;
import java.util.zip.Checksum; import java.util.zip.Checksum;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap; import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
@ -71,7 +80,7 @@ import org.apache.nifi.stream.io.StreamUtils;
@TriggerSerially @TriggerSerially
@InputRequirement(Requirement.INPUT_FORBIDDEN) @InputRequirement(Requirement.INPUT_FORBIDDEN)
@Tags({"tail", "file", "log", "text", "source"}) @Tags({"tail", "file", "log", "text", "source"})
@CapabilityDescription("\"Tails\" a file, ingesting data from the file as it is written to the file. The file is expected to be textual. Data is ingested only when a " @CapabilityDescription("\"Tails\" a file, or a list of files, ingesting data from the file as it is written to the file. The file is expected to be textual. Data is ingested only when a "
+ "new line is encountered (carriage return or new-line character or combination). If the file to tail is periodically \"rolled over\", as is generally the case " + "new line is encountered (carriage return or new-line character or combination). If the file to tail is periodically \"rolled over\", as is generally the case "
+ "with log files, an optional Rolling Filename Pattern can be used to retrieve data from files that have rolled over, even if the rollover occurred while NiFi " + "with log files, an optional Rolling Filename Pattern can be used to retrieve data from files that have rolled over, even if the rollover occurred while NiFi "
+ "was not running (provided that the data still exists upon restart of NiFi). It is generally advisable to set the Run Schedule to a few seconds, rather than running " + "was not running (provided that the data still exists upon restart of NiFi). It is generally advisable to set the Run Schedule to a few seconds, rather than running "
@ -79,35 +88,83 @@ import org.apache.nifi.stream.io.StreamUtils;
+ "ingesting files that have been compressed when 'rolled over'.") + "ingesting files that have been compressed when 'rolled over'.")
@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "Stores state about where in the Tailed File it left off so that on restart it does not have to duplicate data. " @Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "Stores state about where in the Tailed File it left off so that on restart it does not have to duplicate data. "
+ "State is stored either local or clustered depend on the <File Location> property.") + "State is stored either local or clustered depend on the <File Location> property.")
@WritesAttributes({
@WritesAttribute(attribute = "tailfile.original.path", description = "Path of the original file the flow file comes from.")
})
public class TailFile extends AbstractProcessor { public class TailFile extends AbstractProcessor {
static final AllowableValue LOCATION_LOCAL = new AllowableValue("Local", "Local", "State is stored locally. Each node in a cluster will tail a different file."); static final String MAP_PREFIX = "file.";
static final AllowableValue LOCATION_REMOTE = new AllowableValue("Remote", "Remote", "State is located on a remote resource. This Processor will store state across the cluster so that "
static final AllowableValue LOCATION_LOCAL = new AllowableValue("Local", "Local",
"State is stored locally. Each node in a cluster will tail a different file.");
static final AllowableValue LOCATION_REMOTE = new AllowableValue("Remote", "Remote",
"State is located on a remote resource. This Processor will store state across the cluster so that "
+ "it can be run on Primary Node Only and a new Primary Node can pick up where the last one left off."); + "it can be run on Primary Node Only and a new Primary Node can pick up where the last one left off.");
static final AllowableValue MODE_SINGLEFILE = new AllowableValue("Single file", "Single file",
"In this mode, only the one file indicated in the 'Files to tail' property will be watched by the processor."
+ " In this mode, the file may not exist when starting the processor.");
static final AllowableValue MODE_MULTIFILE = new AllowableValue("Multiple files", "Multiple files",
"In this mode, the 'Files to tail' property accepts a regular expression and the processor will look"
+ " for files in 'Base directory' to list the files to tail by the processor. In this mode, only the files existing"
+ " when starting the processor will be used.");
static final AllowableValue FIXED_NAME = new AllowableValue("Fixed name", "Fixed name", "With this rolling strategy, the files "
+ "where the log messages are appended have always the same name.");
static final AllowableValue CHANGING_NAME = new AllowableValue("Changing name", "Changing name", "With this rolling strategy, "
+ "the files where the log messages are appended have not a fixed name (for example: filename contaning the current day.");
static final AllowableValue START_BEGINNING_OF_TIME = new AllowableValue("Beginning of Time", "Beginning of Time", static final AllowableValue START_BEGINNING_OF_TIME = new AllowableValue("Beginning of Time", "Beginning of Time",
"Start with the oldest data that matches the Rolling Filename Pattern and then begin reading from the File to Tail"); "Start with the oldest data that matches the Rolling Filename Pattern and then begin reading from the File to Tail");
static final AllowableValue START_CURRENT_FILE = new AllowableValue("Beginning of File", "Beginning of File", static final AllowableValue START_CURRENT_FILE = new AllowableValue("Beginning of File", "Beginning of File",
"Start with the beginning of the File to Tail. Do not ingest any data that has already been rolled over"); "Start with the beginning of the File to Tail. Do not ingest any data that has already been rolled over");
static final AllowableValue START_CURRENT_TIME = new AllowableValue("Current Time", "Current Time", static final AllowableValue START_CURRENT_TIME = new AllowableValue("Current Time", "Current Time",
"Start with the data at the end of the File to Tail. Do not ingest any data thas has already been rolled over or any data in the File to Tail that has already been written."); "Start with the data at the end of the File to Tail. Do not ingest any data thas has already been rolled over or any "
+ "data in the File to Tail that has already been written.");
static final PropertyDescriptor BASE_DIRECTORY = new PropertyDescriptor.Builder()
.name("tail-base-directory")
.displayName("Base directory")
.description("Base directory used to look for files to tail. This property is required when using Multifile mode.")
.expressionLanguageSupported(false)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.required(false)
.build();
static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
.name("tail-mode")
.displayName("Tailing mode")
.description("Mode to use: single file will tail only one gile, multiple file will look for a list of file. In Multiple mode"
+ " the Base directory is required.")
.expressionLanguageSupported(false)
.required(true)
.allowableValues(MODE_SINGLEFILE, MODE_MULTIFILE)
.defaultValue(MODE_SINGLEFILE.getValue())
.build();
static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder() static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder()
.displayName("File(s) to Tail")
.name("File to Tail") .name("File to Tail")
.description("Fully-qualified filename of the file that should be tailed") .description("Path of the file to tail in case of single file mode. If using multifile mode, regular expression to find files "
+ "to tail in the base directory. In case recursivity is set to true, the regular expression will be used to match the "
+ "path starting from the base directory (see additional details for examples).")
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.required(true) .required(true)
.build(); .build();
static final PropertyDescriptor ROLLING_FILENAME_PATTERN = new PropertyDescriptor.Builder() static final PropertyDescriptor ROLLING_FILENAME_PATTERN = new PropertyDescriptor.Builder()
.name("Rolling Filename Pattern") .name("Rolling Filename Pattern")
.description("If the file to tail \"rolls over\" as would be the case with log files, this filename pattern will be used to " .description("If the file to tail \"rolls over\" as would be the case with log files, this filename pattern will be used to "
+ "identify files that have rolled over so that if NiFi is restarted, and the file has rolled over, it will be able to pick up where it left off. " + "identify files that have rolled over so that if NiFi is restarted, and the file has rolled over, it will be able to pick up where it left off. "
+ "This pattern supports wildcard characters * and ? and will assume that the files that have rolled over live in the same directory as the file being tailed.") + "This pattern supports wildcard characters * and ?, it also supports the notation ${filename} to specify a pattern based on the name of the file "
+ "(without extension), and will assume that the files that have rolled over live in the same directory as the file being tailed. "
+ "The same glob pattern will be used for all files.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.required(false) .required(false)
.build(); .build();
static final PropertyDescriptor STATE_LOCATION = new PropertyDescriptor.Builder() static final PropertyDescriptor STATE_LOCATION = new PropertyDescriptor.Builder()
.displayName("State Location") .displayName("State Location")
.name("File Location") //retained name of property for backward compatibility of configs .name("File Location") //retained name of property for backward compatibility of configs
@ -117,31 +174,79 @@ public class TailFile extends AbstractProcessor {
.allowableValues(LOCATION_LOCAL, LOCATION_REMOTE) .allowableValues(LOCATION_LOCAL, LOCATION_REMOTE)
.defaultValue(LOCATION_LOCAL.getValue()) .defaultValue(LOCATION_LOCAL.getValue())
.build(); .build();
static final PropertyDescriptor START_POSITION = new PropertyDescriptor.Builder() static final PropertyDescriptor START_POSITION = new PropertyDescriptor.Builder()
.name("Initial Start Position") .name("Initial Start Position")
.description("When the Processor first begins to tail data, this property specifies where the Processor should begin reading data. Once data has been ingested from the file, " .description("When the Processor first begins to tail data, this property specifies where the Processor should begin reading data. Once data has been ingested from a file, "
+ "the Processor will continue from the last point from which it has received data.") + "the Processor will continue from the last point from which it has received data.")
.allowableValues(START_BEGINNING_OF_TIME, START_CURRENT_FILE, START_CURRENT_TIME) .allowableValues(START_BEGINNING_OF_TIME, START_CURRENT_FILE, START_CURRENT_TIME)
.defaultValue(START_CURRENT_FILE.getValue()) .defaultValue(START_CURRENT_FILE.getValue())
.required(true) .required(true)
.build(); .build();
static final PropertyDescriptor RECURSIVE = new PropertyDescriptor.Builder()
.name("tailfile-recursive-lookup")
.displayName("Recursive lookup")
.description("When using Multiple files mode, this property defines if files must be listed recursively or not"
+ " in the base directory.")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
static final PropertyDescriptor ROLLING_STRATEGY = new PropertyDescriptor.Builder()
.name("tailfile-rolling-strategy")
.displayName("Rolling Strategy")
.description("Specifies if the files to tail have a fixed name or not.")
.required(true)
.allowableValues(FIXED_NAME, CHANGING_NAME)
.defaultValue(FIXED_NAME.getValue())
.build();
static final PropertyDescriptor LOOKUP_FREQUENCY = new PropertyDescriptor.Builder()
.name("tailfile-lookup-frequency")
.displayName("Lookup frequency")
.description("Only used in Multiple files mode and Changing name rolling strategy. It specifies the minimum "
+ "duration the processor will wait before listing again the files to tail.")
.required(false)
.defaultValue("10 minutes")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
static final PropertyDescriptor MAXIMUM_AGE = new PropertyDescriptor.Builder()
.name("tailfile-maximum-age")
.displayName("Maximum age")
.description("Only used in Multiple files mode and Changing name rolling strategy. It specifies the necessary "
+ "minimum duration to consider that no new messages will be appended in a file regarding its last "
+ "modification date. This should not be set too low to avoid duplication of data in case new messages "
+ "are appended at a lower frequency.")
.required(false)
.defaultValue("24 hours")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder() static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
.description("All FlowFiles are routed to this Relationship.") .description("All FlowFiles are routed to this Relationship.")
.build(); .build();
private volatile TailFileState state = new TailFileState(null, null, null, 0L, 0L, null, ByteBuffer.allocate(65536)); private volatile Map<String, TailFileObject> states = new HashMap<String, TailFileObject>();
private volatile Long expectedRecoveryChecksum; private volatile AtomicLong lastLookup = new AtomicLong(0L);
private volatile boolean tailFileChanged = false; private volatile AtomicBoolean isMultiChanging = new AtomicBoolean(false);
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(); final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(MODE);
properties.add(FILENAME); properties.add(FILENAME);
properties.add(ROLLING_FILENAME_PATTERN); properties.add(ROLLING_FILENAME_PATTERN);
properties.add(BASE_DIRECTORY);
properties.add(START_POSITION); properties.add(START_POSITION);
properties.add(STATE_LOCATION); properties.add(STATE_LOCATION);
properties.add(RECURSIVE);
properties.add(ROLLING_STRATEGY);
properties.add(LOOKUP_FREQUENCY);
properties.add(MAXIMUM_AGE);
return properties; return properties;
} }
@ -153,22 +258,178 @@ public class TailFile extends AbstractProcessor {
@Override @Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
if (isConfigurationRestored() && FILENAME.equals(descriptor)) { if (isConfigurationRestored() && FILENAME.equals(descriptor)) {
state = new TailFileState(newValue, null, null, 0L, 0L, null, ByteBuffer.allocate(65536)); states = new HashMap<String, TailFileObject>();
tailFileChanged = true;
} }
} }
@Override
protected Collection<ValidationResult> customValidate(ValidationContext context) {
final List<ValidationResult> results = new ArrayList<>(super.customValidate(context));
if(context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue())) {
String path = context.getProperty(BASE_DIRECTORY).getValue();
if(path == null) {
results.add(new ValidationResult.Builder().subject(BASE_DIRECTORY.getName()).valid(false)
.explanation("Base directory property cannot be empty in Multifile mode.").build());
} else if (!new File(path).isDirectory()) {
results.add(new ValidationResult.Builder().subject(BASE_DIRECTORY.getName()).valid(false)
.explanation(path + " is not a directory.").build());
}
if(context.getProperty(ROLLING_STRATEGY).getValue().equals(CHANGING_NAME.getValue())) {
String freq = context.getProperty(LOOKUP_FREQUENCY).getValue();
if(freq == null) {
results.add(new ValidationResult.Builder().subject(LOOKUP_FREQUENCY.getName()).valid(false)
.explanation("In Multiple files mode and Changing name rolling strategy, lookup frequency "
+ "property must be specified.").build());
}
String maxAge = context.getProperty(MAXIMUM_AGE).getValue();
if(maxAge == null) {
results.add(new ValidationResult.Builder().subject(MAXIMUM_AGE.getName()).valid(false)
.explanation("In Multiple files mode and Changing name rolling strategy, maximum age "
+ "property must be specified.").build());
}
} else {
long max = context.getProperty(MAXIMUM_AGE).getValue() == null ? Long.MAX_VALUE : context.getProperty(MAXIMUM_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
List<String> filesToTail = getFilesToTail(context.getProperty(BASE_DIRECTORY).getValue(),
context.getProperty(FILENAME).getValue(),
context.getProperty(RECURSIVE).asBoolean(),
max);
if(filesToTail.isEmpty()) {
results.add(new ValidationResult.Builder().subject(FILENAME.getName()).valid(false)
.explanation("There is no file to tail. Files must exist when starting this processor.").build());
}
}
}
return results;
}
@OnScheduled @OnScheduled
public void recoverState(final ProcessContext context) throws IOException { public void recoverState(final ProcessContext context) throws IOException {
// set isMultiChanging
isMultiChanging.set(context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue())
&& context.getProperty(ROLLING_STRATEGY).getValue().equals(CHANGING_NAME.getValue()));
// set last lookup to now
lastLookup.set(new Date().getTime());
// maxAge
long maxAge = context.getProperty(MAXIMUM_AGE).getValue() == null ? Long.MAX_VALUE : context.getProperty(MAXIMUM_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
// get list of files to tail
List<String> filesToTail = new ArrayList<String>();
if(context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue())) {
filesToTail.addAll(getFilesToTail(context.getProperty(BASE_DIRECTORY).getValue(),
context.getProperty(FILENAME).getValue(),
context.getProperty(RECURSIVE).asBoolean(),
maxAge));
} else {
filesToTail.add(context.getProperty(FILENAME).getValue());
}
final Scope scope = getStateScope(context); final Scope scope = getStateScope(context);
final StateMap stateMap = context.getStateManager().getState(scope); final StateMap stateMap = context.getStateManager().getState(scope);
if (stateMap.getVersion() == -1L) { if (stateMap.getVersion() == -1L) {
//state has been cleared or never stored so recover as 'empty state' //state has been cleared or never stored so recover as 'empty state'
recoverState(context, Collections.EMPTY_MAP); initStates(filesToTail, Collections.emptyMap(), true);
recoverState(context, filesToTail, Collections.emptyMap());
return; return;
} }
recoverState(context, stateMap.toMap()); Map<String, String> statesMap = stateMap.toMap();
initStates(filesToTail, statesMap, false);
recoverState(context, filesToTail, statesMap);
}
private void initStates(List<String> filesToTail, Map<String, String> statesMap, boolean isCleared) {
int i = 0;
if(isCleared) {
states.clear();
} else {
// we have to deal with the case where NiFi has been restarted. In this
// case 'states' object is empty but the statesMap is not. So we have to
// put back the files we already know about in 'states' object before
// doing the recovery
if(states.isEmpty() && !statesMap.isEmpty()) {
for(String key : statesMap.keySet()) {
if(key.endsWith(TailFileState.StateKeys.FILENAME)) {
int index = Integer.valueOf(key.split("\\.")[1]);
states.put(statesMap.get(key), new TailFileObject(index, statesMap));
}
}
}
// first, we remove the files that are no longer present
List<String> toBeRemoved = new ArrayList<String>();
for(String file : states.keySet()) {
if(!filesToTail.contains(file)) {
toBeRemoved.add(file);
cleanReader(states.get(file));
}
}
states.keySet().removeAll(toBeRemoved);
// then we need to get the highest ID used so far to be sure
// we don't mix different files in case we add new files to tail
for(String file : states.keySet()) {
if(i <= states.get(file).getFilenameIndex()) {
i = states.get(file).getFilenameIndex() + 1;
}
}
}
for (String file : filesToTail) {
if(isCleared || !states.containsKey(file)) {
states.put(file, new TailFileObject(i));
i++;
}
}
}
private void recoverState(final ProcessContext context, final List<String> filesToTail, final Map<String, String> map) throws IOException {
for (String file : filesToTail) {
recoverState(context, map, file);
}
}
/**
* Method to list the files to tail according to the given base directory
* and using the user-provided regular expression
* @param baseDir base directory to recursively look into
* @param fileRegex expression regular used to match files to tail
* @param isRecursive true if looking for file recursively, false otherwise
* @return List of files to tail
*/
private List<String> getFilesToTail(final String baseDir, String fileRegex, boolean isRecursive, long maxAge) {
Collection<File> files = FileUtils.listFiles(new File(baseDir), null, isRecursive);
List<String> result = new ArrayList<String>();
String fullRegex = baseDir.endsWith(File.separator) ? baseDir + fileRegex : baseDir + File.separator + fileRegex;
Pattern p = Pattern.compile(fullRegex);
for(File file : files) {
String path = file.getPath();
if(p.matcher(path).matches()) {
if(isMultiChanging.get()) {
if((new Date().getTime() - file.lastModified()) < maxAge) {
result.add(path);
}
} else {
result.add(path);
}
}
}
return result;
} }
/** /**
@ -181,35 +442,43 @@ public class TailFile extends AbstractProcessor {
* @param stateValues the values that were recovered from state that was * @param stateValues the values that were recovered from state that was
* previously stored. This Map should be populated with the keys defined in * previously stored. This Map should be populated with the keys defined in
* {@link TailFileState.StateKeys}. * {@link TailFileState.StateKeys}.
* @param filePath the file of the file for which state must be recovered
* @throws IOException if unable to seek to the appropriate location in the * @throws IOException if unable to seek to the appropriate location in the
* tailed file. * tailed file.
*/ */
private void recoverState(final ProcessContext context, final Map<String, String> stateValues) throws IOException { private void recoverState(final ProcessContext context, final Map<String, String> stateValues, final String filePath) throws IOException {
final String currentFilename = context.getProperty(FILENAME).getValue();
if (!stateValues.containsKey(TailFileState.StateKeys.FILENAME)) { final String prefix = MAP_PREFIX + states.get(filePath).getFilenameIndex() + '.';
resetState(currentFilename);
if (!stateValues.containsKey(prefix + TailFileState.StateKeys.FILENAME)) {
resetState(filePath);
return; return;
} }
if (!stateValues.containsKey(TailFileState.StateKeys.POSITION)) { if (!stateValues.containsKey(prefix + TailFileState.StateKeys.POSITION)) {
resetState(currentFilename); resetState(filePath);
return; return;
} }
if (!stateValues.containsKey(TailFileState.StateKeys.TIMESTAMP)) { if (!stateValues.containsKey(prefix + TailFileState.StateKeys.TIMESTAMP)) {
resetState(currentFilename); resetState(filePath);
return;
}
if (!stateValues.containsKey(prefix + TailFileState.StateKeys.LENGTH)) {
resetState(filePath);
return; return;
} }
final String checksumValue = stateValues.get(TailFileState.StateKeys.CHECKSUM); final String checksumValue = stateValues.get(prefix + TailFileState.StateKeys.CHECKSUM);
final boolean checksumPresent = (checksumValue != null); final boolean checksumPresent = (checksumValue != null);
final String storedStateFilename = stateValues.get(TailFileState.StateKeys.FILENAME); final String storedStateFilename = stateValues.get(prefix + TailFileState.StateKeys.FILENAME);
final long position = Long.parseLong(stateValues.get(TailFileState.StateKeys.POSITION)); final long position = Long.parseLong(stateValues.get(prefix + TailFileState.StateKeys.POSITION));
final long timestamp = Long.parseLong(stateValues.get(TailFileState.StateKeys.TIMESTAMP)); final long timestamp = Long.parseLong(stateValues.get(prefix + TailFileState.StateKeys.TIMESTAMP));
final long length = Long.parseLong(stateValues.get(prefix + TailFileState.StateKeys.LENGTH));
FileChannel reader = null; FileChannel reader = null;
File tailFile = null; File tailFile = null;
if (checksumPresent && currentFilename.equals(storedStateFilename)) { if (checksumPresent && filePath.equals(storedStateFilename)) {
expectedRecoveryChecksum = Long.parseLong(checksumValue); states.get(filePath).setExpectedRecoveryChecksum(Long.parseLong(checksumValue));
// We have an expected checksum and the currently configured filename is the same as the state file. // We have an expected checksum and the currently configured filename is the same as the state file.
// We need to check if the existing file is the same as the one referred to in the state file based on // We need to check if the existing file is the same as the one referred to in the state file based on
@ -219,10 +488,10 @@ public class TailFile extends AbstractProcessor {
if (existingTailFile.length() >= position) { if (existingTailFile.length() >= position) {
try (final InputStream tailFileIs = new FileInputStream(existingTailFile); try (final InputStream tailFileIs = new FileInputStream(existingTailFile);
final CheckedInputStream in = new CheckedInputStream(tailFileIs, checksum)) { final CheckedInputStream in = new CheckedInputStream(tailFileIs, checksum)) {
StreamUtils.copy(in, new NullOutputStream(), state.getPosition()); StreamUtils.copy(in, new NullOutputStream(), states.get(filePath).getState().getPosition());
final long checksumResult = in.getChecksum().getValue(); final long checksumResult = in.getChecksum().getValue();
if (checksumResult == expectedRecoveryChecksum) { if (checksumResult == states.get(filePath).getExpectedRecoveryChecksum()) {
// Checksums match. This means that we want to resume reading from where we left off. // Checksums match. This means that we want to resume reading from where we left off.
// So we will populate the reader object so that it will be used in onTrigger. If the // So we will populate the reader object so that it will be used in onTrigger. If the
// checksums do not match, then we will leave the reader object null, so that the next // checksums do not match, then we will leave the reader object null, so that the next
@ -245,58 +514,86 @@ public class TailFile extends AbstractProcessor {
+ "this indicates that the file has rotated. Will begin tailing current file from beginning.", new Object[]{existingTailFile.length(), position}); + "this indicates that the file has rotated. Will begin tailing current file from beginning.", new Object[]{existingTailFile.length(), position});
} }
state = new TailFileState(currentFilename, tailFile, reader, position, timestamp, checksum, ByteBuffer.allocate(65536)); states.get(filePath).setState(new TailFileState(filePath, tailFile, reader, position, timestamp, length, checksum, ByteBuffer.allocate(65536)));
} else { } else {
resetState(currentFilename); resetState(filePath);
} }
getLogger().debug("Recovered state {}", new Object[]{state}); getLogger().debug("Recovered state {}", new Object[]{states.get(filePath).getState()});
} }
private void resetState(final String currentFilename) { private void resetState(final String filePath) {
expectedRecoveryChecksum = null; states.get(filePath).setExpectedRecoveryChecksum(null);
state = new TailFileState(currentFilename, null, null, 0L, 0L, null, ByteBuffer.allocate(65536)); states.get(filePath).setState(new TailFileState(filePath, null, null, 0L, 0L, 0L, null, ByteBuffer.allocate(65536)));
} }
@OnStopped @OnStopped
public void cleanup() { public void cleanup() {
final TailFileState state = this.state; for (TailFileObject tfo : states.values()) {
if (state == null) { cleanReader(tfo);
final TailFileState state = tfo.getState();
tfo.setState(new TailFileState(state.getFilename(), state.getFile(), null, state.getPosition(), state.getTimestamp(), state.getLength(), state.getChecksum(), state.getBuffer()));
}
}
private void cleanReader(TailFileObject tfo) {
if (tfo.getState() == null) {
return; return;
} }
final FileChannel reader = state.getReader(); final FileChannel reader = tfo.getState().getReader();
if (reader == null) { if (reader == null) {
return; return;
} }
try { try {
reader.close(); reader.close();
getLogger().debug("Closed FileChannel {}", new Object[]{reader});
} catch (final IOException ioe) { } catch (final IOException ioe) {
getLogger().warn("Failed to close file handle during cleanup"); getLogger().warn("Failed to close file handle during cleanup");
} }
getLogger().debug("Closed FileChannel {}", new Object[]{reader});
this.state = new TailFileState(state.getFilename(), state.getFile(), null, state.getPosition(), state.getTimestamp(), state.getChecksum(), state.getBuffer());
} }
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
if(isMultiChanging.get()) {
long timeSinceLastLookup = new Date().getTime() - lastLookup.get();
if(timeSinceLastLookup > context.getProperty(LOOKUP_FREQUENCY).asTimePeriod(TimeUnit.MILLISECONDS)) {
try {
recoverState(context);
} catch (IOException e) {
getLogger().error("Exception raised while looking up for new files", e);
context.yield();
return;
}
}
}
if(states.isEmpty()) {
context.yield();
return;
}
for (String tailFile : states.keySet()) {
processTailFile(context, session, tailFile);
}
}
private void processTailFile(final ProcessContext context, final ProcessSession session, final String tailFile) {
// If user changes the file that is being tailed, we need to consume the already-rolled-over data according // If user changes the file that is being tailed, we need to consume the already-rolled-over data according
// to the Initial Start Position property // to the Initial Start Position property
boolean rolloverOccurred; boolean rolloverOccurred;
if (tailFileChanged) { TailFileObject tfo = states.get(tailFile);
if (tfo.isTailFileChanged()) {
rolloverOccurred = false; rolloverOccurred = false;
final String recoverPosition = context.getProperty(START_POSITION).getValue(); final String recoverPosition = context.getProperty(START_POSITION).getValue();
if (START_BEGINNING_OF_TIME.getValue().equals(recoverPosition)) { if (START_BEGINNING_OF_TIME.getValue().equals(recoverPosition)) {
recoverRolledFiles(context, session, this.expectedRecoveryChecksum, state.getTimestamp(), state.getPosition()); recoverRolledFiles(context, session, tailFile, tfo.getExpectedRecoveryChecksum(), tfo.getState().getTimestamp(), tfo.getState().getPosition());
} else if (START_CURRENT_FILE.getValue().equals(recoverPosition)) { } else if (START_CURRENT_FILE.getValue().equals(recoverPosition)) {
cleanup(); cleanup();
state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, 0L, null, state.getBuffer()); tfo.setState(new TailFileState(tailFile, null, null, 0L, 0L, 0L, null, tfo.getState().getBuffer()));
} else { } else {
final String filename = context.getProperty(FILENAME).getValue(); final String filename = tailFile;
final File file = new File(filename); final File file = new File(filename);
try { try {
@ -314,7 +611,7 @@ public class TailFile extends AbstractProcessor {
fileChannel.position(position); fileChannel.position(position);
cleanup(); cleanup();
state = new TailFileState(filename, file, fileChannel, position, timestamp, checksum, state.getBuffer()); tfo.setState(new TailFileState(filename, file, fileChannel, position, timestamp, file.length(), checksum, tfo.getState().getBuffer()));
} catch (final IOException ioe) { } catch (final IOException ioe) {
getLogger().error("Attempted to position Reader at current position in file {} but failed to do so due to {}", new Object[]{file, ioe.toString()}, ioe); getLogger().error("Attempted to position Reader at current position in file {} but failed to do so due to {}", new Object[]{file, ioe.toString()}, ioe);
context.yield(); context.yield();
@ -322,25 +619,25 @@ public class TailFile extends AbstractProcessor {
} }
} }
tailFileChanged = false; tfo.setTailFileChanged(false);
} else { } else {
// Recover any data that may have rolled over since the last time that this processor ran. // Recover any data that may have rolled over since the last time that this processor ran.
// If expectedRecoveryChecksum != null, that indicates that this is the first iteration since processor was started, so use whatever checksum value // If expectedRecoveryChecksum != null, that indicates that this is the first iteration since processor was started, so use whatever checksum value
// was present when the state was last persisted. In this case, we must then null out the value so that the next iteration won't keep using the "recovered" // was present when the state was last persisted. In this case, we must then null out the value so that the next iteration won't keep using the "recovered"
// value. If the value is null, then we know that either the processor has already recovered that data, or there was no state persisted. In either case, // value. If the value is null, then we know that either the processor has already recovered that data, or there was no state persisted. In either case,
// use whatever checksum value is currently in the state. // use whatever checksum value is currently in the state.
Long expectedChecksumValue = expectedRecoveryChecksum; Long expectedChecksumValue = tfo.getExpectedRecoveryChecksum();
if (expectedChecksumValue == null) { if (expectedChecksumValue == null) {
expectedChecksumValue = state.getChecksum() == null ? null : state.getChecksum().getValue(); expectedChecksumValue = tfo.getState().getChecksum() == null ? null : tfo.getState().getChecksum().getValue();
} }
rolloverOccurred = recoverRolledFiles(context, session, expectedChecksumValue, state.getTimestamp(), state.getPosition()); rolloverOccurred = recoverRolledFiles(context, session, tailFile, expectedChecksumValue, tfo.getState().getTimestamp(), tfo.getState().getPosition());
expectedRecoveryChecksum = null; tfo.setExpectedRecoveryChecksum(null);
} }
// initialize local variables from state object; this is done so that we can easily change the values throughout // initialize local variables from state object; this is done so that we can easily change the values throughout
// the onTrigger method and then create a new state object after we finish processing the files. // the onTrigger method and then create a new state object after we finish processing the files.
TailFileState state = this.state; TailFileState state = tfo.getState();
File file = state.getFile(); File file = state.getFile();
FileChannel reader = state.getReader(); FileChannel reader = state.getReader();
Checksum checksum = state.getChecksum(); Checksum checksum = state.getChecksum();
@ -349,10 +646,11 @@ public class TailFile extends AbstractProcessor {
} }
long position = state.getPosition(); long position = state.getPosition();
long timestamp = state.getTimestamp(); long timestamp = state.getTimestamp();
long length = state.getLength();
// Create a reader if necessary. // Create a reader if necessary.
if (file == null || reader == null) { if (file == null || reader == null) {
file = new File(context.getProperty(FILENAME).getValue()); file = new File(tailFile);
reader = createReader(file, position); reader = createReader(file, position);
if (reader == null) { if (reader == null) {
context.yield(); context.yield();
@ -363,7 +661,10 @@ public class TailFile extends AbstractProcessor {
final long startNanos = System.nanoTime(); final long startNanos = System.nanoTime();
// Check if file has rotated // Check if file has rotated
if (rolloverOccurred) { if (rolloverOccurred
|| (timestamp <= file.lastModified() && length > file.length())
|| (timestamp < file.lastModified() && length >= file.length())) {
// Since file has rotated, we close the reader, create a new one, and then reset our state. // Since file has rotated, we close the reader, create a new one, and then reset our state.
try { try {
reader.close(); reader.close();
@ -377,13 +678,11 @@ public class TailFile extends AbstractProcessor {
checksum.reset(); checksum.reset();
} }
if (file.length() == position) { if (file.length() == position || !file.exists()) {
// no data to consume so rather than continually running, yield to allow other processors to use the thread. // no data to consume so rather than continually running, yield to allow other processors to use the thread.
// In this case, the state should not have changed, and we will have created no FlowFiles, so we don't have to
// persist the state or commit the session; instead, just return here.
getLogger().debug("No data to consume; created no FlowFiles"); getLogger().debug("No data to consume; created no FlowFiles");
state = this.state = new TailFileState(context.getProperty(FILENAME).getValue(), file, reader, position, timestamp, checksum, state.getBuffer()); tfo.setState(new TailFileState(tailFile, file, reader, position, timestamp, length, checksum, state.getBuffer()));
persistState(state, context); persistState(tfo, context);
context.yield(); context.yield();
return; return;
} }
@ -420,9 +719,10 @@ public class TailFile extends AbstractProcessor {
flowFileName = baseName + "." + position + "-" + positionHolder.get(); flowFileName = baseName + "." + position + "-" + positionHolder.get();
} }
final Map<String, String> attributes = new HashMap<>(2); final Map<String, String> attributes = new HashMap<>(3);
attributes.put(CoreAttributes.FILENAME.key(), flowFileName); attributes.put(CoreAttributes.FILENAME.key(), flowFileName);
attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
attributes.put("tailfile.original.path", tailFile);
flowFile = session.putAllAttributes(flowFile, attributes); flowFile = session.putAllAttributes(flowFile, attributes);
session.getProvenanceReporter().receive(flowFile, file.toURI().toString(), "FlowFile contains bytes " + position + " through " + positionHolder.get() + " of source file", session.getProvenanceReporter().receive(flowFile, file.toURI().toString(), "FlowFile contains bytes " + position + " through " + positionHolder.get() + " of source file",
@ -436,16 +736,16 @@ public class TailFile extends AbstractProcessor {
// operating system file last mod precision), then we could set the timestamp to a smaller value, which could result in reading in the // operating system file last mod precision), then we could set the timestamp to a smaller value, which could result in reading in the
// rotated file a second time. // rotated file a second time.
timestamp = Math.max(state.getTimestamp(), file.lastModified()); timestamp = Math.max(state.getTimestamp(), file.lastModified());
length = file.length();
getLogger().debug("Created {} and routed to success", new Object[]{flowFile}); getLogger().debug("Created {} and routed to success", new Object[]{flowFile});
} }
// Create a new state object to represent our current position, timestamp, etc. // Create a new state object to represent our current position, timestamp, etc.
final TailFileState updatedState = new TailFileState(context.getProperty(FILENAME).getValue(), file, reader, position, timestamp, checksum, state.getBuffer()); tfo.setState(new TailFileState(tailFile, file, reader, position, timestamp, length, checksum, state.getBuffer()));
this.state = updatedState;
// We must commit session before persisting state in order to avoid data loss on restart // We must commit session before persisting state in order to avoid data loss on restart
session.commit(); session.commit();
persistState(updatedState, context); persistState(tfo, context);
} }
/** /**
@ -551,17 +851,18 @@ public class TailFile extends AbstractProcessor {
* @return a list of all Files that have rolled over * @return a list of all Files that have rolled over
* @throws IOException if unable to perform the listing of files * @throws IOException if unable to perform the listing of files
*/ */
private List<File> getRolledOffFiles(final ProcessContext context, final long minTimestamp) throws IOException { private List<File> getRolledOffFiles(final ProcessContext context, final long minTimestamp, final String tailFilePath) throws IOException {
final String tailFilename = context.getProperty(FILENAME).getValue(); final File tailFile = new File(tailFilePath);
final File tailFile = new File(tailFilename);
File directory = tailFile.getParentFile(); File directory = tailFile.getParentFile();
if (directory == null) { if (directory == null) {
directory = new File("."); directory = new File(".");
} }
final String rollingPattern = context.getProperty(ROLLING_FILENAME_PATTERN).getValue(); String rollingPattern = context.getProperty(ROLLING_FILENAME_PATTERN).getValue();
if (rollingPattern == null) { if (rollingPattern == null) {
return Collections.emptyList(); return Collections.emptyList();
} else {
rollingPattern = rollingPattern.replace("${filename}", StringUtils.substringBeforeLast(tailFile.getName(), "."));
} }
final List<File> rolledOffFiles = new ArrayList<>(); final List<File> rolledOffFiles = new ArrayList<>();
@ -610,13 +911,21 @@ public class TailFile extends AbstractProcessor {
return Scope.LOCAL; return Scope.LOCAL;
} }
private void persistState(final TailFileState state, final ProcessContext context) { private void persistState(final TailFileObject tfo, final ProcessContext context) {
persistState(state.toStateMap(), context); persistState(tfo.getState().toStateMap(tfo.getFilenameIndex()), context);
} }
private void persistState(final Map<String, String> state, final ProcessContext context) { private void persistState(final Map<String, String> state, final ProcessContext context) {
try { try {
context.getStateManager().setState(state, getStateScope(context)); StateMap oldState = context.getStateManager().getState(getStateScope(context));
Map<String, String> updatedState = new HashMap<String, String>();
for(String key : oldState.toMap().keySet()) {
updatedState.put(key, oldState.get(key));
}
updatedState.putAll(state);
context.getStateManager().setState(updatedState, getStateScope(context));
} catch (final IOException e) { } catch (final IOException e) {
getLogger().warn("Failed to store state due to {}; some data may be duplicated on restart of NiFi", new Object[]{e}); getLogger().warn("Failed to store state due to {}; some data may be duplicated on restart of NiFi", new Object[]{e});
} }
@ -652,8 +961,8 @@ public class TailFile extends AbstractProcessor {
} }
// for testing purposes // for testing purposes
TailFileState getState() { Map<String, TailFileObject> getState() {
return state; return states;
} }
/** /**
@ -678,14 +987,14 @@ public class TailFile extends AbstractProcessor {
* @return <code>true</code> if the file being tailed has rolled over, * @return <code>true</code> if the file being tailed has rolled over,
* <code>false</code> otherwise * <code>false</code> otherwise
*/ */
private boolean recoverRolledFiles(final ProcessContext context, final ProcessSession session, final Long expectedChecksum, final long timestamp, final long position) { private boolean recoverRolledFiles(final ProcessContext context, final ProcessSession session, final String tailFile, final Long expectedChecksum, final long timestamp, final long position) {
try { try {
// Find all files that match our rollover pattern, if any, and order them based on their timestamp and filename. // Find all files that match our rollover pattern, if any, and order them based on their timestamp and filename.
// Ignore any file that has a timestamp earlier than the state that we have persisted. If we were reading from // Ignore any file that has a timestamp earlier than the state that we have persisted. If we were reading from
// a file when we stopped running, then that file that we were reading from should be the first file in this list, // a file when we stopped running, then that file that we were reading from should be the first file in this list,
// assuming that the file still exists on the file system. // assuming that the file still exists on the file system.
final List<File> rolledOffFiles = getRolledOffFiles(context, timestamp); final List<File> rolledOffFiles = getRolledOffFiles(context, timestamp, tailFile);
return recoverRolledFiles(context, session, rolledOffFiles, expectedChecksum, timestamp, position); return recoverRolledFiles(context, session, tailFile, rolledOffFiles, expectedChecksum, timestamp, position);
} catch (final IOException e) { } catch (final IOException e) {
getLogger().error("Failed to recover files that have rolled over due to {}", new Object[]{e}); getLogger().error("Failed to recover files that have rolled over due to {}", new Object[]{e});
return false; return false;
@ -714,10 +1023,11 @@ public class TailFile extends AbstractProcessor {
* @return <code>true</code> if the file being tailed has rolled over, false * @return <code>true</code> if the file being tailed has rolled over, false
* otherwise * otherwise
*/ */
private boolean recoverRolledFiles(final ProcessContext context, final ProcessSession session, final List<File> rolledOffFiles, final Long expectedChecksum, private boolean recoverRolledFiles(final ProcessContext context, final ProcessSession session, final String tailFile, final List<File> rolledOffFiles, final Long expectedChecksum,
final long timestamp, final long position) { final long timestamp, final long position) {
try { try {
getLogger().debug("Recovering Rolled Off Files; total number of files rolled off = {}", new Object[]{rolledOffFiles.size()}); getLogger().debug("Recovering Rolled Off Files; total number of files rolled off = {}", new Object[]{rolledOffFiles.size()});
TailFileObject tfo = states.get(tailFile);
// For first file that we find, it may or may not be the file that we were last reading from. // For first file that we find, it may or may not be the file that we were last reading from.
// As a result, we have to read up to the position we stored, while calculating the checksum. If the checksums match, // As a result, we have to read up to the position we stored, while calculating the checksum. If the checksums match,
@ -746,9 +1056,13 @@ public class TailFile extends AbstractProcessor {
session.remove(flowFile); session.remove(flowFile);
// use a timestamp of lastModified() + 1 so that we do not ingest this file again. // use a timestamp of lastModified() + 1 so that we do not ingest this file again.
cleanup(); cleanup();
state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, firstFile.lastModified() + 1L, null, state.getBuffer()); tfo.setState(new TailFileState(tailFile, null, null, 0L, firstFile.lastModified() + 1L, firstFile.length(), null, tfo.getState().getBuffer()));
} else { } else {
flowFile = session.putAttribute(flowFile, "filename", firstFile.getName()); final Map<String, String> attributes = new HashMap<>(3);
attributes.put(CoreAttributes.FILENAME.key(), firstFile.getName());
attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
attributes.put("tailfile.original.path", tailFile);
flowFile = session.putAllAttributes(flowFile, attributes);
session.getProvenanceReporter().receive(flowFile, firstFile.toURI().toString(), "FlowFile contains bytes 0 through " + position + " of source file", session.getProvenanceReporter().receive(flowFile, firstFile.toURI().toString(), "FlowFile contains bytes 0 through " + position + " of source file",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos)); TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
@ -757,11 +1071,11 @@ public class TailFile extends AbstractProcessor {
// use a timestamp of lastModified() + 1 so that we do not ingest this file again. // use a timestamp of lastModified() + 1 so that we do not ingest this file again.
cleanup(); cleanup();
state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, firstFile.lastModified() + 1L, null, state.getBuffer()); tfo.setState(new TailFileState(tailFile, null, null, 0L, firstFile.lastModified() + 1L, firstFile.length(), null, tfo.getState().getBuffer()));
// must ensure that we do session.commit() before persisting state in order to avoid data loss. // must ensure that we do session.commit() before persisting state in order to avoid data loss.
session.commit(); session.commit();
persistState(state, context); persistState(tfo, context);
} }
} else { } else {
getLogger().debug("Checksum for {} did not match expected checksum. Checksum for file was {} but expected {}. Will consume entire file", getLogger().debug("Checksum for {} did not match expected checksum. Checksum for file was {} but expected {}. Will consume entire file",
@ -776,7 +1090,7 @@ public class TailFile extends AbstractProcessor {
// we were reading when we last stopped, as it may already have been partially consumed. That is taken care of in the // we were reading when we last stopped, as it may already have been partially consumed. That is taken care of in the
// above block of code. // above block of code.
for (final File file : rolledOffFiles) { for (final File file : rolledOffFiles) {
state = consumeFileFully(file, context, session, state); tfo.setState(consumeFileFully(file, context, session, tfo));
} }
return rolloverOccurred; return rolloverOccurred;
@ -794,32 +1108,92 @@ public class TailFile extends AbstractProcessor {
* @param file the file to ingest * @param file the file to ingest
* @param context the ProcessContext * @param context the ProcessContext
* @param session the ProcessSession * @param session the ProcessSession
* @param state the current state * @param tfo the current state
* *
* @return the new, updated state that reflects that the given file has been * @return the new, updated state that reflects that the given file has been
* ingested. * ingested.
*/ */
private TailFileState consumeFileFully(final File file, final ProcessContext context, final ProcessSession session, TailFileState state) { private TailFileState consumeFileFully(final File file, final ProcessContext context, final ProcessSession session, TailFileObject tfo) {
FlowFile flowFile = session.create(); FlowFile flowFile = session.create();
flowFile = session.importFrom(file.toPath(), true, flowFile); flowFile = session.importFrom(file.toPath(), true, flowFile);
if (flowFile.getSize() == 0L) { if (flowFile.getSize() == 0L) {
session.remove(flowFile); session.remove(flowFile);
} else { } else {
flowFile = session.putAttribute(flowFile, "filename", file.getName()); final Map<String, String> attributes = new HashMap<>(3);
attributes.put(CoreAttributes.FILENAME.key(), file.getName());
attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
attributes.put("tailfile.original.path", tfo.getState().getFilename());
flowFile = session.putAllAttributes(flowFile, attributes);
session.getProvenanceReporter().receive(flowFile, file.toURI().toString()); session.getProvenanceReporter().receive(flowFile, file.toURI().toString());
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
getLogger().debug("Created {} from {} and routed to success", new Object[]{flowFile, file}); getLogger().debug("Created {} from {} and routed to success", new Object[]{flowFile, file});
// use a timestamp of lastModified() + 1 so that we do not ingest this file again. // use a timestamp of lastModified() + 1 so that we do not ingest this file again.
cleanup(); cleanup();
state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, file.lastModified() + 1L, null, state.getBuffer()); tfo.setState(new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, file.lastModified() + 1L, file.length(), null, tfo.getState().getBuffer()));
// must ensure that we do session.commit() before persisting state in order to avoid data loss. // must ensure that we do session.commit() before persisting state in order to avoid data loss.
session.commit(); session.commit();
persistState(state, context); persistState(tfo, context);
}
return tfo.getState();
}
static class TailFileObject {
private TailFileState state = new TailFileState(null, null, null, 0L, 0L, 0L, null, ByteBuffer.allocate(65536));
private Long expectedRecoveryChecksum;
private int filenameIndex;
private boolean tailFileChanged = true;
public TailFileObject(int i) {
this.filenameIndex = i;
}
public TailFileObject(int index, Map<String, String> statesMap) {
this.filenameIndex = index;
this.tailFileChanged = false;
final String prefix = MAP_PREFIX + index + '.';
final String filename = statesMap.get(prefix + TailFileState.StateKeys.FILENAME);
final long position = Long.valueOf(statesMap.get(prefix + TailFileState.StateKeys.POSITION));
final long timestamp = Long.valueOf(statesMap.get(prefix + TailFileState.StateKeys.TIMESTAMP));
final long length = Long.valueOf(statesMap.get(prefix + TailFileState.StateKeys.LENGTH));
this.state = new TailFileState(filename, new File(filename), null, position, timestamp, length, null, ByteBuffer.allocate(65536));
}
public int getFilenameIndex() {
return filenameIndex;
}
public void setFilenameIndex(int filenameIndex) {
this.filenameIndex = filenameIndex;
}
public TailFileState getState() {
return state;
}
public void setState(TailFileState state) {
this.state = state;
}
public Long getExpectedRecoveryChecksum() {
return expectedRecoveryChecksum;
}
public void setExpectedRecoveryChecksum(Long expectedRecoveryChecksum) {
this.expectedRecoveryChecksum = expectedRecoveryChecksum;
}
public boolean isTailFileChanged() {
return tailFileChanged;
}
public void setTailFileChanged(boolean tailFileChanged) {
this.tailFileChanged = tailFileChanged;
} }
return state;
} }
/** /**
@ -833,22 +1207,25 @@ public class TailFile extends AbstractProcessor {
private final FileChannel reader; private final FileChannel reader;
private final long position; private final long position;
private final long timestamp; private final long timestamp;
private final long length;
private final Checksum checksum; private final Checksum checksum;
private final ByteBuffer buffer; private final ByteBuffer buffer;
private static class StateKeys { private static class StateKeys {
public static final String FILENAME = "filename"; public static final String FILENAME = "filename";
public static final String POSITION = "position"; public static final String POSITION = "position";
public static final String TIMESTAMP = "timestamp"; public static final String TIMESTAMP = "timestamp";
public static final String CHECKSUM = "checksum"; public static final String CHECKSUM = "checksum";
public static final String LENGTH = "length";
} }
public TailFileState(final String filename, final File file, final FileChannel reader, final long position, final long timestamp, final Checksum checksum, final ByteBuffer buffer) { public TailFileState(final String filename, final File file, final FileChannel reader,
final long position, final long timestamp, final long length, final Checksum checksum, final ByteBuffer buffer) {
this.filename = filename; this.filename = filename;
this.file = file; this.file = file;
this.reader = reader; this.reader = reader;
this.position = position; this.position = position;
this.length = length;
this.timestamp = timestamp; // many operating systems will use only second-level precision for last-modified times so cut off milliseconds this.timestamp = timestamp; // many operating systems will use only second-level precision for last-modified times so cut off milliseconds
this.checksum = checksum; this.checksum = checksum;
this.buffer = buffer; this.buffer = buffer;
@ -874,6 +1251,10 @@ public class TailFile extends AbstractProcessor {
return timestamp; return timestamp;
} }
public long getLength() {
return length;
}
public Checksum getChecksum() { public Checksum getChecksum() {
return checksum; return checksum;
} }
@ -887,12 +1268,14 @@ public class TailFile extends AbstractProcessor {
return "TailFileState[filename=" + filename + ", position=" + position + ", timestamp=" + timestamp + ", checksum=" + (checksum == null ? "null" : checksum.getValue()) + "]"; return "TailFileState[filename=" + filename + ", position=" + position + ", timestamp=" + timestamp + ", checksum=" + (checksum == null ? "null" : checksum.getValue()) + "]";
} }
public Map<String, String> toStateMap() { public Map<String, String> toStateMap(int index) {
final String prefix = MAP_PREFIX + index + '.';
final Map<String, String> map = new HashMap<>(4); final Map<String, String> map = new HashMap<>(4);
map.put(StateKeys.FILENAME, filename); map.put(prefix + StateKeys.FILENAME, filename);
map.put(StateKeys.POSITION, String.valueOf(position)); map.put(prefix + StateKeys.POSITION, String.valueOf(position));
map.put(StateKeys.TIMESTAMP, String.valueOf(timestamp)); map.put(prefix + StateKeys.LENGTH, String.valueOf(length));
map.put(StateKeys.CHECKSUM, checksum == null ? null : String.valueOf(checksum.getValue())); map.put(prefix + StateKeys.TIMESTAMP, String.valueOf(timestamp));
map.put(prefix + StateKeys.CHECKSUM, checksum == null ? null : String.valueOf(checksum.getValue()));
return map; return map;
} }
} }

View File

@ -0,0 +1,134 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>TailFile</title>
<link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
</head>
<body>
<h3>Modes</h3>
<p>
This processor is used to tail a file or multiple files according to multiple modes. The
mode to choose depends of the logging pattern followed by the file(s) to tail. In any case, if there
is a rolling pattern, the rolling files must be plain text files (compression is not supported at
the moment).
</p>
<ul>
<li><b>Single file</b>: the processor will tail the file with the path given in 'File(s) to tail' property.</li>
<li><b>Multiple files</b>: the processor will look for files into the 'Base directory'. It will look for file recursively
according to the 'Recursive lookup' property and will tail all the files matching the regular expression
provided in the 'File(s) to tail' property.</li>
</ul>
<h3>Rolling filename pattern</h3>
<p>
In case the 'Rolling filename pattern' property is used, when the processor detects that the file to tail has rolled over, the
processor will look for possible missing messages in the rolled file. To do so, the processor will use the pattern to find the
rolling files in the same directory as the file to tail.
</p>
<p>
In order to keep this property available in the 'Multiple files' mode when multiples files to tail are in the same directory,
it is possible to use the ${filename} tag to reference the name (without extension) of the file to tail. For example, if we have:
</p>
<p>
<code>
/my/path/directory/my-app.log.1<br />
/my/path/directory/my-app.log<br />
/my/path/directory/application.log.1<br />
/my/path/directory/application.log
</code>
</p>
<p>
the 'rolling filename pattern' would be <i>${filename}.log.*</i>.
</p>
<h3>Descriptions for different modes and strategies</h3>
<p>
The '<b>Single file</b>' mode assumes that the file to tail has always the same name even if there is a rolling pattern.
Example:
</p>
<p>
<code>
/my/path/directory/my-app.log.2<br />
/my/path/directory/my-app.log.1<br />
/my/path/directory/my-app.log
</code>
</p>
<p>
and new log messages are always appended in my-app.log file.
</p>
<p>
In case recursivity is set to 'true'. The regular expression for the files to tail must embrace the possible intermediate directories
between the base directory and the files to tail. Example:
</p>
<p>
<code>
/my/path/directory1/my-app1.log<br />
/my/path/directory2/my-app2.log<br />
/my/path/directory3/my-app3.log
</code>
</p>
<p>
<code>
Base directory: /my/path<br />
Files to tail: directory[1-3]/my-app[1-3].log<br />
Recursivity: true
</code>
</p>
<p>
In the '<b>Multiple files</b>' mode, it is possible to specify if the file to tail has always the same name or not. It is done through
the property 'Rolling strategy'. The strategy can be 'Fixed name' in case the files to tail have always the same name (see example above)
or can be 'Changing name' in case the files to tail do not always have the same name. Example:
</p>
<p>
<code>
/my/path/directory/my-app-2016-09-06.log<br />
/my/path/directory/my-app-2016-09-07.log<br />
/my/path/directory/my-app-2016-09-08.log
</code>
</p>
<p>
and new log messages are always appended in log file of the current day.
</p>
<p>
If the processor is configured with '<b>Multiple files</b>' mode and '<b>Fixed name</b>' rolling strategy, the processor will list the
files to tail in the 'Base directory' (recursively or not) and matching the regular expression. This listing will only occur once when
the processor is started. In this configuration, the processor will act as in 'Single file' mode for all files listed by the processor
when started.
</p>
<p>
If the processor is configured with '<b>Multiple files</b>' mode and '<b>Changing name</b>' rolling strategy, two new properties are
mandatory:
</p>
<ul>
<li><b>Lookup frequency</b>: specifies the minimum duration the processor will wait before listing again the files to tail.</li>
<li><b>Maximum age</b>: specifies the necessary minimum duration to consider that no new messages will be appended in a file
regarding its last modification date.</li>
</ul>
<p>
It is necessary to pay attention to 'Lookup frequency' and 'Maximum age' properties as well as the frequency at which the processor is
triggered in order to keep good performances. It is recommended to keep 'Maximum age' > 'Lookup frequency' > processor scheduling
frequency to avoid loosing messages. It also recommended not to set 'Maximum Age' too low because if messages are appended in a file
after this file has been considered "too old", all the messages in the file may be read again leading to data duplication.
</p>
<p>
Besides, if the processor is configured with '<b>Multiple files</b>' mode and '<b>Changing name</b>' rolling strategy, the 'Rolling
filename pattern' property must be specific enough to ensure that only the rolling files will be listed and not other currently tailed
files in the same directory (this can be achieved using ${filename} tag).
</p>
</body>
</html>

View File

@ -37,36 +37,38 @@ import org.junit.Test;
public class TestTailFile { public class TestTailFile {
private File file; private File file;
private TailFile processor; private File otherFile;
private RandomAccessFile raf; private RandomAccessFile raf;
private RandomAccessFile otherRaf;
private TailFile processor;
private TestRunner runner; private TestRunner runner;
@Before @Before
public void setup() throws IOException { public void setup() throws IOException {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard", "TRACE"); System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard", "TRACE");
clean();
final File targetDir = new File("target");
final File[] files = targetDir.listFiles(new FilenameFilter() {
@Override
public boolean accept(final File dir, final String name) {
return name.startsWith("log");
}
});
for (final File file : files) {
file.delete();
}
file = new File("target/log.txt"); file = new File("target/log.txt");
file.delete(); file.delete();
assertTrue(file.createNewFile()); assertTrue(file.createNewFile());
File directory = new File("target/testDir");
if(!directory.exists()) {
assertTrue(directory.mkdirs());
}
otherFile = new File("target/testDir/log.txt");
otherFile.delete();
assertTrue(otherFile.createNewFile());
processor = new TailFile(); processor = new TailFile();
runner = TestRunners.newTestRunner(processor); runner = TestRunners.newTestRunner(processor);
runner.setProperty(TailFile.FILENAME, "target/log.txt"); runner.setProperty(TailFile.FILENAME, "target/log.txt");
runner.assertValid(); runner.assertValid();
raf = new RandomAccessFile(file, "rw"); raf = new RandomAccessFile(file, "rw");
otherRaf = new RandomAccessFile(otherFile, "rw");
} }
@After @After
@ -75,6 +77,10 @@ public class TestTailFile {
raf.close(); raf.close();
} }
if (otherRaf != null) {
otherRaf.close();
}
processor.cleanup(); processor.cleanup();
} }
@ -436,7 +442,7 @@ public class TestTailFile {
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
final TailFileState state = ((TailFile) runner.getProcessor()).getState(); final TailFileState state = ((TailFile) runner.getProcessor()).getState().get("target/log.txt").getState();
assertNotNull(state); assertNotNull(state);
assertEquals("target/log.txt", state.getFilename()); assertEquals("target/log.txt", state.getFilename());
assertTrue(state.getTimestamp() <= System.currentTimeMillis()); assertTrue(state.getTimestamp() <= System.currentTimeMillis());
@ -500,4 +506,286 @@ public class TestTailFile {
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0); runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
} }
@Test
public void testRolloverWhenNoRollingPattern() throws IOException {
// write out some data and ingest it.
raf.write("hello there\n".getBytes());
runner.run();
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
runner.clearTransferState();
// move the file and write data to the new log.txt file.
raf.write("another".getBytes());
raf.close();
file.renameTo(new File("target/log.1"));
raf = new RandomAccessFile(file, "rw");
raf.write("new file\n".getBytes());
// because the roll over pattern has not been set we are not able to get
// data before the file has been moved, but we still want to ingest data
// from the tailed file
runner.run();
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("new file\n");
runner.clearTransferState();
// in the unlikely case where more data is written after the file is moved
// we are not able to detect it is a completely new file, then we continue
// on the tailed file as it never changed
raf.close();
file.renameTo(new File("target/log.2"));
raf = new RandomAccessFile(file, "rw");
raf.write("new file with longer data in the new file\n".getBytes());
runner.run();
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("with longer data in the new file\n");
runner.clearTransferState();
}
@Test
public void testMultipleFiles() throws IOException, InterruptedException {
runner.setProperty(TailFile.BASE_DIRECTORY, "target");
runner.setProperty(TailFile.MODE, TailFile.MODE_MULTIFILE);
runner.setProperty(TailFile.FILENAME, "(testDir/)?log(ging)?.txt");
runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "${filename}.?");
runner.setProperty(TailFile.START_POSITION, TailFile.START_CURRENT_FILE);
runner.setProperty(TailFile.RECURSIVE, "true");
runner.run(1);
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
// I manually add a third file to tail here
// I'll remove it later in the test
File thirdFile = new File("target/logging.txt");
if(thirdFile.exists()) {
thirdFile.delete();
}
assertTrue(thirdFile.createNewFile());
RandomAccessFile thirdFileRaf = new RandomAccessFile(thirdFile, "rw");
thirdFileRaf.write("hey\n".getBytes());
otherRaf.write("hi\n".getBytes());
raf.write("hello\n".getBytes());
runner.run(1);
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 3);
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hey\n");
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertAttributeEquals("tailfile.original.path", thirdFile.getPath());
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("hi\n");
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertAttributeEquals("tailfile.original.path", otherFile.getPath());
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(2).assertContentEquals("hello\n");
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(2).assertAttributeEquals("tailfile.original.path", file.getPath());
runner.clearTransferState();
otherRaf.write("world!".getBytes());
raf.write("world".getBytes());
Thread.sleep(100L);
runner.run(1);
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0); // should not pull in data because no \n
raf.close();
otherRaf.close();
thirdFileRaf.close();
thirdFile.delete();
file.renameTo(new File("target/log.1"));
otherFile.renameTo(new File("target/testDir/log.1"));
raf = new RandomAccessFile(new File("target/log.txt"), "rw");
raf.write("1\n".getBytes());
otherRaf = new RandomAccessFile(new File("target/testDir/log.txt"), "rw");
otherRaf.write("2\n".getBytes());
// I also add a new file here
File fourthFile = new File("target/testDir/logging.txt");
if(fourthFile.exists()) {
fourthFile.delete();
}
assertTrue(fourthFile.createNewFile());
RandomAccessFile fourthFileRaf = new RandomAccessFile(fourthFile, "rw");
fourthFileRaf.write("3\n".getBytes());
fourthFileRaf.close();
runner.run(1);
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 5);
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("3\n");
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("world!");
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(2).assertContentEquals("2\n");
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(3).assertContentEquals("world");
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(4).assertContentEquals("1\n");
}
/**
* This test is used to check the case where we have multiple files in the same directory
* and where it is not possible to specify a single rolling pattern for all files.
*/
@Test
public void testMultipleFilesInSameDirectory() throws IOException, InterruptedException {
runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "${filename}.?");
runner.setProperty(TailFile.START_POSITION, TailFile.START_CURRENT_FILE);
runner.setProperty(TailFile.BASE_DIRECTORY, "target");
runner.setProperty(TailFile.FILENAME, "log(ging)?.txt");
runner.setProperty(TailFile.MODE, TailFile.MODE_MULTIFILE);
runner.run(1);
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
File myOtherFile = new File("target/logging.txt");
if(myOtherFile.exists()) {
myOtherFile.delete();
}
assertTrue(myOtherFile.createNewFile());
RandomAccessFile myOtherRaf = new RandomAccessFile(myOtherFile, "rw");
myOtherRaf.write("hey\n".getBytes());
raf.write("hello\n".getBytes());
runner.run(1);
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hey\n");
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertAttributeEquals("tailfile.original.path", myOtherFile.getPath());
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("hello\n");
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertAttributeEquals("tailfile.original.path", file.getPath());
runner.clearTransferState();
myOtherRaf.write("guys".getBytes());
raf.write("world".getBytes());
Thread.sleep(100L);
runner.run(1);
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0); // should not pull in data because no \n
raf.close();
myOtherRaf.close();
// roll over
myOtherFile.renameTo(new File("target/logging.1"));
file.renameTo(new File("target/log.1"));
raf = new RandomAccessFile(new File("target/log.txt"), "rw");
raf.write("1\n".getBytes());
myOtherRaf = new RandomAccessFile(new File("target/logging.txt"), "rw");
myOtherRaf.write("2\n".getBytes());
myOtherRaf.close();
runner.run(1);
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 4);
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("guys");
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("2\n");
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(2).assertContentEquals("world");
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(3).assertContentEquals("1\n");
}
@Test
public void testMultipleFilesChangingNameStrategy() throws IOException, InterruptedException {
runner.setProperty(TailFile.START_POSITION, TailFile.START_CURRENT_FILE);
runner.setProperty(TailFile.MODE, TailFile.MODE_MULTIFILE);
runner.setProperty(TailFile.ROLLING_STRATEGY, TailFile.CHANGING_NAME);
runner.setProperty(TailFile.BASE_DIRECTORY, "target");
runner.setProperty(TailFile.FILENAME, ".*app-.*.log");
runner.setProperty(TailFile.LOOKUP_FREQUENCY, "2s");
runner.setProperty(TailFile.MAXIMUM_AGE, "5s");
File multiChangeFirstFile = new File("target/app-2016-09-07.log");
if(multiChangeFirstFile.exists()) {
multiChangeFirstFile.delete();
}
assertTrue(multiChangeFirstFile.createNewFile());
RandomAccessFile multiChangeFirstRaf = new RandomAccessFile(multiChangeFirstFile, "rw");
multiChangeFirstRaf.write("hey\n".getBytes());
File multiChangeSndFile = new File("target/my-app-2016-09-07.log");
if(multiChangeSndFile.exists()) {
multiChangeSndFile.delete();
}
assertTrue(multiChangeSndFile.createNewFile());
RandomAccessFile multiChangeSndRaf = new RandomAccessFile(multiChangeSndFile, "rw");
multiChangeSndRaf.write("hello\n".getBytes());
runner.run(1, false);
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n");
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("hey\n");
runner.clearTransferState();
multiChangeFirstRaf.write("hey2\n".getBytes());
multiChangeSndRaf.write("hello2\n".getBytes());
Thread.sleep(2000);
runner.run(1, false);
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello2\n");
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("hey2\n");
runner.clearTransferState();
multiChangeFirstRaf.write("hey3\n".getBytes());
multiChangeSndRaf.write("hello3\n".getBytes());
multiChangeFirstRaf.close();
multiChangeSndRaf.close();
multiChangeFirstFile = new File("target/app-2016-09-08.log");
if(multiChangeFirstFile.exists()) {
multiChangeFirstFile.delete();
}
assertTrue(multiChangeFirstFile.createNewFile());
multiChangeFirstRaf = new RandomAccessFile(multiChangeFirstFile, "rw");
multiChangeFirstRaf.write("hey\n".getBytes());
multiChangeSndFile = new File("target/my-app-2016-09-08.log");
if(multiChangeSndFile.exists()) {
multiChangeSndFile.delete();
}
assertTrue(multiChangeSndFile.createNewFile());
multiChangeSndRaf = new RandomAccessFile(multiChangeSndFile, "rw");
multiChangeSndRaf.write("hello\n".getBytes());
Thread.sleep(2000);
runner.run(1);
multiChangeFirstRaf.close();
multiChangeSndRaf.close();
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 4);
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello3\n");
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("hello\n");
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(2).assertContentEquals("hey3\n");
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(3).assertContentEquals("hey\n");
runner.clearTransferState();
}
private void cleanFiles(String directory) {
final File targetDir = new File(directory);
if(targetDir.exists()) {
final File[] files = targetDir.listFiles(new FilenameFilter() {
@Override
public boolean accept(final File dir, final String name) {
return name.startsWith("log") || name.endsWith("log");
}
});
for (final File file : files) {
file.delete();
}
}
}
private void clean() {
cleanFiles("target");
cleanFiles("target/testDir");
}
} }