From 095c04eda0c604a02c51df085ba67847448224c0 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Fri, 16 Dec 2016 17:48:06 +0900 Subject: [PATCH] NIFI-3213: ListFile do not skip obviously old files Before this fix, files with the latest timestamp within a listing iteration are always be held back one cycle no matter how old it is. Signed-off-by: Andre F de Miranda --- .../standard/AbstractListProcessor.java | 4 +- .../standard/TestAbstractListProcessor.java | 78 +++--- .../processors/standard/TestListFile.java | 254 +++++------------- 3 files changed, 110 insertions(+), 226 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java index 9e1e1aa394..eceee1d6c3 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java @@ -336,6 +336,7 @@ public abstract class AbstractListProcessor extends Ab } final List entityList; + final long currentListingTimestamp = System.nanoTime(); try { // track of when this last executed for consideration of the lag nanos entityList = performListing(context, minTimestamp); @@ -385,7 +386,8 @@ public abstract class AbstractListProcessor extends Ab context.yield(); return; } - } else { + + } else if (latestListingTimestamp >= currentListingTimestamp - LISTING_LAG_NANOS) { // Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data orderedEntries.remove(latestListingTimestamp); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java index f8b59f5754..9896396245 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java @@ -56,6 +56,30 @@ public class TestAbstractListProcessor { @Rule public final TemporaryFolder testFolder = new TemporaryFolder(); + @Test + public void testAllExistingEntriesEmittedOnFirstIteration() throws Exception { + final long oldTimestamp = System.nanoTime() - (AbstractListProcessor.LISTING_LAG_NANOS * 2); + + // These entries have existed before the processor runs at the first time. + final ConcreteListProcessor proc = new ConcreteListProcessor(); + proc.addEntity("name", "id", oldTimestamp); + proc.addEntity("name", "id2", oldTimestamp); + + // First run, the above listed entries should be emitted since it has existed. + final TestRunner runner = TestRunners.newTestRunner(proc); + + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); + runner.clearTransferState(); + + // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again + Thread.sleep(DEFAULT_SLEEP_MILLIS); + + // Run again without introducing any new entries + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + } + @Test public void testPreviouslySkippedEntriesEmittedOnNextIteration() throws Exception { final ConcreteListProcessor proc = new ConcreteListProcessor(); @@ -71,6 +95,7 @@ public class TestAbstractListProcessor { // First run, the above listed entries would be skipped to avoid write synchronization issues runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again Thread.sleep(DEFAULT_SLEEP_MILLIS); @@ -124,14 +149,7 @@ public class TestAbstractListProcessor { // Now a new file beyond the current time enters proc.addEntity("name", "id2", initialTimestamp + 1); - // Nothing occurs for the first iteration as it is withheld - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - runner.clearTransferState(); - - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - // But it should now show up that the appropriate pause has been eclipsed + // It should show up runner.run(); runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1); runner.clearTransferState(); @@ -187,14 +205,7 @@ public class TestAbstractListProcessor { // Now a new file beyond the current time enters proc.addEntity("name", "id2", initialTimestamp + 1); - // Nothing occurs for the first iteration as it is withheld - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - runner.clearTransferState(); - - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - // But it should now show up that the appropriate pause has been eclipsed + // It should now show up runner.run(); runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1); runner.clearTransferState(); @@ -209,14 +220,14 @@ public class TestAbstractListProcessor { runner.enableControllerService(cache); runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, "cache"); - runner.run(); + final long initialTimestamp = System.nanoTime(); - proc.addEntity("name", "id", 1492L); + proc.addEntity("name", "id", initialTimestamp); runner.run(); final Map expectedState = new HashMap<>(); // Ensure only timestamp is migrated - expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, "1492"); + expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, String.valueOf(initialTimestamp)); expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "0"); runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER); @@ -224,8 +235,8 @@ public class TestAbstractListProcessor { runner.run(); // Ensure only timestamp is migrated - expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, "1492"); - expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "1492"); + expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, String.valueOf(initialTimestamp)); + expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, String.valueOf(initialTimestamp)); runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER); } @@ -328,14 +339,6 @@ public class TestAbstractListProcessor { runner.getStateManager().clear(Scope.CLUSTER); Assert.assertEquals("State is not empty for this component after clearing", 0, runner.getStateManager().getState(Scope.CLUSTER).toMap().size()); - - // As before, we are unsure of when these files were delivered relative to system time, and additional cycle(s) need to occur before transfer - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - runner.clearTransferState(); - - Thread.sleep(DEFAULT_SLEEP_MILLIS); - // Ensure the original files are now transferred again. runner.run(); runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); @@ -390,27 +393,12 @@ public class TestAbstractListProcessor { proc.addEntity("new name", "new id", initialTimestamp + 1); runner.run(); - // Verify that the new entry has not been emitted but it has triggered an updated state - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1); runner.clearTransferState(); StateMap updatedStateMap = runner.getStateManager().getState(Scope.CLUSTER); assertEquals(3, updatedStateMap.getVersion()); - assertEquals(2, updatedStateMap.toMap().size()); - assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LISTING_TIMESTAMP_KEY)); - // Processed timestamp is lagging behind currently - assertEquals(Long.toString(initialTimestamp), updatedStateMap.get(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY)); - - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1); - runner.clearTransferState(); - - updatedStateMap = runner.getStateManager().getState(Scope.CLUSTER); - assertEquals(4, updatedStateMap.getVersion()); - assertEquals(2, updatedStateMap.toMap().size()); assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LISTING_TIMESTAMP_KEY)); // Processed timestamp is now caught up diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java index a3e5a94402..951aab0510 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Locale; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -89,6 +90,15 @@ public class TestListFile { } } + /** + * This method ensures runner clears transfer state, + * and sleeps the current thread for DEFAULT_SLEEP_MILLIS before executing runner.run(). + */ + private void runNext() throws InterruptedException { + runner.clearTransferState(); + Thread.sleep(DEFAULT_SLEEP_MILLIS); + runner.run(); + } @Test public void testGetRelationships() throws Exception { @@ -107,21 +117,18 @@ public class TestListFile { @Test public void testPerformListing() throws Exception { + + runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); + runNext(); + runner.assertTransferCount(ListFile.REL_SUCCESS, 0); + // create first file final File file1 = new File(TESTDIR + "/listing1.txt"); assertTrue(file1.createNewFile()); assertTrue(file1.setLastModified(time4millis)); // process first file and set new timestamp - runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); - runner.run(); - - runner.assertTransferCount(ListFile.REL_SUCCESS, 0); - runner.clearTransferState(); - - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - runner.run(); + runNext(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); assertEquals(1, successFiles1.size()); @@ -132,14 +139,7 @@ public class TestListFile { assertTrue(file2.setLastModified(time2millis)); // process second file after timestamp - runner.clearTransferState(); - runner.run(); - runner.assertTransferCount(ListFile.REL_SUCCESS, 0); - runner.clearTransferState(); - - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - runner.run(); + runNext(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); assertEquals(1, successFiles2.size()); @@ -150,95 +150,88 @@ public class TestListFile { assertTrue(file3.setLastModified(time4millis)); // process third file before timestamp - runner.clearTransferState(); - runner.run(); + runNext(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); assertEquals(0, successFiles3.size()); // force state to reset and process all files - runner.clearTransferState(); runner.removeProperty(ListFile.DIRECTORY); runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); - runner.run(); + runNext(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); - assertEquals(2, successFiles4.size()); + assertEquals(3, successFiles4.size()); - runner.clearTransferState(); - - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - runner.run(); - runner.assertTransferCount(ListFile.REL_SUCCESS, 1); + runNext(); + runner.assertTransferCount(ListFile.REL_SUCCESS, 0); } @Test public void testFilterAge() throws Exception { + final File file1 = new File(TESTDIR + "/age1.txt"); assertTrue(file1.createNewFile()); - assertTrue(file1.setLastModified(time0millis)); final File file2 = new File(TESTDIR + "/age2.txt"); assertTrue(file2.createNewFile()); - assertTrue(file2.setLastModified(time2millis)); final File file3 = new File(TESTDIR + "/age3.txt"); assertTrue(file3.createNewFile()); - assertTrue(file3.setLastModified(time4millis)); + + final Function runNext = resetAges -> { + if (resetAges) { + resetAges(); + assertTrue(file1.setLastModified(time0millis)); + assertTrue(file2.setLastModified(time2millis)); + assertTrue(file3.setLastModified(time4millis)); + } + try { + runNext(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return null; + }; // check all files runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); - runner.run(); - runner.assertTransferCount(ListFile.REL_SUCCESS, 2); - runner.clearTransferState(); + runNext.apply(true); + runner.assertTransferCount(ListFile.REL_SUCCESS, 3); - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - runner.run(); - runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); - final List successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); - assertEquals(1, successFiles1.size()); + // processor updates internal state, it shouldn't pick the same ones. + runNext.apply(false); + runner.assertTransferCount(ListFile.REL_SUCCESS, 0); // exclude oldest - runner.clearTransferState(); runner.setProperty(ListFile.MIN_AGE, age0); runner.setProperty(ListFile.MAX_AGE, age3); - runner.run(); + runNext.apply(true); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); - assertEquals(1, successFiles2.size()); - runner.clearTransferState(); - - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - runner.run(); - runner.assertTransferCount(ListFile.REL_SUCCESS, 1); + assertEquals(2, successFiles2.size()); + assertEquals(file2.getName(), successFiles2.get(0).getAttribute("filename")); + assertEquals(file1.getName(), successFiles2.get(1).getAttribute("filename")); // exclude newest - runner.clearTransferState(); runner.setProperty(ListFile.MIN_AGE, age1); runner.setProperty(ListFile.MAX_AGE, age5); - runner.run(); + runNext.apply(true); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); - assertEquals(1, successFiles3.size()); + assertEquals(2, successFiles3.size()); + assertEquals(file3.getName(), successFiles3.get(0).getAttribute("filename")); + assertEquals(file2.getName(), successFiles3.get(1).getAttribute("filename")); // exclude oldest and newest - runner.clearTransferState(); runner.setProperty(ListFile.MIN_AGE, age1); runner.setProperty(ListFile.MAX_AGE, age3); - runner.run(); + runNext.apply(true); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); - assertEquals(0, successFiles4.size()); - runner.clearTransferState(); + assertEquals(1, successFiles4.size()); + assertEquals(file2.getName(), successFiles4.get(0).getAttribute("filename")); - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - runner.run(); - runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); - runner.assertTransferCount(ListFile.REL_SUCCESS, 1); } @Test @@ -273,68 +266,37 @@ public class TestListFile { // check all files runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); - runner.run(); - runner.assertTransferCount(ListFile.REL_SUCCESS, 0); - runner.clearTransferState(); - - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - runner.run(); - + runNext(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); assertEquals(3, successFiles1.size()); // exclude largest - runner.clearTransferState(); runner.removeProperty(ListFile.MIN_AGE); runner.removeProperty(ListFile.MAX_AGE); runner.setProperty(ListFile.MIN_SIZE, "0 b"); runner.setProperty(ListFile.MAX_SIZE, "7500 b"); - runner.run(); - - runner.assertTransferCount(ListFile.REL_SUCCESS, 0); - runner.clearTransferState(); - - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - runner.run(); - + runNext(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); assertEquals(2, successFiles2.size()); // exclude smallest - runner.clearTransferState(); runner.removeProperty(ListFile.MIN_AGE); runner.removeProperty(ListFile.MAX_AGE); runner.setProperty(ListFile.MIN_SIZE, "2500 b"); runner.removeProperty(ListFile.MAX_SIZE); - runner.run(); - runner.assertTransferCount(ListFile.REL_SUCCESS, 0); - runner.clearTransferState(); - - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - runner.run(); + runNext(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); assertEquals(2, successFiles3.size()); // exclude oldest and newest - runner.clearTransferState(); runner.removeProperty(ListFile.MIN_AGE); runner.removeProperty(ListFile.MAX_AGE); runner.setProperty(ListFile.MIN_SIZE, "2500 b"); runner.setProperty(ListFile.MAX_SIZE, "7500 b"); - runner.run(); - runner.assertTransferCount(ListFile.REL_SUCCESS, 0); - runner.clearTransferState(); - - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - runner.run(); - + runNext(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); assertEquals(1, successFiles4.size()); @@ -364,7 +326,6 @@ public class TestListFile { assertTrue(file2.setLastModified(now)); // check all files - runner.clearTransferState(); runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); runner.setProperty(ListFile.FILE_FILTER, ".*"); runner.removeProperty(ListFile.MIN_AGE); @@ -372,25 +333,14 @@ public class TestListFile { runner.removeProperty(ListFile.MIN_SIZE); runner.removeProperty(ListFile.MAX_SIZE); runner.setProperty(ListFile.IGNORE_HIDDEN_FILES, "false"); - runner.run(); - runner.assertTransferCount(ListFile.REL_SUCCESS, 0); - - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - runner.run(); + runNext(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); assertEquals(2, successFiles1.size()); // exclude hidden - runner.clearTransferState(); runner.setProperty(ListFile.IGNORE_HIDDEN_FILES, "true"); - runner.run(); - runner.assertTransferCount(ListFile.REL_SUCCESS, 0); - - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - runner.run(); + runNext(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); assertEquals(1, successFiles2.size()); @@ -398,6 +348,7 @@ public class TestListFile { @Test public void testFilterFilePattern() throws Exception { + final long now = getTestModifiedTime(); final File file1 = new File(TESTDIR + "/file1-abc-apple.txt"); @@ -417,31 +368,21 @@ public class TestListFile { assertTrue(file4.setLastModified(now)); // check all files - runner.clearTransferState(); runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); runner.setProperty(ListFile.FILE_FILTER, ListFile.FILE_FILTER.getDefaultValue()); - runner.run(); - runner.assertTransferCount(ListFile.REL_SUCCESS, 0); - - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - runner.run(); + runNext(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); assertEquals(4, successFiles1.size()); // filter file on pattern - runner.clearTransferState(); + // Modifying FILE_FILTER property reset listing status, so these files will be listed again. runner.setProperty(ListFile.FILE_FILTER, ".*-xyz-.*"); - runner.run(); + runNext(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS, 2); + + runNext(); runner.assertTransferCount(ListFile.REL_SUCCESS, 0); - - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - runner.run(); - runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); - final List successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); - assertEquals(2, successFiles2.size()); } @Test @@ -474,40 +415,24 @@ public class TestListFile { runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); runner.setProperty(ListFile.FILE_FILTER, ListFile.FILE_FILTER.getDefaultValue()); runner.setProperty(ListFile.RECURSE, "true"); - runner.run(); - runner.assertTransferCount(ListFile.REL_SUCCESS, 0); + runNext(); - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - runner.run(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); assertEquals(4, successFiles1.size()); // filter path on pattern subdir1 - runner.clearTransferState(); runner.setProperty(ListFile.PATH_FILTER, "subdir1"); runner.setProperty(ListFile.RECURSE, "true"); - runner.run(); - runner.assertTransferCount(ListFile.REL_SUCCESS, 0); - - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - runner.run(); + runNext(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); assertEquals(3, successFiles2.size()); // filter path on pattern subdir2 - runner.clearTransferState(); runner.setProperty(ListFile.PATH_FILTER, "subdir2"); runner.setProperty(ListFile.RECURSE, "true"); - runner.run(); - runner.assertTransferCount(ListFile.REL_SUCCESS, 0); - - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - runner.run(); + runNext(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); assertEquals(1, successFiles3.size()); @@ -536,16 +461,9 @@ public class TestListFile { assertTrue(file3.setLastModified(now)); // check all files - runner.clearTransferState(); runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); runner.setProperty(ListFile.RECURSE, "true"); - runner.run(); - runner.assertTransferCount(ListFile.REL_SUCCESS, 0); - runner.clearTransferState(); - - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - runner.run(); + runNext(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS, 3); final List successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); for (final MockFlowFile mff : successFiles1) { @@ -570,17 +488,8 @@ public class TestListFile { assertEquals(3, successFiles1.size()); // exclude hidden - runner.clearTransferState(); runner.setProperty(ListFile.RECURSE, "false"); - runner.run(); - - runner.assertTransferCount(ListFile.REL_SUCCESS, 0); - runner.clearTransferState(); - - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - runner.run(); - + runNext(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); assertEquals(1, successFiles2.size()); @@ -603,16 +512,9 @@ public class TestListFile { assertTrue(file3.setLastModified(now)); // check all files - runner.clearTransferState(); runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); runner.setProperty(ListFile.RECURSE, "true"); - runner.run(); - runner.assertTransferCount(ListFile.REL_SUCCESS, 0); - runner.clearTransferState(); - - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - runner.run(); + runNext(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); runner.assertTransferCount(ListFile.REL_SUCCESS, 3); } @@ -630,16 +532,8 @@ public class TestListFile { String userName = System.getProperty("user.name"); // validate the file transferred - runner.clearTransferState(); runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); - runner.run(); - runner.assertTransferCount(ListFile.REL_SUCCESS, 0); - runner.clearTransferState(); - - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - runner.run(); - + runNext(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); assertEquals(1, successFiles1.size());