NIFI-1664 Preferring System.nanoTime to System.currentTimeMillis and providing explicit handling of timestamps for files in those tests that are testing other attributes of the ListFile process besides timestamp which could lead to erroneous transmissions depending on exactly when files were created.

Adding unsalted_128_raw.enc and salted_128_raw.enc to the list of exclusions for the RAT plugin which caused issues in the Windows environment for contrib-checks.

This closes #297.
This commit is contained in:
Aldrin Piri 2016-03-22 11:10:08 -04:00
parent 736896246c
commit 4babd067c1
4 changed files with 88 additions and 40 deletions

View File

@ -325,7 +325,9 @@ language governing permissions and limitations under the License. -->
<exclude>src/test/resources/TestUnpackContent/data.zip</exclude>
<exclude>src/test/resources/TestEncryptContent/plain.txt</exclude>
<exclude>src/test/resources/TestEncryptContent/salted_raw.enc</exclude>
<exclude>src/test/resources/TestEncryptContent/salted_128_raw.enc</exclude>
<exclude>src/test/resources/TestEncryptContent/unsalted_raw.enc</exclude>
<exclude>src/test/resources/TestEncryptContent/unsalted_128_raw.enc</exclude>
<!-- This file is copied from https://github.com/jeremyh/jBCrypt because the binary is compiled for Java 8 and we must support Java 7 -->
<exclude>src/main/java/org/apache/nifi/processors/standard/util/crypto/bcrypt/BCrypt.java</exclude>
</excludes>

View File

