NIFI-3213: ListFile do not skip obviously old files

Before this fix, files with the latest timestamp within a listing
iteration are always be held back one cycle no matter how old it is.

Signed-off-by: Andre F de Miranda <trixpan@users.noreply.github.com>
This commit is contained in:
Koji Kawamura 2016-12-16 17:48:06 +09:00 committed by Andre F de Miranda
parent e4eda188b8
commit 095c04eda0
3 changed files with 110 additions and 226 deletions

View File

@ -336,6 +336,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
} }
final List<T> entityList; final List<T> entityList;
final long currentListingTimestamp = System.nanoTime();
try { try {
// track of when this last executed for consideration of the lag nanos // track of when this last executed for consideration of the lag nanos
entityList = performListing(context, minTimestamp); entityList = performListing(context, minTimestamp);
@ -385,7 +386,8 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
context.yield(); context.yield();
return; return;
} }
} else {
} else if (latestListingTimestamp >= currentListingTimestamp - LISTING_LAG_NANOS) {
// Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data // Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data
orderedEntries.remove(latestListingTimestamp); orderedEntries.remove(latestListingTimestamp);
} }

View File

@ -56,6 +56,30 @@ public class TestAbstractListProcessor {
@Rule @Rule
public final TemporaryFolder testFolder = new TemporaryFolder(); public final TemporaryFolder testFolder = new TemporaryFolder();
@Test
public void testAllExistingEntriesEmittedOnFirstIteration() throws Exception {
final long oldTimestamp = System.nanoTime() - (AbstractListProcessor.LISTING_LAG_NANOS * 2);
// These entries have existed before the processor runs at the first time.
final ConcreteListProcessor proc = new ConcreteListProcessor();
proc.addEntity("name", "id", oldTimestamp);
proc.addEntity("name", "id2", oldTimestamp);
// First run, the above listed entries should be emitted since it has existed.
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
runner.clearTransferState();
// Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again
Thread.sleep(DEFAULT_SLEEP_MILLIS);
// Run again without introducing any new entries
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
}
@Test @Test
public void testPreviouslySkippedEntriesEmittedOnNextIteration() throws Exception { public void testPreviouslySkippedEntriesEmittedOnNextIteration() throws Exception {
final ConcreteListProcessor proc = new ConcreteListProcessor(); final ConcreteListProcessor proc = new ConcreteListProcessor();
@ -71,6 +95,7 @@ public class TestAbstractListProcessor {
// First run, the above listed entries would be skipped to avoid write synchronization issues // First run, the above listed entries would be skipped to avoid write synchronization issues
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState();
// Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again
Thread.sleep(DEFAULT_SLEEP_MILLIS); Thread.sleep(DEFAULT_SLEEP_MILLIS);
@ -124,14 +149,7 @@ public class TestAbstractListProcessor {
// Now a new file beyond the current time enters // Now a new file beyond the current time enters
proc.addEntity("name", "id2", initialTimestamp + 1); proc.addEntity("name", "id2", initialTimestamp + 1);
// Nothing occurs for the first iteration as it is withheld // It should show up
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState();
Thread.sleep(DEFAULT_SLEEP_MILLIS);
// But it should now show up that the appropriate pause has been eclipsed
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
runner.clearTransferState(); runner.clearTransferState();
@ -187,14 +205,7 @@ public class TestAbstractListProcessor {
// Now a new file beyond the current time enters // Now a new file beyond the current time enters
proc.addEntity("name", "id2", initialTimestamp + 1); proc.addEntity("name", "id2", initialTimestamp + 1);
// Nothing occurs for the first iteration as it is withheld // It should now show up
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState();
Thread.sleep(DEFAULT_SLEEP_MILLIS);
// But it should now show up that the appropriate pause has been eclipsed
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
runner.clearTransferState(); runner.clearTransferState();
@ -209,14 +220,14 @@ public class TestAbstractListProcessor {
runner.enableControllerService(cache); runner.enableControllerService(cache);
runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, "cache"); runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, "cache");
runner.run(); final long initialTimestamp = System.nanoTime();
proc.addEntity("name", "id", 1492L); proc.addEntity("name", "id", initialTimestamp);
runner.run(); runner.run();
final Map<String, String> expectedState = new HashMap<>(); final Map<String, String> expectedState = new HashMap<>();
// Ensure only timestamp is migrated // Ensure only timestamp is migrated
expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, "1492"); expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "0"); expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "0");
runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER); runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
@ -224,8 +235,8 @@ public class TestAbstractListProcessor {
runner.run(); runner.run();
// Ensure only timestamp is migrated // Ensure only timestamp is migrated
expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, "1492"); expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "1492"); expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER); runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
} }
@ -328,14 +339,6 @@ public class TestAbstractListProcessor {
runner.getStateManager().clear(Scope.CLUSTER); runner.getStateManager().clear(Scope.CLUSTER);
Assert.assertEquals("State is not empty for this component after clearing", 0, runner.getStateManager().getState(Scope.CLUSTER).toMap().size()); Assert.assertEquals("State is not empty for this component after clearing", 0, runner.getStateManager().getState(Scope.CLUSTER).toMap().size());
// As before, we are unsure of when these files were delivered relative to system time, and additional cycle(s) need to occur before transfer
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState();
Thread.sleep(DEFAULT_SLEEP_MILLIS);
// Ensure the original files are now transferred again. // Ensure the original files are now transferred again.
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
@ -390,27 +393,12 @@ public class TestAbstractListProcessor {
proc.addEntity("new name", "new id", initialTimestamp + 1); proc.addEntity("new name", "new id", initialTimestamp + 1);
runner.run(); runner.run();
// Verify that the new entry has not been emitted but it has triggered an updated state runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState(); runner.clearTransferState();
StateMap updatedStateMap = runner.getStateManager().getState(Scope.CLUSTER); StateMap updatedStateMap = runner.getStateManager().getState(Scope.CLUSTER);
assertEquals(3, updatedStateMap.getVersion()); assertEquals(3, updatedStateMap.getVersion());
assertEquals(2, updatedStateMap.toMap().size());
assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LISTING_TIMESTAMP_KEY));
// Processed timestamp is lagging behind currently
assertEquals(Long.toString(initialTimestamp), updatedStateMap.get(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY));
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
runner.clearTransferState();
updatedStateMap = runner.getStateManager().getState(Scope.CLUSTER);
assertEquals(4, updatedStateMap.getVersion());
assertEquals(2, updatedStateMap.toMap().size()); assertEquals(2, updatedStateMap.toMap().size());
assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LISTING_TIMESTAMP_KEY)); assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LISTING_TIMESTAMP_KEY));
// Processed timestamp is now caught up // Processed timestamp is now caught up

