diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
index 8c82dbec98..eb439b4126 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
@@ -35,7 +35,6 @@ import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.ListBlobItem;
-
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -62,6 +61,8 @@ import org.apache.nifi.processors.azure.storage.utils.BlobInfo;
import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder;
import org.apache.nifi.serialization.record.RecordSchema;
+import java.util.Optional;
+
@PrimaryNodeOnly
@TriggerSerially
@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
@@ -140,6 +141,11 @@ public class ListAzureBlobStorage extends AbstractListProcessor {
return attributes;
}
+ @Override
+ protected String getListingContainerName(final ProcessContext context) {
+ return String.format("Azure Blob Storage Container [%s]", getPath(context));
+ }
+
@Override
protected String getPath(final ProcessContext context) {
return context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue();
@@ -173,27 +179,24 @@ public class ListAzureBlobStorage extends AbstractListProcessor {
}
@Override
- protected List performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
- String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue();
- String prefix = context.getProperty(PROP_PREFIX).evaluateAttributeExpressions().getValue();
- if (prefix == null) {
- prefix = "";
- }
+ protected List performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode) throws IOException {
+ final String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue();
+ final String prefix = Optional.ofNullable(context.getProperty(PROP_PREFIX).evaluateAttributeExpressions().getValue()).orElse("");
final List listing = new ArrayList<>();
try {
- CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger(), null);
- CloudBlobContainer container = blobClient.getContainerReference(containerName);
+ final CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger(), null);
+ final CloudBlobContainer container = blobClient.getContainerReference(containerName);
final OperationContext operationContext = new OperationContext();
AzureStorageUtils.setProxy(operationContext, context);
- for (ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), null, operationContext)) {
+ for (final ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), null, operationContext)) {
if (blob instanceof CloudBlob) {
- CloudBlob cloudBlob = (CloudBlob) blob;
- BlobProperties properties = cloudBlob.getProperties();
- StorageUri uri = cloudBlob.getSnapshotQualifiedStorageUri();
+ final CloudBlob cloudBlob = (CloudBlob) blob;
+ final BlobProperties properties = cloudBlob.getProperties();
+ final StorageUri uri = cloudBlob.getSnapshotQualifiedStorageUri();
- Builder builder = new BlobInfo.Builder()
+ final Builder builder = new BlobInfo.Builder()
.primaryUri(uri.getPrimaryUri().toString())
.blobName(cloudBlob.getName())
.containerName(containerName)
@@ -215,12 +218,15 @@ public class ListAzureBlobStorage extends AbstractListProcessor {
listing.add(builder.build());
}
}
- } catch (Throwable t) {
+ } catch (final Throwable t) {
throw new IOException(ExceptionUtils.getRootCause(t));
}
return listing;
}
-
-
+ // Unfiltered listing is not supported - must provide a prefix
+ @Override
+ protected Integer countUnfilteredListing(final ProcessContext context) {
+ return null;
+ }
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
index c726e81e31..268b8c168e 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
@@ -160,12 +160,9 @@ public class ListAzureDataLakeStorage extends AbstractListProcessor results) {
+ protected void customValidate(final ValidationContext context, final Collection results) {
if (context.getProperty(PATH_FILTER).isSet() && !context.getProperty(RECURSE_SUBDIRECTORIES).asBoolean()) {
results.add(new ValidationResult.Builder()
.subject(PATH_FILTER.getDisplayName())
@@ -191,7 +188,7 @@ public class ListAzureDataLakeStorage extends AbstractListProcessor performListing(ProcessContext context, Long minTimestamp) throws IOException {
- try {
- String fileSystem = evaluateFileSystemProperty(context, null);
- String baseDirectory = evaluateDirectoryProperty(context, null);
- boolean recurseSubdirectories = context.getProperty(RECURSE_SUBDIRECTORIES).asBoolean();
-
- DataLakeServiceClient storageClient = getStorageClient(context, null);
- DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
-
- ListPathsOptions options = new ListPathsOptions();
- options.setPath(baseDirectory);
- options.setRecursive(recurseSubdirectories);
-
- Pattern baseDirectoryPattern = Pattern.compile("^" + baseDirectory + "/?");
-
- List listing = fileSystemClient.listPaths(options, null).stream()
- .filter(pathItem -> !pathItem.isDirectory())
- .map(pathItem -> new ADLSFileInfo.Builder()
- .fileSystem(fileSystem)
- .filePath(pathItem.getName())
- .length(pathItem.getContentLength())
- .lastModified(pathItem.getLastModified().toInstant().toEpochMilli())
- .etag(pathItem.getETag())
- .build())
- .filter(fileInfo -> filePattern == null || filePattern.matcher(fileInfo.getFilename()).matches())
- .filter(fileInfo -> pathPattern == null || pathPattern.matcher(RegExUtils.removeFirst(fileInfo.getDirectory(), baseDirectoryPattern)).matches())
- .collect(Collectors.toList());
-
- return listing;
- } catch (Exception e) {
- getLogger().error("Failed to list directory on Azure Data Lake Storage", e);
- throw new IOException(ExceptionUtils.getRootCause(e));
- }
+ protected List performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode) throws IOException {
+ return performListing(context, listingMode, true);
}
@Override
- protected Map createAttributes(ADLSFileInfo fileInfo, ProcessContext context) {
- Map attributes = new HashMap<>();
+ protected Integer countUnfilteredListing(final ProcessContext context) throws IOException {
+ return performListing(context, ListingMode.CONFIGURATION_VERIFICATION, false).size();
+ }
+
+ @Override
+ protected String getListingContainerName(final ProcessContext context) {
+ return String.format("Azure Data Lake Directory [%s]", getPath(context));
+ }
+
+ @Override
+ protected Map createAttributes(final ADLSFileInfo fileInfo, final ProcessContext context) {
+ final Map attributes = new HashMap<>();
attributes.put(ATTR_NAME_FILESYSTEM, fileInfo.getFileSystem());
attributes.put(ATTR_NAME_FILE_PATH, fileInfo.getFilePath());
@@ -261,4 +237,48 @@ public class ListAzureDataLakeStorage extends AbstractListProcessor performListing(final ProcessContext context, final ListingMode listingMode,
+ final boolean applyFilters) throws IOException {
+ try {
+ final String fileSystem = evaluateFileSystemProperty(context, null);
+ final String baseDirectory = evaluateDirectoryProperty(context, null);
+ final boolean recurseSubdirectories = context.getProperty(RECURSE_SUBDIRECTORIES).asBoolean();
+
+ final Pattern filePattern = listingMode == ListingMode.EXECUTION ? this.filePattern : getPattern(context, FILE_FILTER);
+ final Pattern pathPattern = listingMode == ListingMode.EXECUTION ? this.pathPattern : getPattern(context, PATH_FILTER);
+
+ final DataLakeServiceClient storageClient = getStorageClient(context, null);
+ final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
+
+ final ListPathsOptions options = new ListPathsOptions();
+ options.setPath(baseDirectory);
+ options.setRecursive(recurseSubdirectories);
+
+ final Pattern baseDirectoryPattern = Pattern.compile("^" + baseDirectory + "/?");
+
+ final List listing = fileSystemClient.listPaths(options, null).stream()
+ .filter(pathItem -> !pathItem.isDirectory())
+ .map(pathItem -> new ADLSFileInfo.Builder()
+ .fileSystem(fileSystem)
+ .filePath(pathItem.getName())
+ .length(pathItem.getContentLength())
+ .lastModified(pathItem.getLastModified().toInstant().toEpochMilli())
+ .etag(pathItem.getETag())
+ .build())
+ .filter(fileInfo -> applyFilters && (filePattern == null || filePattern.matcher(fileInfo.getFilename()).matches()))
+ .filter(fileInfo -> applyFilters && (pathPattern == null || pathPattern.matcher(RegExUtils.removeFirst(fileInfo.getDirectory(), baseDirectoryPattern)).matches()))
+ .collect(Collectors.toList());
+
+ return listing;
+ } catch (final Exception e) {
+ getLogger().error("Failed to list directory on Azure Data Lake Storage", e);
+ throw new IOException(ExceptionUtils.getRootCause(e));
+ }
+ }
+
+ private Pattern getPattern(final ProcessContext context, final PropertyDescriptor filterDescriptor) {
+ String value = context.getProperty(filterDescriptor).evaluateAttributeExpressions().getValue();
+ return value != null ? Pattern.compile(value) : null;
+ }
}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
index e494b8c706..4fcb862d28 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
@@ -26,6 +26,8 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.components.ValidationContext;
@@ -40,10 +42,12 @@ import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
@@ -121,7 +125,7 @@ import java.util.stream.Collectors;
*
*
* -
- * Perform a listing of resources. The subclass will implement the {@link #performListing(ProcessContext, Long)} method, which creates a listing of all
+ * Perform a listing of resources. The subclass will implement the {@link #performListing(ProcessContext, Long, ListingMode)} method, which creates a listing of all
* entities on the target system that have timestamps later than the provided timestamp. If the entities returned have a timestamp before the provided one, those
* entities will be filtered out. It is therefore not necessary to perform the filtering of timestamps but is provided in order to give the implementation the ability
* to filter those resources on the server side rather than pulling back all of the information, if it makes sense to do so in the concrete implementation.
@@ -149,42 +153,61 @@ import java.util.stream.Collectors;
@TriggerSerially
@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "After a listing of resources is performed, the latest timestamp of any of the resources is stored in the component's state. "
+ "The scope used depends on the implementation.")
-public abstract class AbstractListProcessor extends AbstractProcessor {
+public abstract class AbstractListProcessor extends AbstractProcessor implements VerifiableProcessor {
+
+ /**
+ * Indicates the mode when performing a listing.
+ */
+ protected enum ListingMode {
+ /**
+ * Indicates the listing is being performed during normal processor execution. May use configuration cached in the Processor object.
+ */
+ EXECUTION,
+ /**
+ * Indicates the listing is being performed during configuration verification. Only use configuration provided in the ProcessContext argument, since the configuration may not
+ * have been applied to the processor yet.
+ */
+ CONFIGURATION_VERIFICATION
+ }
+
+ private static final Long IGNORE_MIN_TIMESTAMP_VALUE = 0L;
public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new Builder()
- .name("Distributed Cache Service")
- .description("NOTE: This property is used merely for migration from old NiFi version before state management was introduced at version 0.5.0. "
- + "The stored value in the cache service will be migrated into the state when this processor is started at the first time. "
- + "The specified Controller Service was used to maintain state about what had been pulled from the remote server so that if a new node "
- + "begins pulling data, it won't duplicate all of the work that has been done. If not specified, the information was not shared across the cluster. "
- + "This property did not need to be set for standalone instances of NiFi but was supposed to be configured if NiFi had been running within a cluster.")
- .required(false)
- .identifiesControllerService(DistributedMapCacheClient.class)
- .build();
+ .name("Distributed Cache Service")
+ .description("NOTE: This property is used merely for migration from old NiFi version before state management was introduced at version 0.5.0. "
+ + "The stored value in the cache service will be migrated into the state when this processor is started at the first time. "
+ + "The specified Controller Service was used to maintain state about what had been pulled from the remote server so that if a new node "
+ + "begins pulling data, it won't duplicate all of the work that has been done. If not specified, the information was not shared across the cluster. "
+ + "This property did not need to be set for standalone instances of NiFi but was supposed to be configured if NiFi had been running within a cluster.")
+ .required(false)
+ .identifiesControllerService(DistributedMapCacheClient.class)
+ .build();
public static final AllowableValue PRECISION_AUTO_DETECT = new AllowableValue("auto-detect", "Auto Detect",
- "Automatically detect time unit deterministically based on candidate entries timestamp."
- + " Please note that this option may take longer to list entities unnecessarily, if none of entries has a precise precision timestamp."
- + " E.g. even if a target system supports millis, if all entries only have timestamps without millis, such as '2017-06-16 09:06:34.000', then its precision is determined as 'seconds'.");
+ "Automatically detect time unit deterministically based on candidate entries timestamp."
+ + " Please note that this option may take longer to list entities unnecessarily, if none of entries has a precise precision timestamp."
+ + " E.g. even if a target system supports millis, if all entries only have timestamps without millis, such as '2017-06-16 09:06:34.000', "
+ + "then its precision is determined as 'seconds'.");
public static final AllowableValue PRECISION_MILLIS = new AllowableValue("millis", "Milliseconds",
"This option provides the minimum latency for an entry from being available to being listed if target system supports millis, if not, use other options.");
- public static final AllowableValue PRECISION_SECONDS = new AllowableValue("seconds", "Seconds","For a target system that does not have millis precision, but has in seconds.");
+ public static final AllowableValue PRECISION_SECONDS = new AllowableValue("seconds", "Seconds",
+ "For a target system that does not have millis precision, but has in seconds.");
public static final AllowableValue PRECISION_MINUTES = new AllowableValue("minutes", "Minutes", "For a target system that only supports precision in minutes.");
public static final PropertyDescriptor TARGET_SYSTEM_TIMESTAMP_PRECISION = new Builder()
- .name("target-system-timestamp-precision")
- .displayName("Target System Timestamp Precision")
- .description("Specify timestamp precision at the target system."
- + " Since this processor uses timestamp of entities to decide which should be listed, it is crucial to use the right timestamp precision.")
- .required(true)
- .allowableValues(PRECISION_AUTO_DETECT, PRECISION_MILLIS, PRECISION_SECONDS, PRECISION_MINUTES)
- .defaultValue(PRECISION_AUTO_DETECT.getValue())
- .build();
+ .name("target-system-timestamp-precision")
+ .displayName("Target System Timestamp Precision")
+ .description("Specify timestamp precision at the target system."
+ + " Since this processor uses timestamp of entities to decide which should be listed, it is crucial to use the right timestamp precision.")
+ .required(true)
+ .allowableValues(PRECISION_AUTO_DETECT, PRECISION_MILLIS, PRECISION_SECONDS, PRECISION_MINUTES)
+ .defaultValue(PRECISION_AUTO_DETECT.getValue())
+ .build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
- .name("success")
- .description("All FlowFiles that are received are routed to success")
- .build();
+ .name("success")
+ .description("All FlowFiles that are received are routed to success")
+ .build();
public static final AllowableValue BY_TIMESTAMPS = new AllowableValue("timestamps", "Tracking Timestamps",
"This strategy tracks the latest timestamp of listed entity to determine new/updated entities." +
@@ -214,22 +237,22 @@ public abstract class AbstractListProcessor extends Ab
" are accurate.");
public static final PropertyDescriptor LISTING_STRATEGY = new Builder()
- .name("listing-strategy")
- .displayName("Listing Strategy")
- .description("Specify how to determine new/updated entities. See each strategy descriptions for detail.")
- .required(true)
- .allowableValues(BY_TIMESTAMPS, BY_ENTITIES, NO_TRACKING)
- .defaultValue(BY_TIMESTAMPS.getValue())
- .build();
+ .name("listing-strategy")
+ .displayName("Listing Strategy")
+ .description("Specify how to determine new/updated entities. See each strategy descriptions for detail.")
+ .required(true)
+ .allowableValues(BY_TIMESTAMPS, BY_ENTITIES, NO_TRACKING)
+ .defaultValue(BY_TIMESTAMPS.getValue())
+ .build();
public static final PropertyDescriptor RECORD_WRITER = new Builder()
- .name("record-writer")
- .displayName("Record Writer")
- .description("Specifies the Record Writer to use for creating the listing. If not specified, one FlowFile will be created for each entity that is listed. If the Record Writer is specified, " +
- "all entities will be written to a single FlowFile instead of adding attributes to individual FlowFiles.")
- .required(false)
- .identifiesControllerService(RecordSetWriterFactory.class)
- .build();
+ .name("record-writer")
+ .displayName("Record Writer")
+ .description("Specifies the Record Writer to use for creating the listing. If not specified, one FlowFile will be created for each entity that is listed. " +
+ "If the Record Writer is specified, all entities will be written to a single FlowFile instead of adding attributes to individual FlowFiles.")
+ .required(false)
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .build();
/**
* Represents the timestamp of an entity which was the latest one within those listed at the previous cycle.
@@ -255,6 +278,7 @@ public abstract class AbstractListProcessor extends Ab
* near instantaneously after the prior iteration effectively voiding the built in buffer
*/
public static final Map LISTING_LAG_MILLIS;
+
static {
final Map nanos = new HashMap<>();
nanos.put(TimeUnit.MILLISECONDS, 100L);
@@ -262,6 +286,7 @@ public abstract class AbstractListProcessor extends Ab
nanos.put(TimeUnit.MINUTES, 60_000L);
LISTING_LAG_MILLIS = Collections.unmodifiableMap(nanos);
}
+
static final String LATEST_LISTED_ENTRY_TIMESTAMP_KEY = "listing.timestamp";
static final String LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY = "processed.timestamp";
static final String IDENTIFIER_PREFIX = "id";
@@ -279,7 +304,6 @@ public abstract class AbstractListProcessor extends Ab
}
}
-
@Override
public Set getRelationships() {
final Set relationships = new HashSet<>();
@@ -291,7 +315,7 @@ public abstract class AbstractListProcessor extends Ab
* In order to add custom validation at sub-classes, implement {@link #customValidate(ValidationContext, Collection)} method.
*/
@Override
- protected final Collection customValidate(ValidationContext context) {
+ protected final Collection customValidate(final ValidationContext context) {
final Collection results = new ArrayList<>();
final String listingStrategy = context.getProperty(LISTING_STRATEGY).getValue();
@@ -303,9 +327,9 @@ public abstract class AbstractListProcessor extends Ab
return results;
}
-
/**
* Sub-classes can add custom validation by implementing this method.
+ *
* @param validationContext the validation context
* @param validationResults add custom validation result to this collection
*/
@@ -313,6 +337,54 @@ public abstract class AbstractListProcessor extends Ab
}
+ @Override
+ public List verify(final ProcessContext context, final ComponentLog logger, final Map attributes) {
+
+ final List results = new ArrayList<>();
+
+ final String containerName = getListingContainerName(context);
+ try {
+ final Integer unfilteredListingCount = countUnfilteredListing(context);
+ final int matchingCount = performListing(context, IGNORE_MIN_TIMESTAMP_VALUE, ListingMode.CONFIGURATION_VERIFICATION).size();
+
+ final String countExplanation;
+ if (unfilteredListingCount == null) {
+ if (matchingCount == 0) {
+ countExplanation = "Found no objects matching the filter.";
+ } else {
+ final String matchingCountText = matchingCount == 1 ? matchingCount + " object" : matchingCount + " objects";
+ countExplanation = String.format("Found %s matching the filter.", matchingCountText);
+ }
+ } else if (unfilteredListingCount == 0) {
+ countExplanation = "Found no objects.";
+ } else {
+ final String unfilteredListingCountText = unfilteredListingCount == 1 ? unfilteredListingCount + " object" : unfilteredListingCount + " objects";
+ final String unfilteredDemonstrativePronoun = unfilteredListingCount == 1 ? "that" : "those";
+ final String matchingCountText = matchingCount == 1 ? matchingCount + " matches" : matchingCount + " match";
+ countExplanation = String.format("Found %s. Of %s, %s the filter.",
+ unfilteredListingCountText, unfilteredDemonstrativePronoun, matchingCountText);
+ }
+
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Perform Listing")
+ .outcome(Outcome.SUCCESSFUL)
+ .explanation(String.format("Successfully listed contents of %s. %s", containerName, countExplanation))
+ .build());
+
+ logger.info("Successfully verified configuration");
+ } catch (final IOException e) {
+ logger.warn("Failed to verify configuration. Could not list contents of {}", containerName, e);
+
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Perform Listing")
+ .outcome(Outcome.FAILED)
+ .explanation(String.format("Failed to list contents of %s: %s", containerName, e.getMessage()))
+ .build());
+ }
+
+ return results;
+ }
+
@OnPrimaryNodeStateChange
public void onPrimaryNodeChange(final PrimaryNodeState newState) {
justElectedPrimaryNode = (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE);
@@ -376,7 +448,7 @@ public abstract class AbstractListProcessor extends Ab
client.remove(path, new StringSerDe());
} catch (final IOException ioe) {
getLogger().warn("Failed to remove entry from Distributed Cache Service. However, the state has already been migrated to use the new "
- + "State Management service, so the Distributed Cache Service is no longer needed.");
+ + "State Management service, so the Distributed Cache Service is no longer needed.");
}
}
}
@@ -415,8 +487,8 @@ public abstract class AbstractListProcessor extends Ab
}
private Map createStateMap(final long latestListedEntryTimestampThisCycleMillis,
- final long lastProcessedLatestEntryTimestampMillis,
- final List processedIdentifiesWithLatestTimestamp) throws IOException {
+ final long lastProcessedLatestEntryTimestampMillis,
+ final List processedIdentifiesWithLatestTimestamp) throws IOException {
final Map updatedState = new HashMap<>(processedIdentifiesWithLatestTimestamp.size() + 2);
updatedState.put(LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(latestListedEntryTimestampThisCycleMillis));
@@ -428,7 +500,6 @@ public abstract class AbstractListProcessor extends Ab
return updatedState;
}
-
private void persist(final long latestListedEntryTimestampThisCycleMillis,
final long lastProcessedLatestEntryTimestampMillis,
final List processedIdentifiesWithLatestTimestamp,
@@ -467,6 +538,10 @@ public abstract class AbstractListProcessor extends Ab
}
}
+ protected long getCurrentTime() {
+ return System.currentTimeMillis();
+ }
+
public void listByNoTracking(final ProcessContext context, final ProcessSession session) {
final List entityList;
@@ -483,7 +558,7 @@ public abstract class AbstractListProcessor extends Ab
try {
// minTimestamp = 0L by default on this strategy to ignore any future
// comparision in lastModifiedMap to the same entity.
- entityList = performListing(context, 0L);
+ entityList = performListing(context, IGNORE_MIN_TIMESTAMP_VALUE, ListingMode.EXECUTION);
} catch (final IOException pe) {
getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{pe.getMessage()}, pe);
context.yield();
@@ -516,12 +591,12 @@ public abstract class AbstractListProcessor extends Ab
}
public void listByTimeWindow(final ProcessContext context, final ProcessSession session) throws ProcessException {
- if (this.lastListedLatestEntryTimestampMillis == null || justElectedPrimaryNode) {
+ if (lastListedLatestEntryTimestampMillis == null || justElectedPrimaryNode) {
try {
final StateMap stateMap = context.getStateManager().getState(getStateScope(context));
Optional.ofNullable(stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY))
- .map(Long::parseLong)
- .ifPresent(lastTimestamp -> this.lastListedLatestEntryTimestampMillis = lastTimestamp);
+ .map(Long::parseLong)
+ .ifPresent(lastTimestamp -> lastListedLatestEntryTimestampMillis = lastTimestamp);
justElectedPrimaryNode = false;
} catch (final IOException ioe) {
@@ -531,14 +606,14 @@ public abstract class AbstractListProcessor extends Ab
}
}
- long lowerBoundInclusiveTimestamp = Optional.ofNullable(this.lastListedLatestEntryTimestampMillis).orElse(0L);
+ long lowerBoundInclusiveTimestamp = Optional.ofNullable(lastListedLatestEntryTimestampMillis).orElse(IGNORE_MIN_TIMESTAMP_VALUE);
long upperBoundExclusiveTimestamp;
long currentTime = getCurrentTime();
final TreeMap> orderedEntries = new TreeMap<>();
try {
- List entityList = performListing(context, lowerBoundInclusiveTimestamp);
+ List entityList = performListing(context, lowerBoundInclusiveTimestamp, ListingMode.EXECUTION);
boolean targetSystemHasMilliseconds = false;
boolean targetSystemHasSeconds = false;
@@ -560,7 +635,7 @@ public abstract class AbstractListProcessor extends Ab
}
final TimeUnit targetSystemTimePrecision
= PRECISION_AUTO_DETECT.getValue().equals(specifiedPrecision)
- ? targetSystemHasMilliseconds ? TimeUnit.MILLISECONDS : targetSystemHasSeconds ? TimeUnit.SECONDS : TimeUnit.MINUTES
+ ? targetSystemHasMilliseconds ? TimeUnit.MILLISECONDS : targetSystemHasSeconds ? TimeUnit.SECONDS : TimeUnit.MINUTES
: PRECISION_MILLIS.getValue().equals(specifiedPrecision) ? TimeUnit.MILLISECONDS
: PRECISION_SECONDS.getValue().equals(specifiedPrecision) ? TimeUnit.SECONDS : TimeUnit.MINUTES;
final Long listingLagMillis = LISTING_LAG_MILLIS.get(targetSystemTimePrecision);
@@ -572,19 +647,19 @@ public abstract class AbstractListProcessor extends Ab
getLogger().trace("entityList: " + entityList.stream().map(entity -> entity.getName() + "_" + entity.getTimestamp()).collect(Collectors.joining(", ")));
}
entityList
- .stream()
- .filter(entity -> entity.getTimestamp() >= lowerBoundInclusiveTimestamp)
- .filter(entity -> entity.getTimestamp() < upperBoundExclusiveTimestamp)
- .forEach(entity -> orderedEntries
- .computeIfAbsent(entity.getTimestamp(), __ -> new ArrayList<>())
- .add(entity)
- );
+ .stream()
+ .filter(entity -> entity.getTimestamp() >= lowerBoundInclusiveTimestamp)
+ .filter(entity -> entity.getTimestamp() < upperBoundExclusiveTimestamp)
+ .forEach(entity -> orderedEntries
+ .computeIfAbsent(entity.getTimestamp(), __ -> new ArrayList<>())
+ .add(entity)
+ );
if (getLogger().isTraceEnabled()) {
getLogger().trace("orderedEntries: " +
- orderedEntries.values().stream()
- .flatMap(List::stream)
- .map(entity -> entity.getName() + "_" + entity.getTimestamp())
- .collect(Collectors.joining(", "))
+ orderedEntries.values().stream()
+ .flatMap(List::stream)
+ .map(entity -> entity.getName() + "_" + entity.getTimestamp())
+ .collect(Collectors.joining(", "))
);
}
} catch (final IOException e) {
@@ -614,31 +689,27 @@ public abstract class AbstractListProcessor extends Ab
try {
if (getLogger().isTraceEnabled()) {
- getLogger().info("this.lastListedLatestEntryTimestampMillis = upperBoundExclusiveTimestamp: " + this.lastListedLatestEntryTimestampMillis + " = " + upperBoundExclusiveTimestamp);
+ getLogger().info("this.lastListedLatestEntryTimestampMillis = upperBoundExclusiveTimestamp: " + lastListedLatestEntryTimestampMillis + " = " + upperBoundExclusiveTimestamp);
}
- this.lastListedLatestEntryTimestampMillis = upperBoundExclusiveTimestamp;
+ lastListedLatestEntryTimestampMillis = upperBoundExclusiveTimestamp;
persist(upperBoundExclusiveTimestamp, upperBoundExclusiveTimestamp, latestIdentifiersProcessed, session, getStateScope(context));
} catch (final IOException ioe) {
getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or "
- + "if another node begins executing this Processor, data duplication may occur.", ioe);
+ + "if another node begins executing this Processor, data duplication may occur.", ioe);
}
}
- protected long getCurrentTime() {
- return System.currentTimeMillis();
- }
-
public void listByTrackingTimestamps(final ProcessContext context, final ProcessSession session) throws ProcessException {
Long minTimestampToListMillis = lastListedLatestEntryTimestampMillis;
- if (this.lastListedLatestEntryTimestampMillis == null || this.lastProcessedLatestEntryTimestampMillis == null || justElectedPrimaryNode) {
+ if (lastListedLatestEntryTimestampMillis == null || lastProcessedLatestEntryTimestampMillis == null || justElectedPrimaryNode) {
try {
boolean noUpdateRequired = false;
// Attempt to retrieve state from the state manager if a last listing was not yet established or
// if just elected the primary node
final StateMap stateMap = session.getState(getStateScope(context));
latestIdentifiersProcessed.clear();
- for (Map.Entry state : stateMap.toMap().entrySet()) {
+ for (final Map.Entry state : stateMap.toMap().entrySet()) {
final String k = state.getKey();
final String v = state.getValue();
if (v == null || v.isEmpty()) {
@@ -648,13 +719,13 @@ public abstract class AbstractListProcessor extends Ab
if (LATEST_LISTED_ENTRY_TIMESTAMP_KEY.equals(k)) {
minTimestampToListMillis = Long.parseLong(v);
// If our determined timestamp is the same as that of our last listing, skip this execution as there are no updates
- if (minTimestampToListMillis.equals(this.lastListedLatestEntryTimestampMillis)) {
+ if (minTimestampToListMillis.equals(lastListedLatestEntryTimestampMillis)) {
noUpdateRequired = true;
} else {
- this.lastListedLatestEntryTimestampMillis = minTimestampToListMillis;
+ lastListedLatestEntryTimestampMillis = minTimestampToListMillis;
}
} else if (LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY.equals(k)) {
- this.lastProcessedLatestEntryTimestampMillis = Long.parseLong(v);
+ lastProcessedLatestEntryTimestampMillis = Long.parseLong(v);
} else if (k.startsWith(IDENTIFIER_PREFIX)) {
latestIdentifiersProcessed.add(v);
}
@@ -676,7 +747,7 @@ public abstract class AbstractListProcessor extends Ab
final long currentRunTimeMillis = System.currentTimeMillis();
try {
// track of when this last executed for consideration of the lag nanos
- entityList = performListing(context, minTimestampToListMillis);
+ entityList = performListing(context, minTimestampToListMillis, ListingMode.EXECUTION);
} catch (final IOException e) {
getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{e.getMessage()}, e);
context.yield();
@@ -728,7 +799,7 @@ public abstract class AbstractListProcessor extends Ab
}
final TimeUnit targetSystemTimePrecision
= PRECISION_AUTO_DETECT.getValue().equals(specifiedPrecision)
- ? targetSystemHasMilliseconds ? TimeUnit.MILLISECONDS : targetSystemHasSeconds ? TimeUnit.SECONDS : TimeUnit.MINUTES
+ ? targetSystemHasMilliseconds ? TimeUnit.MILLISECONDS : targetSystemHasSeconds ? TimeUnit.SECONDS : TimeUnit.MINUTES
: PRECISION_MILLIS.getValue().equals(specifiedPrecision) ? TimeUnit.MILLISECONDS
: PRECISION_SECONDS.getValue().equals(specifiedPrecision) ? TimeUnit.SECONDS : TimeUnit.MINUTES;
final Long listingLagMillis = LISTING_LAG_MILLIS.get(targetSystemTimePrecision);
@@ -743,8 +814,8 @@ public abstract class AbstractListProcessor extends Ab
final long listingLagNanos = TimeUnit.MILLISECONDS.toNanos(listingLagMillis);
if (currentRunTimeNanos - lastRunTimeNanos < listingLagNanos
|| (latestListedEntryTimestampThisCycleMillis.equals(lastProcessedLatestEntryTimestampMillis)
- && orderedEntries.get(latestListedEntryTimestampThisCycleMillis).stream()
- .allMatch(entity -> latestIdentifiersProcessed.contains(entity.getIdentifier())))) {
+ && orderedEntries.get(latestListedEntryTimestampThisCycleMillis).stream()
+ .allMatch(entity -> latestIdentifiersProcessed.contains(entity.getIdentifier())))) {
context.yield();
return;
}
@@ -778,7 +849,7 @@ public abstract class AbstractListProcessor extends Ab
// As long as we have a listing timestamp, there is meaningful state to capture regardless of any outputs generated
if (latestListedEntryTimestampThisCycleMillis != null) {
- boolean processedNewFiles = entitiesListed > 0;
+ final boolean processedNewFiles = entitiesListed > 0;
if (!latestListedEntryTimestampThisCycleMillis.equals(lastListedLatestEntryTimestampMillis) || processedNewFiles) {
// We have performed a listing and pushed any FlowFiles out that may have been generated
@@ -794,7 +865,7 @@ public abstract class AbstractListProcessor extends Ab
persist(latestListedEntryTimestampThisCycleMillis, lastProcessedLatestEntryTimestampMillis, latestIdentifiersProcessed, session, getStateScope(context));
} catch (final IOException ioe) {
getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or "
- + "if another node begins executing this Processor, data duplication may occur.", ioe);
+ + "if another node begins executing this Processor, data duplication may occur.", ioe);
}
}
@@ -834,17 +905,17 @@ public abstract class AbstractListProcessor extends Ab
final WriteResult writeResult;
try (final OutputStream out = session.write(flowFile);
- final RecordSetWriter recordSetWriter = writerFactory.createWriter(getLogger(), getRecordSchema(), out, Collections.emptyMap())) {
+ final RecordSetWriter recordSetWriter = writerFactory.createWriter(getLogger(), getRecordSchema(), out, Collections.emptyMap())) {
recordSetWriter.beginRecordSet();
- for (Map.Entry> timestampEntities : orderedEntries.entrySet()) {
+ for (final Map.Entry> timestampEntities : orderedEntries.entrySet()) {
List entities = timestampEntities.getValue();
if (timestampEntities.getKey().equals(lastProcessedLatestEntryTimestampMillis)) {
// Filter out previously processed entities.
entities = entities.stream().filter(entity -> !latestIdentifiersProcessed.contains(entity.getIdentifier())).collect(Collectors.toList());
}
- for (T entity : entities) {
+ for (final T entity : entities) {
entitiesListed++;
recordSetWriter.write(entity.toRecord());
}
@@ -868,14 +939,14 @@ public abstract class AbstractListProcessor extends Ab
private int createFlowFilesForEntities(final ProcessContext context, final ProcessSession session, final Map> orderedEntries) {
int entitiesListed = 0;
- for (Map.Entry> timestampEntities : orderedEntries.entrySet()) {
+ for (final Map.Entry> timestampEntities : orderedEntries.entrySet()) {
List entities = timestampEntities.getValue();
if (timestampEntities.getKey().equals(lastProcessedLatestEntryTimestampMillis)) {
// Filter out previously processed entities.
entities = entities.stream().filter(entity -> !latestIdentifiersProcessed.contains(entity.getIdentifier())).collect(Collectors.toList());
}
- for (T entity : entities) {
+ for (final T entity : entities) {
entitiesListed++;
// Create the FlowFile for this path.
@@ -894,6 +965,7 @@ public abstract class AbstractListProcessor extends Ab
* So that it use return different precisions than PRECISION_AUTO_DETECT.
* If TARGET_SYSTEM_TIMESTAMP_PRECISION is supported as a valid Processor property,
* then PRECISION_AUTO_DETECT will be the default value when not specified by a user.
+ *
* @return
*/
protected String getDefaultTimePrecision() {
@@ -936,11 +1008,13 @@ public abstract class AbstractListProcessor extends Ab
* will be filtered out by the Processor. Therefore, it is not necessary that implementations perform this filtering but can be more efficient
* if the filtering can be performed on the server side prior to retrieving the information.
*
- * @param context the ProcessContex to use in order to pull the appropriate entities
- * @param minTimestamp the minimum timestamp of entities that should be returned.
+ * @param context the ProcessContext to use in order to pull the appropriate entities
+ * @param minTimestamp the minimum timestamp of entities that should be returned
+ * @param listingMode the listing mode, indicating whether the listing is being performed during configuration verification or normal processor execution
* @return a Listing of entities that have a timestamp >= minTimestamp
*/
- protected abstract List performListing(final ProcessContext context, final Long minTimestamp) throws IOException;
+ protected abstract List performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode)
+ throws IOException;
/**
* Determines whether or not the listing must be reset if the value of the given property is changed
@@ -963,7 +1037,25 @@ public abstract class AbstractListProcessor extends Ab
*/
protected abstract RecordSchema getRecordSchema();
+ /**
+ * Performs an unfiltered listing and returns the count, or null if this operation is not supported.
+ *
+ * @param context the ProcessContext to use in order to pull the appropriate entities
+ * @return The number of unfiltered entities in the listing, or null if this processor does not support an unfiltered listing
+ */
+ protected abstract Integer countUnfilteredListing(final ProcessContext context)
+ throws IOException;
+
+ /**
+ * Provides a human-readable name for the container being listed, for the purpose of displaying readable verification messages during processor configuration verification.
+ *
+ * @param context The process context
+ * @return The user-friendly name for the container
+ */
+ protected abstract String getListingContainerName(final ProcessContext context);
+
private static class StringSerDe implements Serializer, Deserializer {
+
@Override
public String deserialize(final byte[] value) throws DeserializationException, IOException {
if (value == null) {
@@ -972,11 +1064,11 @@ public abstract class AbstractListProcessor extends Ab
return new String(value, StandardCharsets.UTF_8);
}
-
@Override
public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
out.write(value.getBytes(StandardCharsets.UTF_8));
}
+
}
@OnScheduled
@@ -1007,7 +1099,7 @@ public abstract class AbstractListProcessor extends Ab
private void listByTrackingEntities(ProcessContext context, ProcessSession session) throws ProcessException {
listedEntityTracker.trackEntities(context, session, justElectedPrimaryNode, getStateScope(context), minTimestampToList -> {
try {
- return performListing(context, minTimestampToList);
+ return performListing(context, minTimestampToList, ListingMode.EXECUTION);
} catch (final IOException e) {
getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{e.getMessage()}, e);
return Collections.emptyList();
@@ -1015,5 +1107,4 @@ public abstract class AbstractListProcessor extends Ab
}, entity -> createAttributes(entity, context));
justElectedPrimaryNode = false;
}
-
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
index 7565f1821e..1a70ecb7a0 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
@@ -257,7 +257,7 @@ public class ListedEntityTracker {
} else {
this.alreadyListedEntities = new ConcurrentHashMap<>(fetchedListedEntities);
}
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new ProcessException("Failed to restore already-listed entities due to " + e, e);
}
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java
index e59d65c3e0..ec64785cb4 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java
@@ -17,6 +17,7 @@
package org.apache.nifi.processor.util.list;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
+import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.flowfile.FlowFile;
@@ -30,7 +31,9 @@ import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestWatcher;
import java.io.IOException;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -225,6 +228,10 @@ public class ITAbstractListProcessor {
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
runner.clearTransferState();
+
+ final List results = proc.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap());
+ assertEquals(1, results.size());
+ assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, results.get(0).getOutcome());
}
private void setTargetSystemTimestampPrecision(TimeUnit targetPrecision) {
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
index 440a08d968..4f78e8c1fc 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
@@ -17,7 +17,10 @@
package org.apache.nifi.processor.util.list;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
@@ -30,6 +33,7 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
@@ -43,6 +47,7 @@ import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
+import org.glassfish.jersey.internal.guava.Predicates;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@@ -55,6 +60,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@@ -62,11 +68,13 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class TestAbstractListProcessor {
@@ -224,9 +232,12 @@ public class TestAbstractListProcessor {
proc.addEntity("one","firstFile",1585344381476L);
proc.addEntity("two","secondFile",1585344381475L);
+ assertVerificationOutcome(Outcome.SUCCESSFUL, ".* Found 2 objects. Of those, 2 match the filter.");
+
runner.run();
assertEquals(2, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
assertEquals(2, proc.entities.size());
+ assertVerificationOutcome(Outcome.SUCCESSFUL, ".* Found 2 objects. Of those, 2 match the filter.");
final MockStateManager stateManager = runner.getStateManager();
final Map expectedState = new HashMap<>();
@@ -252,14 +263,16 @@ public class TestAbstractListProcessor {
// Clear any listed entities after choose No Tracking Strategy
proc.entities.clear();
+ assertVerificationOutcome(Outcome.SUCCESSFUL, ".* Found no objects.");
// Add new entity
proc.addEntity("one","firstFile",1585344381476L);
- proc.listByNoTracking(context, session);
+ proc.listByTrackingTimestamps(context, session);
// Test if state cleared or not
runner.getStateManager().assertStateNotEquals(expectedState, Scope.CLUSTER);
assertEquals(1, proc.entities.size());
+ assertVerificationOutcome(Outcome.SUCCESSFUL, ".* Found 1 object. Of that, 1 matches the filter.");
}
@Test
@@ -285,14 +298,22 @@ public class TestAbstractListProcessor {
proc.addEntity("one", "one", 1, 1);
proc.currentTimestamp.set(1L);
runner.clearTransferState();
+ // Prior to running the processor, we should expect 3 objects during verification
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed contents of .*.json.*" +
+ "Found 1 object. Of that, 1 matches the filter.");
runner.run();
assertEquals(1, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(0)
.assertAttributeEquals(CoreAttributes.FILENAME.key(), "one");
+ // The object is now tracked, so it's no longer considered new
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed contents of .*.json.*" +
+ "Found 1 object. Of that, 1 matches the filter.");
// Should not list any entity.
proc.currentTimestamp.set(2L);
runner.clearTransferState();
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed contents of .*.json.*" +
+ "Found 1 object. Of that, 1 matches the filter.");
runner.run();
assertEquals(0, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
@@ -301,6 +322,8 @@ public class TestAbstractListProcessor {
proc.addEntity("five", "five", 5, 5);
proc.addEntity("six", "six", 6, 6);
runner.clearTransferState();
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed contents of .*.json.*" +
+ "Found 3 objects. Of those, 3 match the filter.");
runner.run();
assertEquals(2, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(0)
@@ -316,6 +339,8 @@ public class TestAbstractListProcessor {
proc.addEntity("three", "three", 3, 3);
proc.addEntity("four", "four", 4, 4);
runner.clearTransferState();
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed contents of .*.json.*" +
+ "Found 6 objects. Of those, 6 match the filter.");
runner.run();
assertEquals(2, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(0)
@@ -329,6 +354,8 @@ public class TestAbstractListProcessor {
proc.addEntity("five", "five", 7, 5);
proc.addEntity("six", "six", 6, 16);
runner.clearTransferState();
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed contents of .*.json.*" +
+ "Found 6 objects. Of those, 6 match the filter.");
runner.run();
assertEquals(2, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(0)
@@ -344,7 +371,12 @@ public class TestAbstractListProcessor {
runner.setProperty(ConcreteListProcessor.RESET_STATE, "1");
runner.setProperty(ListedEntityTracker.INITIAL_LISTING_TARGET, "window");
runner.clearTransferState();
+
+ // Prior to running the processor, we should expect 3 objects during verification
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed contents of .*.json.*" +
+ "Found 6 objects. Of those, 6 match the filter.");
runner.run();
+
assertEquals(3, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(0)
.assertAttributeEquals(CoreAttributes.FILENAME.key(), "four");
@@ -353,16 +385,44 @@ public class TestAbstractListProcessor {
runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(2)
.assertAttributeEquals(CoreAttributes.FILENAME.key(), "five");
-
// Reset state again.
proc.currentTimestamp.set(20L);
// ConcreteListProcessor can reset state with any property.
runner.setProperty(ListedEntityTracker.INITIAL_LISTING_TARGET, "all");
runner.setProperty(ConcreteListProcessor.RESET_STATE, "2");
runner.clearTransferState();
+
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed contents of .*.json.*" +
+ "Found 6 objects. Of those, 6 match the filter.");
+
runner.run();
// All entities should be picked, one to six.
assertEquals(6, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
+ // Now all are tracked, so none are new
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed contents of .*.json.*" +
+ "Found 6 objects. Of those, 6 match the filter.");
+
+ // Reset state again.
+ proc.currentTimestamp.set(25L);
+ runner.setProperty(ListedEntityTracker.INITIAL_LISTING_TARGET, "window");
+ runner.setProperty(ListedEntityTracker.TRACKING_TIME_WINDOW, "20ms");
+ runner.setProperty(ConcreteListProcessor.LISTING_FILTER, "f[a-z]+"); // Match only four and five
+ runner.setProperty(ConcreteListProcessor.RESET_STATE, "3");
+ runner.clearTransferState();
+
+ // Time window is now 5ms - 25ms, so only 5 and 6 fall in the window, so only 1 of the 2 filtered entities are considered 'new'
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed contents of .*.json.*" +
+ "Found 6 objects. Of those, 2 match the filter.");
+ }
+
+ private void assertVerificationOutcome(final Outcome expectedOutcome, final String expectedExplanationRegex) {
+ final List results = proc.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap());
+
+ assertEquals(1, results.size());
+ final ConfigVerificationResult result = results.get(0);
+ assertEquals(expectedOutcome, result.getOutcome());
+ assertTrue(String.format("Expected verification result to match pattern [%s]. Actual explanation was: %s", expectedExplanationRegex, result.getExplanation()),
+ result.getExplanation().matches(expectedExplanationRegex));
}
static class DistributedCache extends AbstractControllerService implements DistributedMapCacheClient {
@@ -434,6 +494,12 @@ public class TestAbstractListProcessor {
.name("reset-state")
.addValidator(Validator.VALID)
.build();
+ private static final PropertyDescriptor LISTING_FILTER = new PropertyDescriptor.Builder()
+ .name("listing-filter")
+ .displayName("Listing Filter")
+ .description("Filters listed entities by name.")
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+ .build();
final AtomicReference currentTimestamp = new AtomicReference<>();
@@ -453,6 +519,7 @@ public class TestAbstractListProcessor {
properties.add(ListedEntityTracker.TRACKING_TIME_WINDOW);
properties.add(ListedEntityTracker.INITIAL_LISTING_TARGET);
properties.add(RESET_STATE);
+ properties.add(LISTING_FILTER);
return properties;
}
@@ -514,8 +581,17 @@ public class TestAbstractListProcessor {
}
@Override
- protected List performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
- return getEntityList();
+ protected List performListing(final ProcessContext context, final Long minTimestamp, ListingMode listingMode) throws IOException {
+ final PropertyValue listingFilter = context.getProperty(LISTING_FILTER);
+ Predicate filter = listingFilter.isSet()
+ ? entity -> entity.getName().matches(listingFilter.getValue())
+ : Predicates.alwaysTrue();
+ return getEntityList().stream().filter(filter).collect(Collectors.toList());
+ }
+
+ @Override
+ protected Integer countUnfilteredListing(final ProcessContext context) throws IOException {
+ return entities.size();
}
List getEntityList() {
@@ -527,6 +603,11 @@ public class TestAbstractListProcessor {
return RESET_STATE.equals(property);
}
+ @Override
+ protected String getListingContainerName(final ProcessContext context) {
+ return persistenceFilename;
+ }
+
@Override
protected Scope getStateScope(final PropertyContext context) {
return Scope.CLUSTER;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java
index bd07aba79c..da362366ab 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java
@@ -296,7 +296,7 @@ public abstract class GetFileTransfer extends AbstractProcessor {
}
final StopWatch stopWatch = new StopWatch(true);
- final List listing = transfer.getListing();
+ final List listing = transfer.getListing(true);
final long millis = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
int newItems = 0;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
index ad9601ec3e..fd6032aa4f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
@@ -126,4 +126,5 @@ public class ListFTP extends ListFileTransfer {
protected void customValidate(ValidationContext validationContext, Collection results) {
FTPTransfer.validateProxySpec(validationContext, results);
}
+
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
index 8d06dff3f6..c0af210070 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
@@ -338,7 +338,6 @@ public class ListFile extends AbstractListProcessor {
@OnScheduled
public void onScheduled(final ProcessContext context) {
- fileFilterRef.set(createFileFilter(context));
includeFileAttributes = context.getProperty(INCLUDE_FILE_ATTRIBUTES).asBoolean();
final long maxDiskOperationMillis = context.getProperty(MAX_DISK_OPERATION_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
@@ -351,6 +350,7 @@ public class ListFile extends AbstractListProcessor {
} else {
performanceTracker = new UntrackedPerformanceTracker(getLogger(), maxDiskOperationMillis);
}
+ fileFilterRef.set(createFileFilter(context, performanceTracker, true));
final long millisToKeepStats = TimeUnit.MINUTES.toMillis(15);
final MonitorActiveTasks monitorTask = new MonitorActiveTasks(performanceTracker, getLogger(), maxDiskOperationMillis, maxListingMillis, millisToKeepStats);
@@ -502,12 +502,31 @@ public class ListFile extends AbstractListProcessor {
}
@Override
- protected List performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
+ protected Integer countUnfilteredListing(final ProcessContext context) throws IOException {
+ return performListing(context, 0L, ListingMode.CONFIGURATION_VERIFICATION, false).size();
+ }
+ @Override
+ protected List performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode)
+ throws IOException {
+ return performListing(context, minTimestamp, listingMode, true);
+ }
+
+ private List performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode, final boolean applyFilters)
+ throws IOException {
final Path basePath = new File(getPath(context)).toPath();
final Boolean recurse = context.getProperty(RECURSE).asBoolean();
final Map lastModifiedMap = new HashMap<>();
- final BiPredicate fileFilter = fileFilterRef.get();
+ final BiPredicate fileFilter;
+ final PerformanceTracker performanceTracker;
+ if (listingMode == ListingMode.EXECUTION) {
+ fileFilter = fileFilterRef.get();
+ performanceTracker = this.performanceTracker;
+ } else {
+ final long maxDiskOperationMillis = context.getProperty(MAX_DISK_OPERATION_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+ performanceTracker = new UntrackedPerformanceTracker(getLogger(), maxDiskOperationMillis);
+ fileFilter = createFileFilter(context, performanceTracker, applyFilters);
+ }
int maxDepth = recurse ? Integer.MAX_VALUE : 1;
final BiPredicate matcher = new BiPredicate() {
@@ -515,7 +534,7 @@ public class ListFile extends AbstractListProcessor {
@Override
public boolean test(final Path path, final BasicFileAttributes attributes) {
- if (!isScheduled()) {
+ if (!isScheduled() && listingMode == ListingMode.EXECUTION) {
throw new ProcessorStoppedException();
}
@@ -536,10 +555,11 @@ public class ListFile extends AbstractListProcessor {
final TimedOperationKey operationKey = performanceTracker.beginOperation(DiskOperation.FILTER, relativePath, filename);
try {
- if (!isDirectory && (minTimestamp == null || attributes.lastModifiedTime().toMillis() >= minTimestamp)
- && fileFilter.test(path, attributes)) {
- // We store the attributes for each Path we are returning in order to avoid to
- // retrieve them again later when creating the FileInfo
+ final boolean matchesFilters = (minTimestamp == null || attributes.lastModifiedTime().toMillis() >= minTimestamp)
+ && fileFilter.test(path, attributes);
+ if (!isDirectory && (!applyFilters || matchesFilters)) {
+ // We store the attributes for each Path we are returning in order to avoid
+ // retrieving them again later when creating the FileInfo
lastModifiedMap.put(path, attributes);
return true;
@@ -562,17 +582,17 @@ public class ListFile extends AbstractListProcessor {
Files.walkFileTree(basePath, Collections.singleton(FileVisitOption.FOLLOW_LINKS), maxDepth, new FileVisitor() {
@Override
- public FileVisitResult preVisitDirectory(final Path dir, final BasicFileAttributes attributes) throws IOException {
+ public FileVisitResult preVisitDirectory(final Path dir, final BasicFileAttributes attributes) {
if (Files.isReadable(dir)) {
return FileVisitResult.CONTINUE;
} else {
- getLogger().debug("The following directory is not readable: {}", new Object[] {dir.toString()});
+ getLogger().debug("The following directory is not readable: {}", new Object[]{dir.toString()});
return FileVisitResult.SKIP_SUBTREE;
}
}
@Override
- public FileVisitResult visitFile(final Path path, final BasicFileAttributes attributes) throws IOException {
+ public FileVisitResult visitFile(final Path path, final BasicFileAttributes attributes) {
if (matcher.test(path, attributes)) {
final File file = path.toFile();
final BasicFileAttributes fileAttributes = lastModifiedMap.get(path);
@@ -591,20 +611,20 @@ public class ListFile extends AbstractListProcessor {
}
@Override
- public FileVisitResult visitFileFailed(final Path path, final IOException e) throws IOException {
+ public FileVisitResult visitFileFailed(final Path path, final IOException e) {
if (e instanceof AccessDeniedException) {
- getLogger().debug("The following file is not readable: {}", new Object[] {path.toString()});
+ getLogger().debug("The following file is not readable: {}", new Object[]{path.toString()});
return FileVisitResult.SKIP_SUBTREE;
} else {
- getLogger().error("Error during visiting file {}: {}", new Object[] {path.toString(), e.getMessage()}, e);
+ getLogger().error("Error during visiting file {}: {}", new Object[]{path.toString(), e.getMessage()}, e);
return FileVisitResult.TERMINATE;
}
}
@Override
- public FileVisitResult postVisitDirectory(final Path dir, final IOException e) throws IOException {
+ public FileVisitResult postVisitDirectory(final Path dir, final IOException e) {
if (e != null) {
- getLogger().error("Error during visiting directory {}: {}", new Object[] {dir.toString(), e.getMessage()}, e);
+ getLogger().error("Error during visiting directory {}: {}", new Object[]{dir.toString(), e.getMessage()}, e);
}
return FileVisitResult.CONTINUE;
@@ -619,10 +639,17 @@ public class ListFile extends AbstractListProcessor {
getLogger().info("Processor was stopped so will not complete listing of Files");
return Collections.emptyList();
} finally {
- performanceTracker.completeActiveDirectory();
+ if (performanceTracker != null) {
+ performanceTracker.completeActiveDirectory();
+ }
}
}
+ @Override
+ protected String getListingContainerName(final ProcessContext context) {
+ return String.format("%s Directory [%s]", context.getProperty(DIRECTORY_LOCATION).getValue(), getPath(context));
+ }
+
@Override
protected boolean isListingResetNecessary(final PropertyDescriptor property) {
return DIRECTORY.equals(property)
@@ -636,7 +663,8 @@ public class ListFile extends AbstractListProcessor {
|| IGNORE_HIDDEN_FILES.equals(property);
}
- private BiPredicate createFileFilter(final ProcessContext context) {
+ private BiPredicate createFileFilter(final ProcessContext context, final PerformanceTracker performanceTracker,
+ final boolean applyFilters) {
final long minSize = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
final Double maxSize = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
final long minAge = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
@@ -652,6 +680,10 @@ public class ListFile extends AbstractListProcessor {
final Path basePath = Paths.get(indir);
return (path, attributes) -> {
+ if (!applyFilters) {
+ return true;
+ }
+
if (minSize > attributes.size()) {
return false;
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
index 95d475c088..c2f3f11493 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
@@ -17,10 +17,6 @@
package org.apache.nifi.processors.standard;
-import java.io.IOException;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-
import org.apache.commons.io.IOUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
@@ -32,12 +28,15 @@ import org.apache.nifi.processors.standard.util.FileInfo;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.serialization.record.RecordSchema;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Iterator;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
import java.util.Locale;
+import java.util.Map;
public abstract class ListFileTransfer extends AbstractListProcessor {
public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
@@ -104,11 +103,21 @@ public abstract class ListFileTransfer extends AbstractListProcessor {
}
@Override
- protected List performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
+ protected Integer countUnfilteredListing(final ProcessContext context) throws IOException {
+ return performListing(context, 0L, ListingMode.CONFIGURATION_VERIFICATION, false).size();
+ }
+
+ @Override
+ protected List performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode) throws IOException {
+ return performListing(context, minTimestamp, listingMode, true);
+ }
+
+ protected List performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode,
+ final boolean applyFilters) throws IOException {
final FileTransfer transfer = getFileTransfer(context);
final List listing;
try {
- listing = transfer.getListing();
+ listing = transfer.getListing(applyFilters);
} finally {
IOUtils.closeQuietly(transfer);
}
@@ -128,6 +137,12 @@ public abstract class ListFileTransfer extends AbstractListProcessor {
return listing;
}
+ @Override
+ protected String getListingContainerName(final ProcessContext context) {
+ return String.format("Remote Directory [%s] on [%s:%s]", getPath(context), context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue(),
+ context.getProperty(UNDEFAULTED_PORT).evaluateAttributeExpressions().getValue());
+ }
+
@Override
protected RecordSchema getRecordSchema() {
return FileInfo.getRecordSchema();
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
index 704cebce1a..c2c0aeb5f6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
@@ -146,11 +146,17 @@ public class ListSFTP extends ListFileTransfer {
}
@Override
- protected List performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
- final List listing = super.performListing(context, minTimestamp);
+ protected List performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode,
+ final boolean applyFilters) throws IOException {
+ final List listing = super.performListing(context, minTimestamp, listingMode, applyFilters);
+ if (!applyFilters) {
+ return listing;
+ }
+
+ final Predicate filePredicate = listingMode == ListingMode.EXECUTION ? this.fileFilter : createFileFilter(context);
return listing.stream()
- .filter(fileFilter)
+ .filter(filePredicate)
.collect(Collectors.toList());
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
index b7ac6a9d41..c3a35fa688 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
@@ -190,14 +190,14 @@ public class FTPTransfer implements FileTransfer {
}
@Override
- public List getListing() throws IOException {
+ public List getListing(final boolean applyFilters) throws IOException {
final String path = ctx.getProperty(FileTransfer.REMOTE_PATH).evaluateAttributeExpressions().getValue();
final int depth = 0;
final int maxResults = ctx.getProperty(FileTransfer.REMOTE_POLL_BATCH_SIZE).asInteger();
- return getListing(path, depth, maxResults);
+ return getListing(path, depth, maxResults, applyFilters);
}
- private List getListing(final String path, final int depth, final int maxResults) throws IOException {
+ private List getListing(final String path, final int depth, final int maxResults, final boolean applyFilters) throws IOException {
final List listing = new ArrayList<>();
if (maxResults < 1) {
return listing;
@@ -266,7 +266,7 @@ public class FTPTransfer implements FileTransfer {
// OR if is a link and we're supposed to follow symlink
if ((recurse && file.isDirectory()) || (symlink && file.isSymbolicLink())) {
try {
- listing.addAll(getListing(newFullForwardPath, depth + 1, maxResults - count));
+ listing.addAll(getListing(newFullForwardPath, depth + 1, maxResults - count, applyFilters));
} catch (final IOException e) {
logger.error("Unable to get listing from " + newFullForwardPath + "; skipping", e);
}
@@ -274,8 +274,8 @@ public class FTPTransfer implements FileTransfer {
// if is not a directory and is not a link and it matches
// FILE_FILTER_REGEX - then let's add it
- if (!file.isDirectory() && !file.isSymbolicLink() && pathFilterMatches) {
- if (pattern == null || pattern.matcher(filename).matches()) {
+ if (!file.isDirectory() && !file.isSymbolicLink() && (pathFilterMatches || !applyFilters)) {
+ if (pattern == null || !applyFilters || pattern.matcher(filename).matches()) {
listing.add(newFileInfo(file, path));
count++;
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
index 64bb1306fe..e109714eb4 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
@@ -34,7 +34,7 @@ public interface FileTransfer extends Closeable {
String getHomeDirectory(FlowFile flowFile) throws IOException;
- List getListing() throws IOException;
+ List getListing(boolean applyFilters) throws IOException;
FlowFile getRemoteFile(String remoteFileName, FlowFile flowFile, ProcessSession session) throws ProcessException, IOException;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
index fd77991c1a..1082dd7ef3 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
@@ -263,7 +263,7 @@ public class SFTPTransfer implements FileTransfer {
}
@Override
- public List getListing() throws IOException {
+ public List getListing(final boolean applyFilters) throws IOException {
final String path = ctx.getProperty(FileTransfer.REMOTE_PATH).evaluateAttributeExpressions().getValue();
final int depth = 0;
@@ -277,11 +277,12 @@ public class SFTPTransfer implements FileTransfer {
}
final List listing = new ArrayList<>(1000);
- getListing(path, depth, maxResults, listing);
+ getListing(path, depth, maxResults, listing, applyFilters);
return listing;
}
- protected void getListing(final String path, final int depth, final int maxResults, final List listing) throws IOException {
+ protected void getListing(final String path, final int depth, final int maxResults, final List listing,
+ final boolean applyFilters) throws IOException {
if (maxResults < 1 || listing.size() >= maxResults) {
return;
}
@@ -346,8 +347,8 @@ public class SFTPTransfer implements FileTransfer {
}
// if is not a directory and is not a link and it matches FILE_FILTER_REGEX - then let's add it
- if (!entry.isDirectory() && !(entry.getAttributes().getType() == FileMode.Type.SYMLINK) && isPathMatch) {
- if (pattern == null || pattern.matcher(entryFilename).matches()) {
+ if (!entry.isDirectory() && !(entry.getAttributes().getType() == FileMode.Type.SYMLINK) && (!applyFilters || isPathMatch)) {
+ if (pattern == null || !applyFilters || pattern.matcher(entryFilename).matches()) {
listing.add(newFileInfo(entry, path));
}
}
@@ -379,7 +380,7 @@ public class SFTPTransfer implements FileTransfer {
final String newFullForwardPath = newFullPath.getPath().replace("\\", "/");
try {
- getListing(newFullForwardPath, depth + 1, maxResults, listing);
+ getListing(newFullForwardPath, depth + 1, maxResults, listing, applyFilters);
} catch (final IOException e) {
logger.error("Unable to get listing from " + newFullForwardPath + "; skipping", e);
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
index b6af93849b..7c3caa2778 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
@@ -18,6 +18,8 @@
package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.SystemUtils;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.flowfile.FlowFile;
@@ -233,27 +235,34 @@ public class TestListFile {
assertTrue(file1.createNewFile());
assertTrue(file1.setLastModified(time4millis));
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 1 object. Of that, 1 matches the filter.");
+
// process first file and set new timestamp
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(1, successFiles1.size());
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 1 object. Of that, 1 matches the filter.");
// create second file
final File file2 = new File(TESTDIR + "/listing2.txt");
assertTrue(file2.createNewFile());
assertTrue(file2.setLastModified(time2millis));
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 2 objects. Of those, 2 match the filter.");
// process second file after timestamp
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(1, successFiles2.size());
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 2 objects. Of those, 2 match the filter.");
// create third file
final File file3 = new File(TESTDIR + "/listing3.txt");
assertTrue(file3.createNewFile());
assertTrue(file3.setLastModified(time4millis));
+ // 0 are new because the timestamp is before the min listed timestamp
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects. Of those, 3 match the filter.");
// process third file before timestamp
runNext();
@@ -264,6 +273,7 @@ public class TestListFile {
// force state to reset and process all files
runner.removeProperty(ListFile.DIRECTORY);
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects. Of those, 3 match the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@@ -271,6 +281,7 @@ public class TestListFile {
runNext();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects. Of those, 3 match the filter.");
}
@Test
@@ -309,6 +320,7 @@ public class TestListFile {
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runNext.apply(true);
runner.assertTransferCount(ListFile.REL_SUCCESS, 3);
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects. Of those, 3 match the filter.");
// processor updates internal state, it shouldn't pick the same ones.
runNext.apply(false);
@@ -323,6 +335,7 @@ public class TestListFile {
assertEquals(2, successFiles2.size());
assertEquals(file2.getName(), successFiles2.get(0).getAttribute("filename"));
assertEquals(file1.getName(), successFiles2.get(1).getAttribute("filename"));
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects. Of those, 2 match the filter.");
// exclude newest
runner.setProperty(ListFile.MIN_AGE, age1);
@@ -333,6 +346,7 @@ public class TestListFile {
assertEquals(2, successFiles3.size());
assertEquals(file3.getName(), successFiles3.get(0).getAttribute("filename"));
assertEquals(file2.getName(), successFiles3.get(1).getAttribute("filename"));
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects. Of those, 2 match the filter.");
// exclude oldest and newest
runner.setProperty(ListFile.MIN_AGE, age1);
@@ -342,6 +356,7 @@ public class TestListFile {
final List successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(1, successFiles4.size());
assertEquals(file2.getName(), successFiles4.get(0).getAttribute("filename"));
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects. Of those, 1 matches the filter.");
}
@@ -377,26 +392,31 @@ public class TestListFile {
// check all files
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects. Of those, 3 match the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(3, successFiles1.size());
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects. Of those, 3 match the filter.");
// exclude largest
runner.removeProperty(ListFile.MIN_AGE);
runner.removeProperty(ListFile.MAX_AGE);
runner.setProperty(ListFile.MIN_SIZE, "0 b");
runner.setProperty(ListFile.MAX_SIZE, "7500 b");
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects. Of those, 2 match the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(2, successFiles2.size());
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects. Of those, 2 match the filter.");
// exclude smallest
runner.removeProperty(ListFile.MIN_AGE);
runner.removeProperty(ListFile.MAX_AGE);
runner.setProperty(ListFile.MIN_SIZE, "2500 b");
runner.removeProperty(ListFile.MAX_SIZE);
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects. Of those, 2 match the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@@ -407,6 +427,7 @@ public class TestListFile {
runner.removeProperty(ListFile.MAX_AGE);
runner.setProperty(ListFile.MIN_SIZE, "2500 b");
runner.setProperty(ListFile.MAX_SIZE, "7500 b");
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects. Of those, 1 matches the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@@ -444,6 +465,7 @@ public class TestListFile {
runner.removeProperty(ListFile.MIN_SIZE);
runner.removeProperty(ListFile.MAX_SIZE);
runner.setProperty(ListFile.IGNORE_HIDDEN_FILES, "false");
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 2 objects. Of those, 2 match the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@@ -451,6 +473,7 @@ public class TestListFile {
// exclude hidden
runner.setProperty(ListFile.IGNORE_HIDDEN_FILES, "true");
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 2 objects. Of those, 1 matches the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@@ -472,6 +495,7 @@ public class TestListFile {
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runner.setProperty(ListFile.FILE_FILTER, ".*");
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 2 objects. Of those, 1 matches the filter.");
runNext();
final List successFiles = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@@ -484,30 +508,33 @@ public class TestListFile {
assertTrue(subdir.mkdir());
assertTrue(subdir.setReadable(false));
- final File file1 = new File(TESTDIR + "/subdir/unreadable.txt");
- assertTrue(file1.createNewFile());
- assertTrue(file1.setReadable(false));
+ try {
+ final File file1 = new File(TESTDIR + "/subdir/unreadable.txt");
+ assertTrue(file1.createNewFile());
+ assertTrue(file1.setReadable(false));
- final File file2 = new File(TESTDIR + "/subdir/readable.txt");
- assertTrue(file2.createNewFile());
+ final File file2 = new File(TESTDIR + "/subdir/readable.txt");
+ assertTrue(file2.createNewFile());
- final File file3 = new File(TESTDIR + "/secondReadable.txt");
- assertTrue(file3.createNewFile());
+ final File file3 = new File(TESTDIR + "/secondReadable.txt");
+ assertTrue(file3.createNewFile());
- final long now = getTestModifiedTime();
- assertTrue(file1.setLastModified(now));
- assertTrue(file2.setLastModified(now));
- assertTrue(file3.setLastModified(now));
+ final long now = getTestModifiedTime();
+ assertTrue(file1.setLastModified(now));
+ assertTrue(file2.setLastModified(now));
+ assertTrue(file3.setLastModified(now));
- runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
- runner.setProperty(ListFile.FILE_FILTER, ".*");
- runNext();
+ runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
+ runner.setProperty(ListFile.FILE_FILTER, ".*");
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 1 object. Of that, 1 matches the filter.");
+ runNext();
- final List successFiles = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
- assertEquals(1, successFiles.size());
- assertEquals("secondReadable.txt", successFiles.get(0).getAttribute("filename"));
-
- subdir.setReadable(true);
+ final List successFiles = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
+ assertEquals(1, successFiles.size());
+ assertEquals("secondReadable.txt", successFiles.get(0).getAttribute("filename"));
+ } finally {
+ subdir.setReadable(true);
+ }
}
@Test
@@ -527,6 +554,7 @@ public class TestListFile {
// Run with privileges and with fitting filter
runner.setProperty(ListFile.FILE_FILTER, "file.*");
assertTrue(file.setLastModified(getTestModifiedTime()));
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 1 object. Of that, 1 matches the filter.");
runNext();
final List successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@@ -535,6 +563,7 @@ public class TestListFile {
// Run without privileges and with fitting filter
assertTrue(file.setReadable(false));
assertTrue(file.setLastModified(getTestModifiedTime()));
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 1 object. Of that, 0 match the filter.");
runNext();
final List successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@@ -566,6 +595,7 @@ public class TestListFile {
// check all files
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runner.setProperty(ListFile.FILE_FILTER, ListFile.FILE_FILTER.getDefaultValue());
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 4 objects. Of those, 4 match the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@@ -574,9 +604,11 @@ public class TestListFile {
// filter file on pattern
// Modifying FILE_FILTER property reset listing status, so these files will be listed again.
runner.setProperty(ListFile.FILE_FILTER, ".*-xyz-.*");
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 4 objects. Of those, 2 match the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS, 2);
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 4 objects. Of those, 2 match the filter.");
runNext();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
}
@@ -611,6 +643,7 @@ public class TestListFile {
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runner.setProperty(ListFile.FILE_FILTER, ListFile.FILE_FILTER.getDefaultValue());
runner.setProperty(ListFile.RECURSE, "true");
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 4 objects. Of those, 4 match the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
@@ -620,6 +653,7 @@ public class TestListFile {
// filter path on pattern subdir1
runner.setProperty(ListFile.PATH_FILTER, "subdir1");
runner.setProperty(ListFile.RECURSE, "true");
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 4 objects. Of those, 3 match the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@@ -628,6 +662,7 @@ public class TestListFile {
// filter path on pattern subdir2
runner.setProperty(ListFile.PATH_FILTER, "subdir2");
runner.setProperty(ListFile.RECURSE, "true");
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 4 objects. Of those, 1 matches the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@@ -659,6 +694,7 @@ public class TestListFile {
// check all files
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runner.setProperty(ListFile.RECURSE, "true");
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects. Of those, 3 match the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS, 3);
final List successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@@ -683,8 +719,9 @@ public class TestListFile {
}
assertEquals(3, successFiles1.size());
- // exclude hidden
+ // don't recurse
runner.setProperty(ListFile.RECURSE, "false");
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 1 object. Of that, 1 matches the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@@ -710,6 +747,7 @@ public class TestListFile {
// check all files
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runner.setProperty(ListFile.RECURSE, "true");
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects. Of those, 3 match the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
runner.assertTransferCount(ListFile.REL_SUCCESS, 3);
@@ -802,6 +840,7 @@ public class TestListFile {
makeTestFile("/batch1-age5.txt", time5millis, fileTimes);
// check files
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects. Of those, 3 match the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
@@ -815,6 +854,7 @@ public class TestListFile {
// should be ignored since it's older than age3
makeTestFile("/batch2-age4.txt", time4millis, fileTimes);
+ assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 6 objects. Of those, 6 match the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
@@ -880,4 +920,14 @@ public class TestListFile {
}
}
}
+
+ private void assertVerificationOutcome(final Outcome expectedOutcome, final String expectedExplanationRegex) {
+ final List results = processor.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap());
+
+ assertEquals(1, results.size());
+ final ConfigVerificationResult result = results.get(0);
+ assertEquals(expectedOutcome, result.getOutcome());
+ assertTrue(String.format("Expected verification result to match pattern [%s]. Actual explanation was: %s", expectedExplanationRegex, result.getExplanation()),
+ result.getExplanation().matches(expectedExplanationRegex));
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java
index f864e12db1..4027dddaa6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java
@@ -29,7 +29,10 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processor.util.list.AbstractListProcessor;
import org.apache.nifi.processors.standard.util.FTPTransfer;
import org.apache.nifi.processors.standard.util.FileInfo;
@@ -47,6 +50,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class TestListSFTP {
@Rule
@@ -95,7 +99,7 @@ public class TestListSFTP {
protected FileTransfer getFileTransfer(ProcessContext context) {
return new SFTPTransfer(context, getLogger()){
@Override
- protected void getListing(String path, int depth, int maxResults, List listing) throws IOException {
+ protected void getListing(String path, int depth, int maxResults, List listing, boolean applyFilters) throws IOException {
if (path.contains("subdir")) {
reachScanningSubDir.countDown();
try {
@@ -105,7 +109,7 @@ public class TestListSFTP {
}
}
- super.getListing(path, depth, maxResults, listing);
+ super.getListing(path, depth, maxResults, listing, applyFilters);
}
};
}
@@ -193,6 +197,7 @@ public class TestListSFTP {
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.MILLISECONDS) * 2);
runner.run();
+ assertVerificationOutcome(runner, Outcome.SUCCESSFUL, ".* Found 3 objects. Of those, 3 match the filter.");
runner.assertTransferCount(ListSFTP.REL_SUCCESS, 3);
@@ -231,10 +236,22 @@ public class TestListSFTP {
runner.run();
+ assertVerificationOutcome(runner, Outcome.SUCCESSFUL, ".* Found 3 objects. Of those, 1 matches the filter.");
runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1);
final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(ListSFTP.REL_SUCCESS).get(0);
//the only file between the limits
retrievedFile.assertAttributeEquals("filename", "file.txt");
}
+
+ private void assertVerificationOutcome(final TestRunner runner, final Outcome expectedOutcome, final String expectedExplanationRegex) {
+ final List results = ((VerifiableProcessor) runner.getProcessor())
+ .verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap());
+
+ assertEquals(1, results.size());
+ final ConfigVerificationResult result = results.get(0);
+ assertEquals(expectedOutcome, result.getOutcome());
+ assertTrue(String.format("Expected verification result to match pattern [%s]. Actual explanation was: %s", expectedExplanationRegex, result.getExplanation()),
+ result.getExplanation().matches(expectedExplanationRegex));
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/ITestSFTPTransferWithSSHTestServer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/ITestSFTPTransferWithSSHTestServer.java
index de8c8b6b07..50813c9e7c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/ITestSFTPTransferWithSSHTestServer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/ITestSFTPTransferWithSSHTestServer.java
@@ -144,7 +144,7 @@ public class ITestSFTPTransferWithSSHTestServer {
properties.put(SFTPTransfer.REMOTE_PATH, DIR_2);
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
- final List listing = transfer.getListing();
+ final List listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(2, listing.size());
@@ -167,7 +167,7 @@ public class ITestSFTPTransferWithSSHTestServer {
properties.put(SFTPTransfer.IGNORE_DOTTED_FILES, "false");
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
- final List listing = transfer.getListing();
+ final List listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(3, listing.size());
@@ -183,7 +183,7 @@ public class ITestSFTPTransferWithSSHTestServer {
properties.put(SFTPTransfer.RECURSIVE_SEARCH, "false");
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
- final List listing = transfer.getListing();
+ final List listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(0, listing.size());
}
@@ -196,7 +196,7 @@ public class ITestSFTPTransferWithSSHTestServer {
properties.put(SFTPTransfer.RECURSIVE_SEARCH, "true");
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
- final List listing = transfer.getListing();
+ final List listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(4, listing.size());
}
@@ -210,7 +210,7 @@ public class ITestSFTPTransferWithSSHTestServer {
properties.put(SFTPTransfer.FOLLOW_SYMLINK, "false");
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
- final List listing = transfer.getListing();
+ final List listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(0, listing.size());
}
@@ -224,7 +224,7 @@ public class ITestSFTPTransferWithSSHTestServer {
properties.put(SFTPTransfer.FOLLOW_SYMLINK, "true");
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
- final List listing = transfer.getListing();
+ final List listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(4, listing.size());
}
@@ -238,7 +238,7 @@ public class ITestSFTPTransferWithSSHTestServer {
// first listing is without batch size and shows 4 results
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
- final List listing = transfer.getListing();
+ final List listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(4, listing.size());
}
@@ -247,7 +247,7 @@ public class ITestSFTPTransferWithSSHTestServer {
properties.put(SFTPTransfer.REMOTE_POLL_BATCH_SIZE, "2");
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
- final List listing = transfer.getListing();
+ final List listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(2, listing.size());
}
@@ -263,7 +263,7 @@ public class ITestSFTPTransferWithSSHTestServer {
properties.put(SFTPTransfer.FILE_FILTER_REGEX, fileFilterRegex);
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
- final List listing = transfer.getListing();
+ final List listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(2, listing.size());
@@ -282,7 +282,7 @@ public class ITestSFTPTransferWithSSHTestServer {
properties.put(SFTPTransfer.PATH_FILTER_REGEX, pathFilterRegex);
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
- final List listing = transfer.getListing();
+ final List listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(2, listing.size());
@@ -306,7 +306,7 @@ public class ITestSFTPTransferWithSSHTestServer {
properties.put(SFTPTransfer.RECURSIVE_SEARCH, "true");
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
- transfer.getListing();
+ transfer.getListing(true);
}
}
@@ -317,7 +317,7 @@ public class ITestSFTPTransferWithSSHTestServer {
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
// verify the directory has two files
- final List listing = transfer.getListing();
+ final List listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(2, listing.size());
@@ -327,7 +327,7 @@ public class ITestSFTPTransferWithSSHTestServer {
}
// verify there are now zero files
- final List listingAfterDelete = transfer.getListing();
+ final List listingAfterDelete = transfer.getListing(true);
assertNotNull(listingAfterDelete);
assertEquals(0, listingAfterDelete.size());
}
@@ -340,7 +340,7 @@ public class ITestSFTPTransferWithSSHTestServer {
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
// verify the directory has two files
- final List listing = transfer.getListing();
+ final List listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(2, listing.size());
@@ -352,7 +352,7 @@ public class ITestSFTPTransferWithSSHTestServer {
}
// verify there are now zero files
- final List listingAfterDelete = transfer.getListing();
+ final List listingAfterDelete = transfer.getListing(true);
assertNotNull(listingAfterDelete);
assertEquals(0, listingAfterDelete.size());
}
@@ -374,7 +374,7 @@ public class ITestSFTPTransferWithSSHTestServer {
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
// verify the directory exists
- final List listing = transfer.getListing();
+ final List listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(0, listing.size());
@@ -382,7 +382,7 @@ public class ITestSFTPTransferWithSSHTestServer {
// verify the directory no longer exists
try {
- transfer.getListing();
+ transfer.getListing(true);
Assert.fail("Should have thrown exception");
} catch (FileNotFoundException e) {
// nothing to do, expected
@@ -408,7 +408,7 @@ public class ITestSFTPTransferWithSSHTestServer {
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
// verify the directory does not exist
try {
- transfer.getListing();
+ transfer.getListing(true);
Assert.fail("Should have failed");
} catch (FileNotFoundException e) {
// Nothing to do, expected
@@ -418,7 +418,7 @@ public class ITestSFTPTransferWithSSHTestServer {
transfer.ensureDirectoryExists(null, new File(absolutePath));
// verify the directory now exists
- final List listing = transfer.getListing();
+ final List listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(0, listing.size());
}
@@ -433,7 +433,7 @@ public class ITestSFTPTransferWithSSHTestServer {
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
// verify the directory does not exist
try {
- transfer.getListing();
+ transfer.getListing(true);
Assert.fail("Should have failed");
} catch (FileNotFoundException e) {
// Nothing to do, expected
@@ -443,7 +443,7 @@ public class ITestSFTPTransferWithSSHTestServer {
transfer.ensureDirectoryExists(null, new File(absolutePath));
// verify the directory now exists
- final List listing = transfer.getListing();
+ final List listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(0, listing.size());
}
@@ -456,7 +456,7 @@ public class ITestSFTPTransferWithSSHTestServer {
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
// verify the directory already exists
- final List listing = transfer.getListing();
+ final List listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(2, listing.size());
@@ -475,7 +475,7 @@ public class ITestSFTPTransferWithSSHTestServer {
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
// verify the directory does not exist
try {
- transfer.getListing();
+ transfer.getListing(true);
Assert.fail("Should have failed");
} catch (FileNotFoundException e) {
// Nothing to do, expected
@@ -485,7 +485,7 @@ public class ITestSFTPTransferWithSSHTestServer {
transfer.ensureDirectoryExists(null, new File(absolutePath));
// verify the directory now exists
- final List listing = transfer.getListing();
+ final List listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(0, listing.size());
}
@@ -499,7 +499,7 @@ public class ITestSFTPTransferWithSSHTestServer {
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
// verify the directory already exists
- final List listing = transfer.getListing();
+ final List listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(2, listing.size());
@@ -519,7 +519,7 @@ public class ITestSFTPTransferWithSSHTestServer {
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
// verify the directory does not exist
try {
- transfer.getListing();
+ transfer.getListing(true);
Assert.fail("Should have failed");
} catch (FileNotFoundException e) {
// Nothing to do, expected