NIFI-13896 Removed legacy state migration from TailFile (#9425)

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Lehel Boér 2024-10-21 15:51:09 -05:00 committed by GitHub
parent d693293c43
commit a44fb528f6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 51 additions and 178 deletions

View File

@ -415,41 +415,20 @@ public class TailFile extends AbstractProcessor {
final Scope scope = getStateScope(context);
final StateMap stateMap = context.getStateManager().getState(scope);
final String startPosition = context.getProperty(START_POSITION).getValue();
if (stateMap.getStateVersion().isEmpty() || stateMap.toMap().isEmpty()) {
//state has been cleared or never stored so recover as 'empty state'
initStates(filesToTail, Collections.emptyMap(), true, startPosition);
recoverState(context, filesToTail, Collections.emptyMap());
initStates(filesToTail, Collections.emptyMap(), true);
recoverState(filesToTail, Collections.emptyMap());
return;
}
Map<String, String> statesMap = stateMap.toMap();
final Map<String, String> statesMap = stateMap.toMap();
if (statesMap.containsKey(TailFileState.StateKeys.FILENAME)
&& statesMap.keySet().stream().noneMatch(key -> key.startsWith(MAP_PREFIX))) {
// If statesMap contains "filename" key without "file.0." prefix,
// and there's no key with "file." prefix, then
// it indicates that the statesMap is created with earlier version of NiFi.
// In this case, we need to migrate the state by adding prefix indexed with 0.
final Map<String, String> migratedStatesMap = new HashMap<>(statesMap.size());
for (String key : statesMap.keySet()) {
migratedStatesMap.put(MAP_PREFIX + "0." + key, statesMap.get(key));
initStates(filesToTail, statesMap, false);
recoverState(filesToTail, statesMap);
}
// LENGTH is added from NiFi 1.1.0. Set the value with using the last position so that we can use existing state
// to avoid sending duplicated log data after updating NiFi.
migratedStatesMap.put(MAP_PREFIX + "0." + TailFileState.StateKeys.LENGTH, statesMap.get(TailFileState.StateKeys.POSITION));
statesMap = Map.copyOf(migratedStatesMap);
getLogger().info("statesMap has been migrated. {}", migratedStatesMap);
}
initStates(filesToTail, statesMap, false, startPosition);
recoverState(context, filesToTail, statesMap);
}
private void initStates(final List<String> filesToTail, final Map<String, String> statesMap, final boolean isCleared, final String startPosition) {
private void initStates(final List<String> filesToTail, final Map<String, String> statesMap, final boolean isCleared) {
int fileIndex = 0;
if (isCleared) {
@ -460,29 +439,33 @@ public class TailFile extends AbstractProcessor {
// put back the files we already know about in 'states' object before
// doing the recovery
if (states.isEmpty() && !statesMap.isEmpty()) {
for (String key : statesMap.keySet()) {
if (key.endsWith(TailFileState.StateKeys.FILENAME) && filesToTail.contains(statesMap.get(key))) {
for (Entry<String, String> entry : statesMap.entrySet()) {
final String key = entry.getKey();
final String value = entry.getValue();
if (key.endsWith(TailFileState.StateKeys.FILENAME) && filesToTail.contains(value)) {
int index = Integer.parseInt(key.split("\\.")[1]);
states.put(statesMap.get(key), new TailFileObject(index, statesMap, preAllocatedBufferSize));
states.put(value, new TailFileObject(index, statesMap, preAllocatedBufferSize));
}
}
}
// first, we remove the files that are no longer present
final List<String> toBeRemoved = new ArrayList<>();
for (String file : states.keySet()) {
if (!filesToTail.contains(file)) {
toBeRemoved.add(file);
cleanReader(states.get(file));
for (Entry<String, TailFileObject> entry : states.entrySet()) {
final String filePath = entry.getKey();
final TailFileObject tailFileObject = entry.getValue();
if (!filesToTail.contains(filePath)) {
toBeRemoved.add(filePath);
cleanReader(tailFileObject);
}
}
states.keySet().removeAll(toBeRemoved);
toBeRemoved.forEach(states.keySet()::remove);
// then we need to get the highest ID used so far to be sure
// we don't mix different files in case we add new files to tail
for (String file : states.keySet()) {
if (fileIndex <= states.get(file).getFilenameIndex()) {
fileIndex = states.get(file).getFilenameIndex() + 1;
for (TailFileObject tfo : states.values()) {
if (fileIndex <= tfo.getFilenameIndex()) {
fileIndex = tfo.getFilenameIndex() + 1;
}
}
@ -498,9 +481,9 @@ public class TailFile extends AbstractProcessor {
}
}
private void recoverState(final ProcessContext context, final List<String> filesToTail, final Map<String, String> map) throws IOException {
private void recoverState(final List<String> filesToTail, final Map<String, String> map) throws IOException {
for (String file : filesToTail) {
recoverState(context, map, file);
recoverState(map, file);
}
}
@ -549,7 +532,6 @@ public class TailFile extends AbstractProcessor {
* checksum, so that we are ready to proceed with the
* {@link #onTrigger(ProcessContext, ProcessSession)} call.
*
* @param context the ProcessContext
* @param stateValues the values that were recovered from state that was
* previously stored. This Map should be populated with the keys defined in
* {@link TailFileState.StateKeys}.
@ -557,23 +539,14 @@ public class TailFile extends AbstractProcessor {
* @throws IOException if unable to seek to the appropriate location in the
* tailed file.
*/
private void recoverState(final ProcessContext context, final Map<String, String> stateValues, final String filePath) throws IOException {
private void recoverState(final Map<String, String> stateValues, final String filePath) throws IOException {
final TailFileObject tailFileObject = states.get(filePath);
final String prefix = MAP_PREFIX + tailFileObject.getFilenameIndex() + '.';
final String prefix = MAP_PREFIX + states.get(filePath).getFilenameIndex() + '.';
if (!stateValues.containsKey(prefix + TailFileState.StateKeys.FILENAME)) {
resetState(filePath);
return;
}
if (!stateValues.containsKey(prefix + TailFileState.StateKeys.POSITION)) {
resetState(filePath);
return;
}
if (!stateValues.containsKey(prefix + TailFileState.StateKeys.TIMESTAMP)) {
resetState(filePath);
return;
}
if (!stateValues.containsKey(prefix + TailFileState.StateKeys.LENGTH)) {
if (!stateValues.containsKey(prefix + TailFileState.StateKeys.FILENAME)
|| !stateValues.containsKey(prefix + TailFileState.StateKeys.POSITION)
|| !stateValues.containsKey(prefix + TailFileState.StateKeys.TIMESTAMP)
|| !stateValues.containsKey(prefix + TailFileState.StateKeys.LENGTH)) {
resetState(filePath);
return;
}
@ -589,7 +562,7 @@ public class TailFile extends AbstractProcessor {
File tailFile = null;
if (checksumPresent && filePath.equals(storedStateFilename)) {
states.get(filePath).setExpectedRecoveryChecksum(Long.parseLong(checksumValue));
tailFileObject.setExpectedRecoveryChecksum(Long.parseLong(checksumValue));
// We have an expected checksum and the currently configured filename is the same as the state file.
// We need to check if the existing file is the same as the one referred to in the state file based on
@ -597,11 +570,11 @@ public class TailFile extends AbstractProcessor {
final Checksum checksum = new CRC32();
final File existingTailFile = new File(storedStateFilename);
if (existingTailFile.length() >= position) {
try (final InputStream tailFileIs = new FileInputStream(existingTailFile);
try (final InputStream tailFileIs = Files.newInputStream(existingTailFile.toPath());
final CheckedInputStream in = new CheckedInputStream(tailFileIs, checksum)) {
try {
StreamUtils.copy(in, new NullOutputStream(), states.get(filePath).getState().getPosition());
StreamUtils.copy(in, new NullOutputStream(), tailFileObject.getState().getPosition());
} catch (final EOFException eof) {
// If we hit EOFException, then the file is smaller than we expected. Assume rollover.
getLogger().debug("When recovering state, file being tailed has less data than was stored in the state. "
@ -609,7 +582,7 @@ public class TailFile extends AbstractProcessor {
}
final long checksumResult = in.getChecksum().getValue();
if (checksumResult == states.get(filePath).getExpectedRecoveryChecksum()) {
if (checksumResult == tailFileObject.getExpectedRecoveryChecksum()) {
// Checksums match. This means that we want to resume reading from where we left off.
// So we will populate the reader object so that it will be used in onTrigger. If the
// checksums do not match, then we will leave the reader object null, so that the next
@ -632,12 +605,12 @@ public class TailFile extends AbstractProcessor {
+ "this indicates that the file has rotated. Will begin tailing current file from beginning.", existingTailFile.length(), position);
}
states.get(filePath).setState(new TailFileState(filePath, tailFile, reader, position, timestamp, length, checksum, ByteBuffer.allocate(preAllocatedBufferSize)));
tailFileObject.setState(new TailFileState(filePath, tailFile, reader, position, timestamp, length, checksum, ByteBuffer.allocate(preAllocatedBufferSize)));
} else {
resetState(filePath);
}
getLogger().debug("Recovered state {}", states.get(filePath).getState());
getLogger().debug("Recovered state {}", tailFileObject.getState());
}
private void resetState(final String filePath) {
@ -685,7 +658,7 @@ public class TailFile extends AbstractProcessor {
final List<String> filesToTail = lookup(context);
final Scope scope = getStateScope(context);
final StateMap stateMap = session.getState(scope);
initStates(filesToTail, stateMap.toMap(), false, context.getProperty(START_POSITION).getValue());
initStates(filesToTail, stateMap.toMap(), false);
} catch (IOException e) {
getLogger().error("Exception raised while attempting to recover state about where the tailing last left off", e);
context.yield();
@ -734,7 +707,7 @@ public class TailFile extends AbstractProcessor {
StateMap sessionStateMap = session.getState(scope);
Map<String, String> sessionStates = new HashMap<>(sessionStateMap.toMap());
List<String> keysToRemove = collectKeysToBeRemoved(sessionStates);
sessionStates.keySet().removeAll(keysToRemove);
keysToRemove.forEach(sessionStates.keySet()::remove);
getLogger().debug("Removed {} references to nonexistent files from session's state map",
keysToRemove.size());
session.setState(sessionStates, scope);
@ -747,13 +720,13 @@ public class TailFile extends AbstractProcessor {
private List<String> collectKeysToBeRemoved(Map<String, String> sessionStates) {
List<String> keysToRemove = new ArrayList<>();
List<String> filesToRemove = sessionStates.entrySet().stream()
.filter(entry -> entry.getKey().endsWith("filename")
.filter(entry -> entry.getKey().endsWith(StateKeys.FILENAME)
&& !states.containsKey(entry.getValue()))
.map(Entry::getKey)
.toList();
for (String key : filesToRemove) {
final String prefix = StringUtils.substringBefore(key, "filename");
final String prefix = StringUtils.substringBefore(key, StateKeys.FILENAME);
keysToRemove.add(prefix + StateKeys.FILENAME);
keysToRemove.add(prefix + StateKeys.LENGTH);
keysToRemove.add(prefix + StateKeys.POSITION);
@ -1180,33 +1153,19 @@ public class TailFile extends AbstractProcessor {
final File file = path.toFile();
final long lastMod = file.lastModified();
if (file.lastModified() < minTimestamp) {
if (lastMod >= minTimestamp && !file.equals(tailFile)) {
rolledOffFiles.add(file);
} else {
getLogger().debug("Found rolled off file {} but its last modified timestamp is before the cutoff (Last Mod = {}, Cutoff = {}) so will not consume it",
file, lastMod, minTimestamp);
continue;
} else if (file.equals(tailFile)) {
continue;
}
rolledOffFiles.add(file);
}
}
// Sort files based on last modified timestamp. If same timestamp, use filename as a secondary sort, as often
// files that are rolled over are given a naming scheme that is lexicographically sort in the same order as the
// timestamp, such as yyyy-MM-dd-HH-mm-ss
rolledOffFiles.sort(new Comparator<>() {
@Override
public int compare(final File o1, final File o2) {
final int lastModifiedComp = Long.compare(o1.lastModified(), o2.lastModified());
if (lastModifiedComp != 0) {
return lastModifiedComp;
}
return o1.getName().compareTo(o2.getName());
}
});
rolledOffFiles.sort(Comparator.comparingLong(File::lastModified).thenComparing(File::getName));
return rolledOffFiles;
}
@ -1228,21 +1187,8 @@ public class TailFile extends AbstractProcessor {
try {
final Scope scope = getStateScope(context);
final StateMap oldState = session == null ? context.getStateManager().getState(scope) : session.getState(scope);
Map<String, String> updatedState = new HashMap<>();
for (String key : oldState.toMap().keySet()) {
// These states are stored by older version of NiFi, and won't be used anymore.
// New states have 'file.<index>.' prefix.
if (TailFileState.StateKeys.CHECKSUM.equals(key)
|| TailFileState.StateKeys.FILENAME.equals(key)
|| TailFileState.StateKeys.POSITION.equals(key)
|| TailFileState.StateKeys.TIMESTAMP.equals(key)) {
getLogger().info("Removed state {}={} stored by older version of NiFi.", key, oldState.get(key));
continue;
}
updatedState.put(key, oldState.get(key));
}
Map<String, String> updatedState = new HashMap<>(oldState.toMap());
updatedState.putAll(state);
if (session == null) {
@ -1251,7 +1197,7 @@ public class TailFile extends AbstractProcessor {
session.setState(updatedState, scope);
}
} catch (final IOException e) {
getLogger().warn("Some data may be duplicated on restart of NiFi since failed to store state", e);
getLogger().warn("Failed to store state due to {}; some data may be duplicated on restart of NiFi", e);
}
}
@ -1403,7 +1349,7 @@ public class TailFile extends AbstractProcessor {
final long millisSinceModified = getCurrentTimeMs() - newestFile.lastModified();
if (millisSinceModified < postRolloverTailMillis) {
getLogger().debug("Rolled over file {} (size={}, lastModified={}) was modified {} millis ago, which isn't long enough to consume file fully without taking line endings into " +
"account. Will do nothing will file for now.", newestFile, newestFile.length(), newestFile.lastModified(), millisSinceModified);
"account. Will do nothing for now.", newestFile, newestFile.length(), newestFile.lastModified(), millisSinceModified);
return true;
}
@ -1718,7 +1664,7 @@ public class TailFile extends AbstractProcessor {
public Map<String, String> toStateMap(int index) {
final String prefix = MAP_PREFIX + index + '.';
final Map<String, String> map = new HashMap<>(4);
final Map<String, String> map = HashMap.newHashMap(4);
map.put(prefix + StateKeys.FILENAME, filename);
map.put(prefix + StateKeys.POSITION, String.valueOf(position));
map.put(prefix + StateKeys.LENGTH, String.valueOf(length));
@ -1741,7 +1687,7 @@ public class TailFile extends AbstractProcessor {
}
@Override
public Throwable fillInStackTrace() {
public synchronized Throwable fillInStackTrace() {
return this;
}
}

View File

@ -24,7 +24,6 @@ import java.util.Map.Entry;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.processors.standard.TailFile.TailFileState;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
@ -36,7 +35,6 @@ import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
@ -44,7 +42,6 @@ import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.RandomAccessFile;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -57,7 +54,6 @@ import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@ -1295,75 +1291,6 @@ public class TestTailFile {
runner.clearTransferState();
}
@DisabledOnOs(OS.WINDOWS)
@Test
public void testMigrateFrom100To110() throws IOException {
runner.setProperty(TailFile.FILENAME, "target/existing-log.txt");
final MockStateManager stateManager = runner.getStateManager();
// Before NiFi 1.1.0, TailFile only handles single file
// and state key doesn't have index in it.
final Map<String, String> state = new HashMap<>();
state.put("filename", "target/existing-log.txt");
// Simulate that it has been tailed up to the 2nd line.
state.put("checksum", "2279929157");
state.put("position", "14");
state.put("timestamp", "1480639134000");
stateManager.setState(state, Scope.LOCAL);
runner.run();
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).iterator().next();
final ByteArrayOutputStream bos = new ByteArrayOutputStream();
try (final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(bos))) {
writer.write("Line 3");
writer.newLine();
}
flowFile.assertContentEquals(bos.toByteArray());
// The old states should be replaced with new ones.
final StateMap updatedState = stateManager.getState(Scope.LOCAL);
assertNull(updatedState.get("filename"));
assertNull(updatedState.get("checksum"));
assertNull(updatedState.get("position"));
assertNull(updatedState.get("timestamp"));
assertEquals("target/existing-log.txt", updatedState.get("file.0.filename"));
assertEquals("3380848603", updatedState.get("file.0.checksum"));
assertEquals("21", updatedState.get("file.0.position"));
assertNotNull(updatedState.get("file.0.timestamp"));
// When it runs again, the state is already migrated, so it shouldn't emit any flow files.
runner.clearTransferState();
runner.run();
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
}
@DisabledOnOs(OS.WINDOWS)
@Test
public void testMigrateFrom100To110FileNotFound() throws IOException {
runner.setProperty(TailFile.FILENAME, "target/not-existing-log.txt");
final MockStateManager stateManager = runner.getStateManager();
// Before NiFi 1.1.0, TailFile only handles single file
// and state key doesn't have index in it.
final Map<String, String> state = new HashMap<>();
state.put("filename", "target/not-existing-log.txt");
// Simulate that it has been tailed up to the 2nd line.
state.put("checksum", "2279929157");
state.put("position", "14");
state.put("timestamp", "1480639134000");
stateManager.setState(state, Scope.LOCAL);
runner.run();
runner.assertTransferCount(TailFile.REL_SUCCESS, 0);
}
private void assertNumberOfStateMapEntries(int expectedNumberOfLogFiles) throws IOException {
final int numberOfStateKeysPerFile = 6;
StateMap states = runner.getStateManager().getState(Scope.LOCAL);