NIFI-9276: Adding config verification to AbstractListProcessor subclasses (#5453)

This commit is contained in:
Joe Gresock 2021-10-21 12:22:50 -04:00 committed by GitHub
parent 1bec905890
commit d2f8f97b10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 582 additions and 255 deletions

View File

@ -35,7 +35,6 @@ import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.ListBlobItem;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@ -62,6 +61,8 @@ import org.apache.nifi.processors.azure.storage.utils.BlobInfo;
import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder;
import org.apache.nifi.serialization.record.RecordSchema;
import java.util.Optional;
@PrimaryNodeOnly
@TriggerSerially
@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
@ -140,6 +141,11 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
return attributes;
}
@Override
protected String getListingContainerName(final ProcessContext context) {
return String.format("Azure Blob Storage Container [%s]", getPath(context));
}
@Override
protected String getPath(final ProcessContext context) {
return context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue();
@ -173,27 +179,24 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
}
@Override
protected List<BlobInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue();
String prefix = context.getProperty(PROP_PREFIX).evaluateAttributeExpressions().getValue();
if (prefix == null) {
prefix = "";
}
protected List<BlobInfo> performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode) throws IOException {
final String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue();
final String prefix = Optional.ofNullable(context.getProperty(PROP_PREFIX).evaluateAttributeExpressions().getValue()).orElse("");
final List<BlobInfo> listing = new ArrayList<>();
try {
CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger(), null);
CloudBlobContainer container = blobClient.getContainerReference(containerName);
final CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger(), null);
final CloudBlobContainer container = blobClient.getContainerReference(containerName);
final OperationContext operationContext = new OperationContext();
AzureStorageUtils.setProxy(operationContext, context);
for (ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), null, operationContext)) {
for (final ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), null, operationContext)) {
if (blob instanceof CloudBlob) {
CloudBlob cloudBlob = (CloudBlob) blob;
BlobProperties properties = cloudBlob.getProperties();
StorageUri uri = cloudBlob.getSnapshotQualifiedStorageUri();
final CloudBlob cloudBlob = (CloudBlob) blob;
final BlobProperties properties = cloudBlob.getProperties();
final StorageUri uri = cloudBlob.getSnapshotQualifiedStorageUri();
Builder builder = new BlobInfo.Builder()
final Builder builder = new BlobInfo.Builder()
.primaryUri(uri.getPrimaryUri().toString())
.blobName(cloudBlob.getName())
.containerName(containerName)
@ -215,12 +218,15 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
listing.add(builder.build());
}
}
} catch (Throwable t) {
} catch (final Throwable t) {
throw new IOException(ExceptionUtils.getRootCause(t));
}
return listing;
}
// Unfiltered listing is not supported - must provide a prefix
@Override
protected Integer countUnfilteredListing(final ProcessContext context) {
return null;
}
}

View File

