From 8b9d4461185848fd552a639ac14b7926164d5426 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Tue, 10 Jul 2018 17:00:38 +0900 Subject: [PATCH] NIFI-54096: Consolidating new model into existing List processors. Adding followings: - Use separate DistributedMapCache for tracking entities to avoid conflict with existing code - Added more validation - Delete listed entities from cache if reset is needed - Support Local scope - Added Initial Listing Target This closes #2876. Signed-off-by: Mark Payne --- .../azure/storage/ListAzureBlobStorage.java | 15 +- .../azure/storage/utils/BlobInfo.java | 5 + .../util/list/AbstractListProcessor.java | 141 +++++++- .../processor/util/list/ListableEntity.java | 5 + .../processor/util/list/ListedEntity.java | 45 +++ .../util/list/ListedEntityTracker.java | 342 ++++++++++++++++++ .../util/list/ITAbstractListProcessor.java | 2 +- .../util/list/TestAbstractListProcessor.java | 158 +++++++- .../nifi/processors/standard/ListFTP.java | 12 +- .../nifi/processors/standard/ListFile.java | 9 +- .../nifi/processors/standard/ListSFTP.java | 12 +- 11 files changed, 705 insertions(+), 41 deletions(-) create mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntity.java create mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java index 9f178faf7f..bf82029986 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java @@ -40,10 +40,12 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.state.Scope; +import org.apache.nifi.context.PropertyContext; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.list.AbstractListProcessor; +import org.apache.nifi.processor.util.list.ListedEntityTracker; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import org.apache.nifi.processors.azure.storage.utils.BlobInfo; import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder; @@ -92,12 +94,17 @@ public class ListAzureBlobStorage extends AbstractListProcessor { .build(); private static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList( + LISTING_STRATEGY, AzureStorageUtils.CONTAINER, AzureStorageUtils.PROP_SAS_TOKEN, AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, PROP_PREFIX, - AzureStorageUtils.PROXY_CONFIGURATION_SERVICE)); + AzureStorageUtils.PROXY_CONFIGURATION_SERVICE, + ListedEntityTracker.TRACKING_STATE_CACHE, + ListedEntityTracker.TRACKING_TIME_WINDOW, + ListedEntityTracker.INITIAL_LISTING_TARGET + )); @Override protected List getSupportedPropertyDescriptors() { @@ -105,10 +112,8 @@ public class ListAzureBlobStorage extends AbstractListProcessor { } @Override - protected Collection customValidate(ValidationContext validationContext) { - final Collection results = AzureStorageUtils.validateCredentialProperties(validationContext); + protected void customValidate(ValidationContext validationContext, Collection results) { AzureStorageUtils.validateProxySpec(validationContext, results); - return results; } @Override @@ -144,7 +149,7 @@ public class ListAzureBlobStorage extends AbstractListProcessor { } @Override - protected Scope getStateScope(final ProcessContext context) { + protected Scope getStateScope(final PropertyContext context) { return Scope.CLUSTER; } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java index 6343dc8126..15b0fe0a84 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java @@ -209,4 +209,9 @@ public class BlobInfo implements Comparable, Serializable, ListableEnt public long getTimestamp() { return getLastModifiedTime(); } + + @Override + public long getSize() { + return length; + } } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java index 5c42f813d8..04e444c972 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java @@ -27,9 +27,12 @@ import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.context.PropertyContext; import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; import org.apache.nifi.distributed.cache.client.Serializer; @@ -49,6 +52,7 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -62,8 +66,8 @@ import java.util.stream.Collectors; /** *

- * An Abstract Processor that is intended to simplify the coding required in order to perform Listing operations of remote resources. - * Those remote resources may be files, "objects", "messages", or any other sort of entity that may need to be listed in such a way that + * An Abstract Processor that is intended to simplify the coding required in order to perform Listing operations of remote or local resources. + * Those resources may be files, "objects", "messages", or any other sort of entity that may need to be listed in such a way that * we identity the entity only once. Each of these objects, messages, etc. is referred to as an "entity" for the scope of this Processor. *

*

