NIFI-2457 removed old state file mechanism and fixed state reset logic

This closes #768.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
joewitt 2016-08-01 23:57:56 -04:00 committed by Bryan Bende
parent e23b235617
commit 6bcc415eb8
2 changed files with 157 additions and 215 deletions

View File

@ -14,14 +14,11 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
@ -84,8 +81,8 @@ import org.apache.nifi.stream.io.StreamUtils;
+ "State is stored either local or clustered depend on the <File Location> property.") + "State is stored either local or clustered depend on the <File Location> property.")
public class TailFile extends AbstractProcessor { public class TailFile extends AbstractProcessor {
static final AllowableValue LOCATION_LOCAL = new AllowableValue("Local", "Local", "File is located on a local disk drive. Each node in a cluster will tail a different file."); static final AllowableValue LOCATION_LOCAL = new AllowableValue("Local", "Local", "State is stored locally. Each node in a cluster will tail a different file.");
static final AllowableValue LOCATION_REMOTE = new AllowableValue("Remote", "Remote", "File is located on a remote resource. This Processor will store state across the cluster so that " static final AllowableValue LOCATION_REMOTE = new AllowableValue("Remote", "Remote", "State is located on a remote resource. This Processor will store state across the cluster so that "
+ "it can be run on Primary Node Only and a new Primary Node can pick up where the last one left off."); + "it can be run on Primary Node Only and a new Primary Node can pick up where the last one left off.");
static final AllowableValue START_BEGINNING_OF_TIME = new AllowableValue("Beginning of Time", "Beginning of Time", static final AllowableValue START_BEGINNING_OF_TIME = new AllowableValue("Beginning of Time", "Beginning of Time",
@ -111,20 +108,15 @@ public class TailFile extends AbstractProcessor {
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.required(false) .required(false)
.build(); .build();
static final PropertyDescriptor FILE_LOCATION = new PropertyDescriptor.Builder() static final PropertyDescriptor STATE_LOCATION = new PropertyDescriptor.Builder()
.name("File Location") .displayName("State Location")
.description("Specifies where the file is located, so that state can be stored appropriately in order to ensure that all data is consumed without duplicating data upon restart of NiFi") .name("File Location") //retained name of property for backward compatibility of configs
.description("Specifies where the state is located either local or cluster so that state can be stored "
+ "appropriately in order to ensure that all data is consumed without duplicating data upon restart of NiFi")
.required(true) .required(true)
.allowableValues(LOCATION_LOCAL, LOCATION_REMOTE) .allowableValues(LOCATION_LOCAL, LOCATION_REMOTE)
.defaultValue(LOCATION_LOCAL.getValue()) .defaultValue(LOCATION_LOCAL.getValue())
.build(); .build();
static final PropertyDescriptor STATE_FILE = new PropertyDescriptor.Builder()
.name("State File")
.description("Specifies the file that should be used for storing state about what data has been ingested so that upon restart NiFi can resume from where it left off")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.required(false)
.build();
static final PropertyDescriptor START_POSITION = new PropertyDescriptor.Builder() static final PropertyDescriptor START_POSITION = new PropertyDescriptor.Builder()
.name("Initial Start Position") .name("Initial Start Position")
.description("When the Processor first begins to tail data, this property specifies where the Processor should begin reading data. Once data has been ingested from the file, " .description("When the Processor first begins to tail data, this property specifies where the Processor should begin reading data. Once data has been ingested from the file, "
@ -148,9 +140,8 @@ public class TailFile extends AbstractProcessor {
final List<PropertyDescriptor> properties = new ArrayList<>(); final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(FILENAME); properties.add(FILENAME);
properties.add(ROLLING_FILENAME_PATTERN); properties.add(ROLLING_FILENAME_PATTERN);
properties.add(STATE_FILE);
properties.add(START_POSITION); properties.add(START_POSITION);
properties.add(FILE_LOCATION); properties.add(STATE_LOCATION);
return properties; return properties;
} }
@ -167,22 +158,13 @@ public class TailFile extends AbstractProcessor {
} }
} }
@OnScheduled @OnScheduled
public void recoverState(final ProcessContext context) throws IOException { public void recoverState(final ProcessContext context) throws IOException {
// Before the State Manager existed, we had to store state in a local file. Now, we want to use the State Manager
// instead. So we will need to recover the state that is stored in the file (if any), and then store that in our
// State Manager. But we do this only if nothing has ever been stored in the State Manager.
final Scope scope = getStateScope(context); final Scope scope = getStateScope(context);
final StateMap stateMap = context.getStateManager().getState(scope); final StateMap stateMap = context.getStateManager().getState(scope);
if (stateMap.getVersion() == -1L) { if (stateMap.getVersion() == -1L) {
// State has never been stored in the State Manager. Try to recover state from a file, if one exists. //state has been cleared or never stored so recover as 'empty state'
final Map<String, String> stateFromFile = recoverStateValuesFromFile(context); recoverState(context, Collections.EMPTY_MAP);
if (!stateFromFile.isEmpty()) {
persistState(stateFromFile, context);
recoverState(context, stateFromFile);
}
return; return;
} }
@ -190,81 +172,33 @@ public class TailFile extends AbstractProcessor {
} }
/** /**
* Recovers values for the State that was stored in a local file. * Updates member variables to reflect the "expected recovery checksum" and
* * seek to the appropriate location in the tailed file, updating our
* @param context the ProcessContext that indicates where the state is stored * checksum, so that we are ready to proceed with the
* @return a Map that contains the keys defined in {@link TailFileState.StateKeys} * {@link #onTrigger(ProcessContext, ProcessSession)} call.
* @throws IOException if the state file exists but was unable to be read
*/
private Map<String, String> recoverStateValuesFromFile(final ProcessContext context) throws IOException {
final String stateFilename = context.getProperty(STATE_FILE).getValue();
if (stateFilename == null) {
return Collections.emptyMap();
}
final Map<String, String> stateValues = new HashMap<>(4);
final File stateFile = new File(stateFilename);
try (final FileInputStream fis = new FileInputStream(stateFile);
final DataInputStream dis = new DataInputStream(fis)) {
final int encodingVersion = dis.readInt();
if (encodingVersion > 0) {
throw new IOException("Unable to recover state because State File was encoded in a more recent version than Version 1");
}
if (encodingVersion == 0) {
final String filename = dis.readUTF();
long position = dis.readLong();
final long timestamp = dis.readLong();
final boolean checksumPresent = dis.readBoolean();
final Long checksumValue;
if (checksumPresent) {
checksumValue = dis.readLong();
} else {
checksumValue = null;
}
stateValues.put(TailFileState.StateKeys.FILENAME, filename);
stateValues.put(TailFileState.StateKeys.POSITION, String.valueOf(position));
stateValues.put(TailFileState.StateKeys.TIMESTAMP, String.valueOf(timestamp));
stateValues.put(TailFileState.StateKeys.CHECKSUM, checksumValue == null ? null : String.valueOf(checksumValue));
} else {
// encoding Version == -1... no data in file. Just move on.
}
} catch (final FileNotFoundException fnfe) {
}
return stateValues;
}
/**
* Updates member variables to reflect the "expected recovery checksum" and seek to the appropriate location in the
* tailed file, updating our checksum, so that we are ready to proceed with the {@link #onTrigger(ProcessContext, ProcessSession)} call.
* *
* @param context the ProcessContext * @param context the ProcessContext
* @param stateValues the values that were recovered from state that was previously stored. This Map should be populated with the keys defined * @param stateValues the values that were recovered from state that was
* in {@link TailFileState.StateKeys}. * previously stored. This Map should be populated with the keys defined in
* @throws IOException if unable to seek to the appropriate location in the tailed file. * {@link TailFileState.StateKeys}.
* @throws IOException if unable to seek to the appropriate location in the
* tailed file.
*/ */
private void recoverState(final ProcessContext context, final Map<String, String> stateValues) throws IOException { private void recoverState(final ProcessContext context, final Map<String, String> stateValues) throws IOException {
if (stateValues == null) { final String currentFilename = context.getProperty(FILENAME).getValue();
return;
}
if (!stateValues.containsKey(TailFileState.StateKeys.FILENAME)) { if (!stateValues.containsKey(TailFileState.StateKeys.FILENAME)) {
resetState(currentFilename);
return; return;
} }
if (!stateValues.containsKey(TailFileState.StateKeys.POSITION)) { if (!stateValues.containsKey(TailFileState.StateKeys.POSITION)) {
resetState(currentFilename);
return; return;
} }
if (!stateValues.containsKey(TailFileState.StateKeys.TIMESTAMP)) { if (!stateValues.containsKey(TailFileState.StateKeys.TIMESTAMP)) {
resetState(currentFilename);
return; return;
} }
final String currentFilename = context.getProperty(FILENAME).getValue();
final String checksumValue = stateValues.get(TailFileState.StateKeys.CHECKSUM); final String checksumValue = stateValues.get(TailFileState.StateKeys.CHECKSUM);
final boolean checksumPresent = (checksumValue != null); final boolean checksumPresent = (checksumValue != null);
final String storedStateFilename = stateValues.get(TailFileState.StateKeys.FILENAME); final String storedStateFilename = stateValues.get(TailFileState.StateKeys.FILENAME);
@ -297,7 +231,7 @@ public class TailFile extends AbstractProcessor {
getLogger().debug("When recovering state, checksum of tailed file matches the stored checksum. Will resume where left off."); getLogger().debug("When recovering state, checksum of tailed file matches the stored checksum. Will resume where left off.");
tailFile = existingTailFile; tailFile = existingTailFile;
reader = FileChannel.open(tailFile.toPath(), StandardOpenOption.READ); reader = FileChannel.open(tailFile.toPath(), StandardOpenOption.READ);
getLogger().debug("Created FileChannel {} for {} in recoverState", new Object[] {reader, tailFile}); getLogger().debug("Created FileChannel {} for {} in recoverState", new Object[]{reader, tailFile});
reader.position(position); reader.position(position);
} else { } else {
@ -308,22 +242,22 @@ public class TailFile extends AbstractProcessor {
} else { } else {
// fewer bytes than our position, so we know we weren't already reading from this file. Keep reader at a position of 0. // fewer bytes than our position, so we know we weren't already reading from this file. Keep reader at a position of 0.
getLogger().debug("When recovering state, existing file to tail is only {} bytes but position flag is {}; " getLogger().debug("When recovering state, existing file to tail is only {} bytes but position flag is {}; "
+ "this indicates that the file has rotated. Will begin tailing current file from beginning.", new Object[] {existingTailFile.length(), position}); + "this indicates that the file has rotated. Will begin tailing current file from beginning.", new Object[]{existingTailFile.length(), position});
} }
state = new TailFileState(currentFilename, tailFile, reader, position, timestamp, checksum, ByteBuffer.allocate(65536)); state = new TailFileState(currentFilename, tailFile, reader, position, timestamp, checksum, ByteBuffer.allocate(65536));
} else { } else {
// If filename changed or there is no checksum present, then we have no expected checksum to use for recovery. resetState(currentFilename);
expectedRecoveryChecksum = null; }
// tailing a new file since the state file was written out. We will reset state. getLogger().debug("Recovered state {}", new Object[]{state});
}
private void resetState(final String currentFilename) {
expectedRecoveryChecksum = null;
state = new TailFileState(currentFilename, null, null, 0L, 0L, null, ByteBuffer.allocate(65536)); state = new TailFileState(currentFilename, null, null, 0L, 0L, null, ByteBuffer.allocate(65536));
} }
getLogger().debug("Recovered state {}", new Object[] {state});
}
@OnStopped @OnStopped
public void cleanup() { public void cleanup() {
final TailFileState state = this.state; final TailFileState state = this.state;
@ -342,13 +276,11 @@ public class TailFile extends AbstractProcessor {
getLogger().warn("Failed to close file handle during cleanup"); getLogger().warn("Failed to close file handle during cleanup");
} }
getLogger().debug("Closed FileChannel {}", new Object[] {reader}); getLogger().debug("Closed FileChannel {}", new Object[]{reader});
this.state = new TailFileState(state.getFilename(), state.getFile(), null, state.getPosition(), state.getTimestamp(), state.getChecksum(), state.getBuffer()); this.state = new TailFileState(state.getFilename(), state.getFile(), null, state.getPosition(), state.getTimestamp(), state.getChecksum(), state.getBuffer());
} }
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
// If user changes the file that is being tailed, we need to consume the already-rolled-over data according // If user changes the file that is being tailed, we need to consume the already-rolled-over data according
@ -369,7 +301,7 @@ public class TailFile extends AbstractProcessor {
try { try {
final FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ); final FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
getLogger().debug("Created FileChannel {} for {}", new Object[] {fileChannel, file}); getLogger().debug("Created FileChannel {} for {}", new Object[]{fileChannel, file});
final Checksum checksum = new CRC32(); final Checksum checksum = new CRC32();
final long position = file.length(); final long position = file.length();
@ -384,7 +316,7 @@ public class TailFile extends AbstractProcessor {
cleanup(); cleanup();
state = new TailFileState(filename, file, fileChannel, position, timestamp, checksum, state.getBuffer()); state = new TailFileState(filename, file, fileChannel, position, timestamp, checksum, state.getBuffer());
} catch (final IOException ioe) { } catch (final IOException ioe) {
getLogger().error("Attempted to position Reader at current position in file {} but failed to do so due to {}", new Object[] {file, ioe.toString()}, ioe); getLogger().error("Attempted to position Reader at current position in file {} but failed to do so due to {}", new Object[]{file, ioe.toString()}, ioe);
context.yield(); context.yield();
return; return;
} }
@ -435,9 +367,9 @@ public class TailFile extends AbstractProcessor {
// Since file has rotated, we close the reader, create a new one, and then reset our state. // Since file has rotated, we close the reader, create a new one, and then reset our state.
try { try {
reader.close(); reader.close();
getLogger().debug("Closed FileChannel {}", new Object[] {reader, reader}); getLogger().debug("Closed FileChannel {}", new Object[]{reader, reader});
} catch (final IOException ioe) { } catch (final IOException ioe) {
getLogger().warn("Failed to close reader for {} due to {}", new Object[] {file, ioe}); getLogger().warn("Failed to close reader for {} due to {}", new Object[]{file, ioe});
} }
reader = createReader(file, 0L); reader = createReader(file, 0L);
@ -504,7 +436,7 @@ public class TailFile extends AbstractProcessor {
// operating system file last mod precision), then we could set the timestamp to a smaller value, which could result in reading in the // operating system file last mod precision), then we could set the timestamp to a smaller value, which could result in reading in the
// rotated file a second time. // rotated file a second time.
timestamp = Math.max(state.getTimestamp(), file.lastModified()); timestamp = Math.max(state.getTimestamp(), file.lastModified());
getLogger().debug("Created {} and routed to success", new Object[] {flowFile}); getLogger().debug("Created {} and routed to success", new Object[]{flowFile});
} }
// Create a new state object to represent our current position, timestamp, etc. // Create a new state object to represent our current position, timestamp, etc.
@ -516,21 +448,22 @@ public class TailFile extends AbstractProcessor {
persistState(updatedState, context); persistState(updatedState, context);
} }
/** /**
* Read new lines from the given FileChannel, copying it to the given Output Stream. The Checksum is used in order to later determine whether or not * Read new lines from the given FileChannel, copying it to the given Output
* Stream. The Checksum is used in order to later determine whether or not
* data has been consumed. * data has been consumed.
* *
* @param reader The FileChannel to read data from * @param reader The FileChannel to read data from
* @param buffer the buffer to use for copying data * @param buffer the buffer to use for copying data
* @param out the OutputStream to copy the data to * @param out the OutputStream to copy the data to
* @param checksum the Checksum object to use in order to calculate checksum for recovery purposes * @param checksum the Checksum object to use in order to calculate checksum
* for recovery purposes
* *
* @return The new position after the lines have been read * @return The new position after the lines have been read
* @throws java.io.IOException if an I/O error occurs. * @throws java.io.IOException if an I/O error occurs.
*/ */
private long readLines(final FileChannel reader, final ByteBuffer buffer, final OutputStream out, final Checksum checksum) throws IOException { private long readLines(final FileChannel reader, final ByteBuffer buffer, final OutputStream out, final Checksum checksum) throws IOException {
getLogger().debug("Reading lines starting at position {}", new Object[] {reader.position()}); getLogger().debug("Reading lines starting at position {}", new Object[]{reader.position()});
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
long pos = reader.position(); long pos = reader.position();
@ -554,7 +487,7 @@ public class TailFile extends AbstractProcessor {
baos.writeTo(out); baos.writeTo(out);
checksum.update(baos.getUnderlyingBuffer(), 0, baos.size()); checksum.update(baos.getUnderlyingBuffer(), 0, baos.size());
if (getLogger().isTraceEnabled()) { if (getLogger().isTraceEnabled()) {
getLogger().trace("Checksum updated to {}", new Object[] {checksum.getValue()}); getLogger().trace("Checksum updated to {}", new Object[]{checksum.getValue()});
} }
baos.reset(); baos.reset();
@ -571,7 +504,7 @@ public class TailFile extends AbstractProcessor {
baos.writeTo(out); baos.writeTo(out);
checksum.update(baos.getUnderlyingBuffer(), 0, baos.size()); checksum.update(baos.getUnderlyingBuffer(), 0, baos.size());
if (getLogger().isTraceEnabled()) { if (getLogger().isTraceEnabled()) {
getLogger().trace("Checksum updated to {}", new Object[] {checksum.getValue()}); getLogger().trace("Checksum updated to {}", new Object[]{checksum.getValue()});
} }
linesRead++; linesRead++;
@ -588,7 +521,7 @@ public class TailFile extends AbstractProcessor {
} }
if (rePos < reader.position()) { if (rePos < reader.position()) {
getLogger().debug("Read {} lines; repositioning reader from {} to {}", new Object[] {linesRead, pos, rePos}); getLogger().debug("Read {} lines; repositioning reader from {} to {}", new Object[]{linesRead, pos, rePos});
reader.position(rePos); // Ensure we can re-read if necessary reader.position(rePos); // Ensure we can re-read if necessary
} }
@ -596,22 +529,25 @@ public class TailFile extends AbstractProcessor {
} }
} }
/** /**
* Returns a list of all Files that match the following criteria: * Returns a list of all Files that match the following criteria:
* *
* <ul> * <ul>
* <li>Filename matches the Rolling Filename Pattern</li> * <li>Filename matches the Rolling Filename Pattern</li>
* <li>Filename does not match the actual file being tailed</li> * <li>Filename does not match the actual file being tailed</li>
* <li>The Last Modified Time on the file is equal to or later than the given minimum timestamp</li> * <li>The Last Modified Time on the file is equal to or later than the
* given minimum timestamp</li>
* </ul> * </ul>
* *
* <p> * <p>
* The List that is returned will be ordered by file timestamp, providing the oldest file first. * The List that is returned will be ordered by file timestamp, providing
* the oldest file first.
* </p> * </p>
* *
* @param context the ProcessContext to use in order to determine Processor configuration * @param context the ProcessContext to use in order to determine Processor
* @param minTimestamp any file with a Last Modified Time before this timestamp will not be returned * configuration
* @param minTimestamp any file with a Last Modified Time before this
* timestamp will not be returned
* @return a list of all Files that have rolled over * @return a list of all Files that have rolled over
* @throws IOException if unable to perform the listing of files * @throws IOException if unable to perform the listing of files
*/ */
@ -636,7 +572,7 @@ public class TailFile extends AbstractProcessor {
if (file.lastModified() < minTimestamp) { if (file.lastModified() < minTimestamp) {
getLogger().debug("Found rolled off file {} but its last modified timestamp is before the cutoff (Last Mod = {}, Cutoff = {}) so will not consume it", getLogger().debug("Found rolled off file {} but its last modified timestamp is before the cutoff (Last Mod = {}, Cutoff = {}) so will not consume it",
new Object[] {file, lastMod, minTimestamp}); new Object[]{file, lastMod, minTimestamp});
continue; continue;
} else if (file.equals(tailFile)) { } else if (file.equals(tailFile)) {
@ -665,9 +601,8 @@ public class TailFile extends AbstractProcessor {
return rolledOffFiles; return rolledOffFiles;
} }
private Scope getStateScope(final ProcessContext context) { private Scope getStateScope(final ProcessContext context) {
final String location = context.getProperty(FILE_LOCATION).getValue(); final String location = context.getProperty(STATE_LOCATION).getValue();
if (LOCATION_REMOTE.getValue().equalsIgnoreCase(location)) { if (LOCATION_REMOTE.getValue().equalsIgnoreCase(location)) {
return Scope.CLUSTER; return Scope.CLUSTER;
} }
@ -683,31 +618,30 @@ public class TailFile extends AbstractProcessor {
try { try {
context.getStateManager().setState(state, getStateScope(context)); context.getStateManager().setState(state, getStateScope(context));
} catch (final IOException e) { } catch (final IOException e) {
getLogger().warn("Failed to store state due to {}; some data may be duplicated on restart of NiFi", new Object[] {e}); getLogger().warn("Failed to store state due to {}; some data may be duplicated on restart of NiFi", new Object[]{e});
} }
} }
private FileChannel createReader(final File file, final long position) { private FileChannel createReader(final File file, final long position) {
final FileChannel reader; final FileChannel reader;
try { try {
reader = FileChannel.open(file.toPath(), StandardOpenOption.READ); reader = FileChannel.open(file.toPath(), StandardOpenOption.READ);
} catch (final IOException ioe) { } catch (final IOException ioe) {
getLogger().warn("Unable to open file {}; will attempt to access file again after the configured Yield Duration has elapsed: {}", new Object[] {file, ioe}); getLogger().warn("Unable to open file {}; will attempt to access file again after the configured Yield Duration has elapsed: {}", new Object[]{file, ioe});
return null; return null;
} }
getLogger().debug("Created FileChannel {} for {}", new Object[] {reader, file}); getLogger().debug("Created FileChannel {} for {}", new Object[]{reader, file});
try { try {
reader.position(position); reader.position(position);
} catch (final IOException ioe) { } catch (final IOException ioe) {
getLogger().error("Failed to read from {} due to {}", new Object[] {file, ioe}); getLogger().error("Failed to read from {} due to {}", new Object[]{file, ioe});
try { try {
reader.close(); reader.close();
getLogger().debug("Closed FileChannel {}", new Object[] {reader}); getLogger().debug("Closed FileChannel {}", new Object[]{reader});
} catch (final IOException ioe2) { } catch (final IOException ioe2) {
} }
@ -722,19 +656,27 @@ public class TailFile extends AbstractProcessor {
return state; return state;
} }
/** /**
* Finds any files that have rolled over and have not yet been ingested by this Processor. Each of these files that is found will be * Finds any files that have rolled over and have not yet been ingested by
* ingested as its own FlowFile. If a file is found that has been partially ingested, the rest of the file will be ingested as a * this Processor. Each of these files that is found will be ingested as its
* single FlowFile but the data that already has been ingested will not be ingested again. * own FlowFile. If a file is found that has been partially ingested, the
* rest of the file will be ingested as a single FlowFile but the data that
* already has been ingested will not be ingested again.
* *
* @param context the ProcessContext to use in order to obtain Processor configuration. * @param context the ProcessContext to use in order to obtain Processor
* @param session the ProcessSession to use in order to interact with FlowFile creation and content. * configuration.
* @param expectedChecksum the checksum value that is expected for the oldest file from offset 0 through &lt;position&gt;. * @param session the ProcessSession to use in order to interact with
* @param timestamp the latest Last Modified Timestamp that has been consumed. Any data that was written before this data will not be ingested. * FlowFile creation and content.
* @param position the byte offset in the file being tailed, where tailing last left off. * @param expectedChecksum the checksum value that is expected for the
* oldest file from offset 0 through &lt;position&gt;.
* @param timestamp the latest Last Modified Timestamp that has been
* consumed. Any data that was written before this data will not be
* ingested.
* @param position the byte offset in the file being tailed, where tailing
* last left off.
* *
* @return <code>true</code> if the file being tailed has rolled over, <code>false</code> otherwise * @return <code>true</code> if the file being tailed has rolled over,
* <code>false</code> otherwise
*/ */
private boolean recoverRolledFiles(final ProcessContext context, final ProcessSession session, final Long expectedChecksum, final long timestamp, final long position) { private boolean recoverRolledFiles(final ProcessContext context, final ProcessSession session, final Long expectedChecksum, final long timestamp, final long position) {
try { try {
@ -745,28 +687,37 @@ public class TailFile extends AbstractProcessor {
final List<File> rolledOffFiles = getRolledOffFiles(context, timestamp); final List<File> rolledOffFiles = getRolledOffFiles(context, timestamp);
return recoverRolledFiles(context, session, rolledOffFiles, expectedChecksum, timestamp, position); return recoverRolledFiles(context, session, rolledOffFiles, expectedChecksum, timestamp, position);
} catch (final IOException e) { } catch (final IOException e) {
getLogger().error("Failed to recover files that have rolled over due to {}", new Object[] {e}); getLogger().error("Failed to recover files that have rolled over due to {}", new Object[]{e});
return false; return false;
} }
} }
/** /**
* Finds any files that have rolled over and have not yet been ingested by this Processor. Each of these files that is found will be * Finds any files that have rolled over and have not yet been ingested by
* ingested as its own FlowFile. If a file is found that has been partially ingested, the rest of the file will be ingested as a * this Processor. Each of these files that is found will be ingested as its
* single FlowFile but the data that already has been ingested will not be ingested again. * own FlowFile. If a file is found that has been partially ingested, the
* rest of the file will be ingested as a single FlowFile but the data that
* already has been ingested will not be ingested again.
* *
* @param context the ProcessContext to use in order to obtain Processor configuration. * @param context the ProcessContext to use in order to obtain Processor
* @param session the ProcessSession to use in order to interact with FlowFile creation and content. * configuration.
* @param expectedChecksum the checksum value that is expected for the oldest file from offset 0 through &lt;position&gt;. * @param session the ProcessSession to use in order to interact with
* @param timestamp the latest Last Modfiied Timestamp that has been consumed. Any data that was written before this data will not be ingested. * FlowFile creation and content.
* @param position the byte offset in the file being tailed, where tailing last left off. * @param expectedChecksum the checksum value that is expected for the
* oldest file from offset 0 through &lt;position&gt;.
* @param timestamp the latest Last Modfiied Timestamp that has been
* consumed. Any data that was written before this data will not be
* ingested.
* @param position the byte offset in the file being tailed, where tailing
* last left off.
* *
* @return <code>true</code> if the file being tailed has rolled over, false otherwise * @return <code>true</code> if the file being tailed has rolled over, false
* otherwise
*/ */
private boolean recoverRolledFiles(final ProcessContext context, final ProcessSession session, final List<File> rolledOffFiles, final Long expectedChecksum, private boolean recoverRolledFiles(final ProcessContext context, final ProcessSession session, final List<File> rolledOffFiles, final Long expectedChecksum,
final long timestamp, final long position) { final long timestamp, final long position) {
try { try {
getLogger().debug("Recovering Rolled Off Files; total number of files rolled off = {}", new Object[] {rolledOffFiles.size()}); getLogger().debug("Recovering Rolled Off Files; total number of files rolled off = {}", new Object[]{rolledOffFiles.size()});
// For first file that we find, it may or may not be the file that we were last reading from. // For first file that we find, it may or may not be the file that we were last reading from.
// As a result, we have to read up to the position we stored, while calculating the checksum. If the checksums match, // As a result, we have to read up to the position we stored, while calculating the checksum. If the checksums match,
@ -785,7 +736,7 @@ public class TailFile extends AbstractProcessor {
final long checksumResult = in.getChecksum().getValue(); final long checksumResult = in.getChecksum().getValue();
if (checksumResult == expectedChecksum) { if (checksumResult == expectedChecksum) {
getLogger().debug("Checksum for {} matched expected checksum. Will skip first {} bytes", new Object[] {firstFile, position}); getLogger().debug("Checksum for {} matched expected checksum. Will skip first {} bytes", new Object[]{firstFile, position});
// This is the same file that we were reading when we shutdown. Start reading from this point on. // This is the same file that we were reading when we shutdown. Start reading from this point on.
rolledOffFiles.remove(0); rolledOffFiles.remove(0);
@ -802,7 +753,7 @@ public class TailFile extends AbstractProcessor {
session.getProvenanceReporter().receive(flowFile, firstFile.toURI().toString(), "FlowFile contains bytes 0 through " + position + " of source file", session.getProvenanceReporter().receive(flowFile, firstFile.toURI().toString(), "FlowFile contains bytes 0 through " + position + " of source file",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos)); TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
getLogger().debug("Created {} from rolled over file {} and routed to success", new Object[] {flowFile, firstFile}); getLogger().debug("Created {} from rolled over file {} and routed to success", new Object[]{flowFile, firstFile});
// use a timestamp of lastModified() + 1 so that we do not ingest this file again. // use a timestamp of lastModified() + 1 so that we do not ingest this file again.
cleanup(); cleanup();
@ -814,7 +765,7 @@ public class TailFile extends AbstractProcessor {
} }
} else { } else {
getLogger().debug("Checksum for {} did not match expected checksum. Checksum for file was {} but expected {}. Will consume entire file", getLogger().debug("Checksum for {} did not match expected checksum. Checksum for file was {} but expected {}. Will consume entire file",
new Object[] {firstFile, checksumResult, expectedChecksum}); new Object[]{firstFile, checksumResult, expectedChecksum});
} }
} }
} }
@ -830,22 +781,23 @@ public class TailFile extends AbstractProcessor {
return rolloverOccurred; return rolloverOccurred;
} catch (final IOException e) { } catch (final IOException e) {
getLogger().error("Failed to recover files that have rolled over due to {}", new Object[] {e}); getLogger().error("Failed to recover files that have rolled over due to {}", new Object[]{e});
return false; return false;
} }
} }
/** /**
* Creates a new FlowFile that contains the entire contents of the given file and transfers that FlowFile to success. This method * Creates a new FlowFile that contains the entire contents of the given
* will commit the given session and emit an appropriate Provenance Event. * file and transfers that FlowFile to success. This method will commit the
* given session and emit an appropriate Provenance Event.
* *
* @param file the file to ingest * @param file the file to ingest
* @param context the ProcessContext * @param context the ProcessContext
* @param session the ProcessSession * @param session the ProcessSession
* @param state the current state * @param state the current state
* *
* @return the new, updated state that reflects that the given file has been ingested. * @return the new, updated state that reflects that the given file has been
* ingested.
*/ */
private TailFileState consumeFileFully(final File file, final ProcessContext context, final ProcessSession session, TailFileState state) { private TailFileState consumeFileFully(final File file, final ProcessContext context, final ProcessSession session, TailFileState state) {
FlowFile flowFile = session.create(); FlowFile flowFile = session.create();
@ -856,7 +808,7 @@ public class TailFile extends AbstractProcessor {
flowFile = session.putAttribute(flowFile, "filename", file.getName()); flowFile = session.putAttribute(flowFile, "filename", file.getName());
session.getProvenanceReporter().receive(flowFile, file.toURI().toString()); session.getProvenanceReporter().receive(flowFile, file.toURI().toString());
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
getLogger().debug("Created {} from {} and routed to success", new Object[] {flowFile, file}); getLogger().debug("Created {} from {} and routed to success", new Object[]{flowFile, file});
// use a timestamp of lastModified() + 1 so that we do not ingest this file again. // use a timestamp of lastModified() + 1 so that we do not ingest this file again.
cleanup(); cleanup();
@ -871,9 +823,11 @@ public class TailFile extends AbstractProcessor {
} }
/** /**
* A simple Java class to hold information about our state so that we can maintain this state across multiple invocations of the Processor * A simple Java class to hold information about our state so that we can
* maintain this state across multiple invocations of the Processor
*/ */
static class TailFileState { static class TailFileState {
private final String filename; // hold onto filename and not just File because we want to match that against the user-defined filename to recover from private final String filename; // hold onto filename and not just File because we want to match that against the user-defined filename to recover from
private final File file; private final File file;
private final FileChannel reader; private final FileChannel reader;
@ -883,6 +837,7 @@ public class TailFile extends AbstractProcessor {
private final ByteBuffer buffer; private final ByteBuffer buffer;
private static class StateKeys { private static class StateKeys {
public static final String FILENAME = "filename"; public static final String FILENAME = "filename";
public static final String POSITION = "position"; public static final String POSITION = "position";
public static final String TIMESTAMP = "timestamp"; public static final String TIMESTAMP = "timestamp";

View File

@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -36,6 +35,7 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
public class TestTailFile { public class TestTailFile {
private File file; private File file;
private TailFile processor; private TailFile processor;
private RandomAccessFile raf; private RandomAccessFile raf;
@ -61,14 +61,9 @@ public class TestTailFile {
file.delete(); file.delete();
assertTrue(file.createNewFile()); assertTrue(file.createNewFile());
final File stateFile = new File("target/tail-file.state");
stateFile.delete();
Assert.assertFalse(stateFile.exists());
processor = new TailFile(); processor = new TailFile();
runner = TestRunners.newTestRunner(processor); runner = TestRunners.newTestRunner(processor);
runner.setProperty(TailFile.FILENAME, "target/log.txt"); runner.setProperty(TailFile.FILENAME, "target/log.txt");
runner.setProperty(TailFile.STATE_FILE, "target/tail-file.state");
runner.assertValid(); runner.assertValid();
raf = new RandomAccessFile(file, "rw"); raf = new RandomAccessFile(file, "rw");
@ -83,7 +78,6 @@ public class TestTailFile {
processor.cleanup(); processor.cleanup();
} }
@Test @Test
public void testConsumeAfterTruncationStartAtBeginningOfFile() throws IOException, InterruptedException { public void testConsumeAfterTruncationStartAtBeginningOfFile() throws IOException, InterruptedException {
runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.txt*"); runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.txt*");
@ -148,7 +142,6 @@ public class TestTailFile {
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("HELLO\n"); runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("HELLO\n");
} }
@Test @Test
public void testStartAtBeginningOfFile() throws IOException, InterruptedException { public void testStartAtBeginningOfFile() throws IOException, InterruptedException {
runner.setProperty(TailFile.START_POSITION, TailFile.START_CURRENT_FILE.getValue()); runner.setProperty(TailFile.START_POSITION, TailFile.START_CURRENT_FILE.getValue());
@ -201,7 +194,6 @@ public class TestTailFile {
assertTrue(world); assertTrue(world);
} }
@Test @Test
public void testRemainderOfFileRecoveredAfterRestart() throws IOException { public void testRemainderOfFileRecoveredAfterRestart() throws IOException {
runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log*.txt"); runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log*.txt");
@ -227,7 +219,6 @@ public class TestTailFile {
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("new file\n"); runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("new file\n");
} }
@Test @Test
public void testRemainderOfFileRecoveredIfRolledOverWhileRunning() throws IOException { public void testRemainderOfFileRecoveredIfRolledOverWhileRunning() throws IOException {
// this mimics the case when we are reading a log file that rolls over while processor is running. // this mimics the case when we are reading a log file that rolls over while processor is running.
@ -291,7 +282,6 @@ public class TestTailFile {
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("1\n"); runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("1\n");
} }
@Test @Test
public void testMultipleRolloversAfterHavingReadAllData() throws IOException, InterruptedException { public void testMultipleRolloversAfterHavingReadAllData() throws IOException, InterruptedException {
// this mimics the case when we are reading a log file that rolls over while processor is running. // this mimics the case when we are reading a log file that rolls over while processor is running.
@ -339,7 +329,6 @@ public class TestTailFile {
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(2).assertContentEquals("1\n"); runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(2).assertContentEquals("1\n");
} }
@Test @Test
public void testMultipleRolloversAfterHavingReadAllDataWhileStillRunning() throws IOException, InterruptedException { public void testMultipleRolloversAfterHavingReadAllDataWhileStillRunning() throws IOException, InterruptedException {
// this mimics the case when we are reading a log file that rolls over while processor is running. // this mimics the case when we are reading a log file that rolls over while processor is running.
@ -387,7 +376,6 @@ public class TestTailFile {
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(2).assertContentEquals("1\n"); runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(2).assertContentEquals("1\n");
} }
@Test @Test
public void testMultipleRolloversWithLongerFileLength() throws IOException, InterruptedException { public void testMultipleRolloversWithLongerFileLength() throws IOException, InterruptedException {
// this mimics the case when we are reading a log file that rolls over while processor is running. // this mimics the case when we are reading a log file that rolls over while processor is running.
@ -433,7 +421,6 @@ public class TestTailFile {
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(2).assertContentEquals("This is a longer line than the other files had.\n"); runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(2).assertContentEquals("This is a longer line than the other files had.\n");
} }
@Test @Test
public void testConsumeWhenNewLineFound() throws IOException, InterruptedException { public void testConsumeWhenNewLineFound() throws IOException, InterruptedException {
runner.run(); runner.run();