NIFI-309 - Adding doc annotations to std processors

This commit is contained in:
danbress 2015-03-05 12:23:25 -05:00
parent 9288f237e0
commit 376a9c6acf
29 changed files with 285 additions and 110 deletions

View File

@ -26,18 +26,34 @@ import java.lang.annotation.Target;
import org.apache.nifi.components.ConfigurableComponent;
/**
* Annotation that may be placed on a {@link org.apache.nifi.processor.Processor Processor},
* Annotation that may be placed on a
* {@link org.apache.nifi.processor.Processor Processor},
* {@link org.apache.nifi.controller.ControllerService ControllerService}, or
* {@link org.apache.nifi.reporting.ReportingTask ReportingTask} that indicates this component is related
* to the components listed.
* @author
* {@link org.apache.nifi.reporting.ReportingTask ReportingTask} that indicates
* this component is related to the components listed.
*
* @author
*
*/
@Documented
@Target({ElementType.TYPE})
@Target({ ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface SeeAlso {
public Class<? extends ConfigurableComponent>[] value();
/**
* Classes you want to link to.
*
* @return
*/
public Class<? extends ConfigurableComponent>[] value();
/**
* Fully qualified class names you want to link to. Use this when the class
* you want to link to is not in the class path of the component you are
* linking from.
*
* @return
*/
public String[] classNames() default "";
}

View File

@ -159,6 +159,20 @@ public class HtmlDocumentationWriter implements DocumentationWriter {
++index;
}
for (final String linkedComponent : seeAlso.classNames()) {
if (index != 0) {
xmlStreamWriter.writeCharacters(", ");
}
final String link = "../" + linkedComponent + "/index.html";
final int indexOfLastPeriod = Math.min(0, linkedComponent.lastIndexOf("."));
writeLink(xmlStreamWriter, linkedComponent.substring(indexOfLastPeriod), link);
++index;
}
xmlStreamWriter.writeEndElement();
}
}

View File