@ -160,12 +160,9 @@ public class ListAzureDataLakeStorage extends AbstractListProcessor<ADLSFileInfo
}
@OnScheduled
public void onScheduled(ProcessContext context) {
String fileFilter = context.getProperty(FILE_FILTER).evaluateAttributeExpressions().getValue();
filePattern = fileFilter != null ? Pattern.compile(fileFilter) : null;
String pathFilter = context.getProperty(PATH_FILTER).evaluateAttributeExpressions().getValue();
pathPattern = pathFilter != null ? Pattern.compile(pathFilter) : null;
public void onScheduled(final ProcessContext context) {
filePattern = getPattern(context, FILE_FILTER);
pathPattern = getPattern(context, PATH_FILTER);
}
@OnStopped
@ -175,7 +172,7 @@ public class ListAzureDataLakeStorage extends AbstractListProcessor<ADLSFileInfo
}
@Override
protected void customValidate(ValidationContext context, Collection<ValidationResult> results) {
protected void customValidate(final ValidationContext context, final Collection<ValidationResult> results) {
if (context.getProperty(PATH_FILTER).isSet() && !context.getProperty(RECURSE_SUBDIRECTORIES).asBoolean()) {
results.add(new ValidationResult.Builder()
.subject(PATH_FILTER.getDisplayName())
@ -191,7 +188,7 @@ public class ListAzureDataLakeStorage extends AbstractListProcessor<ADLSFileInfo
}
@Override
protected Scope getStateScope(PropertyContext context) {
protected Scope getStateScope(final PropertyContext context) {
return Scope.CLUSTER;
}
@ -201,55 +198,34 @@ public class ListAzureDataLakeStorage extends AbstractListProcessor<ADLSFileInfo
}
@Override
protected boolean isListingResetNecessary(PropertyDescriptor property) {
protected boolean isListingResetNecessary(final PropertyDescriptor property) {
return LISTING_RESET_PROPERTIES.contains(property);
}
@Override
protected String getPath(ProcessContext context) {
String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
protected String getPath(final ProcessContext context) {
final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
return directory != null ? directory : ".";
}
@Override
protected List<ADLSFileInfo> performListing(ProcessContext context, Long minTimestamp) throws IOException {
try {
String fileSystem = evaluateFileSystemProperty(context, null);
String baseDirectory = evaluateDirectoryProperty(context, null);
boolean recurseSubdirectories = context.getProperty(RECURSE_SUBDIRECTORIES).asBoolean();
DataLakeServiceClient storageClient = getStorageClient(context, null);
DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
ListPathsOptions options = new ListPathsOptions();
options.setPath(baseDirectory);
options.setRecursive(recurseSubdirectories);
Pattern baseDirectoryPattern = Pattern.compile("^" + baseDirectory + "/?");
List<ADLSFileInfo> listing = fileSystemClient.listPaths(options, null).stream()
.filter(pathItem -> !pathItem.isDirectory())
.map(pathItem -> new ADLSFileInfo.Builder()
.fileSystem(fileSystem)
.filePath(pathItem.getName())
.length(pathItem.getContentLength())
.lastModified(pathItem.getLastModified().toInstant().toEpochMilli())
.etag(pathItem.getETag())
.build())
.filter(fileInfo -> filePattern == null || filePattern.matcher(fileInfo.getFilename()).matches())
.filter(fileInfo -> pathPattern == null || pathPattern.matcher(RegExUtils.removeFirst(fileInfo.getDirectory(), baseDirectoryPattern)).matches())
.collect(Collectors.toList());
return listing;
} catch (Exception e) {
getLogger().error("Failed to list directory on Azure Data Lake Storage", e);
throw new IOException(ExceptionUtils.getRootCause(e));
}
protected List<ADLSFileInfo> performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode) throws IOException {
return performListing(context, listingMode, true);
}
@Override
protected Map<String, String> createAttributes(ADLSFileInfo fileInfo, ProcessContext context) {
Map<String, String> attributes = new HashMap<>();
protected Integer countUnfilteredListing(final ProcessContext context) throws IOException {
return performListing(context, ListingMode.CONFIGURATION_VERIFICATION, false).size();
}
@Override
protected String getListingContainerName(final ProcessContext context) {
return String.format("Azure Data Lake Directory [%s]", getPath(context));
}
@Override
protected Map<String, String> createAttributes(final ADLSFileInfo fileInfo, final ProcessContext context) {
final Map<String, String> attributes = new HashMap<>();
attributes.put(ATTR_NAME_FILESYSTEM, fileInfo.getFileSystem());
attributes.put(ATTR_NAME_FILE_PATH, fileInfo.getFilePath());
@ -261,4 +237,48 @@ public class ListAzureDataLakeStorage extends AbstractListProcessor<ADLSFileInfo
return attributes;
}
private List<ADLSFileInfo> performListing(final ProcessContext context, final ListingMode listingMode,
final boolean applyFilters) throws IOException {
try {
final String fileSystem = evaluateFileSystemProperty(context, null);
final String baseDirectory = evaluateDirectoryProperty(context, null);
final boolean recurseSubdirectories = context.getProperty(RECURSE_SUBDIRECTORIES).asBoolean();
final Pattern filePattern = listingMode == ListingMode.EXECUTION ? this.filePattern : getPattern(context, FILE_FILTER);
final Pattern pathPattern = listingMode == ListingMode.EXECUTION ? this.pathPattern : getPattern(context, PATH_FILTER);
final DataLakeServiceClient storageClient = getStorageClient(context, null);
final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
final ListPathsOptions options = new ListPathsOptions();
options.setPath(baseDirectory);
options.setRecursive(recurseSubdirectories);
final Pattern baseDirectoryPattern = Pattern.compile("^" + baseDirectory + "/?");
final List<ADLSFileInfo> listing = fileSystemClient.listPaths(options, null).stream()
.filter(pathItem -> !pathItem.isDirectory())
.map(pathItem -> new ADLSFileInfo.Builder()
.fileSystem(fileSystem)
.filePath(pathItem.getName())
.length(pathItem.getContentLength())
.lastModified(pathItem.getLastModified().toInstant().toEpochMilli())
.etag(pathItem.getETag())
.build())
.filter(fileInfo -> applyFilters && (filePattern == null || filePattern.matcher(fileInfo.getFilename()).matches()))
.filter(fileInfo -> applyFilters && (pathPattern == null || pathPattern.matcher(RegExUtils.removeFirst(fileInfo.getDirectory(), baseDirectoryPattern)).matches()))
.collect(Collectors.toList());
return listing;
} catch (final Exception e) {
getLogger().error("Failed to list directory on Azure Data Lake Storage", e);
throw new IOException(ExceptionUtils.getRootCause(e));
}
}
private Pattern getPattern(final ProcessContext context, final PropertyDescriptor filterDescriptor) {
String value = context.getProperty(filterDescriptor).evaluateAttributeExpressions().getValue();
return value != null ? Pattern.compile(value) : null;
}
}

View File

@ -26,6 +26,8 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.components.ValidationContext;
@ -40,10 +42,12 @@ import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
@ -121,7 +125,7 @@ import java.util.stream.Collectors;
* </p>
* <ul>
* <li>
* Perform a listing of resources. The subclass will implement the {@link #performListing(ProcessContext, Long)} method, which creates a listing of all
* Perform a listing of resources. The subclass will implement the {@link #performListing(ProcessContext, Long, ListingMode)} method, which creates a listing of all
* entities on the target system that have timestamps later than the provided timestamp. If the entities returned have a timestamp before the provided one, those
* entities will be filtered out. It is therefore not necessary to perform the filtering of timestamps but is provided in order to give the implementation the ability
* to filter those resources on the server side rather than pulling back all of the information, if it makes sense to do so in the concrete implementation.
@ -149,7 +153,24 @@ import java.util.stream.Collectors;
@TriggerSerially
@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "After a listing of resources is performed, the latest timestamp of any of the resources is stored in the component's state. "
+ "The scope used depends on the implementation.")
public abstract class AbstractListProcessor<T extends ListableEntity> extends AbstractProcessor {
public abstract class AbstractListProcessor<T extends ListableEntity> extends AbstractProcessor implements VerifiableProcessor {
/**
* Indicates the mode when performing a listing.
*/
protected enum ListingMode {
/**
* Indicates the listing is being performed during normal processor execution. May use configuration cached in the Processor object.
*/
EXECUTION,
/**
* Indicates the listing is being performed during configuration verification. Only use configuration provided in the ProcessContext argument, since the configuration may not
* have been applied to the processor yet.
*/
CONFIGURATION_VERIFICATION
}
private static final Long IGNORE_MIN_TIMESTAMP_VALUE = 0L;
public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new Builder()
.name("Distributed Cache Service")
@ -165,10 +186,12 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
public static final AllowableValue PRECISION_AUTO_DETECT = new AllowableValue("auto-detect", "Auto Detect",
"Automatically detect time unit deterministically based on candidate entries timestamp."
+ " Please note that this option may take longer to list entities unnecessarily, if none of entries has a precise precision timestamp."
+ " E.g. even if a target system supports millis, if all entries only have timestamps without millis, such as '2017-06-16 09:06:34.000', then its precision is determined as 'seconds'.");
+ " E.g. even if a target system supports millis, if all entries only have timestamps without millis, such as '2017-06-16 09:06:34.000', "
+ "then its precision is determined as 'seconds'.");
public static final AllowableValue PRECISION_MILLIS = new AllowableValue("millis", "Milliseconds",
"This option provides the minimum latency for an entry from being available to being listed if target system supports millis, if not, use other options.");
public static final AllowableValue PRECISION_SECONDS = new AllowableValue("seconds", "Seconds","For a target system that does not have millis precision, but has in seconds.");
public static final AllowableValue PRECISION_SECONDS = new AllowableValue("seconds", "Seconds",
"For a target system that does not have millis precision, but has in seconds.");
public static final AllowableValue PRECISION_MINUTES = new AllowableValue("minutes", "Minutes", "For a target system that only supports precision in minutes.");
public static final PropertyDescriptor TARGET_SYSTEM_TIMESTAMP_PRECISION = new Builder()
@ -225,8 +248,8 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
public static final PropertyDescriptor RECORD_WRITER = new Builder()
.name("record-writer")
.displayName("Record Writer")
.description("Specifies the Record Writer to use for creating the listing. If not specified, one FlowFile will be created for each entity that is listed. If the Record Writer is specified, " +
"all entities will be written to a single FlowFile instead of adding attributes to individual FlowFiles.")
.description("Specifies the Record Writer to use for creating the listing. If not specified, one FlowFile will be created for each entity that is listed. " +
"If the Record Writer is specified, all entities will be written to a single FlowFile instead of adding attributes to individual FlowFiles.")
.required(false)
.identifiesControllerService(RecordSetWriterFactory.class)
.build();
@ -255,6 +278,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
* near instantaneously after the prior iteration effectively voiding the built in buffer
*/
public static final Map<TimeUnit, Long> LISTING_LAG_MILLIS;
static {
final Map<TimeUnit, Long> nanos = new HashMap<>();
nanos.put(TimeUnit.MILLISECONDS, 100L);
@ -262,6 +286,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
nanos.put(TimeUnit.MINUTES, 60_000L);
LISTING_LAG_MILLIS = Collections.unmodifiableMap(nanos);
}
static final String LATEST_LISTED_ENTRY_TIMESTAMP_KEY = "listing.timestamp";
static final String LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY = "processed.timestamp";
static final String IDENTIFIER_PREFIX = "id";
@ -279,7 +304,6 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
}
}
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>();
@ -291,7 +315,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
* In order to add custom validation at sub-classes, implement {@link #customValidate(ValidationContext, Collection)} method.
*/
@Override
protected final Collection<ValidationResult> customValidate(ValidationContext context) {
protected final Collection<ValidationResult> customValidate(final ValidationContext context) {
final Collection<ValidationResult> results = new ArrayList<>();
final String listingStrategy = context.getProperty(LISTING_STRATEGY).getValue();
@ -303,9 +327,9 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
return results;
}
/**
* Sub-classes can add custom validation by implementing this method.
*
* @param validationContext the validation context
* @param validationResults add custom validation result to this collection
*/
@ -313,6 +337,54 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
}
@Override
public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog logger, final Map<String, String> attributes) {
final List<ConfigVerificationResult> results = new ArrayList<>();
final String containerName = getListingContainerName(context);
try {
final Integer unfilteredListingCount = countUnfilteredListing(context);
final int matchingCount = performListing(context, IGNORE_MIN_TIMESTAMP_VALUE, ListingMode.CONFIGURATION_VERIFICATION).size();
final String countExplanation;
if (unfilteredListingCount == null) {
if (matchingCount == 0) {
countExplanation = "Found no objects matching the filter.";
} else {
final String matchingCountText = matchingCount == 1 ? matchingCount + " object" : matchingCount + " objects";
countExplanation = String.format("Found %s matching the filter.", matchingCountText);
}
} else if (unfilteredListingCount == 0) {
countExplanation = "Found no objects.";
} else {
final String unfilteredListingCountText = unfilteredListingCount == 1 ? unfilteredListingCount + " object" : unfilteredListingCount + " objects";
final String unfilteredDemonstrativePronoun = unfilteredListingCount == 1 ? "that" : "those";
final String matchingCountText = matchingCount == 1 ? matchingCount + " matches" : matchingCount + " match";
countExplanation = String.format("Found %s. Of %s, %s the filter.",
unfilteredListingCountText, unfilteredDemonstrativePronoun, matchingCountText);
}
results.add(new ConfigVerificationResult.Builder()
.verificationStepName("Perform Listing")
.outcome(Outcome.SUCCESSFUL)
.explanation(String.format("Successfully listed contents of %s. %s", containerName, countExplanation))
.build());
logger.info("Successfully verified configuration");
} catch (final IOException e) {
logger.warn("Failed to verify configuration. Could not list contents of {}", containerName, e);
results.add(new ConfigVerificationResult.Builder()
.verificationStepName("Perform Listing")
.outcome(Outcome.FAILED)
.explanation(String.format("Failed to list contents of %s: %s", containerName, e.getMessage()))
.build());
}
return results;
}
@OnPrimaryNodeStateChange
public void onPrimaryNodeChange(final PrimaryNodeState newState) {
justElectedPrimaryNode = (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE);
@ -428,7 +500,6 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
return updatedState;
}
private void persist(final long latestListedEntryTimestampThisCycleMillis,
final long lastProcessedLatestEntryTimestampMillis,
final List<String> processedIdentifiesWithLatestTimestamp,
@ -467,6 +538,10 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
}
}
protected long getCurrentTime() {
return System.currentTimeMillis();
}
public void listByNoTracking(final ProcessContext context, final ProcessSession session) {
final List<T> entityList;
@ -483,7 +558,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
try {
// minTimestamp = 0L by default on this strategy to ignore any future
// comparision in lastModifiedMap to the same entity.
entityList = performListing(context, 0L);
entityList = performListing(context, IGNORE_MIN_TIMESTAMP_VALUE, ListingMode.EXECUTION);
} catch (final IOException pe) {
getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{pe.getMessage()}, pe);
context.yield();
@ -516,12 +591,12 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
}
public void listByTimeWindow(final ProcessContext context, final ProcessSession session) throws ProcessException {
if (this.lastListedLatestEntryTimestampMillis == null || justElectedPrimaryNode) {
if (lastListedLatestEntryTimestampMillis == null || justElectedPrimaryNode) {
try {
final StateMap stateMap = context.getStateManager().getState(getStateScope(context));
Optional.ofNullable(stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY))
.map(Long::parseLong)
.ifPresent(lastTimestamp -> this.lastListedLatestEntryTimestampMillis = lastTimestamp);
.ifPresent(lastTimestamp -> lastListedLatestEntryTimestampMillis = lastTimestamp);
justElectedPrimaryNode = false;
} catch (final IOException ioe) {
@ -531,14 +606,14 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
}
}
long lowerBoundInclusiveTimestamp = Optional.ofNullable(this.lastListedLatestEntryTimestampMillis).orElse(0L);
long lowerBoundInclusiveTimestamp = Optional.ofNullable(lastListedLatestEntryTimestampMillis).orElse(IGNORE_MIN_TIMESTAMP_VALUE);
long upperBoundExclusiveTimestamp;
long currentTime = getCurrentTime();
final TreeMap<Long, List<T>> orderedEntries = new TreeMap<>();
try {
List<T> entityList = performListing(context, lowerBoundInclusiveTimestamp);
List<T> entityList = performListing(context, lowerBoundInclusiveTimestamp, ListingMode.EXECUTION);
boolean targetSystemHasMilliseconds = false;
boolean targetSystemHasSeconds = false;
@ -614,9 +689,9 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
try {
if (getLogger().isTraceEnabled()) {
getLogger().info("this.lastListedLatestEntryTimestampMillis = upperBoundExclusiveTimestamp: " + this.lastListedLatestEntryTimestampMillis + " = " + upperBoundExclusiveTimestamp);
getLogger().info("this.lastListedLatestEntryTimestampMillis = upperBoundExclusiveTimestamp: " + lastListedLatestEntryTimestampMillis + " = " + upperBoundExclusiveTimestamp);
}
this.lastListedLatestEntryTimestampMillis = upperBoundExclusiveTimestamp;
lastListedLatestEntryTimestampMillis = upperBoundExclusiveTimestamp;
persist(upperBoundExclusiveTimestamp, upperBoundExclusiveTimestamp, latestIdentifiersProcessed, session, getStateScope(context));
} catch (final IOException ioe) {
getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or "
@ -624,21 +699,17 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
}
}
protected long getCurrentTime() {
return System.currentTimeMillis();
}
public void listByTrackingTimestamps(final ProcessContext context, final ProcessSession session) throws ProcessException {
Long minTimestampToListMillis = lastListedLatestEntryTimestampMillis;
if (this.lastListedLatestEntryTimestampMillis == null || this.lastProcessedLatestEntryTimestampMillis == null || justElectedPrimaryNode) {
if (lastListedLatestEntryTimestampMillis == null || lastProcessedLatestEntryTimestampMillis == null || justElectedPrimaryNode) {
try {
boolean noUpdateRequired = false;
// Attempt to retrieve state from the state manager if a last listing was not yet established or
// if just elected the primary node
final StateMap stateMap = session.getState(getStateScope(context));
latestIdentifiersProcessed.clear();
for (Map.Entry<String, String> state : stateMap.toMap().entrySet()) {
for (final Map.Entry<String, String> state : stateMap.toMap().entrySet()) {
final String k = state.getKey();
final String v = state.getValue();
if (v == null || v.isEmpty()) {
@ -648,13 +719,13 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
if (LATEST_LISTED_ENTRY_TIMESTAMP_KEY.equals(k)) {
minTimestampToListMillis = Long.parseLong(v);
// If our determined timestamp is the same as that of our last listing, skip this execution as there are no updates
if (minTimestampToListMillis.equals(this.lastListedLatestEntryTimestampMillis)) {
if (minTimestampToListMillis.equals(lastListedLatestEntryTimestampMillis)) {
noUpdateRequired = true;
} else {
this.lastListedLatestEntryTimestampMillis = minTimestampToListMillis;
lastListedLatestEntryTimestampMillis = minTimestampToListMillis;
}
} else if (LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY.equals(k)) {
this.lastProcessedLatestEntryTimestampMillis = Long.parseLong(v);
lastProcessedLatestEntryTimestampMillis = Long.parseLong(v);
} else if (k.startsWith(IDENTIFIER_PREFIX)) {
latestIdentifiersProcessed.add(v);
}
@ -676,7 +747,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
final long currentRunTimeMillis = System.currentTimeMillis();
try {
// track of when this last executed for consideration of the lag nanos
entityList = performListing(context, minTimestampToListMillis);
entityList = performListing(context, minTimestampToListMillis, ListingMode.EXECUTION);
} catch (final IOException e) {
getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{e.getMessage()}, e);
context.yield();
@ -778,7 +849,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
// As long as we have a listing timestamp, there is meaningful state to capture regardless of any outputs generated
if (latestListedEntryTimestampThisCycleMillis != null) {
boolean processedNewFiles = entitiesListed > 0;
final boolean processedNewFiles = entitiesListed > 0;
if (!latestListedEntryTimestampThisCycleMillis.equals(lastListedLatestEntryTimestampMillis) || processedNewFiles) {
// We have performed a listing and pushed any FlowFiles out that may have been generated
@ -837,14 +908,14 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
final RecordSetWriter recordSetWriter = writerFactory.createWriter(getLogger(), getRecordSchema(), out, Collections.emptyMap())) {
recordSetWriter.beginRecordSet();
for (Map.Entry<Long, List<T>> timestampEntities : orderedEntries.entrySet()) {
for (final Map.Entry<Long, List<T>> timestampEntities : orderedEntries.entrySet()) {
List<T> entities = timestampEntities.getValue();
if (timestampEntities.getKey().equals(lastProcessedLatestEntryTimestampMillis)) {
// Filter out previously processed entities.
entities = entities.stream().filter(entity -> !latestIdentifiersProcessed.contains(entity.getIdentifier())).collect(Collectors.toList());
}
for (T entity : entities) {
for (final T entity : entities) {
entitiesListed++;
recordSetWriter.write(entity.toRecord());
}
@ -868,14 +939,14 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
private int createFlowFilesForEntities(final ProcessContext context, final ProcessSession session, final Map<Long, List<T>> orderedEntries) {
int entitiesListed = 0;
for (Map.Entry<Long, List<T>> timestampEntities : orderedEntries.entrySet()) {
for (final Map.Entry<Long, List<T>> timestampEntities : orderedEntries.entrySet()) {
List<T> entities = timestampEntities.getValue();
if (timestampEntities.getKey().equals(lastProcessedLatestEntryTimestampMillis)) {
// Filter out previously processed entities.
entities = entities.stream().filter(entity -> !latestIdentifiersProcessed.contains(entity.getIdentifier())).collect(Collectors.toList());
}
for (T entity : entities) {
for (final T entity : entities) {
entitiesListed++;
// Create the FlowFile for this path.
@ -894,6 +965,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
* So that it use return different precisions than PRECISION_AUTO_DETECT.
* If TARGET_SYSTEM_TIMESTAMP_PRECISION is supported as a valid Processor property,
* then PRECISION_AUTO_DETECT will be the default value when not specified by a user.
*
* @return
*/
protected String getDefaultTimePrecision() {
@ -936,11 +1008,13 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
* will be filtered out by the Processor. Therefore, it is not necessary that implementations perform this filtering but can be more efficient
* if the filtering can be performed on the server side prior to retrieving the information.
*
* @param context the ProcessContex to use in order to pull the appropriate entities
* @param minTimestamp the minimum timestamp of entities that should be returned.
* @param context the ProcessContext to use in order to pull the appropriate entities
* @param minTimestamp the minimum timestamp of entities that should be returned
* @param listingMode the listing mode, indicating whether the listing is being performed during configuration verification or normal processor execution
* @return a Listing of entities that have a timestamp >= minTimestamp
*/
protected abstract List<T> performListing(final ProcessContext context, final Long minTimestamp) throws IOException;
protected abstract List<T> performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode)
throws IOException;
/**
* Determines whether or not the listing must be reset if the value of the given property is changed
@ -963,7 +1037,25 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
*/
protected abstract RecordSchema getRecordSchema();
/**
* Performs an unfiltered listing and returns the count, or null if this operation is not supported.
*
* @param context the ProcessContext to use in order to pull the appropriate entities
* @return The number of unfiltered entities in the listing, or null if this processor does not support an unfiltered listing
*/
protected abstract Integer countUnfilteredListing(final ProcessContext context)
throws IOException;
/**
* Provides a human-readable name for the container being listed, for the purpose of displaying readable verification messages during processor configuration verification.
*
* @param context The process context
* @return The user-friendly name for the container
*/
protected abstract String getListingContainerName(final ProcessContext context);
private static class StringSerDe implements Serializer<String>, Deserializer<String> {
@Override
public String deserialize(final byte[] value) throws DeserializationException, IOException {
if (value == null) {
@ -972,11 +1064,11 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
return new String(value, StandardCharsets.UTF_8);
}
@Override
public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
out.write(value.getBytes(StandardCharsets.UTF_8));
}
}
@OnScheduled
@ -1007,7 +1099,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
private void listByTrackingEntities(ProcessContext context, ProcessSession session) throws ProcessException {
listedEntityTracker.trackEntities(context, session, justElectedPrimaryNode, getStateScope(context), minTimestampToList -> {
try {
return performListing(context, minTimestampToList);
return performListing(context, minTimestampToList, ListingMode.EXECUTION);
} catch (final IOException e) {
getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{e.getMessage()}, e);
return Collections.emptyList();
@ -1015,5 +1107,4 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
}, entity -> createAttributes(entity, context));
justElectedPrimaryNode = false;
}
}

View File

@ -257,7 +257,7 @@ public class ListedEntityTracker<T extends ListableEntity> {
} else {
this.alreadyListedEntities = new ConcurrentHashMap<>(fetchedListedEntities);
}
} catch (IOException e) {
} catch (final IOException e) {
throw new ProcessException("Failed to restore already-listed entities due to " + e, e);
}
}

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processor.util.list;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.flowfile.FlowFile;
@ -30,7 +31,9 @@ import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestWatcher;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -225,6 +228,10 @@ public class ITAbstractListProcessor {
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
runner.clearTransferState();
final List<ConfigVerificationResult> results = proc.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap());
assertEquals(1, results.size());
assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, results.get(0).getOutcome());
}
private void setTargetSystemTimestampPrecision(TimeUnit targetPrecision) {

View File

@ -17,7 +17,10 @@
package org.apache.nifi.processor.util.list;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
@ -30,6 +33,7 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
@ -43,6 +47,7 @@ import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.glassfish.jersey.internal.guava.Predicates;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@ -55,6 +60,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@ -62,11 +68,13 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestAbstractListProcessor {
@ -224,9 +232,12 @@ public class TestAbstractListProcessor {
proc.addEntity("one","firstFile",1585344381476L);
proc.addEntity("two","secondFile",1585344381475L);
assertVerificationOutcome(Outcome.SUCCESSFUL, ".* Found 2 objects. Of those, 2 match the filter.");
runner.run();
assertEquals(2, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
assertEquals(2, proc.entities.size());
assertVerificationOutcome(Outcome.SUCCESSFUL, ".* Found 2 objects. Of those, 2 match the filter.");
final MockStateManager stateManager = runner.getStateManager();
final Map<String, String> expectedState = new HashMap<>();
@ -252,14 +263,16 @@ public class TestAbstractListProcessor {
// Clear any listed entities after choose No Tracking Strategy
proc.entities.clear();
assertVerificationOutcome(Outcome.SUCCESSFUL, ".* Found no objects.");
// Add new entity
proc.addEntity("one","firstFile",1585344381476L);
proc.listByNoTracking(context, session);
proc.listByTrackingTimestamps(context, session);
// Test if state cleared or not
runner.getStateManager().assertStateNotEquals(expectedState, Scope.CLUSTER);
assertEquals(1, proc.entities.size());
assertVerificationOutcome(Outcome.SUCCESSFUL, ".* Found 1 object. Of that, 1 matches the filter.");
}
@Test
@ -285,14 +298,22 @@ public class TestAbstractListProcessor {
proc.addEntity("one", "one", 1, 1);
proc.currentTimestamp.set(1L);
runner.clearTransferState();
// Prior to running the processor, we should expect 3 objects during verification
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed contents of .*.json.*" +
"Found 1 object. Of that, 1 matches the filter.");
runner.run();
assertEquals(1, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(0)
.assertAttributeEquals(CoreAttributes.FILENAME.key(), "one");
// The object is now tracked, so it's no longer considered new
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed contents of .*.json.*" +
"Found 1 object. Of that, 1 matches the filter.");
// Should not list any entity.
proc.currentTimestamp.set(2L);
runner.clearTransferState();
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed contents of .*.json.*" +
"Found 1 object. Of that, 1 matches the filter.");
runner.run();
assertEquals(0, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
@ -301,6 +322,8 @@ public class TestAbstractListProcessor {
proc.addEntity("five", "five", 5, 5);
proc.addEntity("six", "six", 6, 6);
runner.clearTransferState();
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed contents of .*.json.*" +
"Found 3 objects. Of those, 3 match the filter.");
runner.run();
assertEquals(2, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(0)
@ -316,6 +339,8 @@ public class TestAbstractListProcessor {
proc.addEntity("three", "three", 3, 3);
proc.addEntity("four", "four", 4, 4);
runner.clearTransferState();
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed contents of .*.json.*" +
"Found 6 objects. Of those, 6 match the filter.");
runner.run();
assertEquals(2, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(0)
@ -329,6 +354,8 @@ public class TestAbstractListProcessor {
proc.addEntity("five", "five", 7, 5);
proc.addEntity("six", "six", 6, 16);
runner.clearTransferState();
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed contents of .*.json.*" +
"Found 6 objects. Of those, 6 match the filter.");
runner.run();
assertEquals(2, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(0)
@ -344,7 +371,12 @@ public class TestAbstractListProcessor {
runner.setProperty(ConcreteListProcessor.RESET_STATE, "1");
runner.setProperty(ListedEntityTracker.INITIAL_LISTING_TARGET, "window");
runner.clearTransferState();
// Prior to running the processor, we should expect 3 objects during verification
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed contents of .*.json.*" +
"Found 6 objects. Of those, 6 match the filter.");
runner.run();
assertEquals(3, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(0)
.assertAttributeEquals(CoreAttributes.FILENAME.key(), "four");
@ -353,16 +385,44 @@ public class TestAbstractListProcessor {
runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(2)
.assertAttributeEquals(CoreAttributes.FILENAME.key(), "five");
// Reset state again.
proc.currentTimestamp.set(20L);
// ConcreteListProcessor can reset state with any property.
runner.setProperty(ListedEntityTracker.INITIAL_LISTING_TARGET, "all");
runner.setProperty(ConcreteListProcessor.RESET_STATE, "2");
runner.clearTransferState();
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed contents of .*.json.*" +
"Found 6 objects. Of those, 6 match the filter.");
runner.run();
// All entities should be picked, one to six.
assertEquals(6, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
// Now all are tracked, so none are new
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed contents of .*.json.*" +
"Found 6 objects. Of those, 6 match the filter.");
// Reset state again.
proc.currentTimestamp.set(25L);
runner.setProperty(ListedEntityTracker.INITIAL_LISTING_TARGET, "window");
runner.setProperty(ListedEntityTracker.TRACKING_TIME_WINDOW, "20ms");
runner.setProperty(ConcreteListProcessor.LISTING_FILTER, "f[a-z]+"); // Match only four and five
runner.setProperty(ConcreteListProcessor.RESET_STATE, "3");
runner.clearTransferState();
// Time window is now 5ms - 25ms, so only 5 and 6 fall in the window, so only 1 of the 2 filtered entities are considered 'new'
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed contents of .*.json.*" +
"Found 6 objects. Of those, 2 match the filter.");
}
private void assertVerificationOutcome(final Outcome expectedOutcome, final String expectedExplanationRegex) {
final List<ConfigVerificationResult> results = proc.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap());
assertEquals(1, results.size());
final ConfigVerificationResult result = results.get(0);
assertEquals(expectedOutcome, result.getOutcome());
assertTrue(String.format("Expected verification result to match pattern [%s]. Actual explanation was: %s", expectedExplanationRegex, result.getExplanation()),
result.getExplanation().matches(expectedExplanationRegex));
}
static class DistributedCache extends AbstractControllerService implements DistributedMapCacheClient {
@ -434,6 +494,12 @@ public class TestAbstractListProcessor {
.name("reset-state")
.addValidator(Validator.VALID)
.build();
private static final PropertyDescriptor LISTING_FILTER = new PropertyDescriptor.Builder()
.name("listing-filter")
.displayName("Listing Filter")
.description("Filters listed entities by name.")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.build();
final AtomicReference<Long> currentTimestamp = new AtomicReference<>();
@ -453,6 +519,7 @@ public class TestAbstractListProcessor {
properties.add(ListedEntityTracker.TRACKING_TIME_WINDOW);
properties.add(ListedEntityTracker.INITIAL_LISTING_TARGET);
properties.add(RESET_STATE);
properties.add(LISTING_FILTER);
return properties;
}
@ -514,8 +581,17 @@ public class TestAbstractListProcessor {
}
@Override
protected List<ListableEntity> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
return getEntityList();
protected List<ListableEntity> performListing(final ProcessContext context, final Long minTimestamp, ListingMode listingMode) throws IOException {
final PropertyValue listingFilter = context.getProperty(LISTING_FILTER);
Predicate<ListableEntity> filter = listingFilter.isSet()
? entity -> entity.getName().matches(listingFilter.getValue())
: Predicates.alwaysTrue();
return getEntityList().stream().filter(filter).collect(Collectors.toList());
}
@Override
protected Integer countUnfilteredListing(final ProcessContext context) throws IOException {
return entities.size();
}
List<ListableEntity> getEntityList() {
@ -527,6 +603,11 @@ public class TestAbstractListProcessor {
return RESET_STATE.equals(property);
}
@Override
protected String getListingContainerName(final ProcessContext context) {
return persistenceFilename;
}
@Override
protected Scope getStateScope(final PropertyContext context) {
return Scope.CLUSTER;

View File

@ -296,7 +296,7 @@ public abstract class GetFileTransfer extends AbstractProcessor {
}
final StopWatch stopWatch = new StopWatch(true);
final List<FileInfo> listing = transfer.getListing();
final List<FileInfo> listing = transfer.getListing(true);
final long millis = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
int newItems = 0;

View File

@ -126,4 +126,5 @@ public class ListFTP extends ListFileTransfer {
protected void customValidate(ValidationContext validationContext, Collection<ValidationResult> results) {
FTPTransfer.validateProxySpec(validationContext, results);
}
}

View File

@ -338,7 +338,6 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
@OnScheduled
public void onScheduled(final ProcessContext context) {
fileFilterRef.set(createFileFilter(context));
includeFileAttributes = context.getProperty(INCLUDE_FILE_ATTRIBUTES).asBoolean();
final long maxDiskOperationMillis = context.getProperty(MAX_DISK_OPERATION_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
@ -351,6 +350,7 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
} else {
performanceTracker = new UntrackedPerformanceTracker(getLogger(), maxDiskOperationMillis);
}
fileFilterRef.set(createFileFilter(context, performanceTracker, true));
final long millisToKeepStats = TimeUnit.MINUTES.toMillis(15);
final MonitorActiveTasks monitorTask = new MonitorActiveTasks(performanceTracker, getLogger(), maxDiskOperationMillis, maxListingMillis, millisToKeepStats);
@ -502,12 +502,31 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
}
@Override
protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
protected Integer countUnfilteredListing(final ProcessContext context) throws IOException {
return performListing(context, 0L, ListingMode.CONFIGURATION_VERIFICATION, false).size();
}
@Override
protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode)
throws IOException {
return performListing(context, minTimestamp, listingMode, true);
}
private List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode, final boolean applyFilters)
throws IOException {
final Path basePath = new File(getPath(context)).toPath();
final Boolean recurse = context.getProperty(RECURSE).asBoolean();
final Map<Path, BasicFileAttributes> lastModifiedMap = new HashMap<>();
final BiPredicate<Path, BasicFileAttributes> fileFilter = fileFilterRef.get();
final BiPredicate<Path, BasicFileAttributes> fileFilter;
final PerformanceTracker performanceTracker;
if (listingMode == ListingMode.EXECUTION) {
fileFilter = fileFilterRef.get();
performanceTracker = this.performanceTracker;
} else {
final long maxDiskOperationMillis = context.getProperty(MAX_DISK_OPERATION_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
performanceTracker = new UntrackedPerformanceTracker(getLogger(), maxDiskOperationMillis);
fileFilter = createFileFilter(context, performanceTracker, applyFilters);
}
int maxDepth = recurse ? Integer.MAX_VALUE : 1;
final BiPredicate<Path, BasicFileAttributes> matcher = new BiPredicate<Path, BasicFileAttributes>() {
@ -515,7 +534,7 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
@Override
public boolean test(final Path path, final BasicFileAttributes attributes) {
if (!isScheduled()) {
if (!isScheduled() && listingMode == ListingMode.EXECUTION) {
throw new ProcessorStoppedException();
}
@ -536,10 +555,11 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
final TimedOperationKey operationKey = performanceTracker.beginOperation(DiskOperation.FILTER, relativePath, filename);
try {
if (!isDirectory && (minTimestamp == null || attributes.lastModifiedTime().toMillis() >= minTimestamp)
&& fileFilter.test(path, attributes)) {
// We store the attributes for each Path we are returning in order to avoid to
// retrieve them again later when creating the FileInfo
final boolean matchesFilters = (minTimestamp == null || attributes.lastModifiedTime().toMillis() >= minTimestamp)
&& fileFilter.test(path, attributes);
if (!isDirectory && (!applyFilters || matchesFilters)) {
// We store the attributes for each Path we are returning in order to avoid
// retrieving them again later when creating the FileInfo
lastModifiedMap.put(path, attributes);
return true;
@ -562,17 +582,17 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
Files.walkFileTree(basePath, Collections.singleton(FileVisitOption.FOLLOW_LINKS), maxDepth, new FileVisitor<Path>() {
@Override
public FileVisitResult preVisitDirectory(final Path dir, final BasicFileAttributes attributes) throws IOException {
public FileVisitResult preVisitDirectory(final Path dir, final BasicFileAttributes attributes) {
if (Files.isReadable(dir)) {
return FileVisitResult.CONTINUE;
} else {
getLogger().debug("The following directory is not readable: {}", new Object[] {dir.toString()});
getLogger().debug("The following directory is not readable: {}", new Object[]{dir.toString()});
return FileVisitResult.SKIP_SUBTREE;
}
}
@Override
public FileVisitResult visitFile(final Path path, final BasicFileAttributes attributes) throws IOException {
public FileVisitResult visitFile(final Path path, final BasicFileAttributes attributes) {
if (matcher.test(path, attributes)) {
final File file = path.toFile();
final BasicFileAttributes fileAttributes = lastModifiedMap.get(path);
@ -591,20 +611,20 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
}
@Override
public FileVisitResult visitFileFailed(final Path path, final IOException e) throws IOException {
public FileVisitResult visitFileFailed(final Path path, final IOException e) {
if (e instanceof AccessDeniedException) {
getLogger().debug("The following file is not readable: {}", new Object[] {path.toString()});
getLogger().debug("The following file is not readable: {}", new Object[]{path.toString()});
return FileVisitResult.SKIP_SUBTREE;
} else {
getLogger().error("Error during visiting file {}: {}", new Object[] {path.toString(), e.getMessage()}, e);
getLogger().error("Error during visiting file {}: {}", new Object[]{path.toString(), e.getMessage()}, e);
return FileVisitResult.TERMINATE;
}
}
@Override
public FileVisitResult postVisitDirectory(final Path dir, final IOException e) throws IOException {
public FileVisitResult postVisitDirectory(final Path dir, final IOException e) {
if (e != null) {
getLogger().error("Error during visiting directory {}: {}", new Object[] {dir.toString(), e.getMessage()}, e);
getLogger().error("Error during visiting directory {}: {}", new Object[]{dir.toString(), e.getMessage()}, e);
}
return FileVisitResult.CONTINUE;
@ -619,9 +639,16 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
getLogger().info("Processor was stopped so will not complete listing of Files");
return Collections.emptyList();
} finally {
if (performanceTracker != null) {
performanceTracker.completeActiveDirectory();
}
}
}
@Override
protected String getListingContainerName(final ProcessContext context) {
return String.format("%s Directory [%s]", context.getProperty(DIRECTORY_LOCATION).getValue(), getPath(context));
}
@Override
protected boolean isListingResetNecessary(final PropertyDescriptor property) {
@ -636,7 +663,8 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
|| IGNORE_HIDDEN_FILES.equals(property);
}
private BiPredicate<Path, BasicFileAttributes> createFileFilter(final ProcessContext context) {
private BiPredicate<Path, BasicFileAttributes> createFileFilter(final ProcessContext context, final PerformanceTracker performanceTracker,
final boolean applyFilters) {
final long minSize = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
final Double maxSize = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
final long minAge = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
@ -652,6 +680,10 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
final Path basePath = Paths.get(indir);
return (path, attributes) -> {
if (!applyFilters) {
return true;
}
if (minSize > attributes.size()) {
return false;
}

View File

@ -17,10 +17,6 @@
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
@ -32,12 +28,15 @@ import org.apache.nifi.processors.standard.util.FileInfo;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.serialization.record.RecordSchema;
import java.util.Map;
import java.util.HashMap;
import java.util.List;
import java.util.Iterator;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
public abstract class ListFileTransfer extends AbstractListProcessor<FileInfo> {
public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
@ -104,11 +103,21 @@ public abstract class ListFileTransfer extends AbstractListProcessor<FileInfo> {
}
@Override
protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
protected Integer countUnfilteredListing(final ProcessContext context) throws IOException {
return performListing(context, 0L, ListingMode.CONFIGURATION_VERIFICATION, false).size();
}
@Override
protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode) throws IOException {
return performListing(context, minTimestamp, listingMode, true);
}
protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode,
final boolean applyFilters) throws IOException {
final FileTransfer transfer = getFileTransfer(context);
final List<FileInfo> listing;
try {
listing = transfer.getListing();
listing = transfer.getListing(applyFilters);
} finally {
IOUtils.closeQuietly(transfer);
}
@ -128,6 +137,12 @@ public abstract class ListFileTransfer extends AbstractListProcessor<FileInfo> {
return listing;
}
@Override
protected String getListingContainerName(final ProcessContext context) {
return String.format("Remote Directory [%s] on [%s:%s]", getPath(context), context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue(),
context.getProperty(UNDEFAULTED_PORT).evaluateAttributeExpressions().getValue());
}
@Override
protected RecordSchema getRecordSchema() {
return FileInfo.getRecordSchema();

View File

@ -146,11 +146,17 @@ public class ListSFTP extends ListFileTransfer {
}
@Override
protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
final List<FileInfo> listing = super.performListing(context, minTimestamp);
protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode,
final boolean applyFilters) throws IOException {
final List<FileInfo> listing = super.performListing(context, minTimestamp, listingMode, applyFilters);
if (!applyFilters) {
return listing;
}
final Predicate<FileInfo> filePredicate = listingMode == ListingMode.EXECUTION ? this.fileFilter : createFileFilter(context);
return listing.stream()
.filter(fileFilter)
.filter(filePredicate)
.collect(Collectors.toList());
}

View File

@ -190,14 +190,14 @@ public class FTPTransfer implements FileTransfer {
}
@Override
public List<FileInfo> getListing() throws IOException {
public List<FileInfo> getListing(final boolean applyFilters) throws IOException {
final String path = ctx.getProperty(FileTransfer.REMOTE_PATH).evaluateAttributeExpressions().getValue();
final int depth = 0;
final int maxResults = ctx.getProperty(FileTransfer.REMOTE_POLL_BATCH_SIZE).asInteger();
return getListing(path, depth, maxResults);
return getListing(path, depth, maxResults, applyFilters);
}
private List<FileInfo> getListing(final String path, final int depth, final int maxResults) throws IOException {
private List<FileInfo> getListing(final String path, final int depth, final int maxResults, final boolean applyFilters) throws IOException {
final List<FileInfo> listing = new ArrayList<>();
if (maxResults < 1) {
return listing;
@ -266,7 +266,7 @@ public class FTPTransfer implements FileTransfer {
// OR if is a link and we're supposed to follow symlink
if ((recurse && file.isDirectory()) || (symlink && file.isSymbolicLink())) {
try {
listing.addAll(getListing(newFullForwardPath, depth + 1, maxResults - count));
listing.addAll(getListing(newFullForwardPath, depth + 1, maxResults - count, applyFilters));
} catch (final IOException e) {
logger.error("Unable to get listing from " + newFullForwardPath + "; skipping", e);
}
@ -274,8 +274,8 @@ public class FTPTransfer implements FileTransfer {
// if is not a directory and is not a link and it matches
// FILE_FILTER_REGEX - then let's add it
if (!file.isDirectory() && !file.isSymbolicLink() && pathFilterMatches) {
if (pattern == null || pattern.matcher(filename).matches()) {
if (!file.isDirectory() && !file.isSymbolicLink() && (pathFilterMatches || !applyFilters)) {
if (pattern == null || !applyFilters || pattern.matcher(filename).matches()) {
listing.add(newFileInfo(file, path));
count++;
}

View File

@ -34,7 +34,7 @@ public interface FileTransfer extends Closeable {
String getHomeDirectory(FlowFile flowFile) throws IOException;
List<FileInfo> getListing() throws IOException;
List<FileInfo> getListing(boolean applyFilters) throws IOException;
FlowFile getRemoteFile(String remoteFileName, FlowFile flowFile, ProcessSession session) throws ProcessException, IOException;

View File

@ -263,7 +263,7 @@ public class SFTPTransfer implements FileTransfer {
}
@Override
public List<FileInfo> getListing() throws IOException {
public List<FileInfo> getListing(final boolean applyFilters) throws IOException {
final String path = ctx.getProperty(FileTransfer.REMOTE_PATH).evaluateAttributeExpressions().getValue();
final int depth = 0;
@ -277,11 +277,12 @@ public class SFTPTransfer implements FileTransfer {
}
final List<FileInfo> listing = new ArrayList<>(1000);
getListing(path, depth, maxResults, listing);
getListing(path, depth, maxResults, listing, applyFilters);
return listing;
}
protected void getListing(final String path, final int depth, final int maxResults, final List<FileInfo> listing) throws IOException {
protected void getListing(final String path, final int depth, final int maxResults, final List<FileInfo> listing,
final boolean applyFilters) throws IOException {
if (maxResults < 1 || listing.size() >= maxResults) {
return;
}
@ -346,8 +347,8 @@ public class SFTPTransfer implements FileTransfer {
}
// if is not a directory and is not a link and it matches FILE_FILTER_REGEX - then let's add it
if (!entry.isDirectory() && !(entry.getAttributes().getType() == FileMode.Type.SYMLINK) && isPathMatch) {
if (pattern == null || pattern.matcher(entryFilename).matches()) {
if (!entry.isDirectory() && !(entry.getAttributes().getType() == FileMode.Type.SYMLINK) && (!applyFilters || isPathMatch)) {
if (pattern == null || !applyFilters || pattern.matcher(entryFilename).matches()) {
listing.add(newFileInfo(entry, path));
}
}
@ -379,7 +380,7 @@ public class SFTPTransfer implements FileTransfer {
final String newFullForwardPath = newFullPath.getPath().replace("\\", "/");
try {
getListing(newFullForwardPath, depth + 1, maxResults, listing);
getListing(newFullForwardPath, depth + 1, maxResults, listing, applyFilters);
} catch (final IOException e) {
logger.error("Unable to get listing from " + newFullForwardPath + "; skipping", e);
}

View File

@ -18,6 +18,8 @@
package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.SystemUtils;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.flowfile.FlowFile;
@ -233,27 +235,34 @@ public class TestListFile {
assertTrue(file1.createNewFile());
assertTrue(file1.setLastModified(time4millis));
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 1 object. Of that, 1 matches the filter.");
// process first file and set new timestamp
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(1, successFiles1.size());
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 1 object. Of that, 1 matches the filter.");
// create second file
final File file2 = new File(TESTDIR + "/listing2.txt");
assertTrue(file2.createNewFile());
assertTrue(file2.setLastModified(time2millis));
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 2 objects. Of those, 2 match the filter.");
// process second file after timestamp
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(1, successFiles2.size());
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 2 objects. Of those, 2 match the filter.");
// create third file
final File file3 = new File(TESTDIR + "/listing3.txt");
assertTrue(file3.createNewFile());
assertTrue(file3.setLastModified(time4millis));
// 0 are new because the timestamp is before the min listed timestamp
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects. Of those, 3 match the filter.");
// process third file before timestamp
runNext();
@ -264,6 +273,7 @@ public class TestListFile {
// force state to reset and process all files
runner.removeProperty(ListFile.DIRECTORY);
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects. Of those, 3 match the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@ -271,6 +281,7 @@ public class TestListFile {
runNext();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects. Of those, 3 match the filter.");
}
@Test
@ -309,6 +320,7 @@ public class TestListFile {
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runNext.apply(true);
runner.assertTransferCount(ListFile.REL_SUCCESS, 3);
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects. Of those, 3 match the filter.");
// processor updates internal state, it shouldn't pick the same ones.
runNext.apply(false);
@ -323,6 +335,7 @@ public class TestListFile {
assertEquals(2, successFiles2.size());
assertEquals(file2.getName(), successFiles2.get(0).getAttribute("filename"));
assertEquals(file1.getName(), successFiles2.get(1).getAttribute("filename"));
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects. Of those, 2 match the filter.");
// exclude newest
runner.setProperty(ListFile.MIN_AGE, age1);
@ -333,6 +346,7 @@ public class TestListFile {
assertEquals(2, successFiles3.size());
assertEquals(file3.getName(), successFiles3.get(0).getAttribute("filename"));
assertEquals(file2.getName(), successFiles3.get(1).getAttribute("filename"));
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects. Of those, 2 match the filter.");
// exclude oldest and newest
runner.setProperty(ListFile.MIN_AGE, age1);
@ -342,6 +356,7 @@ public class TestListFile {
final List<MockFlowFile> successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(1, successFiles4.size());
assertEquals(file2.getName(), successFiles4.get(0).getAttribute("filename"));
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects. Of those, 1 matches the filter.");
}
@ -377,26 +392,31 @@ public class TestListFile {
// check all files
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects. Of those, 3 match the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(3, successFiles1.size());
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects. Of those, 3 match the filter.");
// exclude largest
runner.removeProperty(ListFile.MIN_AGE);
runner.removeProperty(ListFile.MAX_AGE);
runner.setProperty(ListFile.MIN_SIZE, "0 b");
runner.setProperty(ListFile.MAX_SIZE, "7500 b");
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects. Of those, 2 match the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(2, successFiles2.size());
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects. Of those, 2 match the filter.");
// exclude smallest
runner.removeProperty(ListFile.MIN_AGE);
runner.removeProperty(ListFile.MAX_AGE);
runner.setProperty(ListFile.MIN_SIZE, "2500 b");
runner.removeProperty(ListFile.MAX_SIZE);
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects. Of those, 2 match the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@ -407,6 +427,7 @@ public class TestListFile {
runner.removeProperty(ListFile.MAX_AGE);
runner.setProperty(ListFile.MIN_SIZE, "2500 b");
runner.setProperty(ListFile.MAX_SIZE, "7500 b");
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects. Of those, 1 matches the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@ -444,6 +465,7 @@ public class TestListFile {
runner.removeProperty(ListFile.MIN_SIZE);
runner.removeProperty(ListFile.MAX_SIZE);
runner.setProperty(ListFile.IGNORE_HIDDEN_FILES, "false");
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 2 objects. Of those, 2 match the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@ -451,6 +473,7 @@ public class TestListFile {
// exclude hidden
runner.setProperty(ListFile.IGNORE_HIDDEN_FILES, "true");
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 2 objects. Of those, 1 matches the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@ -472,6 +495,7 @@ public class TestListFile {
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runner.setProperty(ListFile.FILE_FILTER, ".*");
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 2 objects. Of those, 1 matches the filter.");
runNext();
final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@ -484,6 +508,7 @@ public class TestListFile {
assertTrue(subdir.mkdir());
assertTrue(subdir.setReadable(false));
try {
final File file1 = new File(TESTDIR + "/subdir/unreadable.txt");
assertTrue(file1.createNewFile());
assertTrue(file1.setReadable(false));
@ -501,14 +526,16 @@ public class TestListFile {
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runner.setProperty(ListFile.FILE_FILTER, ".*");
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 1 object. Of that, 1 matches the filter.");
runNext();
final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(1, successFiles.size());
assertEquals("secondReadable.txt", successFiles.get(0).getAttribute("filename"));
} finally {
subdir.setReadable(true);
}
}
@Test
public void testListingNeedsSufficientPrivilegesAndFittingFilter() throws Exception {
@ -527,6 +554,7 @@ public class TestListFile {
// Run with privileges and with fitting filter
runner.setProperty(ListFile.FILE_FILTER, "file.*");
assertTrue(file.setLastModified(getTestModifiedTime()));
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 1 object. Of that, 1 matches the filter.");
runNext();
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@ -535,6 +563,7 @@ public class TestListFile {
// Run without privileges and with fitting filter
assertTrue(file.setReadable(false));
assertTrue(file.setLastModified(getTestModifiedTime()));
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 1 object. Of that, 0 match the filter.");
runNext();
final List<MockFlowFile> successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@ -566,6 +595,7 @@ public class TestListFile {
// check all files
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runner.setProperty(ListFile.FILE_FILTER, ListFile.FILE_FILTER.getDefaultValue());
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 4 objects. Of those, 4 match the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@ -574,9 +604,11 @@ public class TestListFile {
// filter file on pattern
// Modifying FILE_FILTER property reset listing status, so these files will be listed again.
runner.setProperty(ListFile.FILE_FILTER, ".*-xyz-.*");
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 4 objects. Of those, 2 match the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS, 2);
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 4 objects. Of those, 2 match the filter.");
runNext();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
}
@ -611,6 +643,7 @@ public class TestListFile {
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runner.setProperty(ListFile.FILE_FILTER, ListFile.FILE_FILTER.getDefaultValue());
runner.setProperty(ListFile.RECURSE, "true");
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 4 objects. Of those, 4 match the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
@ -620,6 +653,7 @@ public class TestListFile {
// filter path on pattern subdir1
runner.setProperty(ListFile.PATH_FILTER, "subdir1");
runner.setProperty(ListFile.RECURSE, "true");
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 4 objects. Of those, 3 match the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@ -628,6 +662,7 @@ public class TestListFile {
// filter path on pattern subdir2
runner.setProperty(ListFile.PATH_FILTER, "subdir2");
runner.setProperty(ListFile.RECURSE, "true");
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 4 objects. Of those, 1 matches the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@ -659,6 +694,7 @@ public class TestListFile {
// check all files
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runner.setProperty(ListFile.RECURSE, "true");
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects. Of those, 3 match the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS, 3);
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@ -683,8 +719,9 @@ public class TestListFile {
}
assertEquals(3, successFiles1.size());
// exclude hidden
// don't recurse
runner.setProperty(ListFile.RECURSE, "false");
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 1 object. Of that, 1 matches the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@ -710,6 +747,7 @@ public class TestListFile {
// check all files
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runner.setProperty(ListFile.RECURSE, "true");
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects. Of those, 3 match the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
runner.assertTransferCount(ListFile.REL_SUCCESS, 3);
@ -802,6 +840,7 @@ public class TestListFile {
makeTestFile("/batch1-age5.txt", time5millis, fileTimes);
// check files
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects. Of those, 3 match the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
@ -815,6 +854,7 @@ public class TestListFile {
// should be ignored since it's older than age3
makeTestFile("/batch2-age4.txt", time4millis, fileTimes);
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 6 objects. Of those, 6 match the filter.");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
@ -880,4 +920,14 @@ public class TestListFile {
}
}
}
private void assertVerificationOutcome(final Outcome expectedOutcome, final String expectedExplanationRegex) {
final List<ConfigVerificationResult> results = processor.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap());
assertEquals(1, results.size());
final ConfigVerificationResult result = results.get(0);
assertEquals(expectedOutcome, result.getOutcome());
assertTrue(String.format("Expected verification result to match pattern [%s]. Actual explanation was: %s", expectedExplanationRegex, result.getExplanation()),
result.getExplanation().matches(expectedExplanationRegex));
}
}

View File

@ -29,7 +29,10 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processor.util.list.AbstractListProcessor;
import org.apache.nifi.processors.standard.util.FTPTransfer;
import org.apache.nifi.processors.standard.util.FileInfo;
@ -47,6 +50,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestListSFTP {
@Rule
@ -95,7 +99,7 @@ public class TestListSFTP {
protected FileTransfer getFileTransfer(ProcessContext context) {
return new SFTPTransfer(context, getLogger()){
@Override
protected void getListing(String path, int depth, int maxResults, List<FileInfo> listing) throws IOException {
protected void getListing(String path, int depth, int maxResults, List<FileInfo> listing, boolean applyFilters) throws IOException {
if (path.contains("subdir")) {
reachScanningSubDir.countDown();
try {
@ -105,7 +109,7 @@ public class TestListSFTP {
}
}
super.getListing(path, depth, maxResults, listing);
super.getListing(path, depth, maxResults, listing, applyFilters);
}
};
}
@ -193,6 +197,7 @@ public class TestListSFTP {
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.MILLISECONDS) * 2);
runner.run();
assertVerificationOutcome(runner, Outcome.SUCCESSFUL, ".* Found 3 objects. Of those, 3 match the filter.");
runner.assertTransferCount(ListSFTP.REL_SUCCESS, 3);
@ -231,10 +236,22 @@ public class TestListSFTP {
runner.run();
assertVerificationOutcome(runner, Outcome.SUCCESSFUL, ".* Found 3 objects. Of those, 1 matches the filter.");
runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1);
final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(ListSFTP.REL_SUCCESS).get(0);
//the only file between the limits
retrievedFile.assertAttributeEquals("filename", "file.txt");
}
private void assertVerificationOutcome(final TestRunner runner, final Outcome expectedOutcome, final String expectedExplanationRegex) {
final List<ConfigVerificationResult> results = ((VerifiableProcessor) runner.getProcessor())
.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap());
assertEquals(1, results.size());
final ConfigVerificationResult result = results.get(0);
assertEquals(expectedOutcome, result.getOutcome());
assertTrue(String.format("Expected verification result to match pattern [%s]. Actual explanation was: %s", expectedExplanationRegex, result.getExplanation()),
result.getExplanation().matches(expectedExplanationRegex));
}
}

View File

@ -144,7 +144,7 @@ public class ITestSFTPTransferWithSSHTestServer {
properties.put(SFTPTransfer.REMOTE_PATH, DIR_2);
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
final List<FileInfo> listing = transfer.getListing();
final List<FileInfo> listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(2, listing.size());
@ -167,7 +167,7 @@ public class ITestSFTPTransferWithSSHTestServer {
properties.put(SFTPTransfer.IGNORE_DOTTED_FILES, "false");
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
final List<FileInfo> listing = transfer.getListing();
final List<FileInfo> listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(3, listing.size());
@ -183,7 +183,7 @@ public class ITestSFTPTransferWithSSHTestServer {
properties.put(SFTPTransfer.RECURSIVE_SEARCH, "false");
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
final List<FileInfo> listing = transfer.getListing();
final List<FileInfo> listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(0, listing.size());
}
@ -196,7 +196,7 @@ public class ITestSFTPTransferWithSSHTestServer {
properties.put(SFTPTransfer.RECURSIVE_SEARCH, "true");
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
final List<FileInfo> listing = transfer.getListing();
final List<FileInfo> listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(4, listing.size());
}
@ -210,7 +210,7 @@ public class ITestSFTPTransferWithSSHTestServer {
properties.put(SFTPTransfer.FOLLOW_SYMLINK, "false");
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
final List<FileInfo> listing = transfer.getListing();
final List<FileInfo> listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(0, listing.size());
}
@ -224,7 +224,7 @@ public class ITestSFTPTransferWithSSHTestServer {
properties.put(SFTPTransfer.FOLLOW_SYMLINK, "true");
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
final List<FileInfo> listing = transfer.getListing();
final List<FileInfo> listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(4, listing.size());
}
@ -238,7 +238,7 @@ public class ITestSFTPTransferWithSSHTestServer {
// first listing is without batch size and shows 4 results
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
final List<FileInfo> listing = transfer.getListing();
final List<FileInfo> listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(4, listing.size());
}
@ -247,7 +247,7 @@ public class ITestSFTPTransferWithSSHTestServer {
properties.put(SFTPTransfer.REMOTE_POLL_BATCH_SIZE, "2");
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
final List<FileInfo> listing = transfer.getListing();
final List<FileInfo> listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(2, listing.size());
}
@ -263,7 +263,7 @@ public class ITestSFTPTransferWithSSHTestServer {
properties.put(SFTPTransfer.FILE_FILTER_REGEX, fileFilterRegex);
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
final List<FileInfo> listing = transfer.getListing();
final List<FileInfo> listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(2, listing.size());
@ -282,7 +282,7 @@ public class ITestSFTPTransferWithSSHTestServer {
properties.put(SFTPTransfer.PATH_FILTER_REGEX, pathFilterRegex);
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
final List<FileInfo> listing = transfer.getListing();
final List<FileInfo> listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(2, listing.size());
@ -306,7 +306,7 @@ public class ITestSFTPTransferWithSSHTestServer {
properties.put(SFTPTransfer.RECURSIVE_SEARCH, "true");
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
transfer.getListing();
transfer.getListing(true);
}
}
@ -317,7 +317,7 @@ public class ITestSFTPTransferWithSSHTestServer {
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
// verify the directory has two files
final List<FileInfo> listing = transfer.getListing();
final List<FileInfo> listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(2, listing.size());
@ -327,7 +327,7 @@ public class ITestSFTPTransferWithSSHTestServer {
}
// verify there are now zero files
final List<FileInfo> listingAfterDelete = transfer.getListing();
final List<FileInfo> listingAfterDelete = transfer.getListing(true);
assertNotNull(listingAfterDelete);
assertEquals(0, listingAfterDelete.size());
}
@ -340,7 +340,7 @@ public class ITestSFTPTransferWithSSHTestServer {
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
// verify the directory has two files
final List<FileInfo> listing = transfer.getListing();
final List<FileInfo> listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(2, listing.size());
@ -352,7 +352,7 @@ public class ITestSFTPTransferWithSSHTestServer {
}
// verify there are now zero files
final List<FileInfo> listingAfterDelete = transfer.getListing();
final List<FileInfo> listingAfterDelete = transfer.getListing(true);
assertNotNull(listingAfterDelete);
assertEquals(0, listingAfterDelete.size());
}
@ -374,7 +374,7 @@ public class ITestSFTPTransferWithSSHTestServer {
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
// verify the directory exists
final List<FileInfo> listing = transfer.getListing();
final List<FileInfo> listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(0, listing.size());
@ -382,7 +382,7 @@ public class ITestSFTPTransferWithSSHTestServer {
// verify the directory no longer exists
try {
transfer.getListing();
transfer.getListing(true);
Assert.fail("Should have thrown exception");
} catch (FileNotFoundException e) {
// nothing to do, expected
@ -408,7 +408,7 @@ public class ITestSFTPTransferWithSSHTestServer {
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
// verify the directory does not exist
try {
transfer.getListing();
transfer.getListing(true);
Assert.fail("Should have failed");
} catch (FileNotFoundException e) {
// Nothing to do, expected
@ -418,7 +418,7 @@ public class ITestSFTPTransferWithSSHTestServer {
transfer.ensureDirectoryExists(null, new File(absolutePath));
// verify the directory now exists
final List<FileInfo> listing = transfer.getListing();
final List<FileInfo> listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(0, listing.size());
}
@ -433,7 +433,7 @@ public class ITestSFTPTransferWithSSHTestServer {
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
// verify the directory does not exist
try {
transfer.getListing();
transfer.getListing(true);
Assert.fail("Should have failed");
} catch (FileNotFoundException e) {
// Nothing to do, expected
@ -443,7 +443,7 @@ public class ITestSFTPTransferWithSSHTestServer {
transfer.ensureDirectoryExists(null, new File(absolutePath));
// verify the directory now exists
final List<FileInfo> listing = transfer.getListing();
final List<FileInfo> listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(0, listing.size());
}
@ -456,7 +456,7 @@ public class ITestSFTPTransferWithSSHTestServer {
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
// verify the directory already exists
final List<FileInfo> listing = transfer.getListing();
final List<FileInfo> listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(2, listing.size());
@ -475,7 +475,7 @@ public class ITestSFTPTransferWithSSHTestServer {
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
// verify the directory does not exist
try {
transfer.getListing();
transfer.getListing(true);
Assert.fail("Should have failed");
} catch (FileNotFoundException e) {
// Nothing to do, expected
@ -485,7 +485,7 @@ public class ITestSFTPTransferWithSSHTestServer {
transfer.ensureDirectoryExists(null, new File(absolutePath));
// verify the directory now exists
final List<FileInfo> listing = transfer.getListing();
final List<FileInfo> listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(0, listing.size());
}
@ -499,7 +499,7 @@ public class ITestSFTPTransferWithSSHTestServer {
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
// verify the directory already exists
final List<FileInfo> listing = transfer.getListing();
final List<FileInfo> listing = transfer.getListing(true);
assertNotNull(listing);
assertEquals(2, listing.size());
@ -519,7 +519,7 @@ public class ITestSFTPTransferWithSSHTestServer {
try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
// verify the directory does not exist
try {
transfer.getListing();
transfer.getListing(true);
Assert.fail("Should have failed");
} catch (FileNotFoundException e) {
// Nothing to do, expected