diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
index 3f5e60cfd8..dd38e100aa 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
@@ -70,7 +70,7 @@
junit
junit
- test
+ compile
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
index 2666e2c084..8d93a65ffe 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -37,6 +38,7 @@ import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
+import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
@@ -138,14 +140,42 @@ public abstract class AbstractListProcessor extends Ab
.identifiesControllerService(DistributedMapCacheClient.class)
.build();
+ public static final AllowableValue PRECISION_AUTO_DETECT = new AllowableValue("auto-detect", "Auto Detect",
+ "Automatically detect time unit deterministically based on candidate entries timestamp."
+ + " Please note that this option may take longer to list entities unnecessarily, if none of entries has a precise precision timestamp."
+ + " E.g. even if a target system supports millis, if all entries only have timestamps without millis, such as '2017-06-16 09:06:34.000', then its precision is determined as 'seconds'.");
+ public static final AllowableValue PRECISION_MILLIS = new AllowableValue("millis", "Milliseconds",
+ "This option provides the minimum latency for an entry from being available to being listed if target system supports millis, if not, use other options.");
+ public static final AllowableValue PRECISION_SECONDS = new AllowableValue("seconds", "Seconds","For a target system that does not have millis precision, but has in seconds.");
+ public static final AllowableValue PRECISION_MINUTES = new AllowableValue("minutes", "Minutes", "For a target system that only supports precision in minutes.");
+
+ public static final PropertyDescriptor TARGET_SYSTEM_TIMESTAMP_PRECISION = new PropertyDescriptor.Builder()
+ .name("target-system-timestamp-precision")
+ .displayName("Target System Timestamp Precision")
+ .description("Specify timestamp precision at the target system."
+ + " Since this processor uses timestamp of entities to decide which should be listed, it is crucial to use the right timestamp precision.")
+ .required(true)
+ .allowableValues(PRECISION_AUTO_DETECT, PRECISION_MILLIS, PRECISION_SECONDS, PRECISION_MINUTES)
+ .defaultValue(PRECISION_AUTO_DETECT.getValue())
+ .build();
+
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All FlowFiles that are received are routed to success")
.build();
- private volatile Long lastListingTime = null;
- private volatile Long lastProcessedTime = 0L;
- private volatile Long lastRunTime = 0L;
+ /**
+ * Represents the timestamp of an entity which was the latest one within those listed at the previous cycle.
+ * It does not necessary mean it has been processed as well.
+ * Whether it was processed or not depends on target system time precision and how old the entity timestamp was.
+ */
+ private volatile Long lastListedLatestEntryTimestampMillis = null;
+ /**
+ * Represents the timestamp of an entity which was the latest one
+ * within those picked up and written to the output relationship at the previous cycle.
+ */
+ private volatile Long lastProcessedLatestEntryTimestampMillis = 0L;
+ private volatile Long lastRunTimeNanos = 0L;
private volatile boolean justElectedPrimaryNode = false;
private volatile boolean resetState = false;
@@ -154,9 +184,16 @@ public abstract class AbstractListProcessor extends Ab
* files according to timestamp, it is ensured that at least the specified millis has been eclipsed to avoid getting scheduled
* near instantaneously after the prior iteration effectively voiding the built in buffer
*/
- public static final long LISTING_LAG_NANOS = TimeUnit.MILLISECONDS.toNanos(100L);
- static final String LISTING_TIMESTAMP_KEY = "listing.timestamp";
- static final String PROCESSED_TIMESTAMP_KEY = "processed.timestamp";
+ public static final Map LISTING_LAG_MILLIS;
+ static {
+ final Map nanos = new HashMap<>();
+ nanos.put(TimeUnit.MILLISECONDS, 100L);
+ nanos.put(TimeUnit.SECONDS, 1_000L);
+ nanos.put(TimeUnit.MINUTES, 60_000L);
+ LISTING_LAG_MILLIS = Collections.unmodifiableMap(nanos);
+ }
+ static final String LATEST_LISTED_ENTRY_TIMESTAMP_KEY = "listing.timestamp";
+ static final String LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY = "processed.timestamp";
public File getPersistenceFile() {
return new File("conf/state/" + getIdentifier());
@@ -166,6 +203,7 @@ public abstract class AbstractListProcessor extends Ab
protected List getSupportedPropertyDescriptors() {
final List properties = new ArrayList<>();
properties.add(DISTRIBUTED_CACHE_SERVICE);
+ properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION);
return properties;
}
@@ -208,7 +246,7 @@ public abstract class AbstractListProcessor extends Ab
}
// When scheduled to run, check if the associated timestamp is null, signifying a clearing of state and reset the internal timestamp
- if (lastListingTime != null && stateMap.get(LISTING_TIMESTAMP_KEY) == null) {
+ if (lastListedLatestEntryTimestampMillis != null && stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY) == null) {
getLogger().info("Detected that state was cleared for this component. Resetting internal values.");
resetTimeStates();
}
@@ -283,10 +321,12 @@ public abstract class AbstractListProcessor extends Ab
}
}
- private void persist(final long listingTimestamp, final long processedTimestamp, final StateManager stateManager, final Scope scope) throws IOException {
+ private void persist(final long latestListedEntryTimestampThisCycleMillis,
+ final long lastProcessedLatestEntryTimestampMillis,
+ final StateManager stateManager, final Scope scope) throws IOException {
final Map updatedState = new HashMap<>(1);
- updatedState.put(LISTING_TIMESTAMP_KEY, String.valueOf(listingTimestamp));
- updatedState.put(PROCESSED_TIMESTAMP_KEY, String.valueOf(processedTimestamp));
+ updatedState.put(LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(latestListedEntryTimestampThisCycleMillis));
+ updatedState.put(LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(lastProcessedLatestEntryTimestampMillis));
stateManager.setState(updatedState, scope);
}
@@ -303,26 +343,26 @@ public abstract class AbstractListProcessor extends Ab
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
- Long minTimestamp = lastListingTime;
+ Long minTimestampToListMillis = lastListedLatestEntryTimestampMillis;
- if (this.lastListingTime == null || this.lastProcessedTime == null || justElectedPrimaryNode) {
+ if (this.lastListedLatestEntryTimestampMillis == null || this.lastProcessedLatestEntryTimestampMillis == null || justElectedPrimaryNode) {
try {
// Attempt to retrieve state from the state manager if a last listing was not yet established or
// if just elected the primary node
final StateMap stateMap = context.getStateManager().getState(getStateScope(context));
- final String listingTimestampString = stateMap.get(LISTING_TIMESTAMP_KEY);
- final String lastProcessedString= stateMap.get(PROCESSED_TIMESTAMP_KEY);
- if (lastProcessedString != null) {
- this.lastProcessedTime = Long.parseLong(lastProcessedString);
+ final String latestListedEntryTimestampString = stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY);
+ final String lastProcessedLatestEntryTimestampString= stateMap.get(LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY);
+ if (lastProcessedLatestEntryTimestampString != null) {
+ this.lastProcessedLatestEntryTimestampMillis = Long.parseLong(lastProcessedLatestEntryTimestampString);
}
- if (listingTimestampString != null) {
- minTimestamp = Long.parseLong(listingTimestampString);
+ if (latestListedEntryTimestampString != null) {
+ minTimestampToListMillis = Long.parseLong(latestListedEntryTimestampString);
// If our determined timestamp is the same as that of our last listing, skip this execution as there are no updates
- if (minTimestamp == this.lastListingTime) {
+ if (minTimestampToListMillis == this.lastListedLatestEntryTimestampMillis) {
context.yield();
return;
} else {
- this.lastListingTime = minTimestamp;
+ this.lastListedLatestEntryTimestampMillis = minTimestampToListMillis;
}
}
justElectedPrimaryNode = false;
@@ -334,10 +374,11 @@ public abstract class AbstractListProcessor extends Ab
}
final List entityList;
- final long currentListingTimestamp = System.nanoTime();
+ final long currentRunTimeNanos = System.nanoTime();
+ final long currentRunTimeMillis = System.currentTimeMillis();
try {
// track of when this last executed for consideration of the lag nanos
- entityList = performListing(context, minTimestamp);
+ entityList = performListing(context, minTimestampToListMillis);
} catch (final IOException e) {
getLogger().error("Failed to perform listing on remote host due to {}", e);
context.yield();
@@ -349,14 +390,22 @@ public abstract class AbstractListProcessor extends Ab
return;
}
- Long latestListingTimestamp = null;
+ Long latestListedEntryTimestampThisCycleMillis = null;
final TreeMap> orderedEntries = new TreeMap<>();
// Build a sorted map to determine the latest possible entries
+ boolean targetSystemHasMilliseconds = false;
+ boolean targetSystemHasSeconds = false;
for (final T entity : entityList) {
- final long entityTimestamp = entity.getTimestamp();
+ final long entityTimestampMillis = entity.getTimestamp();
+ if (!targetSystemHasMilliseconds) {
+ targetSystemHasMilliseconds = entityTimestampMillis % 1000 > 0;
+ }
+ if (!targetSystemHasSeconds) {
+ targetSystemHasSeconds = entityTimestampMillis % 60_000 > 0;
+ }
// New entries are all those that occur at or after the associated timestamp
- final boolean newEntry = minTimestamp == null || entityTimestamp >= minTimestamp && entityTimestamp > lastProcessedTime;
+ final boolean newEntry = minTimestampToListMillis == null || entityTimestampMillis >= minTimestampToListMillis && entityTimestampMillis > lastProcessedLatestEntryTimestampMillis;
if (newEntry) {
List entitiesForTimestamp = orderedEntries.get(entity.getTimestamp());
@@ -371,25 +420,43 @@ public abstract class AbstractListProcessor extends Ab
int flowfilesCreated = 0;
if (orderedEntries.size() > 0) {
- latestListingTimestamp = orderedEntries.lastKey();
+ latestListedEntryTimestampThisCycleMillis = orderedEntries.lastKey();
+
+ // Determine target system time precision.
+ final String specifiedPrecision = context.getProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION).getValue();
+ final TimeUnit targetSystemTimePrecision
+ = PRECISION_AUTO_DETECT.getValue().equals(specifiedPrecision)
+ ? targetSystemHasMilliseconds ? TimeUnit.MILLISECONDS : targetSystemHasSeconds ? TimeUnit.SECONDS : TimeUnit.MINUTES
+ : PRECISION_MILLIS.getValue().equals(specifiedPrecision) ? TimeUnit.MILLISECONDS
+ : PRECISION_SECONDS.getValue().equals(specifiedPrecision) ? TimeUnit.SECONDS : TimeUnit.MINUTES;
+ final Long listingLagMillis = LISTING_LAG_MILLIS.get(targetSystemTimePrecision);
// If the last listing time is equal to the newest entries previously seen,
// another iteration has occurred without new files and special handling is needed to avoid starvation
- if (latestListingTimestamp.equals(lastListingTime)) {
- /* We are done when either:
- * - the latest listing timestamp is If we have not eclipsed the minimal listing lag needed due to being triggered too soon after the last run
- * - the latest listing timestamp is equal to the last processed time, meaning we handled those items originally passed over
+ if (latestListedEntryTimestampThisCycleMillis.equals(lastListedLatestEntryTimestampMillis)) {
+ /* We need to wait for another cycle when either:
+ * - If we have not eclipsed the minimal listing lag needed due to being triggered too soon after the last run
+ * - The latest listed entity timestamp is equal to the last processed time, meaning we handled those items originally passed over. No need to process it again.
*/
- if (System.nanoTime() - lastRunTime < LISTING_LAG_NANOS || latestListingTimestamp.equals(lastProcessedTime)) {
+ final long listingLagNanos = TimeUnit.MILLISECONDS.toNanos(listingLagMillis);
+ if (currentRunTimeNanos - lastRunTimeNanos < listingLagNanos || latestListedEntryTimestampThisCycleMillis.equals(lastProcessedLatestEntryTimestampMillis)) {
context.yield();
return;
}
- } else if (latestListingTimestamp >= currentListingTimestamp - LISTING_LAG_NANOS) {
- // Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data
- orderedEntries.remove(latestListingTimestamp);
+ } else {
+ // Convert minimum reliable timestamp into target system time unit, in order to truncate unreliable digits.
+ final long minimumReliableTimestampInFilesystemTimeUnit = targetSystemTimePrecision.convert(currentRunTimeMillis - listingLagMillis, TimeUnit.MILLISECONDS);
+ final long minimumReliableTimestampMillis = targetSystemTimePrecision.toMillis(minimumReliableTimestampInFilesystemTimeUnit);
+ // If the latest listed entity is not old enough, compared with the minimum timestamp, then wait for another cycle.
+ // The minimum timestamp should be reliable to determine that no further entries will be added with the same timestamp based on the target system time precision.
+ if (minimumReliableTimestampMillis < latestListedEntryTimestampThisCycleMillis) {
+ // Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data
+ orderedEntries.remove(latestListedEntryTimestampThisCycleMillis);
+ }
}
+
for (List timestampEntities : orderedEntries.values()) {
for (T entity : timestampEntities) {
// Create the FlowFile for this path.
@@ -403,18 +470,20 @@ public abstract class AbstractListProcessor extends Ab
}
// As long as we have a listing timestamp, there is meaningful state to capture regardless of any outputs generated
- if (latestListingTimestamp != null) {
+ if (latestListedEntryTimestampThisCycleMillis != null) {
boolean processedNewFiles = flowfilesCreated > 0;
if (processedNewFiles) {
- // If there have been files created, update the last timestamp we processed
- lastProcessedTime = orderedEntries.lastKey();
+ // If there have been files created, update the last timestamp we processed.
+ // Retrieving lastKey instead of using latestListedEntryTimestampThisCycleMillis is intentional here,
+ // because latestListedEntryTimestampThisCycleMillis might be removed if it's not old enough.
+ lastProcessedLatestEntryTimestampMillis = orderedEntries.lastKey();
getLogger().info("Successfully created listing with {} new objects", new Object[]{flowfilesCreated});
session.commit();
}
- lastRunTime = System.nanoTime();
+ lastRunTimeNanos = currentRunTimeNanos;
- if (!latestListingTimestamp.equals(lastListingTime) || processedNewFiles) {
+ if (!latestListedEntryTimestampThisCycleMillis.equals(lastListedLatestEntryTimestampMillis) || processedNewFiles) {
// We have performed a listing and pushed any FlowFiles out that may have been generated
// Now, we need to persist state about the Last Modified timestamp of the newest file
// that we evaluated. We do this in order to avoid pulling in the same file twice.
@@ -424,8 +493,8 @@ public abstract class AbstractListProcessor extends Ab
// We also store the state locally so that if the node is restarted, and the node cannot contact
// the distributed state cache, the node can continue to run (if it is primary node).
try {
- lastListingTime = latestListingTimestamp;
- persist(latestListingTimestamp, lastProcessedTime, context.getStateManager(), getStateScope(context));
+ lastListedLatestEntryTimestampMillis = latestListedEntryTimestampThisCycleMillis;
+ persist(latestListedEntryTimestampThisCycleMillis, lastProcessedLatestEntryTimestampMillis, context.getStateManager(), getStateScope(context));
} catch (final IOException ioe) {
getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or "
+ "if another node begins executing this Processor, data duplication may occur.", ioe);
@@ -437,8 +506,8 @@ public abstract class AbstractListProcessor extends Ab
context.yield();
// lastListingTime = 0 so that we don't continually poll the distributed cache / local file system
- if (lastListingTime == null) {
- lastListingTime = 0L;
+ if (lastListedLatestEntryTimestampMillis == null) {
+ lastListedLatestEntryTimestampMillis = 0L;
}
return;
@@ -446,9 +515,9 @@ public abstract class AbstractListProcessor extends Ab
}
private void resetTimeStates() {
- lastListingTime = null;
- lastProcessedTime = 0L;
- lastRunTime = 0L;
+ lastListedLatestEntryTimestampMillis = null;
+ lastProcessedLatestEntryTimestampMillis = 0L;
+ lastRunTimeNanos = 0L;
}
/**
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListProcessorTestWatcher.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListProcessorTestWatcher.java
new file mode 100644
index 0000000000..dcb53e0f42
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListProcessorTestWatcher.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.list;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.SimpleDateFormat;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * This class provides a way to dump list-able entities, processor state and transferred FlowFiles into 'success' relationship,
+ * which is useful to debug test issues especially at automation test environment such as Travis that is difficult to debug.
+ */
+public class ListProcessorTestWatcher extends TestWatcher {
+
+ private static final Logger logger = LoggerFactory.getLogger(ListProcessorTestWatcher.class);
+ private static final Consumer logStateDump = logger::info;
+
+ @FunctionalInterface
+ public interface Provider {
+ T provide();
+ }
+
+ private final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
+ private final Provider
+ */
@Test
public void testAllExistingEntriesEmittedOnFirstIteration() throws Exception {
- final long oldTimestamp = System.nanoTime() - (AbstractListProcessor.LISTING_LAG_NANOS * 2);
+ final long oldTimestamp = System.currentTimeMillis() - getSleepMillis(TimeUnit.MILLISECONDS);
// These entries have existed before the processor runs at the first time.
- final ConcreteListProcessor proc = new ConcreteListProcessor();
proc.addEntity("name", "id", oldTimestamp);
proc.addEntity("name", "id2", oldTimestamp);
// First run, the above listed entries should be emitted since it has existed.
- final TestRunner runner = TestRunners.newTestRunner(proc);
-
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
runner.clearTransferState();
@@ -83,13 +125,10 @@ public class TestAbstractListProcessor {
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
}
- @Test
- public void testPreviouslySkippedEntriesEmittedOnNextIteration() throws Exception {
- final ConcreteListProcessor proc = new ConcreteListProcessor();
- final TestRunner runner = TestRunners.newTestRunner(proc);
+ private void testPreviouslySkippedEntriesEmmitedOnNextIteration(final TimeUnit targetPrecision) throws InterruptedException {
runner.run();
- final long initialTimestamp = System.nanoTime();
+ final long initialTimestamp = getCurrentTimestampMillis(targetPrecision);
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
proc.addEntity("name", "id", initialTimestamp);
@@ -101,20 +140,47 @@ public class TestAbstractListProcessor {
runner.clearTransferState();
// Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again
- Thread.sleep(DEFAULT_SLEEP_MILLIS);
+ Thread.sleep(getSleepMillis(targetPrecision));
// Run again without introducing any new entries
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
}
+ /**
+ * Ensures that newly created files should wait to confirm there is no more files created with the same timestamp:
+ *
If files have the latest modified timestamp at an iteration, then those should be postponed to be listed
+ * If those files still are the latest files at the next iteration, then those should be listed
+ *
+ */
@Test
- public void testOnlyNewEntriesEmitted() throws Exception {
- final ConcreteListProcessor proc = new ConcreteListProcessor();
- final TestRunner runner = TestRunners.newTestRunner(proc);
- runner.run();
+ public void testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision() throws Exception {
+ testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.MILLISECONDS);
+ }
- final long initialTimestamp = System.nanoTime();
+ /**
+ * Same as {@link #testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision()} but simulates that the target
+ * filesystem only provide timestamp precision in Seconds.
+ */
+ @Test
+ public void testPreviouslySkippedEntriesEmittedOnNextIterationSecondPrecision() throws Exception {
+ testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.SECONDS);
+ }
+
+ /**
+ * Same as {@link #testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision()} but simulates that the target
+ * filesystem only provide timestamp precision in Minutes.
+ * This test is ignored because it needs to wait two minutes. Not good for automated unit testing, but still valuable when executed manually.
+ */
+ @Ignore
+ @Test
+ public void testPreviouslySkippedEntriesEmittedOnNextIterationMinutesPrecision() throws Exception {
+ testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.MINUTES);
+ }
+
+ private void testOnlyNewEntriesEmitted(final TimeUnit targetPrecision) throws InterruptedException {
+
+ final long initialTimestamp = getCurrentTimestampMillis(targetPrecision);
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
proc.addEntity("name", "id", initialTimestamp);
@@ -126,7 +192,7 @@ public class TestAbstractListProcessor {
runner.clearTransferState();
// Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again
- Thread.sleep(DEFAULT_SLEEP_MILLIS);
+ Thread.sleep(getSleepMillis(targetPrecision));
// Running again, our two previously seen files are now cleared to be released
runner.run();
@@ -139,18 +205,20 @@ public class TestAbstractListProcessor {
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState();
- proc.addEntity("name", "id3", initialTimestamp - 1);
+ // An entry that is older than already processed entry should not be listed.
+ proc.addEntity("name", "id3", initialTimestamp - targetPrecision.toMillis(1));
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState();
+ // If an entry whose timestamp is the same with the last processed timestamp should not be listed.
proc.addEntity("name", "id2", initialTimestamp);
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState();
// Now a new file beyond the current time enters
- proc.addEntity("name", "id2", initialTimestamp + 1);
+ proc.addEntity("name", "id2", initialTimestamp + targetPrecision.toMillis(1));
// It should show up
runner.run();
@@ -159,19 +227,36 @@ public class TestAbstractListProcessor {
}
@Test
- public void testHandleRestartWithEntriesAlreadyTransferredAndNoneNew() throws Exception {
- final ConcreteListProcessor proc = new ConcreteListProcessor();
- final TestRunner runner = TestRunners.newTestRunner(proc);
+ public void testOnlyNewEntriesEmittedMillisPrecision() throws Exception {
+ testOnlyNewEntriesEmitted(TimeUnit.MILLISECONDS);
+ }
- final long initialTimestamp = System.nanoTime();
+ @Test
+ public void testOnlyNewEntriesEmittedSecondPrecision() throws Exception {
+ testOnlyNewEntriesEmitted(TimeUnit.SECONDS);
+ }
+
+ /**
+ * This test is ignored because it needs to wait two minutes. Not good for automated unit testing, but still valuable when executed manually.
+ */
+ @Ignore
+ @Test
+ public void testOnlyNewEntriesEmittedMinutesPrecision() throws Exception {
+ testOnlyNewEntriesEmitted(TimeUnit.MINUTES);
+ }
+
+ @Test
+ public void testHandleRestartWithEntriesAlreadyTransferredAndNoneNew() throws Exception {
+
+ final long initialTimestamp = System.currentTimeMillis();
proc.addEntity("name", "id", initialTimestamp);
proc.addEntity("name", "id2", initialTimestamp);
// Emulate having state but not having had the processor run such as in a restart
final Map preexistingState = new HashMap<>();
- preexistingState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, Long.toString(initialTimestamp));
- preexistingState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, Long.toString(initialTimestamp));
+ preexistingState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, Long.toString(initialTimestamp));
+ preexistingState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, Long.toString(initialTimestamp));
runner.getStateManager().setState(preexistingState, Scope.CLUSTER);
// run for the first time
@@ -216,37 +301,35 @@ public class TestAbstractListProcessor {
@Test
public void testStateStoredInClusterStateManagement() throws Exception {
- final ConcreteListProcessor proc = new ConcreteListProcessor();
- final TestRunner runner = TestRunners.newTestRunner(proc);
+
final DistributedCache cache = new DistributedCache();
runner.addControllerService("cache", cache);
runner.enableControllerService(cache);
runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, "cache");
- final long initialTimestamp = System.nanoTime();
+ final long initialTimestamp = System.currentTimeMillis();
proc.addEntity("name", "id", initialTimestamp);
runner.run();
final Map expectedState = new HashMap<>();
// Ensure only timestamp is migrated
- expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
- expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "0");
+ expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
+ expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, "0");
runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
// Ensure only timestamp is migrated
- expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
- expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
+ expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
+ expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
}
@Test
public void testStateMigratedFromCacheService() throws InitializationException {
- final ConcreteListProcessor proc = new ConcreteListProcessor();
- final TestRunner runner = TestRunners.newTestRunner(proc);
+
final DistributedCache cache = new DistributedCache();
runner.addControllerService("cache", cache);
runner.enableControllerService(cache);
@@ -261,15 +344,13 @@ public class TestAbstractListProcessor {
final MockStateManager stateManager = runner.getStateManager();
final Map expectedState = new HashMap<>();
// Ensure only timestamp is migrated
- expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, "1492");
- expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "1492");
+ expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, "1492");
+ expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, "1492");
stateManager.assertStateEquals(expectedState, Scope.CLUSTER);
}
@Test
public void testNoStateToMigrate() throws Exception {
- final ConcreteListProcessor proc = new ConcreteListProcessor();
- final TestRunner runner = TestRunners.newTestRunner(proc);
runner.run();
@@ -280,8 +361,6 @@ public class TestAbstractListProcessor {
@Test
public void testStateMigratedFromLocalFile() throws Exception {
- final ConcreteListProcessor proc = new ConcreteListProcessor();
- final TestRunner runner = TestRunners.newTestRunner(proc);
// Create a file that we will populate with the desired state
File persistenceFile = testFolder.newFile(proc.persistenceFilename);
@@ -305,20 +384,17 @@ public class TestAbstractListProcessor {
// Verify the state manager now maintains the associated state
final Map expectedState = new HashMap<>();
// Ensure only timestamp is migrated
- expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, "1492");
- expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "1492");
+ expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, "1492");
+ expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, "1492");
runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
}
@Test
public void testResumeListingAfterClearingState() throws Exception {
- final ConcreteListProcessor proc = new ConcreteListProcessor();
- final TestRunner runner = TestRunners.newTestRunner(proc);
- runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
- final long initialEventTimestamp = System.nanoTime();
+ final long initialEventTimestamp = System.currentTimeMillis();
proc.addEntity("name", "id", initialEventTimestamp);
proc.addEntity("name", "id2", initialEventTimestamp);
@@ -350,8 +426,7 @@ public class TestAbstractListProcessor {
@Test
public void testFetchOnStart() throws InitializationException {
- final ConcreteListProcessor proc = new ConcreteListProcessor();
- final TestRunner runner = TestRunners.newTestRunner(proc);
+
final DistributedCache cache = new DistributedCache();
runner.addControllerService("cache", cache);
runner.enableControllerService(cache);
@@ -364,11 +439,10 @@ public class TestAbstractListProcessor {
@Test
public void testOnlyNewStateStored() throws Exception {
- final ConcreteListProcessor proc = new ConcreteListProcessor();
- final TestRunner runner = TestRunners.newTestRunner(proc);
+
runner.run();
- final long initialTimestamp = System.nanoTime();
+ final long initialTimestamp = System.currentTimeMillis();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
proc.addEntity("name", "id", initialTimestamp);
@@ -390,8 +464,8 @@ public class TestAbstractListProcessor {
final Map map = stateMap.toMap();
// Ensure only timestamp is migrated
assertEquals(2, map.size());
- assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.LISTING_TIMESTAMP_KEY));
- assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY));
+ assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY));
+ assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY));
proc.addEntity("new name", "new id", initialTimestamp + 1);
runner.run();
@@ -403,9 +477,9 @@ public class TestAbstractListProcessor {
assertEquals(3, updatedStateMap.getVersion());
assertEquals(2, updatedStateMap.toMap().size());
- assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LISTING_TIMESTAMP_KEY));
+ assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY));
// Processed timestamp is now caught up
- assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY));
+ assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY));
}
private static class DistributedCache extends AbstractControllerService implements DistributedMapCacheClient {
@@ -502,7 +576,9 @@ public class TestAbstractListProcessor {
@Override
protected Map createAttributes(final ListableEntity entity, final ProcessContext context) {
- return Collections.emptyMap();
+ final Map attributes = new HashMap<>();
+ attributes.put(CoreAttributes.FILENAME.key(), entity.getIdentifier());
+ return attributes;
}
@Override
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
index f4455885ac..8204830744 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
@@ -84,6 +84,7 @@ public class ListFTP extends ListFileTransfer {
properties.add(FTPTransfer.PROXY_PORT);
properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
+ properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION);
return properties;
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
index 33d7867134..5f2e2d2ab4 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
@@ -213,6 +213,7 @@ public class ListFile extends AbstractListProcessor {
properties.add(MIN_SIZE);
properties.add(MAX_SIZE);
properties.add(IGNORE_HIDDEN_FILES);
+ properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION);
this.properties = Collections.unmodifiableList(properties);
final Set relationships = new HashSet<>();
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
index cb5a7e75e7..b7805e9e88 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
@@ -82,6 +82,7 @@ public class ListSFTP extends ListFileTransfer {
properties.add(SFTPTransfer.CONNECTION_TIMEOUT);
properties.add(SFTPTransfer.DATA_TIMEOUT);
properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT);
+ properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION);
return properties;
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
index 26dcbbf3b5..1b5b2a4c6e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
@@ -30,54 +30,87 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Locale;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import java.util.stream.Collectors;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.list.AbstractListProcessor;
+import org.apache.nifi.processor.util.list.ListProcessorTestWatcher;
+import org.apache.nifi.processors.standard.util.FileInfo;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-import org.junit.After;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.runner.Description;
public class TestListFile {
- final String TESTDIR = "target/test/data/in";
- final File testDir = new File(TESTDIR);
- ListFile processor;
- TestRunner runner;
- ProcessContext context;
+ private final String TESTDIR = "target/test/data/in";
+ private final File testDir = new File(TESTDIR);
+ private ListFile processor;
+ private TestRunner runner;
+ private ProcessContext context;
// Testing factors in milliseconds for file ages that are configured on each run by resetAges()
// age#millis are relative time references
// time#millis are absolute time references
// age#filter are filter label strings for the filter properties
- Long syncTime = System.currentTimeMillis();
- Long time0millis, time1millis, time2millis, time3millis, time4millis, time5millis;
- Long age0millis, age1millis, age2millis, age3millis, age4millis, age5millis;
- String age0, age1, age2, age3, age4, age5;
+ private Long syncTime = getTestModifiedTime();
+ private Long time0millis, time1millis, time2millis, time3millis, time4millis, time5millis;
+ private Long age0millis, age1millis, age2millis, age3millis, age4millis, age5millis;
+ private String age0, age1, age2, age3, age4, age5;
- static final long DEFAULT_SLEEP_MILLIS = TimeUnit.NANOSECONDS.toMillis(AbstractListProcessor.LISTING_LAG_NANOS * 2);
+ @Rule
+ public ListProcessorTestWatcher dumpState = new ListProcessorTestWatcher(
+ () -> {
+ try {
+ return runner.getStateManager().getState(Scope.LOCAL).toMap();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to retrieve state", e);
+ }
+ },
+ () -> listFiles(testDir).stream()
+ .map(f -> new FileInfo.Builder().filename(f.getName()).lastModifiedTime(f.lastModified()).build()).collect(Collectors.toList()),
+ () -> runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).stream().map(m -> (FlowFile) m).collect(Collectors.toList())
+ ) {
+ @Override
+ protected void finished(Description description) {
+ try {
+ // In order to refer files in testDir, we want to execute this rule before tearDown, because tearDown removes files.
+ // And @After is always executed before @Rule.
+ tearDown();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to tearDown.", e);
+ }
+ }
+ };
@Before
public void setUp() throws Exception {
processor = new ListFile();
runner = TestRunners.newTestRunner(processor);
+ runner.setProperty(AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION, AbstractListProcessor.PRECISION_SECONDS.getValue());
context = runner.getProcessContext();
deleteDirectory(testDir);
assertTrue("Unable to create test data directory " + testDir.getAbsolutePath(), testDir.exists() || testDir.mkdirs());
resetAges();
}
- @After
public void tearDown() throws Exception {
deleteDirectory(testDir);
File tempFile = processor.getPersistenceFile();
@@ -91,14 +124,38 @@ public class TestListFile {
}
}
+ private List listFiles(final File file) {
+ if (file.isDirectory()) {
+ final List result = new ArrayList<>();
+ Optional.ofNullable(file.listFiles()).ifPresent(files -> Arrays.stream(files).forEach(f -> result.addAll(listFiles(f))));
+ return result;
+ } else {
+ return Collections.singletonList(file);
+ }
+ }
+
/**
* This method ensures runner clears transfer state,
- * and sleeps the current thread for DEFAULT_SLEEP_MILLIS before executing runner.run().
+ * and sleeps the current thread for specific period defined at {@link AbstractListProcessor#LISTING_LAG_MILLIS}
+ * based on local filesystem timestamp precision before executing runner.run().
*/
private void runNext() throws InterruptedException {
runner.clearTransferState();
- Thread.sleep(DEFAULT_SLEEP_MILLIS);
+
+ final List files = listFiles(testDir);
+ final boolean isMillisecondSupported = files.stream().anyMatch(file -> file.lastModified() % 1_000 > 0);
+ final Long lagMillis;
+ if (isMillisecondSupported) {
+ lagMillis = AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.MILLISECONDS);
+ } else {
+ // Filesystems such as Mac OS X HFS (Hierarchical File System) or EXT3 are known that only support timestamp in seconds precision.
+ lagMillis = AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.SECONDS);
+ }
+ Thread.sleep(lagMillis * 2);
+
+ final long startedAtMillis = System.currentTimeMillis();
runner.run();
+ dumpState.dumpState(startedAtMillis);
}
@Test
@@ -305,7 +362,7 @@ public class TestListFile {
@Test
public void testFilterHidden() throws Exception {
- final long now = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
+ final long now = getTestModifiedTime();
FileOutputStream fos;
@@ -388,7 +445,7 @@ public class TestListFile {
@Test
public void testFilterPathPattern() throws Exception {
- final long now = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
+ final long now = getTestModifiedTime();
final File subdir1 = new File(TESTDIR + "/subdir1");
assertTrue(subdir1.mkdirs());
@@ -595,20 +652,20 @@ public class TestListFile {
* Provides "now" minus 1 second in millis
*/
private static long getTestModifiedTime() {
- final long nowNanos = System.nanoTime();
+ final long nowMillis = System.currentTimeMillis();
// Subtract a second to avoid possible rounding issues
- final long nowSeconds = TimeUnit.SECONDS.convert(nowNanos, TimeUnit.NANOSECONDS) - 1;
+ final long nowSeconds = TimeUnit.SECONDS.convert(nowMillis, TimeUnit.MILLISECONDS) - 1;
return TimeUnit.MILLISECONDS.convert(nowSeconds, TimeUnit.SECONDS);
}
- public void resetAges() {
- syncTime = System.currentTimeMillis();
+ private void resetAges() {
+ syncTime = getTestModifiedTime();
age0millis = 0L;
- age1millis = 2000L;
- age2millis = 5000L;
- age3millis = 7000L;
- age4millis = 10000L;
+ age1millis = 5000L;
+ age2millis = 10000L;
+ age3millis = 15000L;
+ age4millis = 20000L;
age5millis = 100000L;
time0millis = syncTime - age0millis;