@ -40,7 +40,9 @@ import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.ReadsAttribute;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.WritesAttribute;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -66,6 +68,8 @@ import org.tukaani.xz.XZOutputStream;
@SupportsBatching
@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2"})
@CapabilityDescription("Compresses or decompresses the contents of FlowFiles using a user-specified compression algorithm and updates the mime.type attribute as appropriate")
@ReadsAttribute(attribute="mime.type", description="If the Compression Format is set to use mime.type attribute, this attribute is used to determine the compression type. Otherwise, this attribute is ignored.")
@WritesAttribute(attribute="mime.type", description="If the Mode property is set to compress, the appropriate MIME Type is set. If the Mode property is set to decompress and the file is successfully decompressed, this attribute is removed, as the MIME Type is no longer known.")
public class CompressContent extends AbstractProcessor {
public static final String COMPRESSION_FORMAT_ATTRIBUTE = "use mime.type attribute";

View File

@ -26,6 +26,13 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.WritesAttribute;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
@ -39,15 +46,9 @@ import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.commons.lang3.StringUtils;
@EventDriven
@SupportsBatching
@Tags({"experimental", "hash", "dupe", "duplicate", "dedupe"})
@ -55,6 +56,8 @@ import org.apache.commons.lang3.StringUtils;
+ "If so, routes the FlowFile to 'duplicate' with an attribute named 'original.identifier' that specifies the original FlowFile's"
+ "\"description\", which is specified in the <FlowFile Description> property. If the FlowFile is not determined to be a duplicate, the Processor "
+ "routes the FlowFile to 'non-duplicate'")
@WritesAttribute(attribute="original.flowfile.description", description="All FlowFiles routed to the duplicate relationship will have an attribute added named original.flowfile.description. The value of this attribute is determined by the attributes of the original copy of the data and by the FlowFile Description property.")
@SeeAlso(value={DistributedMapCacheClient.class}, classNames={"org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer"})
public class DetectDuplicate extends AbstractProcessor {
public static final String ORIGINAL_DESCRIPTION_ATTRIBUTE_NAME = "original.flowfile.description";

View File

@ -66,6 +66,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.WritesAttribute;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.behavior.SideEffectFree;
@ -91,6 +92,7 @@ import org.xml.sax.InputSource;
+ "evaluate to a Node, the FlowFile will be routed to 'unmatched' without having its contents modified. If Destination is "
+ "flowfile-attribute and the expression matches nothing, attributes will be created with empty strings as the value, and the "
+ "FlowFile will always be routed to 'matched'")
@WritesAttribute(attribute="user-defined", description="This processor adds user-defined attributes if the <Destination> property is set to flowfile-attribute.")
public class EvaluateXPath extends AbstractProcessor {
public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute";

View File

@ -49,30 +49,31 @@ import net.sf.saxon.s9api.XQueryExecutable;
import net.sf.saxon.s9api.XdmItem;
import net.sf.saxon.s9api.XdmNode;
import net.sf.saxon.s9api.XdmValue;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.WritesAttribute;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.util.ObjectHolder;
import org.w3c.dom.Document;
import org.xml.sax.InputSource;
@ -92,7 +93,7 @@ import org.xml.sax.InputSource;
+ "'matched'. If no provided XQuery returns a result, the FlowFile will be routed to 'unmatched'. If the "
+ "Destination is 'flowfile-attribute' and the XQueries matche nothing, no attributes will be applied to the "
+ "FlowFile.")
@WritesAttribute(attribute="user-defined", description="This processor adds user-defined attributes if the <Destination> property is set to flowfile-attribute .")
public class EvaluateXQuery extends AbstractProcessor {
public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute";

View File

@ -31,32 +31,33 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.WritesAttribute;
import org.apache.nifi.annotation.documentation.WritesAttributes;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
/**
* <p>
@ -119,6 +120,10 @@ import org.apache.commons.lang3.StringUtils;
@SupportsBatching
@Tags({"command execution", "command", "stream", "execute"})
@CapabilityDescription("Executes an external command on the contents of a flow file, and creates a new flow file with the results of the command.")
@WritesAttributes({ @WritesAttribute(attribute = "execution.command", description = "The name of the command executed to create the new FlowFile"),
@WritesAttribute(attribute = "execution.command.args", description = "The semi-colon delimited list of arguments"),
@WritesAttribute(attribute = "execution.status", description = "The exit status code returned from executing the command"),
@WritesAttribute(attribute = "execution.error", description = "Any error messages returned from executing the command") })
public class ExecuteStreamCommand extends AbstractProcessor {
public static final Relationship ORIGINAL_RELATIONSHIP = new Relationship.Builder()

View File

@ -24,6 +24,9 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.WritesAttribute;
import org.apache.nifi.annotation.documentation.WritesAttributes;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processors.standard.util.FTPTransfer;
@ -32,6 +35,15 @@ import org.apache.nifi.processors.standard.util.FileTransfer;
@SideEffectFree
@Tags({"FTP", "get", "retrieve", "files", "fetch", "remote", "ingest", "source", "input"})
@CapabilityDescription("Fetches files from an FTP Server and creates FlowFiles from them")
@WritesAttributes({ @WritesAttribute(attribute = "filename", description = "The filename is set to the name of the file on the remote server"),
@WritesAttribute(attribute = "path", description = "The path is set to the path of the file's directory on the remote server. For example, if the <Remote Path> property is set to /tmp, files picked up from /tmp will have the path attribute set to /tmp. If the <Search Recursively> property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to /tmp/abc/1/2/3"),
@WritesAttribute(attribute = "file.lastModifiedTime", description = "The date and time that the source file was last modified"),
@WritesAttribute(attribute = "file.lastAccessTime", description = "The date and time that the file was last accessed. May not work on all file systems"),
@WritesAttribute(attribute = "file.owner", description = "The numeric owner id of the source file"),
@WritesAttribute(attribute = "file.group", description = "The numeric group id of the source file"),
@WritesAttribute(attribute = "file.permissions", description = "The read/write/execute permissions of the source file"),
@WritesAttribute(attribute = "absolute.path", description = "The full/absolute path from where a file was picked up. The current 'path' attribute is still populated, but may be a relative path")})
@SeeAlso(PutFTP.class)
public class GetFTP extends GetFileTransfer {
private List<PropertyDescriptor> properties;

View File

@ -49,6 +49,13 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.WritesAttribute;
import org.apache.nifi.annotation.documentation.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -59,16 +66,22 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
@TriggerWhenEmpty
@Tags({"local", "files", "filesystem", "ingest", "ingress", "get", "source", "input"})
@CapabilityDescription("Creates FlowFiles from files in a directory. NiFi will ignore files it doesn't have at least read permissions for.")
@WritesAttributes({ @WritesAttribute(attribute = "filename", description = "The filename is set to the name of the file on disk"),
@WritesAttribute(attribute = "path", description = "The path is set to the relative path of the file's directory on disk. For example, if the <Input Directory> property is set to /tmp, files picked up from /tmp will have the path attribute set to ./. If the <Recurse Subdirectories> property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to abc/1/2/3"),
@WritesAttribute(attribute = "file.creationTime", description = "The date and time that the file was created. May not work on all file systems"),
@WritesAttribute(attribute = "file.lastModifiedTime", description = "The date and time that the file was last modified. May not work on all file systems"),
@WritesAttribute(attribute = "file.lastAccessTime", description = "The date and time that the file was last accessed. May not work on all file systems"),
@WritesAttribute(attribute = "file.owner", description = "The owner of the file. May not work on all file systems"),
@WritesAttribute(attribute = "file.group", description = "The group owner of the file. May not work on all file systems"),
@WritesAttribute(attribute = "file.permissions", description = "The read/write/execute permissions of the file. May not work on all file systems"),
@WritesAttribute(attribute = "absolute.path", description = "The full/absolute path from where a file was picked up. The current 'path' attribute is still populated, but may be a relative path")})
@SeeAlso(PutFile.class)
public class GetFile extends AbstractProcessor {
public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()

View File

@ -67,6 +67,10 @@ import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.BasicHttpClientConnectionManager;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.WritesAttribute;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@ -79,9 +83,6 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
@ -90,6 +91,7 @@ import org.apache.nifi.util.StopWatch;
@Tags({"get", "fetch", "poll", "http", "https", "ingest", "source", "input"})
@CapabilityDescription("Fetches a file via HTTP")
@WritesAttribute(attribute="filename", description="the filename is set to the name of the file on the remote server")
public class GetHTTP extends AbstractSessionFactoryProcessor {
static final int PERSISTENCE_INTERVAL_MSEC = 10000;

View File

@ -23,6 +23,7 @@ import javax.jms.JMSException;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.logging.ProcessorLog;
@ -35,6 +36,7 @@ import org.apache.nifi.processors.standard.util.WrappedMessageConsumer;
@TriggerWhenEmpty
@Tags({"jms", "queue", "listen", "get", "pull", "source", "consume", "consumer"})
@CapabilityDescription("Pulls messages from a JMS Queue, creating a FlowFile for each JMS Message or bundle of messages, as configured")
@SeeAlso(PutJMS.class)
public class GetJMSQueue extends JmsConsumer {
private final Queue<WrappedMessageConsumer> consumerQueue = new LinkedBlockingQueue<>();

View File

@ -44,6 +44,7 @@ import javax.jms.Session;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
@ -61,6 +62,7 @@ import org.apache.nifi.processors.standard.util.WrappedMessageConsumer;
@TriggerWhenEmpty
@Tags({"jms", "topic", "subscription", "durable", "non-durable", "listen", "get", "pull", "source", "consume", "consumer"})
@CapabilityDescription("Pulls messages from a JMS Topic, creating a FlowFile for each JMS Message or bundle of messages, as configured")
@SeeAlso(PutJMS.class)
public class GetJMSTopic extends JmsConsumer {
public static final String SUBSCRIPTION_NAME_PROPERTY = "subscription.name";

View File

@ -21,20 +21,31 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.WritesAttribute;
import org.apache.nifi.annotation.documentation.WritesAttributes;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.SFTPTransfer;
@SideEffectFree
@Tags({"sftp", "get", "retrieve", "files", "fetch", "remote", "ingest", "source", "input"})
@CapabilityDescription("Fetches files from an SFTP Server and creates FlowFiles from them")
@WritesAttributes({ @WritesAttribute(attribute = "filename", description = "The filename is set to the name of the file on the remote server"),
@WritesAttribute(attribute = "path", description = "The path is set to the path of the file's directory on the remote server. For example, if the <Remote Path> property is set to /tmp, files picked up from /tmp will have the path attribute set to /tmp. If the <Search Recursively> property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to /tmp/abc/1/2/3"),
@WritesAttribute(attribute = "file.lastModifiedTime", description = "The date and time that the source file was last modified"),
@WritesAttribute(attribute = "file.owner", description = "The numeric owner id of the source file"),
@WritesAttribute(attribute = "file.group", description = "The numeric group id of the source file"),
@WritesAttribute(attribute = "file.permissions", description = "The read/write/execute permissions of the source file"),
@WritesAttribute(attribute = "absolute.path", description = "The full/absolute path from where a file was picked up. The current 'path' attribute is still populated, but may be a relative path")})
@SeeAlso(PutSFTP.class)
public class GetSFTP extends GetFileTransfer {
private List<PropertyDescriptor> properties;

View File

@ -29,6 +29,14 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.WritesAttribute;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
@ -37,16 +45,8 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
/**
* <p>
* This processor identifies groups of user-specified flowfile attributes and
@ -114,6 +114,7 @@ import org.apache.commons.lang3.StringUtils;
+ "and the value of the property is a regular expression that, if matched by the attribute value, will cause that attribute "
+ "to be used as part of the hash. If the regular expression contains a capturing group, only the value of the capturing "
+ "group will be used.")
@WritesAttribute(attribute="<Hash Value Attribute Key>", description="This Processor adds an attribute whose value is the result of Hashing the existing FlowFile attributes. The name of this attribute is specified by the <Hash Value Attribute Key> property.")
public class HashAttribute extends AbstractProcessor {
public static final PropertyDescriptor HASH_VALUE_ATTRIBUTE = new PropertyDescriptor.Builder()

View File

@ -32,6 +32,7 @@ import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.WritesAttribute;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
@ -51,6 +52,7 @@ import org.apache.nifi.util.ObjectHolder;
@SupportsBatching
@Tags({"hash", "content", "MD5", "SHA-1", "SHA-256"})
@CapabilityDescription("Calculates a hash value for the Content of a FlowFile and puts that hash value on the FlowFile as an attribute whose name is determined by the <Hash Attribute Name> property")
@WritesAttribute(attribute="<Hash Attribute Name>", description="This Processor adds an attribute whose value is the result of Hashing the existing FlowFile attributes. The name of this attribute is specified by the <Hash Attribute Name> property")
public class HashContent extends AbstractProcessor {
public static final PropertyDescriptor ATTRIBUTE_NAME = new PropertyDescriptor.Builder()

View File

@ -23,6 +23,12 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.WritesAttribute;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ProcessorLog;
@ -31,11 +37,6 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.util.ObjectHolder;
import org.apache.tika.config.TikaConfig;
@ -46,6 +47,7 @@ import org.apache.tika.mime.MediaType;
import org.apache.tika.mime.MimeType;
import org.apache.tika.mime.MimeTypeException;
/**
* <p>
* Attempts to detect the MIME Type of a FlowFile by examining its contents. If
@ -71,6 +73,7 @@ import org.apache.tika.mime.MimeTypeException;
+ "an attribute with the name 'mime.type' is added with the value being the MIME Type. If the MIME Type cannot be determined, "
+ "the value will be set to 'application/octet-stream'. In addition, the attribute mime.extension will be set if a common file "
+ "extension for the MIME Type is known.")
@WritesAttribute(attribute="mime.type", description="This Processor sets the FlowFile's mime.type attribute to the detected MIME Type. If unable to detect the MIME Type, the attribute's value will be set to application/octet-stream")
public class IdentifyMimeType extends AbstractProcessor {
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles are routed to success").build();

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.processors.standard;
import static org.apache.commons.lang3.StringUtils.*;
import static org.apache.commons.lang3.StringUtils.trimToEmpty;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
@ -48,6 +48,12 @@ import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.WritesAttribute;
import org.apache.nifi.annotation.documentation.WritesAttributes;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
@ -55,20 +61,22 @@ import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.SSLContextService.ClientAuth;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
@SupportsBatching
@Tags({"http", "https", "rest", "client"})
@CapabilityDescription("An HTTP client processor which converts FlowFile attributes to HTTP headers, with configurable HTTP method, url, etc.")
@WritesAttributes({ @WritesAttribute(attribute = "invokehttp.status.code", description = "The status code that is returned"),
@WritesAttribute(attribute = "invokehttp.status.message", description = "The status message that is returned"),
@WritesAttribute(attribute = "invokehttp.response.body", description = "The response body"),
@WritesAttribute(attribute = "invokehttp.request.url", description = "The request URL"),
@WritesAttribute(attribute = "invokehttp.tx.id", description = "The transaction ID that is returned after reading the response"),
@WritesAttribute(attribute = "invokehttp.remote.dn", description = "The DN of the remote server") })
public final class InvokeHTTP extends AbstractProcessor {
//-- properties --//

View File

@ -40,7 +40,12 @@ import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.ReadsAttribute;
import org.apache.nifi.annotation.documentation.ReadsAttributes;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.WritesAttribute;
import org.apache.nifi.annotation.documentation.WritesAttributes;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
@ -70,6 +75,18 @@ import org.apache.nifi.util.ObjectHolder;
@TriggerWhenEmpty
@Tags({"merge", "content", "correlation", "tar", "zip", "stream", "concatenation", "archive", "flowfile-stream", "flowfile-stream-v3"})
@CapabilityDescription("Merges a Group of FlowFiles together based on a user-defined strategy and packages them into a single FlowFile. It is recommended that the Processor be configured with only a single incoming connection, as Group of FlowFiles will not be created from FlowFiles in different connections. This processor updates the mime.type attribute as appropriate.")
@ReadsAttributes({ @ReadsAttribute(attribute = "fragment.identifier", description = "Applicable only if the <Merge Strategy> property is set to Defragment. All FlowFiles with the same value for this attribute will be bundled together"),
@ReadsAttribute(attribute = "fragment.index", description = "Applicable only if the <Merge Strategy> property is set to Defragment. This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute and must be a unique integer between 0 and the value of the fragment.count attribute. This attribute indicates the order in which the fragments should be assembled"),
@ReadsAttribute(attribute = "fragment.count", description = "Applicable only if the <Merge Strategy> property is set to Defragment. This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same bundle must have the same value for this attribute. The value of this attribute indicates how many FlowFiles should be expected in the given bundle"),
@ReadsAttribute(attribute = "segment.original.filename", description = "Applicable only if the <Merge Strategy> property is set to Defragment. This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same bundle must have the same value for this attribute. The value of this attribute will be used for the filename of the completed merged FlowFile"),
@ReadsAttribute(attribute = "tar.permissions", description = "Applicable only if the <Merge Format> property is set to TAR. The value of this attribute must be 3 characters; each character must be in the range 0 to 7 (inclusive) and indicates the file permissions that should be used for the FlowFile's TAR entry. If this attribute is missing or has an invalid value, the default value of 644 will be used") })
@WritesAttributes({ @WritesAttribute(attribute = "filename", description = "When more than 1 file is merged, the filename comes from the segment.original.filename attribute. If that attribute does not exist in the source FlowFiles, then the filename is set to the number of nanoseconds matching system time. Then a filename extension may be applied:<ul><li>"
+"<li>if Merge Format is TAR, then the filename will be appended with .tar</li>"
+"<li>if Merge Format is ZIP, then the filename will be appended with .zip</li>"
+"<li>if Merge Format is FlowFileStream, then the filename will be appended with .pkg</li></ul>"),
@WritesAttribute(attribute = "merge.count", description = "The number of FlowFiles that were merged into this bundle"),
@WritesAttribute(attribute = "merge.bin.age", description = "The age of the bin, in milliseconds, when it was merged and output. Effectively this is the greatest amount of time that any FlowFile in this bundle remained waiting in this processor before it was output") })
@SeeAlso(SegmentContent.class)
public class MergeContent extends BinFiles {
// preferred attributes

View File

@ -30,6 +30,13 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.WritesAttribute;
import org.apache.nifi.annotation.documentation.WritesAttributes;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -39,11 +46,6 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
@ -53,6 +55,9 @@ import org.apache.nifi.processor.util.StandardValidators;
@Tags({"monitor", "flow", "active", "inactive", "activity", "detection"})
@CapabilityDescription("Monitors the flow for activity and sends out an indicator when the flow has not had any data for "
+ "some specified amount of time and again when the flow's activity is restored")
@WritesAttributes({
@WritesAttribute(attribute = "inactivityStartMillis", description = "The time at which Inactivity began, in the form of milliseconds since Epoch"),
@WritesAttribute(attribute = "inactivityDurationMillis", description = "The number of milliseconds that the inactivity has spanned") })
public class MonitorActivity extends AbstractProcessor {
public static final PropertyDescriptor THRESHOLD = new PropertyDescriptor.Builder()

View File

@ -80,6 +80,12 @@ import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.HttpCoreContext;
import org.apache.http.util.EntityUtils;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.ReadsAttribute;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@ -92,11 +98,6 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
@ -120,6 +121,7 @@ import com.sun.jersey.api.client.ClientResponse.Status;
@SupportsBatching
@Tags({"http", "https", "remote", "copy", "archive"})
@CapabilityDescription("Performs an HTTP Post with the content of the FlowFile")
@ReadsAttribute(attribute="mime.type", description="If not sending data as a FlowFile, the mime.type attribute will be used to set the HTTP Header for Content-Type")
public class PostHTTP extends AbstractProcessor {
public static final String CONTENT_TYPE = "Content-Type";

View File

@ -26,20 +26,22 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.FTPTransfer;
@SupportsBatching
@Tags({"remote", "copy", "egress", "put", "ftp", "archive", "files"})
@CapabilityDescription("Sends FlowFiles to an FTP Server")
@SeeAlso(GetFTP.class)
public class PutFTP extends PutFileTransfer<FTPTransfer> {
private static final Pattern PRE_SEND_CMD_PATTERN = Pattern.compile("^pre\\.cmd\\.(\\d+)$");

View File

@ -34,6 +34,10 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -43,9 +47,6 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
@ -53,6 +54,7 @@ import org.apache.nifi.util.StopWatch;
@SupportsBatching
@Tags({"put", "local", "copy", "archive", "files", "filesystem"})
@CapabilityDescription("Writes the contents of a FlowFile to the local file system")
@SeeAlso(GetFile.class)
public class PutFile extends AbstractProcessor {
public static final String REPLACE_RESOLUTION = "replace";

View File

@ -28,6 +28,7 @@ import static org.apache.nifi.processors.standard.util.JmsFactory.PROP_TYPE_OBJE
import static org.apache.nifi.processors.standard.util.JmsFactory.PROP_TYPE_SHORT;
import static org.apache.nifi.processors.standard.util.JmsFactory.PROP_TYPE_STRING;
import static org.apache.nifi.processors.standard.util.JmsProperties.ATTRIBUTES_TO_JMS_PROPS;
import static org.apache.nifi.processors.standard.util.JmsProperties.BATCH_SIZE;
import static org.apache.nifi.processors.standard.util.JmsProperties.CLIENT_ID_PREFIX;
import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_NAME;
import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_TYPE;
@ -44,7 +45,6 @@ import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_MA
import static org.apache.nifi.processors.standard.util.JmsProperties.PASSWORD;
import static org.apache.nifi.processors.standard.util.JmsProperties.REPLY_TO_QUEUE;
import static org.apache.nifi.processors.standard.util.JmsProperties.TIMEOUT;
import static org.apache.nifi.processors.standard.util.JmsProperties.BATCH_SIZE;
import static org.apache.nifi.processors.standard.util.JmsProperties.URL;
import static org.apache.nifi.processors.standard.util.JmsProperties.USERNAME;
@ -71,11 +71,11 @@ import javax.jms.Session;
import javax.jms.StreamMessage;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
@ -86,9 +86,11 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processors.standard.util.JmsFactory;
import org.apache.nifi.processors.standard.util.WrappedMessageProducer;
import org.apache.nifi.stream.io.StreamUtils;
@Tags({"jms", "send", "put"})
@CapabilityDescription("Creates a JMS Message from the contents of a FlowFile and sends the message to a JMS Server")
@SeeAlso({GetJMSQueue.class, GetJMSTopic.class})
public class PutJMS extends AbstractProcessor {
public static final Charset UTF8 = Charset.forName("UTF-8");

View File

@ -20,17 +20,19 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processors.standard.util.SFTPTransfer;
@SupportsBatching
@Tags({"remote", "copy", "egress", "put", "sftp", "archive", "files"})
@CapabilityDescription("Sends FlowFiles to an SFTP Server")
@SeeAlso(GetSFTP.class)
public class PutSFTP extends PutFileTransfer<SFTPTransfer> {
private List<PropertyDescriptor> properties;

View File

@ -34,26 +34,27 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.WritesAttribute;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.util.file.monitor.LastModifiedMonitor;
import org.apache.nifi.util.file.monitor.SynchronousFileWatcher;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.file.monitor.LastModifiedMonitor;
import org.apache.nifi.util.file.monitor.SynchronousFileWatcher;
import org.apache.nifi.util.search.Search;
import org.apache.nifi.util.search.SearchTerm;
import org.apache.nifi.util.search.ahocorasick.AhoCorasick;
@ -64,6 +65,7 @@ import org.apache.nifi.util.search.ahocorasick.SearchState;
@SupportsBatching
@Tags({"aho-corasick", "scan", "content", "byte sequence", "search", "find", "dictionary"})
@CapabilityDescription("Scans the content of FlowFiles for terms that are found in a user-supplied dictionary. If a term is matched, the UTF-8 encoded version of the term will be added to the FlowFile using the 'matching.term' attribute")
@WritesAttribute(attribute="matching.term", description="The term that caused the Processor to route the FlowFile to the 'matched' relationship; if FlowFile is routed to the 'unmatched' relationship, this attribute is not added")
public class ScanContent extends AbstractProcessor {
public static final String TEXT_ENCODING = "text";

View File

@ -29,7 +29,10 @@ import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.WritesAttribute;
import org.apache.nifi.annotation.documentation.WritesAttributes;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -48,6 +51,15 @@ import org.apache.nifi.processor.util.StandardValidators;
@CapabilityDescription("Segments a FlowFile into multiple smaller segments on byte boundaries. Each segment is given the following attributes: "
+ "fragment.identifier, fragment.index, fragment.count, segment.original.filename; these attributes can then be used by the "
+ "MergeContent processor in order to reconstitute the original FlowFile")
@WritesAttributes({ @WritesAttribute(attribute = "segment.identifier", description = "All segments produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute. This attribute is added to maintain backward compatibility, but the fragment.identifier is preferred, as it is designed to work in conjunction with the MergeContent Processor"),
@WritesAttribute(attribute = "segment.index", description = "A one-up number that indicates the ordering of the segments that were created from a single parent FlowFile. This attribute is added to maintain backward compatibility, but the fragment.index is preferred, as it is designed to work in conjunction with the MergeContent Processor"),
@WritesAttribute(attribute = "segment.count", description = "The number of segments generated from the parent FlowFile. This attribute is added to maintain backward compatibility, but the fragment.count is preferred, as it is designed to work in conjunction with the MergeContent Processor"),
@WritesAttribute(attribute = "fragment.identifier", description = "All segments produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"),
@WritesAttribute(attribute = "fragment.index", description = "A one-up number that indicates the ordering of the segments that were created from a single parent FlowFile"),
@WritesAttribute(attribute = "fragment.count", description = "The number of segments generated from the parent FlowFile"),
@WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile"),
@WritesAttribute(attribute = "segment.original.filename ", description = "The filename will be updated to include the parent's filename, the segment index, and the segment count") })
@SeeAlso(MergeContent.class)
public class SegmentContent extends AbstractProcessor {
public static final String SEGMENT_ID = "segment.identifier";

View File

@ -28,35 +28,42 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.codec.binary.Hex;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.WritesAttribute;
import org.apache.nifi.annotation.documentation.WritesAttributes;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.util.NaiveSearchRingBuffer;
import org.apache.nifi.util.Tuple;
import org.apache.commons.codec.binary.Hex;
@EventDriven
@SideEffectFree
@SupportsBatching
@Tags({"content", "split", "binary"})
@CapabilityDescription("Splits incoming FlowFiles by a specified byte sequence")
@WritesAttributes({ @WritesAttribute(attribute = "fragment.identifier", description = "All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"),
@WritesAttribute(attribute = "fragment.index", description = "A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile"),
@WritesAttribute(attribute = "fragment.count", description = "The number of split FlowFiles generated from the parent FlowFile"),
@WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile") })
@SeeAlso(MergeContent.class)
public class SplitContent extends AbstractProcessor {
// attribute keys

View File

@ -30,6 +30,9 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.WritesAttribute;
import org.apache.nifi.annotation.documentation.WritesAttributes;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
@ -58,6 +61,13 @@ import java.util.UUID;
@SupportsBatching
@Tags({"split", "text"})
@CapabilityDescription("Splits a text file into multiple smaller text files on line boundaries, each having up to a configured number of lines")
@WritesAttributes({
@WritesAttribute(attribute = "text.line.count", description = "The number of lines of text from the original FlowFile that were copied to this FlowFile"),
@WritesAttribute(attribute = "fragment.identifier", description = "All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"),
@WritesAttribute(attribute = "fragment.index", description = "A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile"),
@WritesAttribute(attribute = "fragment.count", description = "The number of split FlowFiles generated from the parent FlowFile"),
@WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile") })
@SeeAlso(MergeContent.class)
public class SplitText extends AbstractProcessor {
// attribute keys

View File

@ -30,42 +30,53 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.ReadsAttribute;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.WritesAttribute;
import org.apache.nifi.annotation.documentation.WritesAttributes;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.FlowFileUnpackager;
import org.apache.nifi.util.FlowFileUnpackagerV1;
import org.apache.nifi.util.FlowFileUnpackagerV2;
import org.apache.nifi.util.FlowFileUnpackagerV3;
import org.apache.nifi.util.ObjectHolder;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream;
@EventDriven
@SideEffectFree
@SupportsBatching
@Tags({"Unpack", "un-merge", "tar", "zip", "archive", "flowfile-stream", "flowfile-stream-v3"})
@CapabilityDescription("Unpacks the content of FlowFiles that have been packaged with one of several different Packaging Formats, emitting one to many FlowFiles for each input FlowFile")
@ReadsAttribute(attribute = "mime.type", description = "If the <Packaging Format> property is set to use mime.type attribute, this attribute is used to determine the FlowFile's MIME Type. In this case, if the attribute is set to application/tar, the TAR Packaging Format will be used. If the attribute is set to application/zip, the ZIP Packaging Format will be used. If the attribute is set to application/flowfile-v3 or application/flowfile-v2 or application/flowfile-v1, the appropriate FlowFile Packaging Format will be used. If this attribute is missing, the FlowFile will be routed to 'failure'. Otherwise, if the attribute's value is not one of those mentioned above, the FlowFile will be routed to 'success' without being unpacked")
@WritesAttributes({
@WritesAttribute(attribute = "mime.type", description = "If the FlowFile is successfully unpacked, its MIME Type is no longer known, so the mime.type attribute is set to application/octet-stream."),
@WritesAttribute(attribute = "fragment.identifier", description = "All unpacked FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"),
@WritesAttribute(attribute = "fragment.index", description = "A one-up number that indicates the ordering of the unpacked FlowFiles that were created from a single parent FlowFile"),
@WritesAttribute(attribute = "fragment.count", description = "The number of unpacked FlowFiles generated from the parent FlowFile"),
@WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile. Extensions of .tar, .zip or .pkg are removed because the MergeContent processor automatically adds those extensions if it is used to rebuild the original FlowFile") })
@SeeAlso(MergeContent.class)
public class UnpackContent extends AbstractProcessor {
public static final String AUTO_DETECT_FORMAT = "use mime.type attribute";