diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java index b9ff2497c9..ac6e68c793 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java @@ -78,13 +78,11 @@ public interface FlowFileRepository extends Closeable { * * @param queueProvider the provider of FlowFile Queues into which the * FlowFiles should be enqueued - * @param minimumSequenceNumber specifies the minimum value that should be - * returned by a call to {@link #getNextFlowFileSequence()} * * @return index of highest flow file identifier * @throws IOException if load fails */ - long loadFlowFiles(QueueProvider queueProvider, long minimumSequenceNumber) throws IOException; + long loadFlowFiles(QueueProvider queueProvider) throws IOException; /** * @return true if the Repository is volatile (i.e., its data @@ -104,6 +102,13 @@ public interface FlowFileRepository extends Closeable { */ long getMaxFlowFileIdentifier() throws IOException; + /** + * Notifies the FlowFile Repository that the given identifier has been identified as the maximum value that + * has been encountered for an 'external' (swapped out) FlowFile. + * @param maxId the max id of any FlowFile encountered + */ + void updateMaxFlowFileIdentifier(long maxId); + /** * Updates the Repository to indicate that the given FlowFileRecords were * Swapped Out of memory diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java index b2717c28d1..8a0aa3ba68 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java @@ -298,6 +298,12 @@ public class FileSystemSwapManager implements FlowFileSwapManager { } } + final boolean validLocation = flowFileRepository.isValidSwapLocationSuffix(swapFile.getName()); + if (!validLocation) { + logger.warn("Encountered unknown Swap File {}; will ignore this Swap File. This file should be cleaned up manually", swapFile); + continue; + } + swapLocations.add(swapFile.getAbsolutePath()); continue; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 8ab7e69d54..b731610561 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -748,6 +748,8 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node // get all connections/queues and recover from swap files. final List connections = flowManager.getRootGroup().findAllConnections(); + flowFileRepository.loadFlowFiles(new StandardQueueProvider(this)); + long maxIdFromSwapFiles = -1L; if (flowFileRepository.isVolatile()) { for (final Connection connection : connections) { @@ -771,7 +773,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node } } - flowFileRepository.loadFlowFiles(new StandardQueueProvider(this), maxIdFromSwapFiles + 1); + flowFileRepository.updateMaxFlowFileIdentifier(maxIdFromSwapFiles + 1); // Begin expiring FlowFiles that are old final RepositoryContextFactory contextFactory = new RepositoryContextFactory(contentRepository, flowFileRepository, diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java index da714a652a..979a22e78a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java @@ -114,11 +114,25 @@ public class VolatileFlowFileRepository implements FlowFileRepository { } @Override - public long loadFlowFiles(final QueueProvider queueProvider, final long minimumSequenceNumber) throws IOException { - idGenerator.set(minimumSequenceNumber); + public long loadFlowFiles(final QueueProvider queueProvider) throws IOException { return 0; } + @Override + public void updateMaxFlowFileIdentifier(final long maxId) { + while (true) { + final long currentId = idGenerator.get(); + if (currentId >= maxId) { + return; + } + + final boolean updated = idGenerator.compareAndSet(currentId, maxId); + if (updated) { + return; + } + } + } + @Override public long getNextFlowFileSequence() { return idGenerator.getAndIncrement(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java index d8e45f2c95..2fc285566d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java @@ -585,7 +585,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis } @Override - public long loadFlowFiles(final QueueProvider queueProvider, final long minimumSequenceNumber) throws IOException { + public long loadFlowFiles(final QueueProvider queueProvider) throws IOException { final Map queueMap = new HashMap<>(); for (final FlowFileQueue queue : queueProvider.getAllQueues()) { queueMap.put(queue.getIdentifier(), queue); @@ -630,7 +630,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis // Determine the next sequence number for FlowFiles int numFlowFilesMissingQueue = 0; - long maxId = minimumSequenceNumber; + long maxId = 0; for (final RepositoryRecord record : recordList) { final long recordId = serdeFactory.getRecordIdentifier(record); if (recordId > maxId) { @@ -675,6 +675,21 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis return maxId; } + @Override + public void updateMaxFlowFileIdentifier(final long maxId) { + while (true) { + final long currentId = flowFileSequenceGenerator.get(); + if (currentId >= maxId) { + return; + } + + final boolean updated = flowFileSequenceGenerator.compareAndSet(currentId, maxId); + if (updated) { + return; + } + } + } + @Override public long getNextFlowFileSequence() { return flowFileSequenceGenerator.getAndIncrement(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 7cd2fd62d2..42e61d3dec 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -2104,6 +2104,10 @@ public class TestStandardProcessSession { return 0L; } + @Override + public void updateMaxFlowFileIdentifier(final long maxId) { + } + @Override public void updateRepository(Collection records) throws IOException { if (failOnUpdate) { @@ -2137,7 +2141,7 @@ public class TestStandardProcessSession { } @Override - public long loadFlowFiles(QueueProvider queueProvider, long minimumSequenceNumber) throws IOException { + public long loadFlowFiles(QueueProvider queueProvider) throws IOException { return 0; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java index 1761bd8465..cd3ee1cfda 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java @@ -422,7 +422,7 @@ public class TestWriteAheadFlowFileRepository { repo.initialize(new StandardResourceClaimManager()); final TestQueueProvider queueProvider = new TestQueueProvider(); - repo.loadFlowFiles(queueProvider, 0L); + repo.loadFlowFiles(queueProvider); final Connection connection = Mockito.mock(Connection.class); when(connection.getIdentifier()).thenReturn("1234"); @@ -449,7 +449,7 @@ public class TestWriteAheadFlowFileRepository { // restore final WriteAheadFlowFileRepository repo2 = new WriteAheadFlowFileRepository(NiFiProperties.createBasicNiFiProperties(null, null)); repo2.initialize(new StandardResourceClaimManager()); - repo2.loadFlowFiles(queueProvider, 0L); + repo2.loadFlowFiles(queueProvider); assertTrue(repo2.isValidSwapLocationSuffix("swap123")); assertFalse(repo2.isValidSwapLocationSuffix("other")); repo2.close(); @@ -466,7 +466,7 @@ public class TestWriteAheadFlowFileRepository { repo.initialize(new StandardResourceClaimManager()); final TestQueueProvider queueProvider = new TestQueueProvider(); - repo.loadFlowFiles(queueProvider, 0L); + repo.loadFlowFiles(queueProvider); final Connection connection = Mockito.mock(Connection.class); when(connection.getIdentifier()).thenReturn("1234"); @@ -519,7 +519,7 @@ public class TestWriteAheadFlowFileRepository { // resource claims' counts should be updated for both the swapped out FlowFile and the non-swapped out FlowFile try (final WriteAheadFlowFileRepository repo = new WriteAheadFlowFileRepository(NiFiProperties.createBasicNiFiProperties(null, null))) { repo.initialize(claimManager); - repo.loadFlowFiles(queueProvider, -1L); + repo.loadFlowFiles(queueProvider); // Create a Repository Record that indicates that a FlowFile was created final FlowFileRecord flowFile1 = new StandardFlowFileRecord.Builder() @@ -554,7 +554,7 @@ public class TestWriteAheadFlowFileRepository { final ResourceClaimManager recoveryClaimManager = new StandardResourceClaimManager(); try (final WriteAheadFlowFileRepository repo = new WriteAheadFlowFileRepository(NiFiProperties.createBasicNiFiProperties(null, null))) { repo.initialize(recoveryClaimManager); - final long largestId = repo.loadFlowFiles(queueProvider, 0L); + final long largestId = repo.loadFlowFiles(queueProvider); // largest ID known is 1 because this doesn't take into account the FlowFiles that have been swapped out assertEquals(1, largestId); @@ -587,7 +587,7 @@ public class TestWriteAheadFlowFileRepository { repo.initialize(new StandardResourceClaimManager()); final TestQueueProvider queueProvider = new TestQueueProvider(); - repo.loadFlowFiles(queueProvider, 0L); + repo.loadFlowFiles(queueProvider); final List flowFileCollection = new ArrayList<>(); @@ -639,7 +639,7 @@ public class TestWriteAheadFlowFileRepository { // restore final WriteAheadFlowFileRepository repo2 = new WriteAheadFlowFileRepository(NiFiProperties.createBasicNiFiProperties(null, null)); repo2.initialize(new StandardResourceClaimManager()); - repo2.loadFlowFiles(queueProvider, 0L); + repo2.loadFlowFiles(queueProvider); assertEquals(1, flowFileCollection.size()); final FlowFileRecord flowFile = flowFileCollection.get(0);