From 28ee70222b892fb799f5f74a31a9de678d9fb629 Mon Sep 17 00:00:00 2001
From: Koji Kawamura
Date: Wed, 14 Jun 2017 15:21:01 +0900
Subject: [PATCH] NIFI-4069: Make ListXXX work with timestamp precision in
seconds or minutes
- Refactored variable names to better represents what those are meant for.
- Added deterministic logic which detects target filesystem timestamp precision and adjust lag time based on it.
- Changed from using System.nanoTime() to System.currentTimeMillis in test because Java File API reports timestamp in milliseconds at the best granularity. Also, System.nanoTime should not be used in mix with epoch milliseconds because it uses arbitrary origin and measured differently.
- Changed TestListFile to use more longer interval between file timestamps those are used by testFilterAge to provide more consistent test result because sleep time can be longer with filesystems whose timestamp in seconds precision.
- Added logging at TestListFile.
- Added TestWatcher to dump state in case assertion fails for further investigation.
- Added Timestamp Precision property so that user can set if auto-detect is not enough
- Adjust timestamps for ages test
This closes #1915.
Signed-off-by: Bryan Bende
---
.../nifi-processor-utils/pom.xml | 2 +-
.../util/list/AbstractListProcessor.java | 161 ++++++++++-----
.../util/list/ListProcessorTestWatcher.java | 128 ++++++++++++
.../processor/util/list/ListableEntity.java | 2 +-
.../util/list/TestAbstractListProcessor.java | 192 ++++++++++++------
.../nifi/processors/standard/ListFTP.java | 1 +
.../nifi/processors/standard/ListFile.java | 1 +
.../nifi/processors/standard/ListSFTP.java | 1 +
.../processors/standard/TestListFile.java | 105 +++++++---
9 files changed, 463 insertions(+), 130 deletions(-)
create mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListProcessorTestWatcher.java
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
index 3f5e60cfd8..dd38e100aa 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
@@ -70,7 +70,7 @@
junit
junit
- test
+ compile
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
index 2666e2c084..8d93a65ffe 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -37,6 +38,7 @@ import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
+import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
@@ -138,14 +140,42 @@ public abstract class AbstractListProcessor extends Ab
.identifiesControllerService(DistributedMapCacheClient.class)
.build();
+ public static final AllowableValue PRECISION_AUTO_DETECT = new AllowableValue("auto-detect", "Auto Detect",
+ "Automatically detect time unit deterministically based on candidate entries timestamp."
+ + " Please note that this option may take longer to list entities unnecessarily, if none of entries has a precise precision timestamp."
+ + " E.g. even if a target system supports millis, if all entries only have timestamps without millis, such as '2017-06-16 09:06:34.000', then its precision is determined as 'seconds'.");
+ public static final AllowableValue PRECISION_MILLIS = new AllowableValue("millis", "Milliseconds",
+ "This option provides the minimum latency for an entry from being available to being listed if target system supports millis, if not, use other options.");
+ public static final AllowableValue PRECISION_SECONDS = new AllowableValue("seconds", "Seconds","For a target system that does not have millis precision, but has in seconds.");
+ public static final AllowableValue PRECISION_MINUTES = new AllowableValue("minutes", "Minutes", "For a target system that only supports precision in minutes.");
+
+ public static final PropertyDescriptor TARGET_SYSTEM_TIMESTAMP_PRECISION = new PropertyDescriptor.Builder()
+ .name("target-system-timestamp-precision")
+ .displayName("Target System Timestamp Precision")
+ .description("Specify timestamp precision at the target system."
+ + " Since this processor uses timestamp of entities to decide which should be listed, it is crucial to use the right timestamp precision.")
+ .required(true)
+ .allowableValues(PRECISION_AUTO_DETECT, PRECISION_MILLIS, PRECISION_SECONDS, PRECISION_MINUTES)
+ .defaultValue(PRECISION_AUTO_DETECT.getValue())
+ .build();
+
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All FlowFiles that are received are routed to success")
.build();
- private volatile Long lastListingTime = null;
- private volatile Long lastProcessedTime = 0L;
- private volatile Long lastRunTime = 0L;
+ /**
+ * Represents the timestamp of an entity which was the latest one within those listed at the previous cycle.
+ * It does not necessary mean it has been processed as well.
+ * Whether it was processed or not depends on target system time precision and how old the entity timestamp was.
+ */
+ private volatile Long lastListedLatestEntryTimestampMillis = null;
+ /**
+ * Represents the timestamp of an entity which was the latest one
+ * within those picked up and written to the output relationship at the previous cycle.
+ */
+ private volatile Long lastProcessedLatestEntryTimestampMillis = 0L;
+ private volatile Long lastRunTimeNanos = 0L;
private volatile boolean justElectedPrimaryNode = false;
private volatile boolean resetState = false;
@@ -154,9 +184,16 @@ public abstract class AbstractListProcessor extends Ab
* files according to timestamp, it is ensured that at least the specified millis has been eclipsed to avoid getting scheduled
* near instantaneously after the prior iteration effectively voiding the built in buffer
*/
- public static final long LISTING_LAG_NANOS = TimeUnit.MILLISECONDS.toNanos(100L);
- static final String LISTING_TIMESTAMP_KEY = "listing.timestamp";
- static final String PROCESSED_TIMESTAMP_KEY = "processed.timestamp";
+ public static final Map LISTING_LAG_MILLIS;
+ static {
+ final Map nanos = new HashMap<>();
+ nanos.put(TimeUnit.MILLISECONDS, 100L);
+ nanos.put(TimeUnit.SECONDS, 1_000L);
+ nanos.put(TimeUnit.MINUTES, 60_000L);
+ LISTING_LAG_MILLIS = Collections.unmodifiableMap(nanos);
+ }
+ static final String LATEST_LISTED_ENTRY_TIMESTAMP_KEY = "listing.timestamp";
+ static final String LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY = "processed.timestamp";
public File getPersistenceFile() {
return new File("conf/state/" + getIdentifier());
@@ -166,6 +203,7 @@ public abstract class AbstractListProcessor extends Ab
protected List getSupportedPropertyDescriptors() {
final List properties = new ArrayList<>();
properties.add(DISTRIBUTED_CACHE_SERVICE);
+ properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION);
return properties;
}
@@ -208,7 +246,7 @@ public abstract class AbstractListProcessor extends Ab
}
// When scheduled to run, check if the associated timestamp is null, signifying a clearing of state and reset the internal timestamp
- if (lastListingTime != null && stateMap.get(LISTING_TIMESTAMP_KEY) == null) {
+ if (lastListedLatestEntryTimestampMillis != null && stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY) == null) {
getLogger().info("Detected that state was cleared for this component. Resetting internal values.");
resetTimeStates();
}
@@ -283,10 +321,12 @@ public abstract class AbstractListProcessor extends Ab
}
}
- private void persist(final long listingTimestamp, final long processedTimestamp, final StateManager stateManager, final Scope scope) throws IOException {
+ private void persist(final long latestListedEntryTimestampThisCycleMillis,
+ final long lastProcessedLatestEntryTimestampMillis,
+ final StateManager stateManager, final Scope scope) throws IOException {
final Map updatedState = new HashMap<>(1);
- updatedState.put(LISTING_TIMESTAMP_KEY, String.valueOf(listingTimestamp));
- updatedState.put(PROCESSED_TIMESTAMP_KEY, String.valueOf(processedTimestamp));
+ updatedState.put(LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(latestListedEntryTimestampThisCycleMillis));
+ updatedState.put(LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(lastProcessedLatestEntryTimestampMillis));
stateManager.setState(updatedState, scope);
}
@@ -303,26 +343,26 @@ public abstract class AbstractListProcessor extends Ab
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
- Long minTimestamp = lastListingTime;
+ Long minTimestampToListMillis = lastListedLatestEntryTimestampMillis;
- if (this.lastListingTime == null || this.lastProcessedTime == null || justElectedPrimaryNode) {
+ if (this.lastListedLatestEntryTimestampMillis == null || this.lastProcessedLatestEntryTimestampMillis == null || justElectedPrimaryNode) {
try {
// Attempt to retrieve state from the state manager if a last listing was not yet established or
// if just elected the primary node
final StateMap stateMap = context.getStateManager().getState(getStateScope(context));
- final String listingTimestampString = stateMap.get(LISTING_TIMESTAMP_KEY);
- final String lastProcessedString= stateMap.get(PROCESSED_TIMESTAMP_KEY);
- if (lastProcessedString != null) {
- this.lastProcessedTime = Long.parseLong(lastProcessedString);
+ final String latestListedEntryTimestampString = stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY);
+ final String lastProcessedLatestEntryTimestampString= stateMap.get(LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY);
+ if (lastProcessedLatestEntryTimestampString != null) {
+ this.lastProcessedLatestEntryTimestampMillis = Long.parseLong(lastProcessedLatestEntryTimestampString);
}
- if (listingTimestampString != null) {
- minTimestamp = Long.parseLong(listingTimestampString);
+ if (latestListedEntryTimestampString != null) {
+ minTimestampToListMillis = Long.parseLong(latestListedEntryTimestampString);
// If our determined timestamp is the same as that of our last listing, skip this execution as there are no updates
- if (minTimestamp == this.lastListingTime) {
+ if (minTimestampToListMillis == this.lastListedLatestEntryTimestampMillis) {
context.yield();
return;
} else {
- this.lastListingTime = minTimestamp;
+ this.lastListedLatestEntryTimestampMillis = minTimestampToListMillis;
}
}
justElectedPrimaryNode = false;
@@ -334,10 +374,11 @@ public abstract class AbstractListProcessor extends Ab
}
final List entityList;
- final long currentListingTimestamp = System.nanoTime();
+ final long currentRunTimeNanos = System.nanoTime();
+ final long currentRunTimeMillis = System.currentTimeMillis();
try {
// track of when this last executed for consideration of the lag nanos
- entityList = performListing(context, minTimestamp);
+ entityList = performListing(context, minTimestampToListMillis);
} catch (final IOException e) {
getLogger().error("Failed to perform listing on remote host due to {}", e);
context.yield();
@@ -349,14 +390,22 @@ public abstract class AbstractListProcessor extends Ab
return;
}
- Long latestListingTimestamp = null;
+ Long latestListedEntryTimestampThisCycleMillis = null;
final TreeMap> orderedEntries = new TreeMap<>();
// Build a sorted map to determine the latest possible entries
+ boolean targetSystemHasMilliseconds = false;
+ boolean targetSystemHasSeconds = false;
for (final T entity : entityList) {
- final long entityTimestamp = entity.getTimestamp();
+ final long entityTimestampMillis = entity.getTimestamp();
+ if (!targetSystemHasMilliseconds) {
+ targetSystemHasMilliseconds = entityTimestampMillis % 1000 > 0;
+ }
+ if (!targetSystemHasSeconds) {
+ targetSystemHasSeconds = entityTimestampMillis % 60_000 > 0;
+ }
// New entries are all those that occur at or after the associated timestamp
- final boolean newEntry = minTimestamp == null || entityTimestamp >= minTimestamp && entityTimestamp > lastProcessedTime;
+ final boolean newEntry = minTimestampToListMillis == null || entityTimestampMillis >= minTimestampToListMillis && entityTimestampMillis > lastProcessedLatestEntryTimestampMillis;
if (newEntry) {
List entitiesForTimestamp = orderedEntries.get(entity.getTimestamp());
@@ -371,25 +420,43 @@ public abstract class AbstractListProcessor extends Ab
int flowfilesCreated = 0;
if (orderedEntries.size() > 0) {
- latestListingTimestamp = orderedEntries.lastKey();
+ latestListedEntryTimestampThisCycleMillis = orderedEntries.lastKey();
+
+ // Determine target system time precision.
+ final String specifiedPrecision = context.getProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION).getValue();
+ final TimeUnit targetSystemTimePrecision
+ = PRECISION_AUTO_DETECT.getValue().equals(specifiedPrecision)
+ ? targetSystemHasMilliseconds ? TimeUnit.MILLISECONDS : targetSystemHasSeconds ? TimeUnit.SECONDS : TimeUnit.MINUTES
+ : PRECISION_MILLIS.getValue().equals(specifiedPrecision) ? TimeUnit.MILLISECONDS
+ : PRECISION_SECONDS.getValue().equals(specifiedPrecision) ? TimeUnit.SECONDS : TimeUnit.MINUTES;
+ final Long listingLagMillis = LISTING_LAG_MILLIS.get(targetSystemTimePrecision);
// If the last listing time is equal to the newest entries previously seen,
// another iteration has occurred without new files and special handling is needed to avoid starvation
- if (latestListingTimestamp.equals(lastListingTime)) {
- /* We are done when either:
- * - the latest listing timestamp is If we have not eclipsed the minimal listing lag needed due to being triggered too soon after the last run
- * - the latest listing timestamp is equal to the last processed time, meaning we handled those items originally passed over
+ if (latestListedEntryTimestampThisCycleMillis.equals(lastListedLatestEntryTimestampMillis)) {
+ /* We need to wait for another cycle when either:
+ * - If we have not eclipsed the minimal listing lag needed due to being triggered too soon after the last run
+ * - The latest listed entity timestamp is equal to the last processed time, meaning we handled those items originally passed over. No need to process it again.
*/
- if (System.nanoTime() - lastRunTime < LISTING_LAG_NANOS || latestListingTimestamp.equals(lastProcessedTime)) {
+ final long listingLagNanos = TimeUnit.MILLISECONDS.toNanos(listingLagMillis);
+ if (currentRunTimeNanos - lastRunTimeNanos < listingLagNanos || latestListedEntryTimestampThisCycleMillis.equals(lastProcessedLatestEntryTimestampMillis)) {
context.yield();
return;
}
- } else if (latestListingTimestamp >= currentListingTimestamp - LISTING_LAG_NANOS) {
- // Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data
- orderedEntries.remove(latestListingTimestamp);
+ } else {
+ // Convert minimum reliable timestamp into target system time unit, in order to truncate unreliable digits.
+ final long minimumReliableTimestampInFilesystemTimeUnit = targetSystemTimePrecision.convert(currentRunTimeMillis - listingLagMillis, TimeUnit.MILLISECONDS);
+ final long minimumReliableTimestampMillis = targetSystemTimePrecision.toMillis(minimumReliableTimestampInFilesystemTimeUnit);
+ // If the latest listed entity is not old enough, compared with the minimum timestamp, then wait for another cycle.
+ // The minimum timestamp should be reliable to determine that no further entries will be added with the same timestamp based on the target system time precision.
+ if (minimumReliableTimestampMillis < latestListedEntryTimestampThisCycleMillis) {
+ // Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data
+ orderedEntries.remove(latestListedEntryTimestampThisCycleMillis);
+ }
}
+
for (List timestampEntities : orderedEntries.values()) {
for (T entity : timestampEntities) {
// Create the FlowFile for this path.
@@ -403,18 +470,20 @@ public abstract class AbstractListProcessor extends Ab
}
// As long as we have a listing timestamp, there is meaningful state to capture regardless of any outputs generated
- if (latestListingTimestamp != null) {
+ if (latestListedEntryTimestampThisCycleMillis != null) {
boolean processedNewFiles = flowfilesCreated > 0;
if (processedNewFiles) {
- // If there have been files created, update the last timestamp we processed
- lastProcessedTime = orderedEntries.lastKey();
+ // If there have been files created, update the last timestamp we processed.
+ // Retrieving lastKey instead of using latestListedEntryTimestampThisCycleMillis is intentional here,
+ // because latestListedEntryTimestampThisCycleMillis might be removed if it's not old enough.
+ lastProcessedLatestEntryTimestampMillis = orderedEntries.lastKey();
getLogger().info("Successfully created listing with {} new objects", new Object[]{flowfilesCreated});
session.commit();
}
- lastRunTime = System.nanoTime();
+ lastRunTimeNanos = currentRunTimeNanos;
- if (!latestListingTimestamp.equals(lastListingTime) || processedNewFiles) {
+ if (!latestListedEntryTimestampThisCycleMillis.equals(lastListedLatestEntryTimestampMillis) || processedNewFiles) {
// We have performed a listing and pushed any FlowFiles out that may have been generated
// Now, we need to persist state about the Last Modified timestamp of the newest file
// that we evaluated. We do this in order to avoid pulling in the same file twice.
@@ -424,8 +493,8 @@ public abstract class AbstractListProcessor extends Ab
// We also store the state locally so that if the node is restarted, and the node cannot contact
// the distributed state cache, the node can continue to run (if it is primary node).
try {
- lastListingTime = latestListingTimestamp;
- persist(latestListingTimestamp, lastProcessedTime, context.getStateManager(), getStateScope(context));
+ lastListedLatestEntryTimestampMillis = latestListedEntryTimestampThisCycleMillis;
+ persist(latestListedEntryTimestampThisCycleMillis, lastProcessedLatestEntryTimestampMillis, context.getStateManager(), getStateScope(context));
} catch (final IOException ioe) {
getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or "
+ "if another node begins executing this Processor, data duplication may occur.", ioe);
@@ -437,8 +506,8 @@ public abstract class AbstractListProcessor extends Ab
context.yield();
// lastListingTime = 0 so that we don't continually poll the distributed cache / local file system
- if (lastListingTime == null) {
- lastListingTime = 0L;
+ if (lastListedLatestEntryTimestampMillis == null) {
+ lastListedLatestEntryTimestampMillis = 0L;
}
return;
@@ -446,9 +515,9 @@ public abstract class AbstractListProcessor extends Ab
}
private void resetTimeStates() {
- lastListingTime = null;
- lastProcessedTime = 0L;
- lastRunTime = 0L;
+ lastListedLatestEntryTimestampMillis = null;
+ lastProcessedLatestEntryTimestampMillis = 0L;
+ lastRunTimeNanos = 0L;
}
/**
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListProcessorTestWatcher.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListProcessorTestWatcher.java
new file mode 100644
index 0000000000..dcb53e0f42
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListProcessorTestWatcher.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.list;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.SimpleDateFormat;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * This class provides a way to dump list-able entities, processor state and transferred FlowFiles into 'success' relationship,
+ * which is useful to debug test issues especially at automation test environment such as Travis that is difficult to debug.
+ */
+public class ListProcessorTestWatcher extends TestWatcher {
+
+ private static final Logger logger = LoggerFactory.getLogger(ListProcessorTestWatcher.class);
+ private static final Consumer logStateDump = logger::info;
+
+ @FunctionalInterface
+ public interface Provider {
+ T provide();
+ }
+
+ private final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
+ private final Provider
+ */
@Test
public void testAllExistingEntriesEmittedOnFirstIteration() throws Exception {
- final long oldTimestamp = System.nanoTime() - (AbstractListProcessor.LISTING_LAG_NANOS * 2);
+ final long oldTimestamp = System.currentTimeMillis() - getSleepMillis(TimeUnit.MILLISECONDS);
// These entries have existed before the processor runs at the first time.
- final ConcreteListProcessor proc = new ConcreteListProcessor();
proc.addEntity("name", "id", oldTimestamp);
proc.addEntity("name", "id2", oldTimestamp);
// First run, the above listed entries should be emitted since it has existed.
- final TestRunner runner = TestRunners.newTestRunner(proc);
-
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
runner.clearTransferState();
@@ -83,13 +125,10 @@ public class TestAbstractListProcessor {
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
}
- @Test
- public void testPreviouslySkippedEntriesEmittedOnNextIteration() throws Exception {
- final ConcreteListProcessor proc = new ConcreteListProcessor();
- final TestRunner runner = TestRunners.newTestRunner(proc);
+ private void testPreviouslySkippedEntriesEmmitedOnNextIteration(final TimeUnit targetPrecision) throws InterruptedException {
runner.run();
- final long initialTimestamp = System.nanoTime();
+ final long initialTimestamp = getCurrentTimestampMillis(targetPrecision);
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
proc.addEntity("name", "id", initialTimestamp);
@@ -101,20 +140,47 @@ public class TestAbstractListProcessor {
runner.clearTransferState();
// Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again
- Thread.sleep(DEFAULT_SLEEP_MILLIS);
+ Thread.sleep(getSleepMillis(targetPrecision));
// Run again without introducing any new entries
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
}
+ /**
+ * Ensures that newly created files should wait to confirm there is no more files created with the same timestamp:
+ *
If files have the latest modified timestamp at an iteration, then those should be postponed to be listed
+ * If those files still are the latest files at the next iteration, then those should be listed
+ *
+ */
@Test
- public void testOnlyNewEntriesEmitted() throws Exception {
- final ConcreteListProcessor proc = new ConcreteListProcessor();
- final TestRunner runner = TestRunners.newTestRunner(proc);
- runner.run();
+ public void testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision() throws Exception {
+ testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.MILLISECONDS);
+ }
- final long initialTimestamp = System.nanoTime();
+ /**
+ * Same as {@link #testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision()} but simulates that the target
+ * filesystem only provide timestamp precision in Seconds.
+ */
+ @Test
+ public void testPreviouslySkippedEntriesEmittedOnNextIterationSecondPrecision() throws Exception {
+ testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.SECONDS);
+ }
+
+ /**
+ * Same as {@link #testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision()} but simulates that the target
+ * filesystem only provide timestamp precision in Minutes.
+ * This test is ignored because it needs to wait two minutes. Not good for automated unit testing, but still valuable when executed manually.
+ */
+ @Ignore
+ @Test
+ public void testPreviouslySkippedEntriesEmittedOnNextIterationMinutesPrecision() throws Exception {
+ testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.MINUTES);
+ }
+
+ private void testOnlyNewEntriesEmitted(final TimeUnit targetPrecision) throws InterruptedException {
+
+ final long initialTimestamp = getCurrentTimestampMillis(targetPrecision);
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
proc.addEntity("name", "id", initialTimestamp);
@@ -126,7 +192,7 @@ public class TestAbstractListProcessor {
runner.clearTransferState();
// Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again
- Thread.sleep(DEFAULT_SLEEP_MILLIS);
+ Thread.sleep(getSleepMillis(targetPrecision));
// Running again, our two previously seen files are now cleared to be released
runner.run();
@@ -139,18 +205,20 @@ public class TestAbstractListProcessor {
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState();
- proc.addEntity("name", "id3", initialTimestamp - 1);
+ // An entry that is older than already processed entry should not be listed.
+ proc.addEntity("name", "id3", initialTimestamp - targetPrecision.toMillis(1));
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState();
+ // If an entry whose timestamp is the same with the last processed timestamp should not be listed.
proc.addEntity("name", "id2", initialTimestamp);
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState();
// Now a new file beyond the current time enters
- proc.addEntity("name", "id2", initialTimestamp + 1);
+ proc.addEntity("name", "id2", initialTimestamp + targetPrecision.toMillis(1));
// It should show up
runner.run();
@@ -159,19 +227,36 @@ public class TestAbstractListProcessor {
}
@Test
- public void testHandleRestartWithEntriesAlreadyTransferredAndNoneNew() throws Exception {
- final ConcreteListProcessor proc = new ConcreteListProcessor();
- final TestRunner runner = TestRunners.newTestRunner(proc);
+ public void testOnlyNewEntriesEmittedMillisPrecision() throws Exception {
+ testOnlyNewEntriesEmitted(TimeUnit.MILLISECONDS);
+ }
- final long initialTimestamp = System.nanoTime();
+ @Test
+ public void testOnlyNewEntriesEmittedSecondPrecision() throws Exception {
+ testOnlyNewEntriesEmitted(TimeUnit.SECONDS);
+ }
+
+ /**
+ * This test is ignored because it needs to wait two minutes. Not good for automated unit testing, but still valuable when executed manually.
+ */
+ @Ignore
+ @Test
+ public void testOnlyNewEntriesEmittedMinutesPrecision() throws Exception {
+ testOnlyNewEntriesEmitted(TimeUnit.MINUTES);
+ }
+
+ @Test
+ public void testHandleRestartWithEntriesAlreadyTransferredAndNoneNew() throws Exception {
+
+ final long initialTimestamp = System.currentTimeMillis();
proc.addEntity("name", "id", initialTimestamp);
proc.addEntity("name", "id2", initialTimestamp);
// Emulate having state but not having had the processor run such as in a restart
final Map preexistingState = new HashMap<>();
- preexistingState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, Long.toString(initialTimestamp));
- preexistingState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, Long.toString(initialTimestamp));
+ preexistingState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, Long.toString(initialTimestamp));
+ preexistingState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, Long.toString(initialTimestamp));
runner.getStateManager().setState(preexistingState, Scope.CLUSTER);
// run for the first time
@@ -216,37 +301,35 @@ public class TestAbstractListProcessor {
@Test
public void testStateStoredInClusterStateManagement() throws Exception {
- final ConcreteListProcessor proc = new ConcreteListProcessor();
- final TestRunner runner = TestRunners.newTestRunner(proc);
+
final DistributedCache cache = new DistributedCache();
runner.addControllerService("cache", cache);
runner.enableControllerService(cache);
runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, "cache");
- final long initialTimestamp = System.nanoTime();
+ final long initialTimestamp = System.currentTimeMillis();
proc.addEntity("name", "id", initialTimestamp);
runner.run();
final Map expectedState = new HashMap<>();
// Ensure only timestamp is migrated
- expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
- expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "0");
+ expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
+ expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, "0");
runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
// Ensure only timestamp is migrated
- expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
- expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
+ expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
+ expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
}
@Test
public void testStateMigratedFromCacheService() throws InitializationException {
- final ConcreteListProcessor proc = new ConcreteListProcessor();
- final TestRunner runner = TestRunners.newTestRunner(proc);
+
final DistributedCache cache = new DistributedCache();
runner.addControllerService("cache", cache);
runner.enableControllerService(cache);
@@ -261,15 +344,13 @@ public class TestAbstractListProcessor {
final MockStateManager stateManager = runner.getStateManager();
final Map expectedState = new HashMap<>();
// Ensure only timestamp is migrated
- expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, "1492");
- expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "1492");
+ expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, "1492");
+ expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, "1492");
stateManager.assertStateEquals(expectedState, Scope.CLUSTER);
}
@Test
public void testNoStateToMigrate() throws Exception {
- final ConcreteListProcessor proc = new ConcreteListProcessor();
- final TestRunner runner = TestRunners.newTestRunner(proc);
runner.run();
@@ -280,8 +361,6 @@ public class TestAbstractListProcessor {
@Test
public void testStateMigratedFromLocalFile() throws Exception {
- final ConcreteListProcessor proc = new ConcreteListProcessor();
- final TestRunner runner = TestRunners.newTestRunner(proc);
// Create a file that we will populate with the desired state
File persistenceFile = testFolder.newFile(proc.persistenceFilename);
@@ -305,20 +384,17 @@ public class TestAbstractListProcessor {
// Verify the state manager now maintains the associated state
final Map expectedState = new HashMap<>();
// Ensure only timestamp is migrated
- expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, "1492");
- expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "1492");
+ expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, "1492");
+ expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, "1492");
runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
}
@Test
public void testResumeListingAfterClearingState() throws Exception {
- final ConcreteListProcessor proc = new ConcreteListProcessor();
- final TestRunner runner = TestRunners.newTestRunner(proc);
- runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
- final long initialEventTimestamp = System.nanoTime();
+ final long initialEventTimestamp = System.currentTimeMillis();
proc.addEntity("name", "id", initialEventTimestamp);
proc.addEntity("name", "id2", initialEventTimestamp);
@@ -350,8 +426,7 @@ public class TestAbstractListProcessor {
@Test
public void testFetchOnStart() throws InitializationException {
- final ConcreteListProcessor proc = new ConcreteListProcessor();
- final TestRunner runner = TestRunners.newTestRunner(proc);
+
final DistributedCache cache = new DistributedCache();
runner.addControllerService("cache", cache);
runner.enableControllerService(cache);
@@ -364,11 +439,10 @@ public class TestAbstractListProcessor {
@Test
public void testOnlyNewStateStored() throws Exception {
- final ConcreteListProcessor proc = new ConcreteListProcessor();
- final TestRunner runner = TestRunners.newTestRunner(proc);
+
runner.run();
- final long initialTimestamp = System.nanoTime();
+ final long initialTimestamp = System.currentTimeMillis();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
proc.addEntity("name", "id", initialTimestamp);
@@ -390,8 +464,8 @@ public class TestAbstractListProcessor {
final Map map = stateMap.toMap();
// Ensure only timestamp is migrated
assertEquals(2, map.size());
- assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.LISTING_TIMESTAMP_KEY));
- assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY));
+ assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY));
+ assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY));
proc.addEntity("new name", "new id", initialTimestamp + 1);
runner.run();
@@ -403,9 +477,9 @@ public class TestAbstractListProcessor {
assertEquals(3, updatedStateMap.getVersion());
assertEquals(2, updatedStateMap.toMap().size());
- assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LISTING_TIMESTAMP_KEY));
+ assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY));
// Processed timestamp is now caught up
- assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY));
+ assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY));
}
private static class DistributedCache extends AbstractControllerService implements DistributedMapCacheClient {
@@ -502,7 +576,9 @@ public class TestAbstractListProcessor {
@Override
protected Map createAttributes(final ListableEntity entity, final ProcessContext context) {
- return Collections.emptyMap();
+ final Map attributes = new HashMap<>();
+ attributes.put(CoreAttributes.FILENAME.key(), entity.getIdentifier());
+ return attributes;
}
@Override
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
index f4455885ac..8204830744 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
@@ -84,6 +84,7 @@ public class ListFTP extends ListFileTransfer {
properties.add(FTPTransfer.PROXY_PORT);
properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
+ properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION);
return properties;
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
index 33d7867134..5f2e2d2ab4 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
@@ -213,6 +213,7 @@ public class ListFile extends AbstractListProcessor {
properties.add(MIN_SIZE);
properties.add(MAX_SIZE);
properties.add(IGNORE_HIDDEN_FILES);
+ properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION);
this.properties = Collections.unmodifiableList(properties);
final Set relationships = new HashSet<>();
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
index cb5a7e75e7..b7805e9e88 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
@@ -82,6 +82,7 @@ public class ListSFTP extends ListFileTransfer {
properties.add(SFTPTransfer.CONNECTION_TIMEOUT);
properties.add(SFTPTransfer.DATA_TIMEOUT);
properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT);
+ properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION);
return properties;
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
index 26dcbbf3b5..1b5b2a4c6e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
@@ -30,54 +30,87 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Locale;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import java.util.stream.Collectors;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.list.AbstractListProcessor;
+import org.apache.nifi.processor.util.list.ListProcessorTestWatcher;
+import org.apache.nifi.processors.standard.util.FileInfo;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-import org.junit.After;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.runner.Description;
public class TestListFile {
- final String TESTDIR = "target/test/data/in";
- final File testDir = new File(TESTDIR);
- ListFile processor;
- TestRunner runner;
- ProcessContext context;
+ private final String TESTDIR = "target/test/data/in";
+ private final File testDir = new File(TESTDIR);
+ private ListFile processor;
+ private TestRunner runner;
+ private ProcessContext context;
// Testing factors in milliseconds for file ages that are configured on each run by resetAges()
// age#millis are relative time references
// time#millis are absolute time references
// age#filter are filter label strings for the filter properties
- Long syncTime = System.currentTimeMillis();
- Long time0millis, time1millis, time2millis, time3millis, time4millis, time5millis;
- Long age0millis, age1millis, age2millis, age3millis, age4millis, age5millis;
- String age0, age1, age2, age3, age4, age5;
+ private Long syncTime = getTestModifiedTime();
+ private Long time0millis, time1millis, time2millis, time3millis, time4millis, time5millis;
+ private Long age0millis, age1millis, age2millis, age3millis, age4millis, age5millis;
+ private String age0, age1, age2, age3, age4, age5;
- static final long DEFAULT_SLEEP_MILLIS = TimeUnit.NANOSECONDS.toMillis(AbstractListProcessor.LISTING_LAG_NANOS * 2);
+ @Rule
+ public ListProcessorTestWatcher dumpState = new ListProcessorTestWatcher(
+ () -> {
+ try {
+ return runner.getStateManager().getState(Scope.LOCAL).toMap();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to retrieve state", e);
+ }
+ },
+ () -> listFiles(testDir).stream()
+ .map(f -> new FileInfo.Builder().filename(f.getName()).lastModifiedTime(f.lastModified()).build()).collect(Collectors.toList()),
+ () -> runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).stream().map(m -> (FlowFile) m).collect(Collectors.toList())
+ ) {
+ @Override
+ protected void finished(Description description) {
+ try {
+ // In order to refer files in testDir, we want to execute this rule before tearDown, because tearDown removes files.
+ // And @After is always executed before @Rule.
+ tearDown();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to tearDown.", e);
+ }
+ }
+ };
@Before
public void setUp() throws Exception {
processor = new ListFile();
runner = TestRunners.newTestRunner(processor);
+ runner.setProperty(AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION, AbstractListProcessor.PRECISION_SECONDS.getValue());
context = runner.getProcessContext();
deleteDirectory(testDir);
assertTrue("Unable to create test data directory " + testDir.getAbsolutePath(), testDir.exists() || testDir.mkdirs());
resetAges();
}
- @After
public void tearDown() throws Exception {
deleteDirectory(testDir);
File tempFile = processor.getPersistenceFile();
@@ -91,14 +124,38 @@ public class TestListFile {
}
}
+ private List listFiles(final File file) {
+ if (file.isDirectory()) {
+ final List result = new ArrayList<>();
+ Optional.ofNullable(file.listFiles()).ifPresent(files -> Arrays.stream(files).forEach(f -> result.addAll(listFiles(f))));
+ return result;
+ } else {
+ return Collections.singletonList(file);
+ }
+ }
+
/**
* This method ensures runner clears transfer state,
- * and sleeps the current thread for DEFAULT_SLEEP_MILLIS before executing runner.run().
+ * and sleeps the current thread for specific period defined at {@link AbstractListProcessor#LISTING_LAG_MILLIS}
+ * based on local filesystem timestamp precision before executing runner.run().
*/
private void runNext() throws InterruptedException {
runner.clearTransferState();
- Thread.sleep(DEFAULT_SLEEP_MILLIS);
+
+ final List files = listFiles(testDir);
+ final boolean isMillisecondSupported = files.stream().anyMatch(file -> file.lastModified() % 1_000 > 0);
+ final Long lagMillis;
+ if (isMillisecondSupported) {
+ lagMillis = AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.MILLISECONDS);
+ } else {
+ // Filesystems such as Mac OS X HFS (Hierarchical File System) or EXT3 are known that only support timestamp in seconds precision.
+ lagMillis = AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.SECONDS);
+ }
+ Thread.sleep(lagMillis * 2);
+
+ final long startedAtMillis = System.currentTimeMillis();
runner.run();
+ dumpState.dumpState(startedAtMillis);
}
@Test
@@ -305,7 +362,7 @@ public class TestListFile {
@Test
public void testFilterHidden() throws Exception {
- final long now = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
+ final long now = getTestModifiedTime();
FileOutputStream fos;
@@ -388,7 +445,7 @@ public class TestListFile {
@Test
public void testFilterPathPattern() throws Exception {
- final long now = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
+ final long now = getTestModifiedTime();
final File subdir1 = new File(TESTDIR + "/subdir1");
assertTrue(subdir1.mkdirs());
@@ -595,20 +652,20 @@ public class TestListFile {
* Provides "now" minus 1 second in millis
*/
private static long getTestModifiedTime() {
- final long nowNanos = System.nanoTime();
+ final long nowMillis = System.currentTimeMillis();
// Subtract a second to avoid possible rounding issues
- final long nowSeconds = TimeUnit.SECONDS.convert(nowNanos, TimeUnit.NANOSECONDS) - 1;
+ final long nowSeconds = TimeUnit.SECONDS.convert(nowMillis, TimeUnit.MILLISECONDS) - 1;
return TimeUnit.MILLISECONDS.convert(nowSeconds, TimeUnit.SECONDS);
}
- public void resetAges() {
- syncTime = System.currentTimeMillis();
+ private void resetAges() {
+ syncTime = getTestModifiedTime();
age0millis = 0L;
- age1millis = 2000L;
- age2millis = 5000L;
- age3millis = 7000L;
- age4millis = 10000L;
+ age1millis = 5000L;
+ age2millis = 10000L;
+ age3millis = 15000L;
+ age4millis = 20000L;
age5millis = 100000L;
time0millis = syncTime - age0millis;