diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java index 246f71af39..b04deb34d6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java @@ -218,12 +218,6 @@ public abstract class AbstractListProcessor extends Ab } } - // delete the local file, since it is no longer needed - final File localFile = new File(path); - if (localFile.exists() && !localFile.delete()) { - getLogger().warn("Migrated state but failed to delete local persistence file"); - } - // remove entry from Distributed cache server if (client != null) { try { @@ -285,6 +279,11 @@ public abstract class AbstractListProcessor extends Ab latestIdentifiersListed.addAll(listing.getMatchingIdentifiers()); } } + + // delete the local file, since it is no longer needed + if (persistenceFile.exists() && !persistenceFile.delete()) { + getLogger().warn("Migrated state but failed to delete local persistence file"); + } } if (minTimestamp != null) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java index 3a432e7bf4..7544eb8a65 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -29,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import org.apache.commons.io.Charsets; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateMap; @@ -42,10 +44,16 @@ import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.state.MockStateManager; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; public class TestAbstractListProcessor { + @Rule + public final TemporaryFolder testFolder = new TemporaryFolder(); + @Test public void testOnlyNewEntriesEmitted() { final ConcreteListProcessor proc = new ConcreteListProcessor(); @@ -121,7 +129,7 @@ public class TestAbstractListProcessor { } @Test - public void testStateMigrated() throws InitializationException { + public void testStateMigratedFromCacheService() throws InitializationException { final ConcreteListProcessor proc = new ConcreteListProcessor(); final TestRunner runner = TestRunners.newTestRunner(proc); final DistributedCache cache = new DistributedCache(); @@ -142,6 +150,50 @@ public class TestAbstractListProcessor { stateManager.assertStateEquals(expectedState, Scope.CLUSTER); } + @Test + public void testNoStateToMigrate() throws Exception { + final ConcreteListProcessor proc = new ConcreteListProcessor(); + final TestRunner runner = TestRunners.newTestRunner(proc); + + runner.run(); + + final MockStateManager stateManager = runner.getStateManager(); + final Map expectedState = new HashMap<>(); + stateManager.assertStateEquals(expectedState, Scope.CLUSTER); + } + + @Test + public void testStateMigratedFromLocalFile() throws Exception { + final ConcreteListProcessor proc = new ConcreteListProcessor(); + final TestRunner runner = TestRunners.newTestRunner(proc); + + // Create a file that we will populate with the desired state + File persistenceFile = testFolder.newFile(proc.persistenceFilename); + // Override the processor's internal persistence file + proc.persistenceFile = persistenceFile; + + // Local File persistence was a properties file format of = + // Our ConcreteListProcessor is centered around files which are provided for a given path + final String serviceState = proc.getPath(runner.getProcessContext()) + "={\"latestTimestamp\":1492,\"matchingIdentifiers\":[\"id\"]}"; + + // Create a persistence file of the format anticipated + try (FileOutputStream fos = new FileOutputStream(persistenceFile);) { + fos.write(serviceState.getBytes(Charsets.UTF_8)); + } + + runner.run(); + + // Verify the local persistence file is removed + Assert.assertTrue("Failed to remove persistence file", !persistenceFile.exists()); + + // Verify the state manager now maintains the associated state + final Map expectedState = new HashMap<>(); + expectedState.put(AbstractListProcessor.TIMESTAMP, "1492"); + expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", "id"); + + runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER); + } + @Test public void testFetchOnStart() throws InitializationException { final ConcreteListProcessor proc = new ConcreteListProcessor(); @@ -239,9 +291,13 @@ public class TestAbstractListProcessor { private static class ConcreteListProcessor extends AbstractListProcessor { private final List entities = new ArrayList<>(); + public final String persistenceFilename = "ListProcessor-local-state-" + UUID.randomUUID().toString() + ".json"; + public String persistenceFolder = "target/"; + public File persistenceFile = new File(persistenceFolder + persistenceFilename); + @Override protected File getPersistenceFile() { - return new File("target/ListProcessor-local-state-" + UUID.randomUUID().toString() + ".json"); + return persistenceFile; } public void addEntity(final String name, final String identifier, final long timestamp) {