diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
index dd38e100aa..a86dda1282 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
@@ -68,9 +68,10 @@
test
+
junit
junit
- compile
+ provided
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
index 8d93a65ffe..52049eda4c 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
@@ -32,6 +32,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
@@ -178,6 +179,7 @@ public abstract class AbstractListProcessor extends Ab
private volatile Long lastRunTimeNanos = 0L;
private volatile boolean justElectedPrimaryNode = false;
private volatile boolean resetState = false;
+ private volatile List latestIdentifiersProcessed = new ArrayList<>();
/*
* A constant used in determining an internal "yield" of processing files. Given the logic to provide a pause on the newest
@@ -194,6 +196,7 @@ public abstract class AbstractListProcessor extends Ab
}
static final String LATEST_LISTED_ENTRY_TIMESTAMP_KEY = "listing.timestamp";
static final String LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY = "processed.timestamp";
+ static final String IDENTIFIER_PREFIX = "id";
public File getPersistenceFile() {
return new File("conf/state/" + getIdentifier());
@@ -307,6 +310,8 @@ public abstract class AbstractListProcessor extends Ab
// if the local file's latest timestamp is beyond that of the value provided from the cache, replace
if (minTimestamp == null || localTimestamp > minTimestamp) {
minTimestamp = localTimestamp;
+ latestIdentifiersProcessed.clear();
+ latestIdentifiersProcessed.addAll(listing.getMatchingIdentifiers());
}
}
@@ -317,16 +322,20 @@ public abstract class AbstractListProcessor extends Ab
}
if (minTimestamp != null) {
- persist(minTimestamp, minTimestamp, stateManager, scope);
+ persist(minTimestamp, minTimestamp, latestIdentifiersProcessed, stateManager, scope);
}
}
private void persist(final long latestListedEntryTimestampThisCycleMillis,
final long lastProcessedLatestEntryTimestampMillis,
+ final List processedIdentifiesWithLatestTimestamp,
final StateManager stateManager, final Scope scope) throws IOException {
- final Map updatedState = new HashMap<>(1);
+ final Map updatedState = new HashMap<>(processedIdentifiesWithLatestTimestamp.size() + 2);
updatedState.put(LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(latestListedEntryTimestampThisCycleMillis));
updatedState.put(LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(lastProcessedLatestEntryTimestampMillis));
+ for (int i = 0; i < processedIdentifiesWithLatestTimestamp.size(); i++) {
+ updatedState.put(IDENTIFIER_PREFIX + "." + i, processedIdentifiesWithLatestTimestamp.get(i));
+ }
stateManager.setState(updatedState, scope);
}
@@ -350,19 +359,27 @@ public abstract class AbstractListProcessor extends Ab
// Attempt to retrieve state from the state manager if a last listing was not yet established or
// if just elected the primary node
final StateMap stateMap = context.getStateManager().getState(getStateScope(context));
- final String latestListedEntryTimestampString = stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY);
- final String lastProcessedLatestEntryTimestampString= stateMap.get(LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY);
- if (lastProcessedLatestEntryTimestampString != null) {
- this.lastProcessedLatestEntryTimestampMillis = Long.parseLong(lastProcessedLatestEntryTimestampString);
- }
- if (latestListedEntryTimestampString != null) {
- minTimestampToListMillis = Long.parseLong(latestListedEntryTimestampString);
- // If our determined timestamp is the same as that of our last listing, skip this execution as there are no updates
- if (minTimestampToListMillis == this.lastListedLatestEntryTimestampMillis) {
- context.yield();
- return;
- } else {
- this.lastListedLatestEntryTimestampMillis = minTimestampToListMillis;
+ latestIdentifiersProcessed.clear();
+ for (Map.Entry state : stateMap.toMap().entrySet()) {
+ final String k = state.getKey();
+ final String v = state.getValue();
+ if (v == null || v.isEmpty()) {
+ continue;
+ }
+
+ if (LATEST_LISTED_ENTRY_TIMESTAMP_KEY.equals(k)) {
+ minTimestampToListMillis = Long.parseLong(v);
+ // If our determined timestamp is the same as that of our last listing, skip this execution as there are no updates
+ if (minTimestampToListMillis.equals(this.lastListedLatestEntryTimestampMillis)) {
+ context.yield();
+ return;
+ } else {
+ this.lastListedLatestEntryTimestampMillis = minTimestampToListMillis;
+ }
+ } else if (LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY.equals(k)) {
+ this.lastProcessedLatestEntryTimestampMillis = Long.parseLong(v);
+ } else if (k.startsWith(IDENTIFIER_PREFIX)) {
+ latestIdentifiersProcessed.add(v);
}
}
justElectedPrimaryNode = false;
@@ -405,7 +422,7 @@ public abstract class AbstractListProcessor extends Ab
targetSystemHasSeconds = entityTimestampMillis % 60_000 > 0;
}
// New entries are all those that occur at or after the associated timestamp
- final boolean newEntry = minTimestampToListMillis == null || entityTimestampMillis >= minTimestampToListMillis && entityTimestampMillis > lastProcessedLatestEntryTimestampMillis;
+ final boolean newEntry = minTimestampToListMillis == null || entityTimestampMillis >= minTimestampToListMillis && entityTimestampMillis >= lastProcessedLatestEntryTimestampMillis;
if (newEntry) {
List entitiesForTimestamp = orderedEntries.get(entity.getTimestamp());
@@ -439,7 +456,10 @@ public abstract class AbstractListProcessor extends Ab
* - The latest listed entity timestamp is equal to the last processed time, meaning we handled those items originally passed over. No need to process it again.
*/
final long listingLagNanos = TimeUnit.MILLISECONDS.toNanos(listingLagMillis);
- if (currentRunTimeNanos - lastRunTimeNanos < listingLagNanos || latestListedEntryTimestampThisCycleMillis.equals(lastProcessedLatestEntryTimestampMillis)) {
+ if (currentRunTimeNanos - lastRunTimeNanos < listingLagNanos
+ || (latestListedEntryTimestampThisCycleMillis.equals(lastProcessedLatestEntryTimestampMillis)
+ && orderedEntries.get(latestListedEntryTimestampThisCycleMillis).stream()
+ .allMatch(entity -> latestIdentifiersProcessed.contains(entity.getIdentifier())))) {
context.yield();
return;
}
@@ -456,9 +476,14 @@ public abstract class AbstractListProcessor extends Ab
}
}
+ for (Map.Entry> timestampEntities : orderedEntries.entrySet()) {
+ List entities = timestampEntities.getValue();
+ if (timestampEntities.getKey().equals(lastProcessedLatestEntryTimestampMillis)) {
+ // Filter out previously processed entities.
+ entities = entities.stream().filter(entity -> !latestIdentifiersProcessed.contains(entity.getIdentifier())).collect(Collectors.toList());
+ }
- for (List timestampEntities : orderedEntries.values()) {
- for (T entity : timestampEntities) {
+ for (T entity : entities) {
// Create the FlowFile for this path.
final Map attributes = createAttributes(entity, context);
FlowFile flowFile = session.create();
@@ -476,6 +501,13 @@ public abstract class AbstractListProcessor extends Ab
// If there have been files created, update the last timestamp we processed.
// Retrieving lastKey instead of using latestListedEntryTimestampThisCycleMillis is intentional here,
// because latestListedEntryTimestampThisCycleMillis might be removed if it's not old enough.
+ if (!orderedEntries.lastKey().equals(lastProcessedLatestEntryTimestampMillis)) {
+ // If the latest timestamp at this cycle becomes different than the previous one, we need to clear identifiers.
+ // If it didn't change, we need to add identifiers.
+ latestIdentifiersProcessed.clear();
+ }
+ // Capture latestIdentifierProcessed.
+ latestIdentifiersProcessed.addAll(orderedEntries.lastEntry().getValue().stream().map(T::getIdentifier).collect(Collectors.toList()));
lastProcessedLatestEntryTimestampMillis = orderedEntries.lastKey();
getLogger().info("Successfully created listing with {} new objects", new Object[]{flowfilesCreated});
session.commit();
@@ -494,7 +526,7 @@ public abstract class AbstractListProcessor extends Ab
// the distributed state cache, the node can continue to run (if it is primary node).
try {
lastListedLatestEntryTimestampMillis = latestListedEntryTimestampThisCycleMillis;
- persist(latestListedEntryTimestampThisCycleMillis, lastProcessedLatestEntryTimestampMillis, context.getStateManager(), getStateScope(context));
+ persist(latestListedEntryTimestampThisCycleMillis, lastProcessedLatestEntryTimestampMillis, latestIdentifiersProcessed, context.getStateManager(), getStateScope(context));
} catch (final IOException ioe) {
getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or "
+ "if another node begins executing this Processor, data duplication may occur.", ioe);
@@ -518,6 +550,7 @@ public abstract class AbstractListProcessor extends Ab
lastListedLatestEntryTimestampMillis = null;
lastProcessedLatestEntryTimestampMillis = 0L;
lastRunTimeNanos = 0L;
+ latestIdentifiersProcessed.clear();
}
/**
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
index 69705f2fb8..1ecbce7744 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
@@ -17,6 +17,10 @@
package org.apache.nifi.processor.util.list;
+import static org.apache.nifi.processor.util.list.AbstractListProcessor.PRECISION_MILLIS;
+import static org.apache.nifi.processor.util.list.AbstractListProcessor.PRECISION_MINUTES;
+import static org.apache.nifi.processor.util.list.AbstractListProcessor.PRECISION_SECONDS;
+import static org.apache.nifi.processor.util.list.AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION;
import static org.junit.Assert.assertEquals;
import java.io.File;
@@ -130,6 +134,8 @@ public class TestAbstractListProcessor {
final long initialTimestamp = getCurrentTimestampMillis(targetPrecision);
+ setTargetSystemTimestampPrecision(targetPrecision);
+
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
proc.addEntity("name", "id", initialTimestamp);
proc.addEntity("name", "id2", initialTimestamp);
@@ -182,6 +188,8 @@ public class TestAbstractListProcessor {
final long initialTimestamp = getCurrentTimestampMillis(targetPrecision);
+ setTargetSystemTimestampPrecision(targetPrecision);
+
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
proc.addEntity("name", "id", initialTimestamp);
proc.addEntity("name", "id2", initialTimestamp);
@@ -226,6 +234,20 @@ public class TestAbstractListProcessor {
runner.clearTransferState();
}
+ private void setTargetSystemTimestampPrecision(TimeUnit targetPrecision) {
+ switch (targetPrecision) {
+ case MINUTES:
+ runner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, PRECISION_MINUTES);
+ break;
+ case SECONDS:
+ runner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, PRECISION_SECONDS);
+ break;
+ case MILLISECONDS:
+ runner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, PRECISION_MILLIS);
+ break;
+ }
+ }
+
@Test
public void testOnlyNewEntriesEmittedMillisPrecision() throws Exception {
testOnlyNewEntriesEmitted(TimeUnit.MILLISECONDS);
@@ -257,6 +279,8 @@ public class TestAbstractListProcessor {
final Map preexistingState = new HashMap<>();
preexistingState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, Long.toString(initialTimestamp));
preexistingState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, Long.toString(initialTimestamp));
+ preexistingState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".0", "id");
+ preexistingState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", "id2");
runner.getStateManager().setState(preexistingState, Scope.CLUSTER);
// run for the first time
@@ -324,6 +348,7 @@ public class TestAbstractListProcessor {
// Ensure only timestamp is migrated
expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
+ expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".0", "id");
runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
}
@@ -383,9 +408,10 @@ public class TestAbstractListProcessor {
// Verify the state manager now maintains the associated state
final Map expectedState = new HashMap<>();
- // Ensure only timestamp is migrated
+ // Ensure timestamp and identifies are migrated
expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, "1492");
expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, "1492");
+ expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".0", "id");
runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
}
@@ -462,10 +488,12 @@ public class TestAbstractListProcessor {
assertEquals(2, stateMap.getVersion());
final Map map = stateMap.toMap();
- // Ensure only timestamp is migrated
- assertEquals(2, map.size());
+ // Ensure timestamp and identifiers are migrated
+ assertEquals(4, map.size());
assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY));
assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY));
+ assertEquals("id", map.get(AbstractListProcessor.IDENTIFIER_PREFIX + ".0"));
+ assertEquals("id2", map.get(AbstractListProcessor.IDENTIFIER_PREFIX + ".1"));
proc.addEntity("new name", "new id", initialTimestamp + 1);
runner.run();
@@ -476,10 +504,11 @@ public class TestAbstractListProcessor {
StateMap updatedStateMap = runner.getStateManager().getState(Scope.CLUSTER);
assertEquals(3, updatedStateMap.getVersion());
- assertEquals(2, updatedStateMap.toMap().size());
+ assertEquals(3, updatedStateMap.toMap().size());
assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY));
// Processed timestamp is now caught up
assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY));
+ assertEquals("new id", updatedStateMap.get(AbstractListProcessor.IDENTIFIER_PREFIX + ".0"));
}
private static class DistributedCache extends AbstractControllerService implements DistributedMapCacheClient {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java
index d8797dc8a9..96a423615f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.standard;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.list.AbstractListProcessor;
import org.apache.nifi.processors.standard.util.FTPTransfer;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
@@ -41,6 +42,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
@@ -201,7 +203,7 @@ public class TestFTP {
}
@Test
- public void basicFileList() throws IOException {
+ public void basicFileList() throws IOException, InterruptedException {
FileSystem results = fakeFtpServer.getFileSystem();
FileEntry sampleFile = new FileEntry("c:\\data\\randombytes-2");
@@ -217,10 +219,16 @@ public class TestFTP {
runner.setProperty(FTPTransfer.PASSWORD, password);
runner.setProperty(FTPTransfer.PORT, Integer.toString(ftpPort));
runner.setProperty(ListFTP.REMOTE_PATH, "/");
+ // FakeFTPServer has timestamp precision in minutes.
+ // Specify milliseconds precision so that test does not need to wait for minutes.
+ runner.setProperty(ListFile.TARGET_SYSTEM_TIMESTAMP_PRECISION, ListFile.PRECISION_MILLIS);
runner.assertValid();
+ // Ensure wait for enough lag time.
+ Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.MILLISECONDS) * 2);
runner.run();
+ runner.assertTransferCount(FetchFTP.REL_SUCCESS, 1);
final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(FetchFTP.REL_SUCCESS).get(0);
runner.assertAllFlowFilesContainAttribute("ftp.remote.host");
runner.assertAllFlowFilesContainAttribute("ftp.remote.port");
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
index 1b5b2a4c6e..bf2755b9bc 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
@@ -33,8 +33,10 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Locale;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -646,6 +648,44 @@ public class TestListFile {
assertEquals(false, processor.isListingResetNecessary(new PropertyDescriptor.Builder().name("x").build()));
}
+ private void makeTestFile(final String name, final long millis, final Map fileTimes) throws IOException {
+ final File file = new File(TESTDIR + name);
+ assertTrue(file.createNewFile());
+ assertTrue(file.setLastModified(millis));
+ fileTimes.put(file.getName(), file.lastModified());
+ }
+
+ @Test
+ public void testFilterRunMidFileWrites() throws Exception {
+ final Map fileTimes = new HashMap<>();
+
+ runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
+
+ makeTestFile("/batch1-age3.txt", time3millis, fileTimes);
+ makeTestFile("/batch1-age4.txt", time4millis, fileTimes);
+ makeTestFile("/batch1-age5.txt", time5millis, fileTimes);
+
+ // check files
+ runNext();
+
+ runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
+ runner.assertTransferCount(ListFile.REL_SUCCESS, 3);
+ assertEquals(3, runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS).size());
+
+ // should be picked since it's newer than age3
+ makeTestFile("/batch2-age2.txt", time2millis, fileTimes);
+ // should be picked even if it has the same age3 timestamp, because it wasn't there at the previous cycle.
+ makeTestFile("/batch2-age3.txt", time3millis, fileTimes);
+ // should be ignored since it's older than age3
+ makeTestFile("/batch2-age4.txt", time4millis, fileTimes);
+
+ runNext();
+
+ runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
+ runner.assertTransferCount(ListFile.REL_SUCCESS, 2);
+ assertEquals(2, runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS).size());
+ }
+
/*
* HFS+, default for OS X, only has granularity to one second, accordingly, we go back in time to establish consistent test cases
*