@@ -83,6 +87,9 @@ import java.util.stream.Collectors; * than the last timestamp pulled, then the entity is considered new. * *

  • + * With 'Tracking Entities' strategy, the size of entity content is also used to determine if an entity is "new". If the size changes the entity is considered "new". + *
  • + *
  • * Entity must have a user-readable name that can be used for logging purposes. *
  • * @@ -96,9 +103,10 @@ import java.util.stream.Collectors; * NOTE: This processor performs migrations of legacy state mechanisms inclusive of locally stored, file-based state and the optional utilization of the Distributed Cache * Service property to the new {@link StateManager} functionality. Upon successful migration, the associated data from one or both of the legacy mechanisms is purged. *

    + * *

    * For each new entity that is listed, the Processor will send a FlowFile to the 'success' relationship. The FlowFile will have no content but will have some set - * of attributes (defined by the concrete implementation) that can be used to fetch those remote resources or interact with them in whatever way makes sense for + * of attributes (defined by the concrete implementation) that can be used to fetch those resources or interact with them in whatever way makes sense for * the configured dataflow. *

    *

    @@ -106,8 +114,8 @@ import java.util.stream.Collectors; *

    *
      *
    • - * Perform a listing of remote resources. The subclass will implement the {@link #performListing(ProcessContext, Long)} method, which creates a listing of all - * entities on the remote system that have timestamps later than the provided timestamp. If the entities returned have a timestamp before the provided one, those + * Perform a listing of resources. The subclass will implement the {@link #performListing(ProcessContext, Long)} method, which creates a listing of all + * entities on the target system that have timestamps later than the provided timestamp. If the entities returned have a timestamp before the provided one, those * entities will be filtered out. It is therefore not necessary to perform the filtering of timestamps but is provided in order to give the implementation the ability * to filter those resources on the server side rather than pulling back all of the information, if it makes sense to do so in the concrete implementation. *
    • @@ -138,9 +146,11 @@ public abstract class AbstractListProcessor extends Ab public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder() .name("Distributed Cache Service") - .description("Specifies the Controller Service that should be used to maintain state about what has been pulled from the remote server so that if a new node " - + "begins pulling data, it won't duplicate all of the work that has been done. If not specified, the information will not be shared across the cluster. " - + "This property does not need to be set for standalone instances of NiFi but should be configured if NiFi is run within a cluster.") + .description("NOTE: This property is used merely for migration from old NiFi version before state management was introduced at version 0.5.0. " + + "The stored value in the cache service will be migrated into the state when this processor is started at the first time. " + + "The specified Controller Service was used to maintain state about what had been pulled from the remote server so that if a new node " + + "begins pulling data, it won't duplicate all of the work that has been done. If not specified, the information was not shared across the cluster. " + + "This property did not need to be set for standalone instances of NiFi but was supposed to be configured if NiFi had been running within a cluster.") .required(false) .identifiesControllerService(DistributedMapCacheClient.class) .build(); @@ -169,6 +179,28 @@ public abstract class AbstractListProcessor extends Ab .description("All FlowFiles that are received are routed to success") .build(); + public static final AllowableValue BY_TIMESTAMPS = new AllowableValue("timestamps", "Tracking Timestamps", + "This strategy tracks the latest timestamp of listed entity to determine new/updated entities." + + " Since it only tracks few timestamps, it can manage listing state efficiently." + + " However, any newly added, or updated entity having timestamp older than the tracked latest timestamp can not be picked by this strategy." + + " For example, such situation can happen in a file system if a file with old timestamp" + + " is copied or moved into the target directory without its last modified timestamp being updated."); + + public static final AllowableValue BY_ENTITIES = new AllowableValue("entities", "Tracking Entities", + "This strategy tracks information of all the listed entities within the latest 'Entity Tracking Time Window' to determine new/updated entities." + + " This strategy can pick entities having old timestamp that can be missed with 'Tracing Timestamps'." + + " However additional DistributedMapCache controller service is required and more JVM heap memory is used." + + " See the description of 'Entity Tracking Time Window' property for further details on how it works."); + + public static final PropertyDescriptor LISTING_STRATEGY = new PropertyDescriptor.Builder() + .name("listing-strategy") + .displayName("Listing Strategy") + .description("Specify how to determine new/updated entities. See each strategy descriptions for detail.") + .required(true) + .allowableValues(BY_TIMESTAMPS, BY_ENTITIES) + .defaultValue(BY_TIMESTAMPS.getValue()) + .build(); + /** * Represents the timestamp of an entity which was the latest one within those listed at the previous cycle. * It does not necessary mean it has been processed as well. @@ -185,6 +217,8 @@ public abstract class AbstractListProcessor extends Ab private volatile boolean resetState = false; private volatile List latestIdentifiersProcessed = new ArrayList<>(); + private volatile ListedEntityTracker listedEntityTracker; + /* * A constant used in determining an internal "yield" of processing files. Given the logic to provide a pause on the newest * files according to timestamp, it is ensured that at least the specified millis has been eclipsed to avoid getting scheduled @@ -206,14 +240,6 @@ public abstract class AbstractListProcessor extends Ab return new File("conf/state/" + getIdentifier()); } - @Override - protected List getSupportedPropertyDescriptors() { - final List properties = new ArrayList<>(); - properties.add(DISTRIBUTED_CACHE_SERVICE); - properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION); - return properties; - } - @Override public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { if (isConfigurationRestored() && isListingResetNecessary(descriptor)) { @@ -230,6 +256,32 @@ public abstract class AbstractListProcessor extends Ab return relationships; } + /** + * In order to add custom validation at sub-classes, implement {@link #customValidate(ValidationContext, Collection)} method. + */ + @Override + protected final Collection customValidate(ValidationContext context) { + final Collection results = new ArrayList<>(); + + final String listingStrategy = context.getProperty(LISTING_STRATEGY).getValue(); + if (BY_ENTITIES.equals(listingStrategy)) { + ListedEntityTracker.validateProperties(context, results, getStateScope(context)); + } + + customValidate(context, results); + return results; + } + + + /** + * Sub-classes can add custom validation by implementing this method. + * @param validationContext the validation context + * @param validationResults add custom validation result to this collection + */ + protected void customValidate(ValidationContext validationContext, Collection validationResults) { + + } + @OnPrimaryNodeStateChange public void onPrimaryNodeChange(final PrimaryNodeState newState) { justElectedPrimaryNode = (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE); @@ -260,7 +312,6 @@ public abstract class AbstractListProcessor extends Ab if (resetState) { context.getStateManager().clear(getStateScope(context)); - resetState = false; } } @@ -352,9 +403,24 @@ public abstract class AbstractListProcessor extends Ab return mapper.readValue(serializedState, EntityListing.class); } - @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + + resetState = false; + + final String listingStrategy = context.getProperty(LISTING_STRATEGY).getValue(); + if (BY_TIMESTAMPS.equals(listingStrategy)) { + listByTrackingTimestamps(context, session); + + } else if (BY_ENTITIES.equals(listingStrategy)) { + listByTrackingEntities(context, session); + + } else { + throw new ProcessException("Unknown listing strategy: " + listingStrategy); + } + } + + public void listByTrackingTimestamps(final ProcessContext context, final ProcessSession session) throws ProcessException { Long minTimestampToListMillis = lastListedLatestEntryTimestampMillis; if (this.lastListedLatestEntryTimestampMillis == null || this.lastProcessedLatestEntryTimestampMillis == null || justElectedPrimaryNode) { @@ -624,7 +690,7 @@ public abstract class AbstractListProcessor extends Ab * @param context the ProcessContext to use in order to make a determination * @return a Scope that specifies where the state should be managed for this Processor */ - protected abstract Scope getStateScope(final ProcessContext context); + protected abstract Scope getStateScope(final PropertyContext context); private static class StringSerDe implements Serializer, Deserializer { @@ -642,4 +708,41 @@ public abstract class AbstractListProcessor extends Ab out.write(value.getBytes(StandardCharsets.UTF_8)); } } + + @OnScheduled + public void initListedEntityTracker(ProcessContext context) { + final boolean isTrackingEntityStrategy = BY_ENTITIES.getValue().equals(context.getProperty(LISTING_STRATEGY).getValue()); + if (listedEntityTracker != null && (resetState || !isTrackingEntityStrategy)) { + try { + listedEntityTracker.clearListedEntities(); + } catch (IOException e) { + throw new RuntimeException("Failed to reset previously listed entities due to " + e, e); + } + } + + if (isTrackingEntityStrategy) { + if (listedEntityTracker == null) { + listedEntityTracker = createListedEntityTracker(); + } + } else { + listedEntityTracker = null; + } + } + + protected ListedEntityTracker createListedEntityTracker() { + return new ListedEntityTracker<>(getIdentifier(), getLogger()); + } + + private void listByTrackingEntities(ProcessContext context, ProcessSession session) throws ProcessException { + listedEntityTracker.trackEntities(context, session, justElectedPrimaryNode, getStateScope(context), minTimestampToList -> { + try { + return performListing(context, minTimestampToList); + } catch (final IOException e) { + getLogger().error("Failed to perform listing on remote host due to {}", e); + return Collections.emptyList(); + } + }, entity -> createAttributes(entity, context)); + justElectedPrimaryNode = false; + } + } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListableEntity.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListableEntity.java index 01837cb174..f769e78014 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListableEntity.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListableEntity.java @@ -37,4 +37,9 @@ public interface ListableEntity { */ long getTimestamp(); + /** + * @return the size of the entity content. + */ + long getSize(); + } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntity.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntity.java new file mode 100644 index 0000000000..1ae646f3ff --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntity.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.list; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class ListedEntity { + /** + * Milliseconds. + */ + private final long timestamp; + /** + * Bytes. + */ + private final long size; + + @JsonCreator + public ListedEntity(@JsonProperty("timestamp") long timestamp, @JsonProperty("size") long size) { + this.timestamp = timestamp; + this.size = size; + } + + public long getTimestamp() { + return timestamp; + } + + public long getSize() { + return size; + } +} diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java new file mode 100644 index 0000000000..6c36ba95e5 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.list; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.GZIPOutputStream; +import org.apache.nifi.util.StringUtils; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; + +import static java.lang.String.format; +import static org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS; + +public class ListedEntityTracker { + + private final ObjectMapper objectMapper = new ObjectMapper(); + private volatile Map alreadyListedEntities; + + private static final String NOTE = "Used by 'Tracking Entities' strategy."; + public static final PropertyDescriptor TRACKING_STATE_CACHE = new PropertyDescriptor.Builder() + .name("et-state-cache") + .displayName("Entity Tracking State Cache") + .description(format("Listed entities are stored in the specified cache storage" + + " so that this processor can resume listing across NiFi restart or in case of primary node change." + + " 'Tracking Entities' strategy require tracking information of all listed entities within the last 'Tracking Time Window'." + + " To support large number of entities, the strategy uses DistributedMapCache instead of managed state." + + " Cache key format is 'ListedEntities::{processorId}(::{nodeId})'." + + " If it tracks per node listed entities, then the optional '::{nodeId}' part is added to manage state separately." + + " E.g. cluster wide cache key = 'ListedEntities::8dda2321-0164-1000-50fa-3042fe7d6a7b'," + + " per node cache key = 'ListedEntities::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3'" + + " The stored cache content is Gzipped JSON string." + + " The cache key will be deleted when target listing configuration is changed." + + " %s", NOTE)) + .identifiesControllerService(DistributedMapCacheClient.class) + .build(); + + public static final PropertyDescriptor TRACKING_TIME_WINDOW = new PropertyDescriptor.Builder() + .name("et-time-window") + .displayName("Entity Tracking Time Window") + .description(format("Specify how long this processor should track already-listed entities." + + " 'Tracking Entities' strategy can pick any entity whose timestamp is inside the specified time window." + + " For example, if set to '30 minutes', any entity having timestamp in recent 30 minutes will be the listing target when this processor runs." + + " A listed entity is considered 'new/updated' and a FlowFile is emitted if one of following condition meets:" + + " 1. does not exist in the already-listed entities," + + " 2. has newer timestamp than the cached entity," + + " 3. has different size than the cached entity." + + " If a cached entity's timestamp becomes older than specified time window, that entity will be removed from the cached already-listed entities." + + " %s", NOTE)) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .defaultValue("3 hours") + .build(); + + private static final AllowableValue INITIAL_LISTING_TARGET_ALL = new AllowableValue("all", "All Available", + "Regardless of entities timestamp, all existing entities will be listed at the initial listing activity."); + private static final AllowableValue INITIAL_LISTING_TARGET_WINDOW = new AllowableValue("window", "Tracking Time Window", + "Ignore entities having timestamp older than the specified 'Tracking Time Window' at the initial listing activity."); + + public static final PropertyDescriptor INITIAL_LISTING_TARGET = new PropertyDescriptor.Builder() + .name("et-initial-listing-target") + .displayName("Entity Tracking Initial Listing Target") + .description(format("Specify how initial listing should be handled." + + " %s", NOTE)) + .allowableValues(INITIAL_LISTING_TARGET_WINDOW, INITIAL_LISTING_TARGET_ALL) + .defaultValue(INITIAL_LISTING_TARGET_ALL.getValue()) + .build(); + + public static final PropertyDescriptor NODE_IDENTIFIER = new PropertyDescriptor.Builder() + .name("et-node-identifier") + .displayName("Entity Tracking Node Identifier") + .description(format("The configured value will be appended to the cache key" + + " so that listing state can be tracked per NiFi node rather than cluster wide" + + " when tracking state is scoped to LOCAL. %s", NOTE)) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .defaultValue("${hostname()}") + .build(); + + static final Supplier DEFAULT_CURRENT_TIMESTAMP_SUPPLIER = System::currentTimeMillis; + private final Supplier currentTimestampSupplier; + + private final Serializer stringSerializer = (v, o) -> o.write(v.getBytes(StandardCharsets.UTF_8)); + + private final Serializer> listedEntitiesSerializer = (v, o) -> { + final GZIPOutputStream gzipOutputStream = new GZIPOutputStream(o); + objectMapper.writeValue(gzipOutputStream, v); + // Finish writing gzip data without closing the underlying stream. + gzipOutputStream.finish(); + }; + + private final Deserializer> listedEntitiesDeserializer = v -> { + if (v == null || v.length == 0) { + return null; + } + try (final GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(v))) { + return objectMapper.readValue(in, new TypeReference>() {}); + } + }; + + private final String componentId; + private final ComponentLog logger; + + /* + * The scope, nodeId and mapCacheClient being used at the previous trackEntities method execution is captured, + * so that it can be used when resetListedEntities is called. + */ + private Scope scope; + private String nodeId; + private DistributedMapCacheClient mapCacheClient; + + ListedEntityTracker(String componentId, ComponentLog logger) { + this(componentId, logger, DEFAULT_CURRENT_TIMESTAMP_SUPPLIER); + } + + /** + * This constructor is used by unit test code so that it can produce the consistent result by controlling current timestamp. + * @param currentTimestampSupplier a function to return current timestamp. + */ + ListedEntityTracker(String componentId, ComponentLog logger, Supplier currentTimestampSupplier) { + this.componentId = componentId; + this.logger = logger; + this.currentTimestampSupplier = currentTimestampSupplier; + } + + static void validateProperties(ValidationContext context, Collection results, Scope scope) { + validateRequiredProperty(context, results, ListedEntityTracker.TRACKING_STATE_CACHE); + validateRequiredProperty(context, results, ListedEntityTracker.TRACKING_TIME_WINDOW); + + if (Scope.LOCAL.equals(scope) + && StringUtils.isEmpty(context.getProperty(NODE_IDENTIFIER).evaluateAttributeExpressions().getValue())) { + results.add(new ValidationResult.Builder() + .subject(NODE_IDENTIFIER.getDisplayName()) + .explanation(format("'%s' is required to use local scope with '%s' listing strategy", + NODE_IDENTIFIER.getDisplayName(), AbstractListProcessor.BY_ENTITIES.getDisplayName())) + .build()); + } + } + + private static void validateRequiredProperty(ValidationContext context, Collection results, PropertyDescriptor property) { + if (!context.getProperty(property).isSet()) { + final String displayName = property.getDisplayName(); + results.add(new ValidationResult.Builder() + .subject(displayName) + .explanation(format("'%s' is required to use '%s' listing strategy", displayName, AbstractListProcessor.BY_ENTITIES.getDisplayName())) + .valid(false) + .build()); + } + } + + private static final String CACHE_KEY_PREFIX = "ListedEntities"; + private String getCacheKey() { + switch (scope) { + case LOCAL: + return format("%s::%s::%s", CACHE_KEY_PREFIX, componentId, nodeId); + case CLUSTER: + return format("%s::%s", CACHE_KEY_PREFIX, componentId); + } + throw new IllegalArgumentException("Unknown scope: " + scope); + } + + private void persistListedEntities(Map listedEntities) throws IOException { + final String cacheKey = getCacheKey(); + logger.debug("Persisting listed entities: {}={}", new Object[]{cacheKey, listedEntities}); + mapCacheClient.put(cacheKey, listedEntities, stringSerializer, listedEntitiesSerializer); + } + + private Map fetchListedEntities() throws IOException { + final String cacheKey = getCacheKey(); + final Map listedEntities = mapCacheClient.get(cacheKey, stringSerializer, listedEntitiesDeserializer); + logger.debug("Fetched listed entities: {}={}", new Object[]{cacheKey, listedEntities}); + return listedEntities; + } + + void clearListedEntities() throws IOException { + alreadyListedEntities = null; + if (mapCacheClient != null) { + final String cacheKey = getCacheKey(); + logger.debug("Removing listed entities from cache storage: {}", new Object[]{cacheKey}); + mapCacheClient.remove(cacheKey, stringSerializer); + } + } + + public void trackEntities(ProcessContext context, ProcessSession session, + boolean justElectedPrimaryNode, + Scope scope, + Function> listEntities, + Function> createAttributes) throws ProcessException { + + boolean initialListing = false; + mapCacheClient = context.getProperty(TRACKING_STATE_CACHE).asControllerService(DistributedMapCacheClient.class); + this.scope = scope; + if (Scope.LOCAL.equals(scope)) { + nodeId = context.getProperty(ListedEntityTracker.NODE_IDENTIFIER).evaluateAttributeExpressions().getValue(); + } else { + nodeId = null; + } + + if (alreadyListedEntities == null || justElectedPrimaryNode) { + logger.info(justElectedPrimaryNode + ? "Just elected as Primary node, restoring already-listed entities." + : "At the first onTrigger, restoring already-listed entities."); + try { + final Map fetchedListedEntities = fetchListedEntities(); + if (fetchedListedEntities == null) { + this.alreadyListedEntities = new ConcurrentHashMap<>(); + initialListing = true; + } else { + this.alreadyListedEntities = new ConcurrentHashMap<>(fetchedListedEntities); + } + } catch (IOException e) { + throw new ProcessException("Failed to restore already-listed entities due to " + e, e); + } + } + + final long currentTimeMillis = currentTimestampSupplier.get(); + final long watchWindowMillis = context.getProperty(TRACKING_TIME_WINDOW).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS); + + final String initialListingTarget = context.getProperty(INITIAL_LISTING_TARGET).getValue(); + final long minTimestampToList = (initialListing && INITIAL_LISTING_TARGET_ALL.getValue().equals(initialListingTarget)) + ? -1 : currentTimeMillis - watchWindowMillis; + + final Collection listedEntities = listEntities.apply(minTimestampToList); + + if (listedEntities.size() == 0) { + logger.debug("No entity is listed. Yielding."); + context.yield(); + return; + } + + final List updatedEntities = listedEntities.stream().filter(entity -> { + final String identifier = entity.getIdentifier(); + + if (entity.getTimestamp() < minTimestampToList) { + logger.trace("Skipped {} having older timestamp than the minTimestampToList {}.", new Object[]{identifier, entity.getTimestamp(), minTimestampToList}); + return false; + } + + final ListedEntity alreadyListedEntity = alreadyListedEntities.get(identifier); + if (alreadyListedEntity == null) { + logger.trace("Picked {} being newly found.", new Object[]{identifier}); + return true; + } + + if (entity.getTimestamp() > alreadyListedEntity.getTimestamp()) { + logger.trace("Picked {} having newer timestamp {} than {}.", + new Object[]{identifier, entity.getTimestamp(), alreadyListedEntity.getTimestamp()}); + return true; + } + + if (entity.getSize() != alreadyListedEntity.getSize()) { + logger.trace("Picked {} having different size {} than {}.", + new Object[]{identifier, entity.getSize(), alreadyListedEntity.getSize()}); + return true; + } + + logger.trace("Skipped {}, not changed.", new Object[]{identifier, entity.getTimestamp(), minTimestampToList}); + return false; + }).collect(Collectors.toList()); + + // Find old enough entries. + final List oldEntityIds = alreadyListedEntities.entrySet().stream() + .filter(entry -> entry.getValue().getTimestamp() < minTimestampToList).map(Map.Entry::getKey) + .collect(Collectors.toList()); + + if (updatedEntities.isEmpty() && oldEntityIds.isEmpty()) { + logger.debug("None of updated or old entity was found. Yielding."); + context.yield(); + return; + } + + // Remove old entries. + oldEntityIds.forEach(oldEntityId -> alreadyListedEntities.remove(oldEntityId)); + + // Emit updated entities. + for (T updatedEntity : updatedEntities) { + FlowFile flowFile = session.create(); + flowFile = session.putAllAttributes(flowFile, createAttributes.apply(updatedEntity)); + session.transfer(flowFile, REL_SUCCESS); + // In order to reduce object size, discard meta data captured at the sub-classes. + final ListedEntity listedEntity = new ListedEntity(updatedEntity.getTimestamp(), updatedEntity.getSize()); + alreadyListedEntities.put(updatedEntity.getIdentifier(), listedEntity); + } + + // Commit ProcessSession before persisting listed entities. + // In case persisting listed entities failure, same entities may be listed again, but better than not listing. + session.commit(); + try { + logger.debug("Removed old entities count: {}, Updated entities count: {}", + new Object[]{oldEntityIds.size(), updatedEntities.size()}); + if (logger.isTraceEnabled()) { + logger.trace("Removed old entities: {}, Updated entities: {}", + new Object[]{oldEntityIds, updatedEntities}); + } + persistListedEntities(alreadyListedEntities); + } catch (IOException e) { + throw new ProcessException("Failed to persist already-listed entities due to " + e, e); + } + + } + +} diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java index 4082ce3342..e59d65c3e0 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java @@ -72,7 +72,7 @@ public class ITAbstractListProcessor { throw new RuntimeException("Failed to retrieve state", e); } }, - () -> proc.entities, + () -> proc.getEntityList(), () -> runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).stream().map(m -> (FlowFile) m).collect(Collectors.toList()) ); diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java index f5eae46f97..e893332ee4 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java @@ -18,7 +18,9 @@ package org.apache.nifi.processor.util.list; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; import org.apache.nifi.components.state.Scope; +import org.apache.nifi.context.PropertyContext; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; @@ -42,12 +44,13 @@ import java.io.FileOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -82,7 +85,7 @@ public class TestAbstractListProcessor { throw new RuntimeException("Failed to retrieve state", e); } }, - () -> proc.entities, + () -> proc.getEntityList(), () -> runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).stream().map(m -> (FlowFile) m).collect(Collectors.toList()) ); @@ -171,6 +174,109 @@ public class TestAbstractListProcessor { assertEquals(1, cache.fetchCount); } + @Test + public void testEntityTrackingStrategy() throws InitializationException { + runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_ENTITIES); + // Require a cache service. + runner.assertNotValid(); + + final DistributedCache trackingCache = new DistributedCache(); + runner.addControllerService("tracking-cache", trackingCache); + runner.enableControllerService(trackingCache); + + runner.setProperty(ListedEntityTracker.TRACKING_STATE_CACHE, "tracking-cache"); + runner.setProperty(ListedEntityTracker.TRACKING_TIME_WINDOW, "10ms"); + + runner.assertValid(); + + proc.currentTimestamp.set(0L); + runner.run(); + assertEquals(0, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size()); + + // Should list one entity. + proc.addEntity("one", "one", 1, 1); + proc.currentTimestamp.set(1L); + runner.clearTransferState(); + runner.run(); + assertEquals(1, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size()); + runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(0) + .assertAttributeEquals(CoreAttributes.FILENAME.key(), "one"); + + // Should not list any entity. + proc.currentTimestamp.set(2L); + runner.clearTransferState(); + runner.run(); + assertEquals(0, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size()); + + // Should list added entities. + proc.currentTimestamp.set(10L); + proc.addEntity("five", "five", 5, 5); + proc.addEntity("six", "six", 6, 6); + runner.clearTransferState(); + runner.run(); + assertEquals(2, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size()); + runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(0) + .assertAttributeEquals(CoreAttributes.FILENAME.key(), "five"); + runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(1) + .assertAttributeEquals(CoreAttributes.FILENAME.key(), "six"); + + // Should be able to list entities having older timestamp than the previously listed entity. + // But if its timestamp is out of tracking window, then it won't be picked. + // Current timestamp = 13, and window = 10ms, meaning it can pick entities having timestamp 3 to 13. + proc.currentTimestamp.set(13L); + proc.addEntity("two", "two", 2, 2); + proc.addEntity("three", "three", 3, 3); + proc.addEntity("four", "four", 4, 4); + runner.clearTransferState(); + runner.run(); + assertEquals(2, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size()); + runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(0) + .assertAttributeEquals(CoreAttributes.FILENAME.key(), "three"); + runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(1) + .assertAttributeEquals(CoreAttributes.FILENAME.key(), "four"); + + // Can pick entity that has newer timestamp. + // Can pick entity that has different size. + proc.currentTimestamp.set(14L); + proc.addEntity("five", "five", 7, 5); + proc.addEntity("six", "six", 6, 16); + runner.clearTransferState(); + runner.run(); + assertEquals(2, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size()); + runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(0) + .assertAttributeEquals(CoreAttributes.FILENAME.key(), "six"); + runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(1) + .assertAttributeEquals(CoreAttributes.FILENAME.key(), "five"); + + // Reset state. + // Current timestamp = 15, and window = 11ms, meaning it can pick entities having timestamp 4 to 15. + proc.currentTimestamp.set(15L); + // ConcreteListProcessor can reset state with any property. + runner.setProperty(ListedEntityTracker.TRACKING_TIME_WINDOW, "11ms"); + runner.setProperty(ConcreteListProcessor.RESET_STATE, "1"); + runner.setProperty(ListedEntityTracker.INITIAL_LISTING_TARGET, "window"); + runner.clearTransferState(); + runner.run(); + assertEquals(3, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size()); + runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(0) + .assertAttributeEquals(CoreAttributes.FILENAME.key(), "four"); + runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(1) + .assertAttributeEquals(CoreAttributes.FILENAME.key(), "six"); + runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(2) + .assertAttributeEquals(CoreAttributes.FILENAME.key(), "five"); + + + // Reset state again. + proc.currentTimestamp.set(20L); + // ConcreteListProcessor can reset state with any property. + runner.setProperty(ListedEntityTracker.INITIAL_LISTING_TARGET, "all"); + runner.setProperty(ConcreteListProcessor.RESET_STATE, "2"); + runner.clearTransferState(); + runner.run(); + // All entities should be picked, one to six. + assertEquals(6, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size()); + } + static class DistributedCache extends AbstractControllerService implements DistributedMapCacheClient { private final Map stored = new HashMap<>(); private int fetchCount = 0; @@ -230,18 +336,47 @@ public class TestAbstractListProcessor { } static class ConcreteListProcessor extends AbstractListProcessor { - final List entities = new ArrayList<>(); + final Map entities = new HashMap<>(); final String persistenceFilename = "ListProcessor-local-state-" + UUID.randomUUID().toString() + ".json"; String persistenceFolder = "target/"; File persistenceFile = new File(persistenceFolder + persistenceFilename); + private static PropertyDescriptor RESET_STATE = new PropertyDescriptor.Builder() + .name("reset-state") + .addValidator(Validator.VALID) + .build(); + + final AtomicReference currentTimestamp = new AtomicReference<>(); + + @Override + protected ListedEntityTracker createListedEntityTracker() { + return new ListedEntityTracker<>(getIdentifier(), getLogger(), () -> currentTimestamp.get()); + } + + @Override + protected List getSupportedPropertyDescriptors() { + final List properties = new ArrayList<>(); + properties.add(LISTING_STRATEGY); + properties.add(DISTRIBUTED_CACHE_SERVICE); + properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION); + properties.add(ListedEntityTracker.TRACKING_STATE_CACHE); + properties.add(ListedEntityTracker.TRACKING_TIME_WINDOW); + properties.add(ListedEntityTracker.INITIAL_LISTING_TARGET); + properties.add(RESET_STATE); + return properties; + } + @Override public File getPersistenceFile() { return persistenceFile; } public void addEntity(final String name, final String identifier, final long timestamp) { + addEntity(name, identifier, timestamp, 0); + } + + public void addEntity(final String name, final String identifier, final long timestamp, long size) { final ListableEntity entity = new ListableEntity() { @Override public String getName() { @@ -257,9 +392,14 @@ public class TestAbstractListProcessor { public long getTimestamp() { return timestamp; } + + @Override + public long getSize() { + return size; + } }; - entities.add(entity); + entities.put(entity.getIdentifier(), entity); } @Override @@ -276,16 +416,20 @@ public class TestAbstractListProcessor { @Override protected List performListing(final ProcessContext context, final Long minTimestamp) throws IOException { - return Collections.unmodifiableList(entities); + return getEntityList(); + } + + List getEntityList() { + return entities.values().stream().sorted(Comparator.comparing(ListableEntity::getTimestamp)).collect(Collectors.toList()); } @Override protected boolean isListingResetNecessary(PropertyDescriptor property) { - return false; + return RESET_STATE.equals(property); } @Override - protected Scope getStateScope(final ProcessContext context) { + protected Scope getStateScope(final PropertyContext context) { return Scope.CLUSTER; } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java index 6b6a9a7a4e..2968c856cf 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java @@ -34,7 +34,9 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.state.Scope; +import org.apache.nifi.context.PropertyContext; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.list.ListedEntityTracker; import org.apache.nifi.processors.standard.util.FileTransfer; import org.apache.nifi.processors.standard.util.FTPTransfer; @@ -69,6 +71,7 @@ public class ListFTP extends ListFileTransfer { final PropertyDescriptor port = new PropertyDescriptor.Builder().fromPropertyDescriptor(UNDEFAULTED_PORT).defaultValue("21").build(); final List properties = new ArrayList<>(); + properties.add(LISTING_STRATEGY); properties.add(HOSTNAME); properties.add(port); properties.add(USERNAME); @@ -92,6 +95,9 @@ public class ListFTP extends ListFileTransfer { properties.add(FTPTransfer.HTTP_PROXY_PASSWORD); properties.add(FTPTransfer.BUFFER_SIZE); properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION); + properties.add(ListedEntityTracker.TRACKING_STATE_CACHE); + properties.add(ListedEntityTracker.TRACKING_TIME_WINDOW); + properties.add(ListedEntityTracker.INITIAL_LISTING_TARGET); return properties; } @@ -106,16 +112,14 @@ public class ListFTP extends ListFileTransfer { } @Override - protected Scope getStateScope(final ProcessContext context) { + protected Scope getStateScope(final PropertyContext context) { // Use cluster scope so that component can be run on Primary Node Only and can still // pick up where it left off, even if the Primary Node changes. return Scope.CLUSTER; } @Override - protected Collection customValidate(ValidationContext validationContext) { - final List results = new ArrayList<>(); + protected void customValidate(ValidationContext validationContext, Collection results) { FTPTransfer.validateProxySpec(validationContext, results); - return results; } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java index 3923350b1e..8b73b976f8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java @@ -30,6 +30,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.state.Scope; +import org.apache.nifi.context.PropertyContext; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.DataUnit; @@ -38,6 +39,7 @@ import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.list.AbstractListProcessor; +import org.apache.nifi.processor.util.list.ListedEntityTracker; import org.apache.nifi.processors.standard.util.FileInfo; import java.io.File; @@ -217,6 +219,7 @@ public class ListFile extends AbstractListProcessor { @Override protected void init(final ProcessorInitializationContext context) { final List properties = new ArrayList<>(); + properties.add(LISTING_STRATEGY); properties.add(DIRECTORY); properties.add(RECURSE); properties.add(DIRECTORY_LOCATION); @@ -229,6 +232,10 @@ public class ListFile extends AbstractListProcessor { properties.add(MAX_SIZE); properties.add(IGNORE_HIDDEN_FILES); properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION); + properties.add(ListedEntityTracker.TRACKING_STATE_CACHE); + properties.add(ListedEntityTracker.TRACKING_TIME_WINDOW); + properties.add(ListedEntityTracker.INITIAL_LISTING_TARGET); + properties.add(ListedEntityTracker.NODE_IDENTIFIER); this.properties = Collections.unmodifiableList(properties); final Set relationships = new HashSet<>(); @@ -320,7 +327,7 @@ public class ListFile extends AbstractListProcessor { } @Override - protected Scope getStateScope(final ProcessContext context) { + protected Scope getStateScope(final PropertyContext context) { final String location = context.getProperty(DIRECTORY_LOCATION).getValue(); if (LOCATION_REMOTE.getValue().equalsIgnoreCase(location)) { return Scope.CLUSTER; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java index 7af8766efc..2b5d47aead 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java @@ -35,7 +35,9 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.state.Scope; +import org.apache.nifi.context.PropertyContext; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.list.ListedEntityTracker; import org.apache.nifi.processors.standard.util.FTPTransfer; import org.apache.nifi.processors.standard.util.FileTransfer; import org.apache.nifi.processors.standard.util.SFTPTransfer; @@ -71,6 +73,7 @@ public class ListSFTP extends ListFileTransfer { final PropertyDescriptor port = new PropertyDescriptor.Builder().fromPropertyDescriptor(UNDEFAULTED_PORT).defaultValue("22").build(); final List properties = new ArrayList<>(); + properties.add(LISTING_STRATEGY); properties.add(HOSTNAME); properties.add(port); properties.add(USERNAME); @@ -95,6 +98,9 @@ public class ListSFTP extends ListFileTransfer { properties.add(FTPTransfer.PROXY_PORT); properties.add(FTPTransfer.HTTP_PROXY_USERNAME); properties.add(FTPTransfer.HTTP_PROXY_PASSWORD); + properties.add(ListedEntityTracker.TRACKING_STATE_CACHE); + properties.add(ListedEntityTracker.TRACKING_TIME_WINDOW); + properties.add(ListedEntityTracker.INITIAL_LISTING_TARGET); return properties; } @@ -109,16 +115,14 @@ public class ListSFTP extends ListFileTransfer { } @Override - protected Scope getStateScope(final ProcessContext context) { + protected Scope getStateScope(final PropertyContext context) { // Use cluster scope so that component can be run on Primary Node Only and can still // pick up where it left off, even if the Primary Node changes. return Scope.CLUSTER; } @Override - protected Collection customValidate(ValidationContext validationContext) { - final Collection results = new ArrayList<>(); + protected void customValidate(ValidationContext validationContext, Collection results) { SFTPTransfer.validateProxySpec(validationContext, results); - return results; } }