diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java index 18859128d4..9888d163e8 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java @@ -198,12 +198,17 @@ public abstract class AbstractListProcessor extends Ab " However additional DistributedMapCache controller service is required and more JVM heap memory is used." + " See the description of 'Entity Tracking Time Window' property for further details on how it works."); - public static final PropertyDescriptor LISTING_STRATEGY = new Builder() + public static final AllowableValue NO_TRACKING = new AllowableValue("none", "No Tracking", + "This strategy lists an entity without any tracking. The same entity will be listed each time" + + " on executing this processor. It is recommended to change the default run schedule value." + + " Any property that related to the persisting state will be disregarded."); + + public static final PropertyDescriptor LISTING_STRATEGY = new PropertyDescriptor.Builder() .name("listing-strategy") .displayName("Listing Strategy") .description("Specify how to determine new/updated entities. See each strategy descriptions for detail.") .required(true) - .allowableValues(BY_TIMESTAMPS, BY_ENTITIES) + .allowableValues(BY_TIMESTAMPS, BY_ENTITIES, NO_TRACKING) .defaultValue(BY_TIMESTAMPS.getValue()) .build(); @@ -232,7 +237,6 @@ public abstract class AbstractListProcessor extends Ab private volatile boolean resetState = false; private volatile boolean resetEntityTrackingState = false; private volatile List latestIdentifiersProcessed = new ArrayList<>(); - private volatile ListedEntityTracker listedEntityTracker; /* @@ -442,11 +446,62 @@ public abstract class AbstractListProcessor extends Ab } else if (BY_ENTITIES.equals(listingStrategy)) { listByTrackingEntities(context, session); + } else if (NO_TRACKING.equals(listingStrategy)) { + listByNoTracking(context, session); + } else { throw new ProcessException("Unknown listing strategy: " + listingStrategy); } } + public void listByNoTracking(final ProcessContext context, final ProcessSession session) { + final List entityList; + + try { + // Remove any previous state from the state manager before use a No Tracking Strategy. + context.getStateManager().clear(getStateScope(context)); + + } catch (final IOException re) { + getLogger().error("Failed to remove previous state from the State Manager.", new Object[]{re.getMessage()}, re); + context.yield(); + return; + } + + try { + // minTimestamp = 0L by default on this strategy to ignore any future + // comparision in lastModifiedMap to the same entity. + entityList = performListing(context, 0L); + } catch (final IOException pe) { + getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{pe.getMessage()}, pe); + context.yield(); + return; + } + + if (entityList == null || entityList.isEmpty()) { + context.yield(); + return; + } + + final TreeMap> orderedEntries = new TreeMap<>(); + for (final T entity : entityList) { + List entitiesForTimestamp = orderedEntries.computeIfAbsent(entity.getTimestamp(), k -> new ArrayList()); + entitiesForTimestamp.add(entity); + } + + if (orderedEntries.size() > 0) { + for (Map.Entry> timestampEntities : orderedEntries.entrySet()) { + List entities = timestampEntities.getValue(); + for (T entity : entities) { + // Create the FlowFile for this path. + final Map attributes = createAttributes(entity, context); + FlowFile flowFile = session.create(); + flowFile = session.putAllAttributes(flowFile, attributes); + session.transfer(flowFile, REL_SUCCESS); + } + } + } + } + public void listByTrackingTimestamps(final ProcessContext context, final ProcessSession session) throws ProcessException { Long minTimestampToListMillis = lastListedLatestEntryTimestampMillis; diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java index dd8fdaa85d..65afde4470 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java @@ -20,6 +20,7 @@ package org.apache.nifi.processor.util.list; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.Validator; import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; import org.apache.nifi.context.PropertyContext; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.distributed.cache.client.Deserializer; @@ -28,6 +29,7 @@ import org.apache.nifi.distributed.cache.client.Serializer; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.SimpleRecordSchema; @@ -52,11 +54,11 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.List; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Comparator; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -208,6 +210,58 @@ public class TestAbstractListProcessor { runner.assertAllFlowFilesTransferred(AbstractListProcessor.REL_SUCCESS, 0); } + @Test + public void testNoTrackingEntityStrategy() throws IOException { + + // Firstly, choose Timestamp Strategy lists 2 entities and set state. + // After that choose No Tracking Strategy to test if this strategy remove the state. + ProcessSession session = runner.getProcessSessionFactory().createSession(); + ProcessContext context = runner.getProcessContext(); + + runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_TIMESTAMPS); + + // two entities listed + proc.addEntity("one","firstFile",1585344381476L); + proc.addEntity("two","secondFile",1585344381475L); + + runner.run(); + assertEquals(2, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size()); + assertEquals(2, proc.entities.size()); + + final MockStateManager stateManager = runner.getStateManager(); + final Map expectedState = new HashMap<>(); + final Map realState = new HashMap<>(); + + realState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(proc.entities.get("firstFile").getTimestamp())); + realState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(proc.entities.get("secondFile").getTimestamp())); + realState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".0", proc.entities.get("firstFile").getIdentifier()); + realState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", proc.entities.get("secondFile").getIdentifier()); + + stateManager.setState(realState, Scope.CLUSTER); + + // Ensure timestamp and identifies are migrated + expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(proc.entities.get("firstFile").getTimestamp())); + expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(proc.entities.get("secondFile").getTimestamp())); + expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".0", proc.entities.get("firstFile").getIdentifier()); + expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", proc.entities.get("secondFile").getIdentifier()); + + runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER); + + // Change listing strategy + runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.NO_TRACKING); + + // Clear any listed entities after choose No Tracking Strategy + proc.entities.clear(); + + // Add new entity + proc.addEntity("one","firstFile",1585344381476L); + proc.listByNoTracking(context, session); + + // Test if state cleared or not + runner.getStateManager().assertStateNotEquals(expectedState, Scope.CLUSTER); + assertEquals(1, proc.entities.size()); + } + @Test public void testEntityTrackingStrategy() throws InitializationException { runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_ENTITIES); @@ -487,5 +541,18 @@ public class TestAbstractListProcessor { fields.add(new RecordField("size", RecordFieldType.LONG.getDataType())); return new SimpleRecordSchema(fields); } + + private void persist(final long latestListedEntryTimestampThisCycleMillis, + final long lastProcessedLatestEntryTimestampMillis, + final List processedIdentifiesWithLatestTimestamp, + final StateManager stateManager, final Scope scope) throws IOException { + final Map updatedState = new HashMap<>(processedIdentifiesWithLatestTimestamp.size() + 2); + updatedState.put(LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(latestListedEntryTimestampThisCycleMillis)); + updatedState.put(LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(lastProcessedLatestEntryTimestampMillis)); + for (int i = 0; i < processedIdentifiesWithLatestTimestamp.size(); i++) { + updatedState.put(IDENTIFIER_PREFIX + "." + i, processedIdentifiesWithLatestTimestamp.get(i)); + } + stateManager.setState(updatedState, scope); + } } }