From 4babd067c17bee335667eb81df8fb922e1fe4893 Mon Sep 17 00:00:00 2001 From: Aldrin Piri Date: Tue, 22 Mar 2016 11:10:08 -0400 Subject: [PATCH] NIFI-1664 Preferring System.nanoTime to System.currentTimeMillis and providing explicit handling of timestamps for files in those tests that are testing other attributes of the ListFile process besides timestamp which could lead to erroneous transmissions depending on exactly when files were created. Adding unsalted_128_raw.enc and salted_128_raw.enc to the list of exclusions for the RAT plugin which caused issues in the Windows environment for contrib-checks. This closes #297. --- .../nifi-standard-processors/pom.xml | 2 + .../standard/AbstractListProcessor.java | 9 +- .../standard/TestAbstractListProcessor.java | 33 ++++---- .../processors/standard/TestListFile.java | 84 ++++++++++++++----- 4 files changed, 88 insertions(+), 40 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index dac907c536..4a5ea167fe 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -325,7 +325,9 @@ language governing permissions and limitations under the License. --> src/test/resources/TestUnpackContent/data.zip src/test/resources/TestEncryptContent/plain.txt src/test/resources/TestEncryptContent/salted_raw.enc + src/test/resources/TestEncryptContent/salted_128_raw.enc src/test/resources/TestEncryptContent/unsalted_raw.enc + src/test/resources/TestEncryptContent/unsalted_128_raw.enc src/main/java/org/apache/nifi/processors/standard/util/crypto/bcrypt/BCrypt.java diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java index 08b8b28397..9e1e1aa394 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.TimeUnit; import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.TriggerSerially; @@ -155,7 +156,7 @@ public abstract class AbstractListProcessor extends Ab * files according to timestamp, it is ensured that at least the specified millis has been eclipsed to avoid getting scheduled * near instantaneously after the prior iteration effectively voiding the built in buffer */ - static final long LISTING_LAG_MILLIS = 100L; + static final long LISTING_LAG_NANOS = TimeUnit.MILLISECONDS.toNanos(100L); static final String LISTING_TIMESTAMP_KEY = "listing.timestamp"; static final String PROCESSED_TIMESTAMP_KEY = "processed.timestamp"; @@ -336,7 +337,7 @@ public abstract class AbstractListProcessor extends Ab final List entityList; try { - // track of when this last executed for consideration of the lag millis + // track of when this last executed for consideration of the lag nanos entityList = performListing(context, minTimestamp); } catch (final IOException e) { getLogger().error("Failed to perform listing on remote host due to {}", e); @@ -380,7 +381,7 @@ public abstract class AbstractListProcessor extends Ab * - the latest listing timestamp is If we have not eclipsed the minimal listing lag needed due to being triggered too soon after the last run * - the latest listing timestamp is equal to the last processed time, meaning we handled those items originally passed over */ - if (System.currentTimeMillis() - lastRunTime < LISTING_LAG_MILLIS || latestListingTimestamp.equals(lastProcessedTime)) { + if (System.nanoTime() - lastRunTime < LISTING_LAG_NANOS || latestListingTimestamp.equals(lastProcessedTime)) { context.yield(); return; } @@ -411,7 +412,7 @@ public abstract class AbstractListProcessor extends Ab session.commit(); } - lastRunTime = System.currentTimeMillis(); + lastRunTime = System.nanoTime(); if (!latestListingTimestamp.equals(lastListingTime) || processedNewFiles) { // We have performed a listing and pushed any FlowFiles out that may have been generated diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java index 68ed22fb91..f8b59f5754 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.TimeUnit; import org.apache.commons.io.Charsets; import org.apache.nifi.components.PropertyDescriptor; @@ -50,6 +51,8 @@ import org.junit.rules.TemporaryFolder; public class TestAbstractListProcessor { + static final long DEFAULT_SLEEP_MILLIS = TimeUnit.NANOSECONDS.toMillis(AbstractListProcessor.LISTING_LAG_NANOS * 2); + @Rule public final TemporaryFolder testFolder = new TemporaryFolder(); @@ -59,7 +62,7 @@ public class TestAbstractListProcessor { final TestRunner runner = TestRunners.newTestRunner(proc); runner.run(); - final long initialTimestamp = System.currentTimeMillis(); + final long initialTimestamp = System.nanoTime(); runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); proc.addEntity("name", "id", initialTimestamp); @@ -70,7 +73,7 @@ public class TestAbstractListProcessor { runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again - Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + Thread.sleep(DEFAULT_SLEEP_MILLIS); // Run again without introducing any new entries runner.run(); @@ -83,7 +86,7 @@ public class TestAbstractListProcessor { final TestRunner runner = TestRunners.newTestRunner(proc); runner.run(); - final long initialTimestamp = System.currentTimeMillis(); + final long initialTimestamp = System.nanoTime(); runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); proc.addEntity("name", "id", initialTimestamp); @@ -95,7 +98,7 @@ public class TestAbstractListProcessor { runner.clearTransferState(); // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again - Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + Thread.sleep(DEFAULT_SLEEP_MILLIS); // Running again, our two previously seen files are now cleared to be released runner.run(); @@ -126,7 +129,7 @@ public class TestAbstractListProcessor { runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); runner.clearTransferState(); - Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + Thread.sleep(DEFAULT_SLEEP_MILLIS); // But it should now show up that the appropriate pause has been eclipsed runner.run(); @@ -139,7 +142,7 @@ public class TestAbstractListProcessor { final ConcreteListProcessor proc = new ConcreteListProcessor(); final TestRunner runner = TestRunners.newTestRunner(proc); - final long initialTimestamp = System.currentTimeMillis(); + final long initialTimestamp = System.nanoTime(); proc.addEntity("name", "id", initialTimestamp); proc.addEntity("name", "id2", initialTimestamp); @@ -158,7 +161,7 @@ public class TestAbstractListProcessor { runner.clearTransferState(); // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again - Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + Thread.sleep(DEFAULT_SLEEP_MILLIS); // Running again, these files should be eligible for transfer and again skipped runner.run(); @@ -189,7 +192,7 @@ public class TestAbstractListProcessor { runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); runner.clearTransferState(); - Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + Thread.sleep(DEFAULT_SLEEP_MILLIS); // But it should now show up that the appropriate pause has been eclipsed runner.run(); @@ -217,7 +220,7 @@ public class TestAbstractListProcessor { expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "0"); runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER); - Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS); + Thread.sleep(DEFAULT_SLEEP_MILLIS); runner.run(); // Ensure only timestamp is migrated @@ -301,7 +304,7 @@ public class TestAbstractListProcessor { runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - final long initialEventTimestamp = System.currentTimeMillis(); + final long initialEventTimestamp = System.nanoTime(); proc.addEntity("name", "id", initialEventTimestamp); proc.addEntity("name", "id2", initialEventTimestamp); @@ -310,7 +313,7 @@ public class TestAbstractListProcessor { runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); // after providing a pause in listings, the files should now transfer - Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + Thread.sleep(DEFAULT_SLEEP_MILLIS); runner.run(); runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); @@ -331,7 +334,7 @@ public class TestAbstractListProcessor { runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); runner.clearTransferState(); - Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + Thread.sleep(DEFAULT_SLEEP_MILLIS); // Ensure the original files are now transferred again. runner.run(); @@ -359,7 +362,7 @@ public class TestAbstractListProcessor { final TestRunner runner = TestRunners.newTestRunner(proc); runner.run(); - final long initialTimestamp = System.currentTimeMillis(); + final long initialTimestamp = System.nanoTime(); runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); proc.addEntity("name", "id", initialTimestamp); @@ -369,7 +372,7 @@ public class TestAbstractListProcessor { runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); runner.clearTransferState(); - Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + Thread.sleep(DEFAULT_SLEEP_MILLIS); runner.run(); runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); @@ -399,7 +402,7 @@ public class TestAbstractListProcessor { // Processed timestamp is lagging behind currently assertEquals(Long.toString(initialTimestamp), updatedStateMap.get(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY)); - Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + Thread.sleep(DEFAULT_SLEEP_MILLIS); runner.run(); runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java index 441a5615dd..5b6ea04293 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java @@ -33,6 +33,7 @@ import java.text.SimpleDateFormat; import java.util.List; import java.util.Locale; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -62,6 +63,8 @@ public class TestListFile { Long age0millis, age1millis, age2millis, age3millis, age4millis, age5millis; String age0, age1, age2, age3, age4, age5; + static final long DEFAULT_SLEEP_MILLIS = TimeUnit.NANOSECONDS.toMillis(AbstractListProcessor.LISTING_LAG_NANOS * 2); + @Before public void setUp() throws Exception { processor = new ListFile(); @@ -116,7 +119,7 @@ public class TestListFile { runner.assertTransferCount(ListFile.REL_SUCCESS, 0); runner.clearTransferState(); - Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + Thread.sleep(DEFAULT_SLEEP_MILLIS); runner.run(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); @@ -134,7 +137,7 @@ public class TestListFile { runner.assertTransferCount(ListFile.REL_SUCCESS, 0); runner.clearTransferState(); - Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + Thread.sleep(DEFAULT_SLEEP_MILLIS); runner.run(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); @@ -164,7 +167,7 @@ public class TestListFile { runner.clearTransferState(); - Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + Thread.sleep(DEFAULT_SLEEP_MILLIS); runner.run(); runner.assertTransferCount(ListFile.REL_SUCCESS, 1); @@ -190,7 +193,7 @@ public class TestListFile { runner.assertTransferCount(ListFile.REL_SUCCESS, 2); runner.clearTransferState(); - Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + Thread.sleep(DEFAULT_SLEEP_MILLIS); runner.run(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); @@ -207,7 +210,7 @@ public class TestListFile { assertEquals(1, successFiles2.size()); runner.clearTransferState(); - Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + Thread.sleep(DEFAULT_SLEEP_MILLIS); runner.run(); runner.assertTransferCount(ListFile.REL_SUCCESS, 1); @@ -231,7 +234,7 @@ public class TestListFile { assertEquals(0, successFiles4.size()); runner.clearTransferState(); - Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + Thread.sleep(DEFAULT_SLEEP_MILLIS); runner.run(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); @@ -263,13 +266,18 @@ public class TestListFile { fos.write(bytes1000); fos.close(); + final long now = TimeUnit.MICROSECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS); + assertTrue(file1.setLastModified(now)); + assertTrue(file2.setLastModified(now)); + assertTrue(file3.setLastModified(now)); + // check all files runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); runner.run(); runner.assertTransferCount(ListFile.REL_SUCCESS, 0); runner.clearTransferState(); - Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + Thread.sleep(DEFAULT_SLEEP_MILLIS); runner.run(); @@ -288,7 +296,7 @@ public class TestListFile { runner.assertTransferCount(ListFile.REL_SUCCESS, 0); runner.clearTransferState(); - Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + Thread.sleep(DEFAULT_SLEEP_MILLIS); runner.run(); @@ -306,7 +314,7 @@ public class TestListFile { runner.assertTransferCount(ListFile.REL_SUCCESS, 0); runner.clearTransferState(); - Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + Thread.sleep(DEFAULT_SLEEP_MILLIS); runner.run(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); @@ -323,7 +331,7 @@ public class TestListFile { runner.assertTransferCount(ListFile.REL_SUCCESS, 0); runner.clearTransferState(); - Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + Thread.sleep(DEFAULT_SLEEP_MILLIS); runner.run(); @@ -334,6 +342,8 @@ public class TestListFile { @Test public void testFilterHidden() throws Exception { + final long now = TimeUnit.MICROSECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS); + FileOutputStream fos; final File file1 = new File(TESTDIR + "/hidden1.txt"); @@ -350,6 +360,9 @@ public class TestListFile { Files.setAttribute(file2.toPath(), "dos:hidden", true); } + assertTrue(file1.setLastModified(now)); + assertTrue(file2.setLastModified(now)); + // check all files runner.clearTransferState(); runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); @@ -362,7 +375,7 @@ public class TestListFile { runner.run(); runner.assertTransferCount(ListFile.REL_SUCCESS, 0); - Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + Thread.sleep(DEFAULT_SLEEP_MILLIS); runner.run(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); @@ -375,7 +388,7 @@ public class TestListFile { runner.run(); runner.assertTransferCount(ListFile.REL_SUCCESS, 0); - Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + Thread.sleep(DEFAULT_SLEEP_MILLIS); runner.run(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); @@ -385,17 +398,30 @@ public class TestListFile { @Test public void testFilterFilePattern() throws Exception { + + final long now = TimeUnit.MICROSECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS); + final File file1 = new File(TESTDIR + "/file1-abc-apple.txt"); assertTrue(file1.createNewFile()); + assertTrue(file1.setLastModified(now)); final File file2 = new File(TESTDIR + "/file2-xyz-apple.txt"); assertTrue(file2.createNewFile()); + assertTrue(file2.setLastModified(now)); final File file3 = new File(TESTDIR + "/file3-xyz-banana.txt"); assertTrue(file3.createNewFile()); + assertTrue(file3.setLastModified(now)); final File file4 = new File(TESTDIR + "/file4-pdq-banana.txt"); assertTrue(file4.createNewFile()); + assertTrue(file4.setLastModified(now)); + + System.out.println(file1.lastModified()); + System.out.println(file2.lastModified()); + System.out.println(file3.lastModified()); + System.out.println(file4.lastModified()); + // check all files runner.clearTransferState(); @@ -404,7 +430,7 @@ public class TestListFile { runner.run(); runner.assertTransferCount(ListFile.REL_SUCCESS, 0); - Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + Thread.sleep(DEFAULT_SLEEP_MILLIS); runner.run(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); @@ -417,7 +443,7 @@ public class TestListFile { runner.run(); runner.assertTransferCount(ListFile.REL_SUCCESS, 0); - Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + Thread.sleep(DEFAULT_SLEEP_MILLIS); runner.run(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); @@ -427,6 +453,8 @@ public class TestListFile { @Test public void testFilterPathPattern() throws Exception { + final long now = TimeUnit.MICROSECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS); + final File subdir1 = new File(TESTDIR + "/subdir1"); assertTrue(subdir1.mkdirs()); @@ -435,15 +463,19 @@ public class TestListFile { final File file1 = new File(TESTDIR + "/file1.txt"); assertTrue(file1.createNewFile()); + assertTrue(file1.setLastModified(now)); final File file2 = new File(TESTDIR + "/subdir1/file2.txt"); assertTrue(file2.createNewFile()); + assertTrue(file2.setLastModified(now)); final File file3 = new File(TESTDIR + "/subdir1/subdir2/file3.txt"); assertTrue(file3.createNewFile()); + assertTrue(file3.setLastModified(now)); final File file4 = new File(TESTDIR + "/subdir1/file4.txt"); assertTrue(file4.createNewFile()); + assertTrue(file4.setLastModified(now)); // check all files runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); @@ -452,7 +484,7 @@ public class TestListFile { runner.run(); runner.assertTransferCount(ListFile.REL_SUCCESS, 0); - Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + Thread.sleep(DEFAULT_SLEEP_MILLIS); runner.run(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); @@ -466,7 +498,7 @@ public class TestListFile { runner.run(); runner.assertTransferCount(ListFile.REL_SUCCESS, 0); - Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + Thread.sleep(DEFAULT_SLEEP_MILLIS); runner.run(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); @@ -480,7 +512,7 @@ public class TestListFile { runner.run(); runner.assertTransferCount(ListFile.REL_SUCCESS, 0); - Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + Thread.sleep(DEFAULT_SLEEP_MILLIS); runner.run(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); @@ -490,6 +522,8 @@ public class TestListFile { @Test public void testRecurse() throws Exception { + final long now = TimeUnit.MICROSECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS); + final File subdir1 = new File(TESTDIR + "/subdir1"); assertTrue(subdir1.mkdirs()); @@ -498,12 +532,15 @@ public class TestListFile { final File file1 = new File(TESTDIR + "/file1.txt"); assertTrue(file1.createNewFile()); + assertTrue(file1.setLastModified(now)); final File file2 = new File(TESTDIR + "/subdir1/file2.txt"); assertTrue(file2.createNewFile()); + assertTrue(file2.setLastModified(now)); final File file3 = new File(TESTDIR + "/subdir1/subdir2/file3.txt"); assertTrue(file3.createNewFile()); + assertTrue(file3.setLastModified(now)); // check all files runner.clearTransferState(); @@ -513,7 +550,7 @@ public class TestListFile { runner.assertTransferCount(ListFile.REL_SUCCESS, 0); runner.clearTransferState(); - Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + Thread.sleep(DEFAULT_SLEEP_MILLIS); runner.run(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS, 3); @@ -547,7 +584,7 @@ public class TestListFile { runner.assertTransferCount(ListFile.REL_SUCCESS, 0); runner.clearTransferState(); - Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + Thread.sleep(DEFAULT_SLEEP_MILLIS); runner.run(); @@ -558,14 +595,19 @@ public class TestListFile { @Test public void testReadable() throws Exception { + final long now = TimeUnit.MICROSECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS); + final File file1 = new File(TESTDIR + "/file1.txt"); assertTrue(file1.createNewFile()); + assertTrue(file1.setLastModified(now)); final File file2 = new File(TESTDIR + "/file2.txt"); assertTrue(file2.createNewFile()); + assertTrue(file2.setLastModified(now)); final File file3 = new File(TESTDIR + "/file3.txt"); assertTrue(file3.createNewFile()); + assertTrue(file3.setLastModified(now)); // check all files runner.clearTransferState(); @@ -575,7 +617,7 @@ public class TestListFile { runner.assertTransferCount(ListFile.REL_SUCCESS, 0); runner.clearTransferState(); - Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + Thread.sleep(DEFAULT_SLEEP_MILLIS); runner.run(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); @@ -601,7 +643,7 @@ public class TestListFile { runner.assertTransferCount(ListFile.REL_SUCCESS, 0); runner.clearTransferState(); - Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + Thread.sleep(DEFAULT_SLEEP_MILLIS); runner.run();