From 705c65c86cc15b88b27e4e5651c1c0ec319e38f6 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 3 Nov 2021 15:37:54 -0400 Subject: [PATCH] NIFI-9362: Ensure that we update the StateMap in AbstractListProcessor to hold any files whose date matches the latest before setting cluster-wide state Signed-off-by: Matthew Burgess This closes #5507 --- .../util/list/AbstractListProcessor.java | 25 ++++++----- .../util/list/TestAbstractListProcessor.java | 41 +++++++++++++++++++ 2 files changed, 55 insertions(+), 11 deletions(-) diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java index 4fcb862d28..dce4c24b77 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java @@ -851,6 +851,20 @@ public abstract class AbstractListProcessor extends Ab if (latestListedEntryTimestampThisCycleMillis != null) { final boolean processedNewFiles = entitiesListed > 0; + if (processedNewFiles) { + // If there have been files created, update the last timestamp we processed. + // Retrieving lastKey instead of using latestListedEntryTimestampThisCycleMillis is intentional here, + // because latestListedEntryTimestampThisCycleMillis might be removed if it's not old enough. + if (!orderedEntries.lastKey().equals(lastProcessedLatestEntryTimestampMillis)) { + // If the latest timestamp at this cycle becomes different than the previous one, we need to clear identifiers. + // If it didn't change, we need to add identifiers. + latestIdentifiersProcessed.clear(); + } + // Capture latestIdentifierProcessed. + latestIdentifiersProcessed.addAll(orderedEntries.lastEntry().getValue().stream().map(T::getIdentifier).collect(Collectors.toList())); + lastProcessedLatestEntryTimestampMillis = orderedEntries.lastKey(); + } + if (!latestListedEntryTimestampThisCycleMillis.equals(lastListedLatestEntryTimestampMillis) || processedNewFiles) { // We have performed a listing and pushed any FlowFiles out that may have been generated // Now, we need to persist state about the Last Modified timestamp of the newest file @@ -870,17 +884,6 @@ public abstract class AbstractListProcessor extends Ab } if (processedNewFiles) { - // If there have been files created, update the last timestamp we processed. - // Retrieving lastKey instead of using latestListedEntryTimestampThisCycleMillis is intentional here, - // because latestListedEntryTimestampThisCycleMillis might be removed if it's not old enough. - if (!orderedEntries.lastKey().equals(lastProcessedLatestEntryTimestampMillis)) { - // If the latest timestamp at this cycle becomes different than the previous one, we need to clear identifiers. - // If it didn't change, we need to add identifiers. - latestIdentifiersProcessed.clear(); - } - // Capture latestIdentifierProcessed. - latestIdentifiersProcessed.addAll(orderedEntries.lastEntry().getValue().stream().map(T::getIdentifier).collect(Collectors.toList())); - lastProcessedLatestEntryTimestampMillis = orderedEntries.lastKey(); getLogger().info("Successfully created listing with {} new objects", new Object[]{entitiesListed}); session.commitAsync(); } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java index 4f78e8c1fc..75ada70ba7 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java @@ -24,6 +24,7 @@ import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.Validator; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; import org.apache.nifi.context.PropertyContext; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.distributed.cache.client.Deserializer; @@ -115,6 +116,46 @@ public class TestAbstractListProcessor { @Rule public final TemporaryFolder testFolder = new TemporaryFolder(); + @Test + public void testStateMigratedWhenPrimaryNodeSwitch() throws IOException { + // add a few entities + for (int i=0; i < 5; i++) { + proc.addEntity(String.valueOf(i), String.valueOf(i), 88888L); + } + + // Add an entity with a later timestamp + proc.addEntity("10", "10", 99999999L); + + // Run the processor. All 6 should be listed. + runner.run(); + runner.assertAllFlowFilesTransferred(AbstractListProcessor.REL_SUCCESS, 6); + + // Now, we want to mimic Primary Node changing. To do so, we'll capture the Cluster State from the State Manager, + // create a new Processor, and set the state to be the same, and update the processor in order to produce the same listing. + final ConcreteListProcessor secondProc = new ConcreteListProcessor(); + // Add same listing to the new processor + for (int i=0; i < 5; i++) { + secondProc.addEntity(String.valueOf(i), String.valueOf(i), 88888L); + } + secondProc.addEntity("10", "10", 99999999L); + + // Create new runner for the second processor and update its state to match that of the last TestRunner. + final StateMap stateMap = runner.getStateManager().getState(Scope.CLUSTER); + runner = TestRunners.newTestRunner(secondProc); + runner.getStateManager().setState(stateMap.toMap(), Scope.CLUSTER); + + // Run several times, ensuring that nothing is emitted. + for (int i=0; i < 10; i++) { + runner.run(); + runner.assertAllFlowFilesTransferred(AbstractListProcessor.REL_SUCCESS, 0); + } + + // Add one more entry and ensure that it's emitted. + secondProc.addEntity("new", "new", 999999990L); + runner.run(); + runner.assertAllFlowFilesTransferred(AbstractListProcessor.REL_SUCCESS, 1); + } + @Test public void testStateMigratedFromCacheService() throws InitializationException {