View File

@ -34,6 +34,7 @@ import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -89,6 +90,15 @@ public class TestListFile {
} }
} }
/**
* This method ensures runner clears transfer state,
* and sleeps the current thread for DEFAULT_SLEEP_MILLIS before executing runner.run().
*/
private void runNext() throws InterruptedException {
runner.clearTransferState();
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
}
@Test @Test
public void testGetRelationships() throws Exception { public void testGetRelationships() throws Exception {
@ -107,21 +117,18 @@ public class TestListFile {
@Test @Test
public void testPerformListing() throws Exception { public void testPerformListing() throws Exception {
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runNext();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
// create first file // create first file
final File file1 = new File(TESTDIR + "/listing1.txt"); final File file1 = new File(TESTDIR + "/listing1.txt");
assertTrue(file1.createNewFile()); assertTrue(file1.createNewFile());
assertTrue(file1.setLastModified(time4millis)); assertTrue(file1.setLastModified(time4millis));
// process first file and set new timestamp // process first file and set new timestamp
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); runNext();
runner.run();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
runner.clearTransferState();
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(1, successFiles1.size()); assertEquals(1, successFiles1.size());
@ -132,14 +139,7 @@ public class TestListFile {
assertTrue(file2.setLastModified(time2millis)); assertTrue(file2.setLastModified(time2millis));
// process second file after timestamp // process second file after timestamp
runner.clearTransferState(); runNext();
runner.run();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
runner.clearTransferState();
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(1, successFiles2.size()); assertEquals(1, successFiles2.size());
@ -150,95 +150,88 @@ public class TestListFile {
assertTrue(file3.setLastModified(time4millis)); assertTrue(file3.setLastModified(time4millis));
// process third file before timestamp // process third file before timestamp
runner.clearTransferState(); runNext();
runner.run();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(0, successFiles3.size()); assertEquals(0, successFiles3.size());
// force state to reset and process all files // force state to reset and process all files
runner.clearTransferState();
runner.removeProperty(ListFile.DIRECTORY); runner.removeProperty(ListFile.DIRECTORY);
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runner.run(); runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(2, successFiles4.size()); assertEquals(3, successFiles4.size());
runner.clearTransferState(); runNext();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertTransferCount(ListFile.REL_SUCCESS, 1);
} }
@Test @Test
public void testFilterAge() throws Exception { public void testFilterAge() throws Exception {
final File file1 = new File(TESTDIR + "/age1.txt"); final File file1 = new File(TESTDIR + "/age1.txt");
assertTrue(file1.createNewFile()); assertTrue(file1.createNewFile());
assertTrue(file1.setLastModified(time0millis));
final File file2 = new File(TESTDIR + "/age2.txt"); final File file2 = new File(TESTDIR + "/age2.txt");
assertTrue(file2.createNewFile()); assertTrue(file2.createNewFile());
assertTrue(file2.setLastModified(time2millis));
final File file3 = new File(TESTDIR + "/age3.txt"); final File file3 = new File(TESTDIR + "/age3.txt");
assertTrue(file3.createNewFile()); assertTrue(file3.createNewFile());
assertTrue(file3.setLastModified(time4millis));
final Function<Boolean, Object> runNext = resetAges -> {
if (resetAges) {
resetAges();
assertTrue(file1.setLastModified(time0millis));
assertTrue(file2.setLastModified(time2millis));
assertTrue(file3.setLastModified(time4millis));
}
try {
runNext();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return null;
};
// check all files // check all files
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runner.run(); runNext.apply(true);
runner.assertTransferCount(ListFile.REL_SUCCESS, 2); runner.assertTransferCount(ListFile.REL_SUCCESS, 3);
runner.clearTransferState();
Thread.sleep(DEFAULT_SLEEP_MILLIS); // processor updates internal state, it shouldn't pick the same ones.
runNext.apply(false);
runner.run(); runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(1, successFiles1.size());
// exclude oldest // exclude oldest
runner.clearTransferState();
runner.setProperty(ListFile.MIN_AGE, age0); runner.setProperty(ListFile.MIN_AGE, age0);
runner.setProperty(ListFile.MAX_AGE, age3); runner.setProperty(ListFile.MAX_AGE, age3);
runner.run(); runNext.apply(true);
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(1, successFiles2.size()); assertEquals(2, successFiles2.size());
runner.clearTransferState(); assertEquals(file2.getName(), successFiles2.get(0).getAttribute("filename"));
assertEquals(file1.getName(), successFiles2.get(1).getAttribute("filename"));
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertTransferCount(ListFile.REL_SUCCESS, 1);
// exclude newest // exclude newest
runner.clearTransferState();
runner.setProperty(ListFile.MIN_AGE, age1); runner.setProperty(ListFile.MIN_AGE, age1);
runner.setProperty(ListFile.MAX_AGE, age5); runner.setProperty(ListFile.MAX_AGE, age5);
runner.run(); runNext.apply(true);
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(1, successFiles3.size()); assertEquals(2, successFiles3.size());
assertEquals(file3.getName(), successFiles3.get(0).getAttribute("filename"));
assertEquals(file2.getName(), successFiles3.get(1).getAttribute("filename"));
// exclude oldest and newest // exclude oldest and newest
runner.clearTransferState();
runner.setProperty(ListFile.MIN_AGE, age1); runner.setProperty(ListFile.MIN_AGE, age1);
runner.setProperty(ListFile.MAX_AGE, age3); runner.setProperty(ListFile.MAX_AGE, age3);
runner.run(); runNext.apply(true);
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(0, successFiles4.size()); assertEquals(1, successFiles4.size());
runner.clearTransferState(); assertEquals(file2.getName(), successFiles4.get(0).getAttribute("filename"));
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
runner.assertTransferCount(ListFile.REL_SUCCESS, 1);
} }
@Test @Test
@ -273,68 +266,37 @@ public class TestListFile {
// check all files // check all files
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runner.run(); runNext();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
runner.clearTransferState();
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(3, successFiles1.size()); assertEquals(3, successFiles1.size());
// exclude largest // exclude largest
runner.clearTransferState();
runner.removeProperty(ListFile.MIN_AGE); runner.removeProperty(ListFile.MIN_AGE);
runner.removeProperty(ListFile.MAX_AGE); runner.removeProperty(ListFile.MAX_AGE);
runner.setProperty(ListFile.MIN_SIZE, "0 b"); runner.setProperty(ListFile.MIN_SIZE, "0 b");
runner.setProperty(ListFile.MAX_SIZE, "7500 b"); runner.setProperty(ListFile.MAX_SIZE, "7500 b");
runner.run(); runNext();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
runner.clearTransferState();
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(2, successFiles2.size()); assertEquals(2, successFiles2.size());
// exclude smallest // exclude smallest
runner.clearTransferState();
runner.removeProperty(ListFile.MIN_AGE); runner.removeProperty(ListFile.MIN_AGE);
runner.removeProperty(ListFile.MAX_AGE); runner.removeProperty(ListFile.MAX_AGE);
runner.setProperty(ListFile.MIN_SIZE, "2500 b"); runner.setProperty(ListFile.MIN_SIZE, "2500 b");
runner.removeProperty(ListFile.MAX_SIZE); runner.removeProperty(ListFile.MAX_SIZE);
runner.run(); runNext();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
runner.clearTransferState();
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(2, successFiles3.size()); assertEquals(2, successFiles3.size());
// exclude oldest and newest // exclude oldest and newest
runner.clearTransferState();
runner.removeProperty(ListFile.MIN_AGE); runner.removeProperty(ListFile.MIN_AGE);
runner.removeProperty(ListFile.MAX_AGE); runner.removeProperty(ListFile.MAX_AGE);
runner.setProperty(ListFile.MIN_SIZE, "2500 b"); runner.setProperty(ListFile.MIN_SIZE, "2500 b");
runner.setProperty(ListFile.MAX_SIZE, "7500 b"); runner.setProperty(ListFile.MAX_SIZE, "7500 b");
runner.run(); runNext();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
runner.clearTransferState();
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(1, successFiles4.size()); assertEquals(1, successFiles4.size());
@ -364,7 +326,6 @@ public class TestListFile {
assertTrue(file2.setLastModified(now)); assertTrue(file2.setLastModified(now));
// check all files // check all files
runner.clearTransferState();
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runner.setProperty(ListFile.FILE_FILTER, ".*"); runner.setProperty(ListFile.FILE_FILTER, ".*");
runner.removeProperty(ListFile.MIN_AGE); runner.removeProperty(ListFile.MIN_AGE);
@ -372,25 +333,14 @@ public class TestListFile {
runner.removeProperty(ListFile.MIN_SIZE); runner.removeProperty(ListFile.MIN_SIZE);
runner.removeProperty(ListFile.MAX_SIZE); runner.removeProperty(ListFile.MAX_SIZE);
runner.setProperty(ListFile.IGNORE_HIDDEN_FILES, "false"); runner.setProperty(ListFile.IGNORE_HIDDEN_FILES, "false");
runner.run(); runNext();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(2, successFiles1.size()); assertEquals(2, successFiles1.size());
// exclude hidden // exclude hidden
runner.clearTransferState();
runner.setProperty(ListFile.IGNORE_HIDDEN_FILES, "true"); runner.setProperty(ListFile.IGNORE_HIDDEN_FILES, "true");
runner.run(); runNext();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(1, successFiles2.size()); assertEquals(1, successFiles2.size());
@ -398,6 +348,7 @@ public class TestListFile {
@Test @Test
public void testFilterFilePattern() throws Exception { public void testFilterFilePattern() throws Exception {
final long now = getTestModifiedTime(); final long now = getTestModifiedTime();
final File file1 = new File(TESTDIR + "/file1-abc-apple.txt"); final File file1 = new File(TESTDIR + "/file1-abc-apple.txt");
@ -417,31 +368,21 @@ public class TestListFile {
assertTrue(file4.setLastModified(now)); assertTrue(file4.setLastModified(now));
// check all files // check all files
runner.clearTransferState();
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runner.setProperty(ListFile.FILE_FILTER, ListFile.FILE_FILTER.getDefaultValue()); runner.setProperty(ListFile.FILE_FILTER, ListFile.FILE_FILTER.getDefaultValue());
runner.run(); runNext();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(4, successFiles1.size()); assertEquals(4, successFiles1.size());
// filter file on pattern // filter file on pattern
runner.clearTransferState(); // Modifying FILE_FILTER property reset listing status, so these files will be listed again.
runner.setProperty(ListFile.FILE_FILTER, ".*-xyz-.*"); runner.setProperty(ListFile.FILE_FILTER, ".*-xyz-.*");
runner.run(); runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS, 2);
runNext();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0); runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(2, successFiles2.size());
} }
@Test @Test
@ -474,40 +415,24 @@ public class TestListFile {
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runner.setProperty(ListFile.FILE_FILTER, ListFile.FILE_FILTER.getDefaultValue()); runner.setProperty(ListFile.FILE_FILTER, ListFile.FILE_FILTER.getDefaultValue());
runner.setProperty(ListFile.RECURSE, "true"); runner.setProperty(ListFile.RECURSE, "true");
runner.run(); runNext();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(4, successFiles1.size()); assertEquals(4, successFiles1.size());
// filter path on pattern subdir1 // filter path on pattern subdir1
runner.clearTransferState();
runner.setProperty(ListFile.PATH_FILTER, "subdir1"); runner.setProperty(ListFile.PATH_FILTER, "subdir1");
runner.setProperty(ListFile.RECURSE, "true"); runner.setProperty(ListFile.RECURSE, "true");
runner.run(); runNext();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(3, successFiles2.size()); assertEquals(3, successFiles2.size());
// filter path on pattern subdir2 // filter path on pattern subdir2
runner.clearTransferState();
runner.setProperty(ListFile.PATH_FILTER, "subdir2"); runner.setProperty(ListFile.PATH_FILTER, "subdir2");
runner.setProperty(ListFile.RECURSE, "true"); runner.setProperty(ListFile.RECURSE, "true");
runner.run(); runNext();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(1, successFiles3.size()); assertEquals(1, successFiles3.size());
@ -536,16 +461,9 @@ public class TestListFile {
assertTrue(file3.setLastModified(now)); assertTrue(file3.setLastModified(now));
// check all files // check all files
runner.clearTransferState();
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runner.setProperty(ListFile.RECURSE, "true"); runner.setProperty(ListFile.RECURSE, "true");
runner.run(); runNext();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
runner.clearTransferState();
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS, 3); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS, 3);
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
for (final MockFlowFile mff : successFiles1) { for (final MockFlowFile mff : successFiles1) {
@ -570,17 +488,8 @@ public class TestListFile {
assertEquals(3, successFiles1.size()); assertEquals(3, successFiles1.size());
// exclude hidden // exclude hidden
runner.clearTransferState();
runner.setProperty(ListFile.RECURSE, "false"); runner.setProperty(ListFile.RECURSE, "false");
runner.run(); runNext();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
runner.clearTransferState();
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(1, successFiles2.size()); assertEquals(1, successFiles2.size());
@ -603,16 +512,9 @@ public class TestListFile {
assertTrue(file3.setLastModified(now)); assertTrue(file3.setLastModified(now));
// check all files // check all files
runner.clearTransferState();
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runner.setProperty(ListFile.RECURSE, "true"); runner.setProperty(ListFile.RECURSE, "true");
runner.run(); runNext();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
runner.clearTransferState();
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
runner.assertTransferCount(ListFile.REL_SUCCESS, 3); runner.assertTransferCount(ListFile.REL_SUCCESS, 3);
} }
@ -630,16 +532,8 @@ public class TestListFile {
String userName = System.getProperty("user.name"); String userName = System.getProperty("user.name");
// validate the file transferred // validate the file transferred
runner.clearTransferState();
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runner.run(); runNext();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
runner.clearTransferState();
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(1, successFiles1.size()); assertEquals(1, successFiles1.size());