@ -30,6 +30,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
@ -155,7 +156,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
* files according to timestamp, it is ensured that at least the specified millis has been eclipsed to avoid getting scheduled
* near instantaneously after the prior iteration effectively voiding the built in buffer
*/
static final long LISTING_LAG_MILLIS = 100L;
static final long LISTING_LAG_NANOS = TimeUnit.MILLISECONDS.toNanos(100L);
static final String LISTING_TIMESTAMP_KEY = "listing.timestamp";
static final String PROCESSED_TIMESTAMP_KEY = "processed.timestamp";
@ -336,7 +337,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
final List<T> entityList;
try {
// track of when this last executed for consideration of the lag millis
// track of when this last executed for consideration of the lag nanos
entityList = performListing(context, minTimestamp);
} catch (final IOException e) {
getLogger().error("Failed to perform listing on remote host due to {}", e);
@ -380,7 +381,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
* - the latest listing timestamp is If we have not eclipsed the minimal listing lag needed due to being triggered too soon after the last run
* - the latest listing timestamp is equal to the last processed time, meaning we handled those items originally passed over
*/
if (System.currentTimeMillis() - lastRunTime < LISTING_LAG_MILLIS || latestListingTimestamp.equals(lastProcessedTime)) {
if (System.nanoTime() - lastRunTime < LISTING_LAG_NANOS || latestListingTimestamp.equals(lastProcessedTime)) {
context.yield();
return;
}
@ -411,7 +412,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
session.commit();
}
lastRunTime = System.currentTimeMillis();
lastRunTime = System.nanoTime();
if (!latestListingTimestamp.equals(lastListingTime) || processedNewFiles) {
// We have performed a listing and pushed any FlowFiles out that may have been generated

View File

@ -28,6 +28,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.Charsets;
import org.apache.nifi.components.PropertyDescriptor;
@ -50,6 +51,8 @@ import org.junit.rules.TemporaryFolder;
public class TestAbstractListProcessor {
static final long DEFAULT_SLEEP_MILLIS = TimeUnit.NANOSECONDS.toMillis(AbstractListProcessor.LISTING_LAG_NANOS * 2);
@Rule
public final TemporaryFolder testFolder = new TemporaryFolder();
@ -59,7 +62,7 @@ public class TestAbstractListProcessor {
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.run();
final long initialTimestamp = System.currentTimeMillis();
final long initialTimestamp = System.nanoTime();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
proc.addEntity("name", "id", initialTimestamp);
@ -70,7 +73,7 @@ public class TestAbstractListProcessor {
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
// Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
// Run again without introducing any new entries
runner.run();
@ -83,7 +86,7 @@ public class TestAbstractListProcessor {
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.run();
final long initialTimestamp = System.currentTimeMillis();
final long initialTimestamp = System.nanoTime();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
proc.addEntity("name", "id", initialTimestamp);
@ -95,7 +98,7 @@ public class TestAbstractListProcessor {
runner.clearTransferState();
// Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
// Running again, our two previously seen files are now cleared to be released
runner.run();
@ -126,7 +129,7 @@ public class TestAbstractListProcessor {
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState();
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
// But it should now show up that the appropriate pause has been eclipsed
runner.run();
@ -139,7 +142,7 @@ public class TestAbstractListProcessor {
final ConcreteListProcessor proc = new ConcreteListProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
final long initialTimestamp = System.currentTimeMillis();
final long initialTimestamp = System.nanoTime();
proc.addEntity("name", "id", initialTimestamp);
proc.addEntity("name", "id2", initialTimestamp);
@ -158,7 +161,7 @@ public class TestAbstractListProcessor {
runner.clearTransferState();
// Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
// Running again, these files should be eligible for transfer and again skipped
runner.run();
@ -189,7 +192,7 @@ public class TestAbstractListProcessor {
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState();
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
// But it should now show up that the appropriate pause has been eclipsed
runner.run();
@ -217,7 +220,7 @@ public class TestAbstractListProcessor {
expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "0");
runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
// Ensure only timestamp is migrated
@ -301,7 +304,7 @@ public class TestAbstractListProcessor {
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
final long initialEventTimestamp = System.currentTimeMillis();
final long initialEventTimestamp = System.nanoTime();
proc.addEntity("name", "id", initialEventTimestamp);
proc.addEntity("name", "id2", initialEventTimestamp);
@ -310,7 +313,7 @@ public class TestAbstractListProcessor {
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
// after providing a pause in listings, the files should now transfer
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
@ -331,7 +334,7 @@ public class TestAbstractListProcessor {
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState();
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
// Ensure the original files are now transferred again.
runner.run();
@ -359,7 +362,7 @@ public class TestAbstractListProcessor {
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.run();
final long initialTimestamp = System.currentTimeMillis();
final long initialTimestamp = System.nanoTime();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
proc.addEntity("name", "id", initialTimestamp);
@ -369,7 +372,7 @@ public class TestAbstractListProcessor {
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState();
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
@ -399,7 +402,7 @@ public class TestAbstractListProcessor {
// Processed timestamp is lagging behind currently
assertEquals(Long.toString(initialTimestamp), updatedStateMap.get(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY));
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);

View File

@ -33,6 +33,7 @@ import java.text.SimpleDateFormat;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -62,6 +63,8 @@ public class TestListFile {
Long age0millis, age1millis, age2millis, age3millis, age4millis, age5millis;
String age0, age1, age2, age3, age4, age5;
static final long DEFAULT_SLEEP_MILLIS = TimeUnit.NANOSECONDS.toMillis(AbstractListProcessor.LISTING_LAG_NANOS * 2);
@Before
public void setUp() throws Exception {
processor = new ListFile();
@ -116,7 +119,7 @@ public class TestListFile {
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
runner.clearTransferState();
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
@ -134,7 +137,7 @@ public class TestListFile {
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
runner.clearTransferState();
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
@ -164,7 +167,7 @@ public class TestListFile {
runner.clearTransferState();
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertTransferCount(ListFile.REL_SUCCESS, 1);
@ -190,7 +193,7 @@ public class TestListFile {
runner.assertTransferCount(ListFile.REL_SUCCESS, 2);
runner.clearTransferState();
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
@ -207,7 +210,7 @@ public class TestListFile {
assertEquals(1, successFiles2.size());
runner.clearTransferState();
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertTransferCount(ListFile.REL_SUCCESS, 1);
@ -231,7 +234,7 @@ public class TestListFile {
assertEquals(0, successFiles4.size());
runner.clearTransferState();
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
@ -263,13 +266,18 @@ public class TestListFile {
fos.write(bytes1000);
fos.close();
final long now = TimeUnit.MICROSECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
assertTrue(file1.setLastModified(now));
assertTrue(file2.setLastModified(now));
assertTrue(file3.setLastModified(now));
// check all files
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runner.run();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
runner.clearTransferState();
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
@ -288,7 +296,7 @@ public class TestListFile {
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
runner.clearTransferState();
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
@ -306,7 +314,7 @@ public class TestListFile {
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
runner.clearTransferState();
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
@ -323,7 +331,7 @@ public class TestListFile {
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
runner.clearTransferState();
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
@ -334,6 +342,8 @@ public class TestListFile {
@Test
public void testFilterHidden() throws Exception {
final long now = TimeUnit.MICROSECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
FileOutputStream fos;
final File file1 = new File(TESTDIR + "/hidden1.txt");
@ -350,6 +360,9 @@ public class TestListFile {
Files.setAttribute(file2.toPath(), "dos:hidden", true);
}
assertTrue(file1.setLastModified(now));
assertTrue(file2.setLastModified(now));
// check all files
runner.clearTransferState();
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
@ -362,7 +375,7 @@ public class TestListFile {
runner.run();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
@ -375,7 +388,7 @@ public class TestListFile {
runner.run();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
@ -385,17 +398,30 @@ public class TestListFile {
@Test
public void testFilterFilePattern() throws Exception {
final long now = TimeUnit.MICROSECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
final File file1 = new File(TESTDIR + "/file1-abc-apple.txt");
assertTrue(file1.createNewFile());
assertTrue(file1.setLastModified(now));
final File file2 = new File(TESTDIR + "/file2-xyz-apple.txt");
assertTrue(file2.createNewFile());
assertTrue(file2.setLastModified(now));
final File file3 = new File(TESTDIR + "/file3-xyz-banana.txt");
assertTrue(file3.createNewFile());
assertTrue(file3.setLastModified(now));
final File file4 = new File(TESTDIR + "/file4-pdq-banana.txt");
assertTrue(file4.createNewFile());
assertTrue(file4.setLastModified(now));
System.out.println(file1.lastModified());
System.out.println(file2.lastModified());
System.out.println(file3.lastModified());
System.out.println(file4.lastModified());
// check all files
runner.clearTransferState();
@ -404,7 +430,7 @@ public class TestListFile {
runner.run();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
@ -417,7 +443,7 @@ public class TestListFile {
runner.run();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
@ -427,6 +453,8 @@ public class TestListFile {
@Test
public void testFilterPathPattern() throws Exception {
final long now = TimeUnit.MICROSECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
final File subdir1 = new File(TESTDIR + "/subdir1");
assertTrue(subdir1.mkdirs());
@ -435,15 +463,19 @@ public class TestListFile {
final File file1 = new File(TESTDIR + "/file1.txt");
assertTrue(file1.createNewFile());
assertTrue(file1.setLastModified(now));
final File file2 = new File(TESTDIR + "/subdir1/file2.txt");
assertTrue(file2.createNewFile());
assertTrue(file2.setLastModified(now));
final File file3 = new File(TESTDIR + "/subdir1/subdir2/file3.txt");
assertTrue(file3.createNewFile());
assertTrue(file3.setLastModified(now));
final File file4 = new File(TESTDIR + "/subdir1/file4.txt");
assertTrue(file4.createNewFile());
assertTrue(file4.setLastModified(now));
// check all files
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
@ -452,7 +484,7 @@ public class TestListFile {
runner.run();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
@ -466,7 +498,7 @@ public class TestListFile {
runner.run();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
@ -480,7 +512,7 @@ public class TestListFile {
runner.run();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
@ -490,6 +522,8 @@ public class TestListFile {
@Test
public void testRecurse() throws Exception {
final long now = TimeUnit.MICROSECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
final File subdir1 = new File(TESTDIR + "/subdir1");
assertTrue(subdir1.mkdirs());
@ -498,12 +532,15 @@ public class TestListFile {
final File file1 = new File(TESTDIR + "/file1.txt");
assertTrue(file1.createNewFile());
assertTrue(file1.setLastModified(now));
final File file2 = new File(TESTDIR + "/subdir1/file2.txt");
assertTrue(file2.createNewFile());
assertTrue(file2.setLastModified(now));
final File file3 = new File(TESTDIR + "/subdir1/subdir2/file3.txt");
assertTrue(file3.createNewFile());
assertTrue(file3.setLastModified(now));
// check all files
runner.clearTransferState();
@ -513,7 +550,7 @@ public class TestListFile {
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
runner.clearTransferState();
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS, 3);
@ -547,7 +584,7 @@ public class TestListFile {
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
runner.clearTransferState();
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
@ -558,14 +595,19 @@ public class TestListFile {
@Test
public void testReadable() throws Exception {
final long now = TimeUnit.MICROSECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
final File file1 = new File(TESTDIR + "/file1.txt");
assertTrue(file1.createNewFile());
assertTrue(file1.setLastModified(now));
final File file2 = new File(TESTDIR + "/file2.txt");
assertTrue(file2.createNewFile());
assertTrue(file2.setLastModified(now));
final File file3 = new File(TESTDIR + "/file3.txt");
assertTrue(file3.createNewFile());
assertTrue(file3.setLastModified(now));
// check all files
runner.clearTransferState();
@ -575,7 +617,7 @@ public class TestListFile {
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
runner.clearTransferState();
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
@ -601,7 +643,7 @@ public class TestListFile {
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
runner.clearTransferState();
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();