diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java index efe551f3db..494f22737f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java @@ -363,11 +363,12 @@ public abstract class AbstractListProcessor extends Ab return; } - int listCount = 0; Long latestListingTimestamp = null; + final List newEntries = new ArrayList<>(); for (final T entity : entityList) { - final boolean list = (minTimestamp == null || entity.getTimestamp() > minTimestamp - || (entity.getTimestamp() == minTimestamp && !latestIdentifiersListed.contains(entity.getIdentifier()))); + final boolean newTimestamp = minTimestamp == null || entity.getTimestamp() > minTimestamp; + final boolean newEntryForTimestamp = minTimestamp != null && entity.getTimestamp() == minTimestamp && !latestIdentifiersListed.contains(entity.getIdentifier()); + final boolean list = newTimestamp || newEntryForTimestamp; // Create the FlowFile for this path. if (list) { @@ -375,7 +376,14 @@ public abstract class AbstractListProcessor extends Ab FlowFile flowFile = session.create(); flowFile = session.putAllAttributes(flowFile, attributes); session.transfer(flowFile, REL_SUCCESS); - listCount++; + + // If we don't have a new timestamp but just have a new entry, we need to + // add all of the previous entries to our entityList. If we have a new timestamp, + // then the previous entries can go away. + if (!newTimestamp) { + newEntries.addAll(entityList); + } + newEntries.add(entity); if (latestListingTimestamp == null || entity.getTimestamp() > latestListingTimestamp) { latestListingTimestamp = entity.getTimestamp(); @@ -383,6 +391,7 @@ public abstract class AbstractListProcessor extends Ab } } + final int listCount = newEntries.size(); if (listCount > 0) { getLogger().info("Successfully created listing with {} new objects", new Object[] {listCount}); session.commit(); @@ -395,9 +404,9 @@ public abstract class AbstractListProcessor extends Ab // previously Primary Node left off. // We also store the state locally so that if the node is restarted, and the node cannot contact // the distributed state cache, the node can continue to run (if it is primary node). - final Set identifiers = new HashSet<>(entityList.size()); + final Set identifiers = new HashSet<>(newEntries.size()); try { - for (final T entity : entityList) { + for (final T entity : newEntries) { identifiers.add(entity.getIdentifier()); } persist(latestListingTimestamp, identifiers, context.getStateManager()); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java index 1da2b4d1b6..3a432e7bf4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java @@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; @@ -30,6 +31,7 @@ import java.util.UUID; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; @@ -154,6 +156,43 @@ public class TestAbstractListProcessor { assertEquals(1, cache.fetchCount); } + @Test + public void testOnlyNewStateStored() throws IOException { + final ConcreteListProcessor proc = new ConcreteListProcessor(); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.run(); + + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + proc.addEntity("name", "id", 1492L); + proc.addEntity("name", "id2", 1492L); + + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); + runner.clearTransferState(); + + final StateMap stateMap = runner.getStateManager().getState(Scope.CLUSTER); + assertEquals(1, stateMap.getVersion()); + + final Map map = stateMap.toMap(); + assertEquals(3, map.size()); + assertEquals("1492", map.get("timestamp")); + assertTrue(map.containsKey("id.1")); + assertTrue(map.containsKey("id.2")); + + proc.addEntity("new name", "new id", 1493L); + runner.run(); + + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1); + final StateMap updatedStateMap = runner.getStateManager().getState(Scope.CLUSTER); + assertEquals(2, updatedStateMap.getVersion()); + + final Map updatedValues = updatedStateMap.toMap(); + assertEquals(2, updatedValues.size()); + assertEquals("1493", updatedValues.get("timestamp")); + assertEquals("new id", updatedValues.get("id.1")); + } + + private static class DistributedCache extends AbstractControllerService implements DistributedMapCacheClient { private final Map stored = new HashMap<>(); private int fetchCount = 0;