diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java index 1660e06dba..1cbdc66365 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java @@ -415,41 +415,20 @@ public class TailFile extends AbstractProcessor { final Scope scope = getStateScope(context); final StateMap stateMap = context.getStateManager().getState(scope); - final String startPosition = context.getProperty(START_POSITION).getValue(); - if (stateMap.getStateVersion().isEmpty() || stateMap.toMap().isEmpty()) { //state has been cleared or never stored so recover as 'empty state' - initStates(filesToTail, Collections.emptyMap(), true, startPosition); - recoverState(context, filesToTail, Collections.emptyMap()); + initStates(filesToTail, Collections.emptyMap(), true); + recoverState(filesToTail, Collections.emptyMap()); return; } - Map statesMap = stateMap.toMap(); + final Map statesMap = stateMap.toMap(); - if (statesMap.containsKey(TailFileState.StateKeys.FILENAME) - && statesMap.keySet().stream().noneMatch(key -> key.startsWith(MAP_PREFIX))) { - // If statesMap contains "filename" key without "file.0." prefix, - // and there's no key with "file." prefix, then - // it indicates that the statesMap is created with earlier version of NiFi. - // In this case, we need to migrate the state by adding prefix indexed with 0. - final Map migratedStatesMap = new HashMap<>(statesMap.size()); - for (String key : statesMap.keySet()) { - migratedStatesMap.put(MAP_PREFIX + "0." + key, statesMap.get(key)); - } - - // LENGTH is added from NiFi 1.1.0. Set the value with using the last position so that we can use existing state - // to avoid sending duplicated log data after updating NiFi. - migratedStatesMap.put(MAP_PREFIX + "0." + TailFileState.StateKeys.LENGTH, statesMap.get(TailFileState.StateKeys.POSITION)); - statesMap = Map.copyOf(migratedStatesMap); - - getLogger().info("statesMap has been migrated. {}", migratedStatesMap); - } - - initStates(filesToTail, statesMap, false, startPosition); - recoverState(context, filesToTail, statesMap); + initStates(filesToTail, statesMap, false); + recoverState(filesToTail, statesMap); } - private void initStates(final List filesToTail, final Map statesMap, final boolean isCleared, final String startPosition) { + private void initStates(final List filesToTail, final Map statesMap, final boolean isCleared) { int fileIndex = 0; if (isCleared) { @@ -460,29 +439,33 @@ public class TailFile extends AbstractProcessor { // put back the files we already know about in 'states' object before // doing the recovery if (states.isEmpty() && !statesMap.isEmpty()) { - for (String key : statesMap.keySet()) { - if (key.endsWith(TailFileState.StateKeys.FILENAME) && filesToTail.contains(statesMap.get(key))) { + for (Entry entry : statesMap.entrySet()) { + final String key = entry.getKey(); + final String value = entry.getValue(); + if (key.endsWith(TailFileState.StateKeys.FILENAME) && filesToTail.contains(value)) { int index = Integer.parseInt(key.split("\\.")[1]); - states.put(statesMap.get(key), new TailFileObject(index, statesMap, preAllocatedBufferSize)); + states.put(value, new TailFileObject(index, statesMap, preAllocatedBufferSize)); } } } // first, we remove the files that are no longer present final List toBeRemoved = new ArrayList<>(); - for (String file : states.keySet()) { - if (!filesToTail.contains(file)) { - toBeRemoved.add(file); - cleanReader(states.get(file)); + for (Entry entry : states.entrySet()) { + final String filePath = entry.getKey(); + final TailFileObject tailFileObject = entry.getValue(); + if (!filesToTail.contains(filePath)) { + toBeRemoved.add(filePath); + cleanReader(tailFileObject); } } - states.keySet().removeAll(toBeRemoved); + toBeRemoved.forEach(states.keySet()::remove); // then we need to get the highest ID used so far to be sure // we don't mix different files in case we add new files to tail - for (String file : states.keySet()) { - if (fileIndex <= states.get(file).getFilenameIndex()) { - fileIndex = states.get(file).getFilenameIndex() + 1; + for (TailFileObject tfo : states.values()) { + if (fileIndex <= tfo.getFilenameIndex()) { + fileIndex = tfo.getFilenameIndex() + 1; } } @@ -498,9 +481,9 @@ public class TailFile extends AbstractProcessor { } } - private void recoverState(final ProcessContext context, final List filesToTail, final Map map) throws IOException { + private void recoverState(final List filesToTail, final Map map) throws IOException { for (String file : filesToTail) { - recoverState(context, map, file); + recoverState(map, file); } } @@ -549,7 +532,6 @@ public class TailFile extends AbstractProcessor { * checksum, so that we are ready to proceed with the * {@link #onTrigger(ProcessContext, ProcessSession)} call. * - * @param context the ProcessContext * @param stateValues the values that were recovered from state that was * previously stored. This Map should be populated with the keys defined in * {@link TailFileState.StateKeys}. @@ -557,23 +539,14 @@ public class TailFile extends AbstractProcessor { * @throws IOException if unable to seek to the appropriate location in the * tailed file. */ - private void recoverState(final ProcessContext context, final Map stateValues, final String filePath) throws IOException { + private void recoverState(final Map stateValues, final String filePath) throws IOException { + final TailFileObject tailFileObject = states.get(filePath); + final String prefix = MAP_PREFIX + tailFileObject.getFilenameIndex() + '.'; - final String prefix = MAP_PREFIX + states.get(filePath).getFilenameIndex() + '.'; - - if (!stateValues.containsKey(prefix + TailFileState.StateKeys.FILENAME)) { - resetState(filePath); - return; - } - if (!stateValues.containsKey(prefix + TailFileState.StateKeys.POSITION)) { - resetState(filePath); - return; - } - if (!stateValues.containsKey(prefix + TailFileState.StateKeys.TIMESTAMP)) { - resetState(filePath); - return; - } - if (!stateValues.containsKey(prefix + TailFileState.StateKeys.LENGTH)) { + if (!stateValues.containsKey(prefix + TailFileState.StateKeys.FILENAME) + || !stateValues.containsKey(prefix + TailFileState.StateKeys.POSITION) + || !stateValues.containsKey(prefix + TailFileState.StateKeys.TIMESTAMP) + || !stateValues.containsKey(prefix + TailFileState.StateKeys.LENGTH)) { resetState(filePath); return; } @@ -589,7 +562,7 @@ public class TailFile extends AbstractProcessor { File tailFile = null; if (checksumPresent && filePath.equals(storedStateFilename)) { - states.get(filePath).setExpectedRecoveryChecksum(Long.parseLong(checksumValue)); + tailFileObject.setExpectedRecoveryChecksum(Long.parseLong(checksumValue)); // We have an expected checksum and the currently configured filename is the same as the state file. // We need to check if the existing file is the same as the one referred to in the state file based on @@ -597,19 +570,19 @@ public class TailFile extends AbstractProcessor { final Checksum checksum = new CRC32(); final File existingTailFile = new File(storedStateFilename); if (existingTailFile.length() >= position) { - try (final InputStream tailFileIs = new FileInputStream(existingTailFile); - final CheckedInputStream in = new CheckedInputStream(tailFileIs, checksum)) { + try (final InputStream tailFileIs = Files.newInputStream(existingTailFile.toPath()); + final CheckedInputStream in = new CheckedInputStream(tailFileIs, checksum)) { try { - StreamUtils.copy(in, new NullOutputStream(), states.get(filePath).getState().getPosition()); + StreamUtils.copy(in, new NullOutputStream(), tailFileObject.getState().getPosition()); } catch (final EOFException eof) { // If we hit EOFException, then the file is smaller than we expected. Assume rollover. getLogger().debug("When recovering state, file being tailed has less data than was stored in the state. " - + "Assuming rollover. Will begin tailing current file from beginning."); + + "Assuming rollover. Will begin tailing current file from beginning."); } final long checksumResult = in.getChecksum().getValue(); - if (checksumResult == states.get(filePath).getExpectedRecoveryChecksum()) { + if (checksumResult == tailFileObject.getExpectedRecoveryChecksum()) { // Checksums match. This means that we want to resume reading from where we left off. // So we will populate the reader object so that it will be used in onTrigger. If the // checksums do not match, then we will leave the reader object null, so that the next @@ -632,12 +605,12 @@ public class TailFile extends AbstractProcessor { + "this indicates that the file has rotated. Will begin tailing current file from beginning.", existingTailFile.length(), position); } - states.get(filePath).setState(new TailFileState(filePath, tailFile, reader, position, timestamp, length, checksum, ByteBuffer.allocate(preAllocatedBufferSize))); + tailFileObject.setState(new TailFileState(filePath, tailFile, reader, position, timestamp, length, checksum, ByteBuffer.allocate(preAllocatedBufferSize))); } else { resetState(filePath); } - getLogger().debug("Recovered state {}", states.get(filePath).getState()); + getLogger().debug("Recovered state {}", tailFileObject.getState()); } private void resetState(final String filePath) { @@ -685,7 +658,7 @@ public class TailFile extends AbstractProcessor { final List filesToTail = lookup(context); final Scope scope = getStateScope(context); final StateMap stateMap = session.getState(scope); - initStates(filesToTail, stateMap.toMap(), false, context.getProperty(START_POSITION).getValue()); + initStates(filesToTail, stateMap.toMap(), false); } catch (IOException e) { getLogger().error("Exception raised while attempting to recover state about where the tailing last left off", e); context.yield(); @@ -734,7 +707,7 @@ public class TailFile extends AbstractProcessor { StateMap sessionStateMap = session.getState(scope); Map sessionStates = new HashMap<>(sessionStateMap.toMap()); List keysToRemove = collectKeysToBeRemoved(sessionStates); - sessionStates.keySet().removeAll(keysToRemove); + keysToRemove.forEach(sessionStates.keySet()::remove); getLogger().debug("Removed {} references to nonexistent files from session's state map", keysToRemove.size()); session.setState(sessionStates, scope); @@ -747,13 +720,13 @@ public class TailFile extends AbstractProcessor { private List collectKeysToBeRemoved(Map sessionStates) { List keysToRemove = new ArrayList<>(); List filesToRemove = sessionStates.entrySet().stream() - .filter(entry -> entry.getKey().endsWith("filename") + .filter(entry -> entry.getKey().endsWith(StateKeys.FILENAME) && !states.containsKey(entry.getValue())) .map(Entry::getKey) .toList(); for (String key : filesToRemove) { - final String prefix = StringUtils.substringBefore(key, "filename"); + final String prefix = StringUtils.substringBefore(key, StateKeys.FILENAME); keysToRemove.add(prefix + StateKeys.FILENAME); keysToRemove.add(prefix + StateKeys.LENGTH); keysToRemove.add(prefix + StateKeys.POSITION); @@ -1180,33 +1153,19 @@ public class TailFile extends AbstractProcessor { final File file = path.toFile(); final long lastMod = file.lastModified(); - if (file.lastModified() < minTimestamp) { + if (lastMod >= minTimestamp && !file.equals(tailFile)) { + rolledOffFiles.add(file); + } else { getLogger().debug("Found rolled off file {} but its last modified timestamp is before the cutoff (Last Mod = {}, Cutoff = {}) so will not consume it", file, lastMod, minTimestamp); - - continue; - } else if (file.equals(tailFile)) { - continue; } - - rolledOffFiles.add(file); } } // Sort files based on last modified timestamp. If same timestamp, use filename as a secondary sort, as often // files that are rolled over are given a naming scheme that is lexicographically sort in the same order as the // timestamp, such as yyyy-MM-dd-HH-mm-ss - rolledOffFiles.sort(new Comparator<>() { - @Override - public int compare(final File o1, final File o2) { - final int lastModifiedComp = Long.compare(o1.lastModified(), o2.lastModified()); - if (lastModifiedComp != 0) { - return lastModifiedComp; - } - - return o1.getName().compareTo(o2.getName()); - } - }); + rolledOffFiles.sort(Comparator.comparingLong(File::lastModified).thenComparing(File::getName)); return rolledOffFiles; } @@ -1228,21 +1187,8 @@ public class TailFile extends AbstractProcessor { try { final Scope scope = getStateScope(context); final StateMap oldState = session == null ? context.getStateManager().getState(scope) : session.getState(scope); - Map updatedState = new HashMap<>(); - - for (String key : oldState.toMap().keySet()) { - // These states are stored by older version of NiFi, and won't be used anymore. - // New states have 'file..' prefix. - if (TailFileState.StateKeys.CHECKSUM.equals(key) - || TailFileState.StateKeys.FILENAME.equals(key) - || TailFileState.StateKeys.POSITION.equals(key) - || TailFileState.StateKeys.TIMESTAMP.equals(key)) { - getLogger().info("Removed state {}={} stored by older version of NiFi.", key, oldState.get(key)); - continue; - } - updatedState.put(key, oldState.get(key)); - } + Map updatedState = new HashMap<>(oldState.toMap()); updatedState.putAll(state); if (session == null) { @@ -1251,7 +1197,7 @@ public class TailFile extends AbstractProcessor { session.setState(updatedState, scope); } } catch (final IOException e) { - getLogger().warn("Some data may be duplicated on restart of NiFi since failed to store state", e); + getLogger().warn("Failed to store state due to {}; some data may be duplicated on restart of NiFi", e); } } @@ -1403,7 +1349,7 @@ public class TailFile extends AbstractProcessor { final long millisSinceModified = getCurrentTimeMs() - newestFile.lastModified(); if (millisSinceModified < postRolloverTailMillis) { getLogger().debug("Rolled over file {} (size={}, lastModified={}) was modified {} millis ago, which isn't long enough to consume file fully without taking line endings into " + - "account. Will do nothing will file for now.", newestFile, newestFile.length(), newestFile.lastModified(), millisSinceModified); + "account. Will do nothing for now.", newestFile, newestFile.length(), newestFile.lastModified(), millisSinceModified); return true; } @@ -1718,7 +1664,7 @@ public class TailFile extends AbstractProcessor { public Map toStateMap(int index) { final String prefix = MAP_PREFIX + index + '.'; - final Map map = new HashMap<>(4); + final Map map = HashMap.newHashMap(4); map.put(prefix + StateKeys.FILENAME, filename); map.put(prefix + StateKeys.POSITION, String.valueOf(position)); map.put(prefix + StateKeys.LENGTH, String.valueOf(length)); @@ -1741,7 +1687,7 @@ public class TailFile extends AbstractProcessor { } @Override - public Throwable fillInStackTrace() { + public synchronized Throwable fillInStackTrace() { return this; } } diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java index 1354e2dea2..7f3ddb59f4 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java @@ -24,7 +24,6 @@ import java.util.Map.Entry; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateMap; import org.apache.nifi.processors.standard.TailFile.TailFileState; -import org.apache.nifi.state.MockStateManager; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.TestRunner; @@ -36,7 +35,6 @@ import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; import java.io.BufferedWriter; -import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.FilenameFilter; @@ -44,7 +42,6 @@ import java.io.IOException; import java.io.OutputStreamWriter; import java.io.RandomAccessFile; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -57,7 +54,6 @@ import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -1295,75 +1291,6 @@ public class TestTailFile { runner.clearTransferState(); } - @DisabledOnOs(OS.WINDOWS) - @Test - public void testMigrateFrom100To110() throws IOException { - runner.setProperty(TailFile.FILENAME, "target/existing-log.txt"); - - final MockStateManager stateManager = runner.getStateManager(); - - // Before NiFi 1.1.0, TailFile only handles single file - // and state key doesn't have index in it. - final Map state = new HashMap<>(); - state.put("filename", "target/existing-log.txt"); - // Simulate that it has been tailed up to the 2nd line. - state.put("checksum", "2279929157"); - state.put("position", "14"); - state.put("timestamp", "1480639134000"); - stateManager.setState(state, Scope.LOCAL); - - runner.run(); - - runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1); - final MockFlowFile flowFile = runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).iterator().next(); - - final ByteArrayOutputStream bos = new ByteArrayOutputStream(); - try (final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(bos))) { - writer.write("Line 3"); - writer.newLine(); - } - - flowFile.assertContentEquals(bos.toByteArray()); - - // The old states should be replaced with new ones. - final StateMap updatedState = stateManager.getState(Scope.LOCAL); - assertNull(updatedState.get("filename")); - assertNull(updatedState.get("checksum")); - assertNull(updatedState.get("position")); - assertNull(updatedState.get("timestamp")); - assertEquals("target/existing-log.txt", updatedState.get("file.0.filename")); - assertEquals("3380848603", updatedState.get("file.0.checksum")); - assertEquals("21", updatedState.get("file.0.position")); - assertNotNull(updatedState.get("file.0.timestamp")); - - // When it runs again, the state is already migrated, so it shouldn't emit any flow files. - runner.clearTransferState(); - runner.run(); - runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0); - } - - @DisabledOnOs(OS.WINDOWS) - @Test - public void testMigrateFrom100To110FileNotFound() throws IOException { - runner.setProperty(TailFile.FILENAME, "target/not-existing-log.txt"); - - final MockStateManager stateManager = runner.getStateManager(); - - // Before NiFi 1.1.0, TailFile only handles single file - // and state key doesn't have index in it. - final Map state = new HashMap<>(); - state.put("filename", "target/not-existing-log.txt"); - // Simulate that it has been tailed up to the 2nd line. - state.put("checksum", "2279929157"); - state.put("position", "14"); - state.put("timestamp", "1480639134000"); - stateManager.setState(state, Scope.LOCAL); - - runner.run(); - - runner.assertTransferCount(TailFile.REL_SUCCESS, 0); - } - private void assertNumberOfStateMapEntries(int expectedNumberOfLogFiles) throws IOException { final int numberOfStateKeysPerFile = 6; StateMap states = runner.getStateManager().getState(Scope.LOCAL);