diff --git a/nifi/nifi-docs/src/main/asciidoc/developer-guide.adoc b/nifi/nifi-docs/src/main/asciidoc/developer-guide.adoc index bfaa669418..1e9e5092f1 100644 --- a/nifi/nifi-docs/src/main/asciidoc/developer-guide.adoc +++ b/nifi/nifi-docs/src/main/asciidoc/developer-guide.adoc @@ -19,27 +19,1786 @@ NiFi Developer's Guide Apache NiFi Team :homepage: http://nifi.incubator.apache.org -The designed points of extension --------------------------------- -Processor, Prioritizer, ... -The NiFi Archive (NAR) and NiFi Classloading --------------------------------------------- -A NAR is ... +== Introduction -How to build extensions ------------------------ -Understanding Nars the first step is... +This Developer Guide is written by developers for developers. It is expected that before reading this +guide, you have a basic understanding of Apache NiFi (incubating) and the concepts of dataflow. If not, +please see the link:overview.html[NiFi Overview] and the link:user-guide.html[NiFi User Guide] to familiar +yourself with the concepts of NiFi. -Design considerations ---------------------- -The right abstraction ... +This guide also assumes that the reader is familiar with Java 7 and Apache Maven. + +The intent of this guide is to provide the reader with the information needed to understand how NiFi extensions +are developed and help to explain the thought process behind developing the components. + + +[[components]] +== NiFi Components + +NiFi provides several extension points to provide developers the +ability to add functionality +to the application to meet their needs. The following list provides a +high-level description +of the most common extension points: + +- Processor + - The Processor interface is the mechanism through which NiFi +exposes access to + FlowFiles, their attributes, and their content. The Processor is +the basic building + block used to comprise a NiFi dataflow. This interface is used +to accomplish + all of the following tasks: + + - Create FlowFiles + - Read FlowFile content + - Write FlowFile content + - Read FlowFile attributes + - Update FlowFile attributes + - Ingest data + - Egress data + - Route data + - Extract data + - Modify data + +- ReportingTask + - The ReportingTask interface is a mechanism that NiFi exposes to allow metrics, + monitoring information, and internal NiFI state to be published to external + sources, such as log files, e-mail, and remote web services. + +- ControllerService + - A ControllerService provides shared state across Processors, other ControllerServices, + and ReportingTasks within a single JVM. An example use case may include loading a very + large dataset into memory. By performing this work in a ControllerService, the data + can be loaded once and be exposed to all Processors via this service. + +- FlowFilePrioritizer + - The FlowFilePrioritizer interface provides a mechanism by which FlowFiles + in a queue can be prioritized, or sorted, so that the FlowFiles can be processed in an order + that is most effective for a particular use case. + +- AuthorityProvider + - An AuthorityProvide is responsible for determining which privileges and roles, if any, + a given user should be granted. + + +[[processor_api]] +== Processor API + +The Processor is the most widely used Component available in NiFi. +Processors are the only Component +to which access is given to create, remove, modify, or inspect +FlowFiles (data and attributes). + +All Processors are loaded and instantiated using Java's ServiceLoader +mechanism. This means that all +Processors must adhere to the following rules: + + - The Processor must have a default constructor. + - The Processor's JAR file must contain an entry in the META-INF/services directory named + `org.apache.nifi.processor.Processor`. This is a text file where each line contains the + fully-qualified class name of a Processor. + +While `Processor` is an interface that can be implemented directly, it +will be extremely rare to do so, as +the `org.apache.nifi.processor.AbstractProcessor` is the base class +for almost all Processor +implementations. The `AbstractProcessor` class provides a significant +amount of functionality, which makes +the task of developing a Processor much easier and more convenient. +For the scope of this document, +we will focus primarily on the `AbstractProcessor` class when dealing +with the Processor API. + +.Concurrency Note +NiFi is a highly concurrent framework. This means that all extensions +must be thread-safe. +If unfamiliar with writing concurrent software in Java, it is highly +recommended that you familiarize +yourself with the principles of Java concurrency. + + +[[supporting_api]] +=== Supporting API + +In order to understand the Processor API, we must first understand - +at least at a high level - several supporting classes and interfaces, which are discussed below. + +[[flowfile]] +==== FlowFile +A FlowFile is a logical notion that correlates a piece of data with a +set of Attributes about that data. +Such attributes include a FlowFile's unique identifier, as well as its +name, size, and any number of other +flow-specific values. While the contents and attributes of a FlowFile +can change, the FlowFile object is +immutable. Modifications to a FlowFile are made possible by the ProcessSession. + +[[process_session]] +==== ProcessSession +The ProcessSession, often referred to as simply a "session," provides +a mechanism by which FlowFiles +can be created, destroyed, examined, cloned, and transferred to other +Processors. Additionally, a +ProcessSession provides mechanism for creating modified versions of +FlowFiles, by adding or removing +attributes, or by modifying the FlowFile's content. The ProcessSession +also exposes a mechanism for +emitting provenance events that provide for the ability to track the +lineage and history of a FlowFile. +After operations are performed on one or more FlowFiles, a +ProcessSession can be either committed or +rolled back. + +[[process_context]] +==== ProcessContext +The ProcessContext provides a bridge between a Processor and the +framework. It provides information +about how the Processor is currently configured and allows the +Processor to perform +Framework-specific tasks, such as yielding its resources so that the +framework will schedule other +Processors to run without consuming resources unnecessarily. + + +[[property_descriptor]] +==== PropertyDescriptor +PropertyDescriptor defines a property that is to be used by a +Processor, ReportingTask, or ControllerService. +The definition of a property includes its name, a description of the +property, an optional default value, +validation logic, and an indicator as to whether or not the property +is required in order for the Processor +to be valid. PropertyDescriptors are created by instantiating an +instance of the `PropertyDescriptor.Builder` +class, calling the appropriate methods to fill in the details about +the property, and finally calling +the `build` method. + + +[[validator]] +==== Validator +A PropertyDescriptor may specify one or more Validators that can be +used to ensure that the user-entered value +for a property is valid. If a Validator indicates that a property +value is invalid, the Component will not be +able to be run or used until the property becomes valid. + + +[[validation_context]] +==== ValidationContext +When validating property values, a ValidationContext can be used to +obtain ControllerServices, +create PropertyValue objects, and compile and evaluate property values +using the Expression Language. + + +[[property_value]] +==== PropertyValue +All property values returned to a Processor are returned in the form +of a PropertyValue object. This +object has convenience methods for converting the value from a String +to other forms, such as numbers +and time periods, as well as providing an API for evaluating the +Expression Language. + + +[[relationship]] +==== Relationship +Relationships define the routes to which a FlowFile may be transfered +from a Processor. Relationships +are created by instantiating an instance of the `Relationship.Builder` +class, calling the appropriate methods +to fill in the details of the Relationship, and finally calling the +`build` method. + + +[[processor_initialization_context]] +==== ProcessorInitializationContext +After a Processor is created, its `initialize` method will be called +with an `InitializationContext` object. +This object exposes configuration to the Processor that will not +change throughout the life of the Processor, +such as the unique identifier of the Processor. + +[[ProcessorLog]] +==== ProcessorLog +Processors are encouraged to perform their logging via the +`ProcessorLog` interface, rather than obtaining +a direct instance of a third-party logger. This is because logging via +the ProcessorLog allows the framework +to render log messages that exceed s a configurable severity level to +the User Interface, allowing those who +monitor the dataflow to be notified when important events occur. +Additionally, it provides a consistent logging +format for all Processors by logging stack traces when in DEBUG mode +and providing the Processor's unique +identifier in log messages. + + + + + +[[AbstractProcessor]] +=== AbstractProcessor API + +Since the vast majority of Processors will be created by extending the +AbstractProcessor, it is the +abstract class that we will examine in this section. The +AbstractProcessor provides several methods that +will be of interest to Processor developers. + + +==== Processor Initialization + +When a Processor is created, before any other methods are invoked, the +`init` method of the +AbstractProcessor will be invoked. The method takes a single argument, +which is of type +`ProcessorInitializationContext`. The context object supplies the +Processor with a ProcessorLog, +the Processor's unique identifier, and a ControllerServiceLookup that +can be used to interact with the +configured ControllerServices. Each of these objects is stored by the +AbstractProcessor and may be obtained by +subclasses via the `getLogger`, `getIdentifier`, and +`getControllerServiceLookup` methods, respectively. + + +==== Exposing Processor's Relationships + +In order for a Processor to transfer a FlowFile to a new destination +for follow-on processing, the +Processor must first be able to expose to the Framework all of the +Relationships that it currently supports. +This allows users of the application to connect Processors to one +another by creating +Connections between Processors and assigning the appropriate +Relationships to those Connections. + +A Processor exposes the valid set of Relationships by overriding the +`getRelationships` method. +This method takes no arguments and returns a `Set` of `Relationship` +objects. For most Processors, this Set +will be static, but other Processors will generate the Set +dynamically, based on user configuration. +For those Processors for which the Set is static, it is advisable to +create an immutable Set in the Processor's +constructor or init method and return that value, rather than +dynamically generating the Set. This +pattern lends itself to cleaner code and better performance. + + +==== Exposing Processor Properties + +Most Processors will require some amount of user configuration before +they are able to be used. The properties +that a Processor supports are exposed to the Framework via the +`getSupportedPropertyDescriptors` method. +This method takes no arguments and returns a `List` of +`PropertyDescriptor` objects. The order of the objects in the +List is important in that it dictates the order in which the +properties will be rendered in the User Interface. + +A `PropertyDescriptor` object is constructed by creating a new +instance of the `PropertyDescriptor.Builder` object, +calling the appropriate methods on the builder, and finally calling +the `build` method. + +While this method covers most of the use cases, it is sometimes +desirable to allow users to configure +additional properties whose name are not known. This can be achieved +by overriding the +`getSupportedDynamicPropertyDescriptor` method. This method takes a +`String` as its only argument, which +indicates the name of the property. The method returns a +`PropertyDescriptor` object that can be used to validate +both the name of the property, as well as the value. Any +PropertyDescriptor that is returned from this method +should be built setting the value of `isDynamic` to true in the +`PropertyDescriptor.Builder` class. The default +behavior of AbstractProcessor is to not allow any dynamically created +properties. + + +==== Validating Processor Properties + +A Processor is not able to be started if its configuration is not +valid. Validation of a Processor property can +be achieved by setting a Validator on a PropertyDescriptor or by +restricting the allowable values for a +property via the PropertyDescriptor.Builder's `allowableValues` method +or `identifiesControllerService` method. + +There are times, though, when validating a Processor's properties +individually is not sufficient. For this purpose, +the AbstractProcessor exposes a `customValidate` method. The method +takes a single argument of type `ValidationContext`. +The return value of this method is a `Collection` of +`ValidationResult` objects that describe any problems that were +found during validation. Only those ValidationResult objects whose +`isValid` method returns `false` should be returned. +This method will be invoked only if all properties are valid according +to their associated Validators and Allowable Values. +I.e., this method will be called only if all properties are valid +in-and-of themselves, and this method allows for +validation of a Processor's configuration as a whole. + + +==== Responding to Changes in Configuration + +It is sometimes desirable to have a Processor eagerly react when its +properties are changed. The `onPropertyModified` +method allows a Processor to do just that. When a user changes the +property values for a Processor, the +`onPropertyModified` method will be called for each modified property. +The method takes three arguments: the PropertyDescriptor that +indicates which property was modified, +the old value, and the new value. If the property had no previous +value, the second argument will be `null`. If the property +was removed, the third argument will be `null`. It is important to +note that this method will be called regardless of whether +or not the values are valid. This method will be called only when a +value is actually modified, rather than being +called when a user updates a Processor without changing its value. At +the point that this method is invoked, it is guaranteed +that the thread invoking this method is the only thread currently +executing code in the Processor, unless the Processor itself +creates its own threads. + + +==== Performing the Work + +When a Processor has work to do, it is scheduled to do so by having +its `onTrigger` method called by the framework. +The method takes two arguments: a `ProcessContext` and a +`ProcessSession`. The first step in the `onTrigger` method +is often to obtain a FlowFile on which the work is to be performed by +calling one of the `get` methods on the ProcessSession. +For Processors that ingest data into NiFi from external sources, this +step is skipped. The Processor is then free to examine +FlowFile attributes; add, remove, or modify attributes; read or modify +FlowFile content; and transfer FlowFiles to the appropriate +Relationships. + + +==== When Processors are Triggered + +A Processor's `onTrigger` method will be called only when it is +scheduled to run and when work exists for the Processor. +Work is said to exist for a Processor if any of the following conditions is met: + +- A Connection whose destination is the Processor has at least one +FlowFile in its queue +- The Processors has no incoming Connections +- The Processor is annotated with the @TriggerWhenEmpty annotation + +Several factors exist that will contribute to when a Processor's +`onTrigger` method is invoked. First, the Processor will not +be triggered unless a user has configured the Processor to run. If a +Processor is scheduled to run, the Framework periodically +(the period is configured by users in the User Interface) checks if +there is work for the Processor to do, as described above. +If so, the Framework will check downstream destinations of the +Processor. If any of the Processor's outbound Connections is full, +by default, the Processor will not be scheduled to run. + +However, the `@TriggerWhenAnyDestinationAvailable` annotation may be +added to the Processor's class. In this case, the requirement +is changed so that only one downstream destination must be "available" +(a destination is considered "available" if the Connection's +queue is not full), rather than requiring that all downstream +destinations be available. + +Also related to Processor scheduling is the `@TriggerSerially` +annotation. Processors that use this Annotation will never have more +than one thread running the `onTrigger` method simultaneously. It is +crucial to note, though, that the thread executing the code +may change from invocation to invocation. Therefore, care must still +be taken to ensure that the Processor is thread-safe! + + + +=== Component Lifecycle + +The NiFi API provides lifecycle support through use of Java +Annotations. The `org.apache.nifi.annotations.lifecycle` package +contains +several annotations for lifecycle management. The following +Annotations may be applied to Java methods in a NiFi component to +indicate to +the framework when the methods should be called. For the discussion of +Component Lifecycle, we will define a NiFi component as a +Processor, ControllerServices, or ReportingTask. + +==== @OnAdded + +The `@OnAdded` annotation causes a method to be invoked as soon as a +component is created. The +component's `initialize` method (or `init` method, if subclasses +`AbstractProcessor`) will be invoked after the component is +constructed, +followed by methods that are annotated with `@OnAdded`. If any method +annotated with `@OnAdded` throws an Exception, an error will +be returned to the user, and that component will not be added to the +flow. Furthermore, other methods with this +Annotation will not be invoked. This method will be called only once +for the lifetime of a component. +Methods with this Annotation must take zero arguments. + + +==== @OnRemoved + +The `@OnRemoved` annotation causes a method to be invoked before a +component is removed from the flow. +This allows resources to be cleaned up before removing a component. +Methods with this annotation must take zero arguments. +If a method with this annotation throws an Exception, the component +will still be removed. + +==== @OnScheduled + +This annotation indicates that a method should be called every time +the component is scheduled to run. Because ControllerServices +are not scheduled, using this annotation on a ControllerService does +not make sense and will not be honored. It should be +used only for Processors and Reporting Tasks. If any method with this +annotation throws an Exception, other methods with this +annotation will not be invoked, and a notification will be presented +to the user. In this case, methods annotated with +`@OnUnscheduled` are then triggered, followed by methods with the +`@OnStopped` annotation (during this state, if any of these +methods throws an Exception, those Exceptions are ignored). The +component will then yield its execution for some period of time, +referred to as the "Administrative Yield Duration," which is a value +that is configured in the `nifi.properties` file. Finally, the +process will start again, until all of the methods annotated with +`@OnScheduled` have returned without throwing any Exception. +Methods with this annotation may take zero arguments or may take a +single argument. If the single argument variation is used, +the argument must be of type `ProcessContext` if the component is a +Processor or `ConfigurationContext` if the component +is a ReportingTask. + +==== @OnUnscheduled + +Methods with this annotation will be called whenever a Processor or +ReportingTask is no longer scheduled to run. At that time, many threads +may still be active in the Processor's `onTrigger` method. If such a method +throws an Exception, a log message will be generated, and the +Exception will be otherwise +ignored and other methods with this annotation will still be invoked. +Methods with this annotation may take zero arguments or may take a +single argument. +If the single argument variation is used, the argument must be of type +`ProcessContext` if the component is a Processor or +`ConfigurationContext` if the +component is a ReportingTask. + + +==== @OnStopped + +Methods with this annotation will be called when a Processor or +ReportingTask is no longer scheduled to run +and all threads have returned from the `onTrigger` method. If such a +method throws an Exception, +a lot message will be generated, and the Exception will otherwise be +ignored; other methods with +this annotation will still be invoked. Methods with this annotation +must take zero arguments. + + +==== @OnShutdown + +Any method that is annotated with the `@OnShutdown` annotation will be +called when NiFi is successfully +shut down. If such a method throws an Exception, a log message will be +generated, and the +Exception will be otherwise ignored and other methods with this +annotation will still be invoked. +Methods with this annotation must take zero arguments. Note: while +NiFi will attempt to invoke methods +with this annotation on all components that use it, this is not always +possible. For example, the process +may be killed unexpectedly, in which case it does not have a chance to +invoke these methods. Therefore, +while methods using this annotation can be used to clean up resources, +for instance, they should not be +relied upon for critical business logic. + + + + +=== Reporting Processor Activity + +Processors are responsible for reporting their activity so that users +are able to understand what happens +to their data. Processors should log events via the ProcessorLog, +which is accessible via the InitializationContext +or by calling the `getLogger` method of `AbstractProcessor`. + +Additionally, Processors should use the `ProvenanceReporter` +interface, obtained via the ProcessSession's +`getProvenanceReporter` method. The ProvenanceReoprter should be used +to indicate any time that content is +received from an external source or sent to an external location. The +ProvenanceReporter also has methods for +reporting when a FlowFile is cloned, forked, or modified, and when +multiple FlowFiles are merged into a single FlowFile +as well as associating a FlowFile with some other identifier. However, +these functions are less critical to report, as +the framework is able to detect these things and emit appropriate +events on the Processor's behalf. Yet, it is a best practice +for the Processor developer to emit these events, as it becomes +explicit in the code that these events are being emitted, and +the developer is able to provide additional details to the events, +such as the amount of time that the action took or +pertinent information about the action that was taken. If the +Processor emits an event, the framework will not emit a duplicate +event. Instead, it always assumes that the Processor developer knows +what is happening in the context of the Processor +better than the framework does. The framework may, however, emit a +different event. For example, if a Processor modifies both the +content of a FlowFile and its attributes and then emits only an +ATTRIBUTES_MODIFIED event, the framework will emit a CONTENT_MODIFIED +event. The framework will not emit an ATTRIBUTES_MODIFIED event if any +other event is emitted for that FlowFile (either by the +Processor or the framework). This is due to the fact that all +Provenance Events know about the attributes of the FlowFile before the +event occurred as well as those attributes that occurred as a result +of the processing of that FlowFile, and as a result the +ATTRIBUTES_MODIFIED is generally considered redundant and would result +in a rendering of the FlowFile lineage being very verbose. +It is, however, acceptable for a Processor to emit this event along +with others, if the event is considered pertinent from the +perspective of the Processor. + + + + + + + +== Documenting a Component + +NiFi attempts to make the user experience as simple and convenient as +possible by providing significant amount of documentation +to the user from within the NiFi application itself via the User +Interface. In order for this to happen, of course, Processor +developers must provide that documentation to the framework. NiFi +exposes a few different mechanisms for supplying documentation to +the framework. + + +=== Documenting Properties + +Individual properties can be documented by calling the `description` +method of a PropertyDescriptor's builder as such: + +[source,java] +---- +public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor.Builder() + .name("My Property") + .description("Description of the Property") + ... + .build(); +---- + +If the property is to provide a set of allowable values, those values +are presented to the user in a drop-down field in the UI. +Each of those values can also be given a description: + +[source,java] +---- +public static final AllowableValue EXTENSIVE = new AllowableValue("Extensive", "Extensive", + "Everything will be logged - use with caution!"); +public static final AllowableValue VERBOSE = new AllowableValue("Verbose", "Verbose", + "Quite a bit of logging will occur"); +public static final AllowableValue REGULAR = new AllowableValue("Regular", "Regular", + "Typical logging will occur"); + +public static final PropertyDescriptor LOG_LEVEL = new PropertyDescriptor.Builder() + .name("Amount to Log") + .description("How much the Processor should log") + .allowableValues(REGULAR, VERBOSE, EXTENSIVE) + .defaultValue(REGULAR.getValue()) + ... + .build(); +---- + + +=== Documenting Relationships + +Processor Relationships are documented in much the same way that +properties are - by calling the `description` method of a +Relationship's builder: + +[source,java] +---- +public static final Relationship MY_RELATIONSHIP = new Relationship.Builder() + .name("My Relationship") + .description("This relationship is used only if the Processor fails to process the data.") + .build(); +---- + + +=== Documenting Capability and Keywords + +The `org.apache.nifi.annotations.documentation` package provides Java +annotations that can be used to document components. The +CapabilityDescription +annotation can be added to a Processor, Reporting Task, or Controller +Service and is intended to provide a brief description of the +functionality +provided by the component. The Tags annotation has a `value` variable +that is defined to be an Array of Strings. As such, it is used +by providing multiple values as a comma-separated list of `String`s +with curly braces. These values are then incorporated into the UI by +allowing +users to filter the components based on a tag (i.e., a keyword). +Additionally, the UI provides a tag cloud that allows users to select +the tags that +they want to filter by. The tags that are largest in the cloud are +those tags that exist the most on the components in that instance of +NiFi. An +example of using these annotations is provided below: + +[source, java] +---- +@Tags({"example", "documentation", "developer guide", "processor", "tags"}) +@CapabilityDescription("Example Processor that provides no real functionality but is provided" + + " for an example in the Developer Guide") +public static final ExampleProcessor extends Processor { + ... +} +---- + + +=== Advanced Documentation + +When the documentation methods above are not sufficient, NiFi provides +the ability to expose more advanced documentation to the user via the +"Usage" documentation. When a user right-clicks on a Processor, NiFi +provides a "Usage" menu item in the context menu. Additionally, the +UI exposes a "Help" link in the top-right corner, from which the same +Usage information can be found. + +The advanced documentation of a Processor is provided as an HTML file. +This file should exist within a directory whose name is the +fully-qualified +name of the component, and this directory's parent should be named +`docs` and exist in the root of the Processor's jar. +The mechanism provided for this will be changing as of the 0.1.0 +release. At that time, this section will be updated to reflect +the new procedures for providing this advanced documentation. + + + +== Common Processor Patterns + +While there are many different Processors available to NiFi users, the +vast majority of them fall into +one of several common design patterns. Below, we discuss these +patterns, when the patterns are appropriate, +reasons we follow these patterns, and things to watch out for when +applying such patterns. Note that the patterns +and recommendations discussed below are general guidelines and not +hardened rules. + + +[[ingress]] +=== Data Ingress + +A Processor that ingests data into NiFi has a single Relationship +names `success`. This Processor generates +new FlowFiles via the ProcessSession `create` method and does not pull +FlowFiles from incoming Connections. +The Processor name starts with "Get" or "Listen," depending on whether +it polls an external source or exposes +some interface to which external sources can connect. The name ends +with the protocol used for communications. +Processors that follow this pattern include `GetFile`, `GetSFTP`, +`ListenHTTP`, and `GetHTTP`. + +This Processor may create or initialize a Connection Pool in a method +that uses the `@OnScheduled` annotation. +However, because communications problems may prevent connections from +being established or cause connections +to be terminated, connections themselves are not created at this +point. Rather, the connections are +created or leased from the pool in the `onTrigger` method. + +The `onTrigger` method of this Processor begins by leasing a +connection from the Connection Pool, if possible, +or otherwise creates a connection to the external service. When no +data is available from the +external source, the `yield` method of the ProcessContext is called by +the Processor and the method returns so +that this Processor avoids continually running and depleting resources +without benefit. Otherwise, this +Processor then creates a FlowFile via the ProcessSession's `create` +method and assigns an appropriate +filename and path to the FlowFile (by adding the `filename` and `path` +attributes), as well as any other +attributes that may be appropriate. An OutputStream to the FlowFile's content is +obtained via the ProcessSession's `write` method, passing a new +OutputStreamCallback (which is usually +an anonymous inner class). From within this callback, the Processor is +able to write to the FlowFile and streams +the content from the external resource to the FlowFile's OutputStream. +If the desire is to write the entire contents +of an InputStream to the FlowFile, the `importFrom` method of +ProcessSession may be more convenient to use than the +`write` method. + +When this Processor expects to receive many small files, it may be +advisable to create several FlowFiles from a +single session before committing the session. Typically, this allows +the Framework to treat the content of the +newly created FlowFiles much more efficiently. + +This Processor generates a Provenance event indicating that it has +received data and specifies from +where the data came. This Processor should log the creation of the +FlowFile so that the FlowFile's +origin can be determined by analyzing logs, if necessary. + +This Processor acknowledges receipt of the data and/or removes the +data from the external source in order +to prevent receipt of duplicate files. *This is done only after the +ProcessSession by which the FlowFile was +created has been committed!* Failure to adhere to this principle may +result in data loss, as restarting NiFi +before the session has been committed will result in the temporary +file being deleted. Note, however, that it +is possible using this approach to receive duplicate data because the +application could be restarted after +committing the session and before acknowledging or removing the data +from the external source. In general, though, +potential data duplication is preferred over potential data loss. The +connection is finally returned or added to +the Connection Pool, depending on whether the connection was leased +from the Connection Pool to begin with or +was created in the `onTrigger` method. + +If there is a communications problem, the connection is typically +terminated and not returned (or added) to +the Connection Pool. Connections to remote systems are torn down and +the Connection Pool shutdown in a method +annotated with the `@OnStopped` annotation so that resources can be reclaimed. + + +=== Data Egress + +A Processor that publishes data to an external source has two +Relationships: `success` and `failure`. The +Processor name starts with "Put" followed by the protocol that is used +for data transmission. Processors +that follow this pattern include `PutEmail`, `PutSFTP`, and +`PostHTTP` (note that the name does not +begin with "Put" because this would lead to confusion, since PUT and +POST have special meanings when dealing with +HTTP). + +This Processor may create or initialize a Connection Pool in a method +that uses the `@OnScheduled` annotation. +However, because communications problems may prevent connections from +being established or cause connections +to be terminated, connections themselves are not created at this +point. Rather, the connections are +created or leased from the pool in the `onTrigger` method. + +The `onTrigger` method first obtains a FlowFile from the +ProcessSession via the `get` method. If no FlowFile is +available, the method returns without obtaining a connection to the +remote resource. + +If at least one FlowFile is available, the Processor obtains a +connection from the Connection Pool, if possible, +or otherwise creates a new connection. If the Processor is neither +able to lease a connection from the Connection Pool +nor create a new connection, the FlowFile is routed to `failure`, the +event is logged, and the method returns. + +If a connection was obtained, the Processor obtains an InputStream to +the FlowFile's content by invoking the +`read` method on the ProcessSession and passing an InputStreamCallback +(which is often an anonymous inner class) +and from within that callback transmits the contents of the FlowFile +to the destination. The event is logged +along with the amount of time taken to transfer the file and the data +rate at which the file was transferred. +A SEND event is reported to the ProvenanceReporter by obtaining the +reporter from the ProcessSession via the +`getProvenanceReporter` method and calling the `send` method on the +reporter. The connection is returned or added +to the Connection Pool, depending on whether the connection was leased +from the pool or newly created by the +`onTrigger` method. + +If there is a communications problem, the connection is typically +terminated and not returned (or added) to +the Connection Pool. If there is an issue sending the data to the +remote resource, the desired approach for handling the +error depends on a few considerations. If the issue is related to a +network condition, the FlowFile is generally +routed to `failure`. The FlowFile is not penalized because there is +not necessary a problem with the data. Unlike the +case of the <> Processor, we typically do not call `yield` on +the ProcessContext. This is because in the case of +ingest, the FlowFile does not exist until the Processor is able to +perform its function. However, in the case of a Put Processor, +the DataFlow Manager may choose to route `failure` to a different +Processor. This can allow for a "backup" system to be +used in the case of problems with one system or can be used for load +distribution across many systems. + +If a problem occurs that is data-related, one of two approaches should +be taken. First, if the problem is likely to +sort itself out, the FlowFile is penalized and then routed to +`failure`. This is the case, for instance, with PutFTP, +when a FlowFile cannot be transferred because of a file naming +conflict. The presumption is that the file will eventually +be removed from the directory so that the new file can be transferred. +As a result, we penalize the FlowFile and route to +`failure` so that we can try again later. In the other case, if there +is an actual problem with the data (such as the data does +not conform to some required specification), a different approach may +be taken. In this case, it may be advantageous +to break apart the `failure` relationship into a `failure` and a +`communications failure` relationship. This allows the +DataFlow Manager to determine how to handle each of these cases +individually. It is important in these situations to document +well the differences between the two Relationships by clarifying it in +the "description" when creating the Relationship. + +Connections to remote systems are torn down and the Connection Pool +shutdown in a method +annotated with `@OnStopped` so that resources can be reclaimed. + + +=== Route Based on Content (One-to-One) + +A Processor that routes data based on its content will take one of two +forms: Route an incoming FlowFile to exactly +one destination, or route incoming data to 0 or more destinations. +Here, we will discuss the first case. + +This Processor has two relationships: `matched` and `unmatched`. If a +particular data format is expected, the Processor +will also have a `failure` relationship that is used when the input is +not of the expected format. The Processor exposes +a Property that indicates the routing criteria. + +If the Property that specifies routing criteria requires processing, +such as compiling a Regular Expression, this processing +is done in a method annotated with `@OnScheduled`, if possible. The +result is then stored in a member variable that is marked +as `volatile`. + +The `onTrigger` method obtains a single FlowFile. The method reads the +contents of the FlowFile via the ProcessSession's `read` +method, evaluating the Match Criteria as the data is streamed. The +Processor then determines whether the FlowFile should be +routed to `matched` or `unmatched` based on whether or not the +criteria matched, and routes the FlowFile to the appropriate +relationship. + +The Processor then emits a Provenance ROUTE event indicating which +Relationship to which the Processor routed the FlowFile. + +This Processor is annotated with the `@SideEffectFree` and +`@SupportsBatching` annotations from the `org.apache.nifi.annotations.behavior` +package. + + +=== Route Based on Content (One-to-Many) + +If a Processor will route a single FlowFile to potentially many +relationships, this Processor will be slightly different than +the above-described Processor for Routing Data Based on Content. This +Processor typically has Relationships that are dynamically +defined by the user as well as an `unmatched` relationship. + +In order for the user to be able to define additionally Properties, +the `getSupportedDynamicPropertyDescriptor` method must be +overridden. This method returns a PropertyDescriptor with the supplied +name and an applicable Validator to ensure that the +user-specified Matching Criteria is valid. + +In this Processor, the Set of Relationships that is returned by the +`getRelationships` method is a member variable that is +marked `volatile`. This Set is initially constructed with a single +Relationship named `unmatched`. The `onPropertyModified` method +is overridden so that when a Property is added or removed, a new +Relationship is created with the same name. If the Processor has +Properties that are not user-defined, it is important to check if the +specified Property is user-defined. This can be achieved by +calling the `isDynamic` method of the PropertyDescriptor that is +passed to this method. If this Property is dynamic, +a new Set of Relationships is then created, and the previous set of +Relationships is copied into it. This new Set +either has the newly created Relationship added to it or removed from +it, depending on whether a new Property was added +to the Processor or a Property was removed (Property removal is +detected by check if the third argument to this function is `null`). +The member variable holding the Set of Relationships is then updated +to point to this new Set. + +If the Properties that specify routing criteria require processing, +such as compiling a Regular Expression, this processing is done +in a method annotated with `@OnScheduled`, if possible. The result is +then stored in a member variable that is marked as `volatile`. +This member variable is generally of type `Map` where the key is of +type `Relationship` and the value's type is defined by the result of +processing the property value. + +The `onTrigger` method obtains a FlowFile via the `get` method of +ProcessSession. If no FlowFile is available, it returns immediately. +Otherwise, a Set of type Relationship is created. The method reads the +contents of the FlowFile via the ProcessSession's `read` method, +evaluating each of the Match Criteria as the data is streamed. For any +criteria that matches, the relationship associated with that Match +Criteria is added to the Set of Relationships. + +After reading the contents of the FlowFile, the method checks if the +Set of Relationships is empty. If so, the original FlowFile has +an attribute added to it to indicate the Relationship to which it was +routed and is routed to the `unmatched`. This is logged, a +Provenance ROUTE event is emitted, and the method returns. If the size +of the Set is equal to 1, the original FlowFile has an attribute +added to it to indicate the Relationship to which it was routed and +is routed to the Relationship specified by the entry in the Set. +This is logged, a Provenance ROUTE event is emitted for the FlowFile, +and the method returns. + +In the event that the Set contains more than 1 Relationship, the +Processor creates a clone of the FlowFile for each Relationship, +except +for the first. This is done via the `clone` method of the +ProcessSession. There is no need to report a CLONE Provenance Event, +as the +framework will handle this for you. The original FlowFile and each +clone are routed to their appropriate Relationship with attribute +indicating the name of the Relationship. A Provenance ROUTE event is +emitted for each FlowFile. This is logged, and the method returns. + +This Processor is annotated with the `@SideEffectFree` and +`@SupportsBatching` annotations from the +`org.apache.nifi.annotations.behavior` +package. + + +=== Route Streams Based on Content (One-to-Many) + +The previous description of Route Based on Content (One-to-Many) +provides an abstraction +for creating a very powerful Processor. However, it assumes that each +FlowFile will be routed +in its entirety to zero or more Relationships. What if the incoming +data format is a "stream" of +many different pieces of information - and we want to send different +pieces of this stream to +different Relationships? For example, imagine that we want to have a +RouteCSV Processor such that +it is configured with multiple Regular Expressions. If a line in the +CSV file matches a Regular +Expression, that line should be included in the outbound FlowFile to +the associated relationship. +If a Regular Expression is associated with the Relationship +"has-apples" and that Regular Expression +matches 1,000 of the lines in the FlowFile, there should be one outbound +FlowFile for the "has-apples" relationship that has 1,000 lines in it. +If a different Regular Expression +is associated with the Relationship "has-oranges" and that Regular +Expression matches 50 lines in the +FlowFile, there should be one outbound FlowFile for the "has-oranges" +relationship that has 50 lines in it. +I.e., one FlowFile comes in and two FlowFiles come out. The two +FlowFiles may contain some of the same lines +of text from the original FlowFile, or they may be entirely different. +This is the type of Processor that +we will discuss in this section. + +This Processor's name starts with "Route" and ends with the name of +the data type that it routes. In our +example here, we are routing CSV data, so the Processor is named +RouteCSV. This Processor supports dynamic +properties. Each user-defined property has a name that maps to the +name of a Relationship. The value of +the Property is in the format necessary for the "Match Criteria." In +our example, the value of the property +must be a valid Regular Expression. + +This Processor maintains an internal `ConcurrentMap` where the key is +a `Relationship` and the value is of +a type dependent on the format of the Match Criteria. In our example, +we would maintain a +`ConcurrentMap`. This Processor overrides the +`onPropertyModified` method. +If the new value supplied to this method (the third argument) is null, +the Relationship whose name is +defined by the property name (the first argument) is removed from the +ConcurrentMap. Otherwise, the new value +is processed (in our example, by calling `Pattern.compile(newValue)`) +and this value is added to the ConcurrentMap +with the key again being the Relationship whose name is specified by +the property name. + +This Processor will override the `customValidate` method. In this +method, it will retrieve all Properties from +the `ValidationContext` and count the number of PropertyDescriptors +that are dynamic (by calling `isDynamic()` +on the PropertyDescriptor). If the number of dynamic +PropertyDescriptors is 0, this indicates that the user +has not added any Relationships, so the Processor returns a +`ValidationResult` indicating that the Processor +is not valid because it has no Relationships added. + +The Processor returns all of the Relationships specified by the user +when its `getRelationships` method is +called and will also return an `unmatched` Relationship. Because this +Processor will have to read and write to the +Content Repository (which can be relatively expensive), if this +Processor is expected to be used for very high +data volumes, it may be advantageous to add a Property that allows the +user to specify whether or not they care +about the data that does not match any of the Match Criteria. + +When the `onTrigger` method is called, the Processor obtains a +FlowFile via `ProcessSession.get`. If no data +is available, the Processor returns. Otherwise, the Processor creates +a `Map`. We will +refer to this Map as `flowFileMap`. The Processor reads the incoming +FlowFile by calling `ProcessSession.read` +and provides an `InputStreamCallback`. +From within the Callback, the Processor reads the first piece of data +from the FlowFile. The Processor then +evaluates each of the Match Criteria against this piece of data. If a +particular criteria (in our example, +a Regular Expression) matches, the Processor obtains the FlowFile from +`flowFileMap` that belongs to the appropriate +Relationship. If no FlowFile yet exists in the Map for this +Relationship, the Processor creates a new FlowFile +by calling `session.create(incomingFlowFile)` and then adds the new +FlowFile to `flowFileMap`. The Processor then +writes this piece of data to the FlowFile by calling `session.append` +with an `OutputStreamCallback`. From within +this OutputStreamCallback, we have access to the new FlowFile's +OutputStream, so we are able to write the data +to the new FlowFile. We then return from the OutputStreamCallback. +After iterating over each of the Match Criteria, +if none of them match, we perform the same routines as above for the +`unmatched` relationship (unless the user +configures us to not write out unmatched data). Now that we have +called `session.append`, we have a new version of +the FlowFile. As a result, we need to update our `flowFileMap` to +associate the Relationship with the new FlowFile. + +If at any point, an Exception is thrown, we will need to route the +incoming FlowFile to `failure`. We will also +need to remove each of the newly created FlowFiles, as we won't be +transferring them anywhere. We can accomplish +this by calling `session.remove(flowFileMap.values())`. At this point, +we will log the error and return. + +Otherwise, if all is successful, we can now iterate through the +`flowFileMap` and transfer each FlowFile to the +corresponding Relationship. The original FlowFile is then either +removed or routed to an `original` relationship. +For each of the newly created FlowFiles, we also emit a Provenance +ROUTE event indicating which Relationship +the FlowFile went to. It is also helpful to include in the details of +the ROUTE event how many pieces of information +were included in this FlowFile. This allows DataFlow Managers to +easily see when looking at the Provenance +Lineage view how many pieces of information went to each of the +relationships for a given input FlowFile. + +Additionally, some Processors may need to "group" the data that is +sent to each Relationship so that each FlowFile +that is sent to a relationship has the same value. In our example, we +may wan to allow the Regular Expression +to have a Capturing Group and if two different lines in the CSV match +the Regular Expression but have different +values for the Capturing Group, we want them to be added to two +different FlowFiles. The matching value could then +be added to each FlowFile as an Attribute. This can be accomplished by +modifying the `flowFileMap` such that +it is defined as `Map>` where `T` is +the type of the Grouping Function (in our +example, the Group would be a `String` because it is the result of +evaluating a Regular Expression's +Capturing Group). + + + +=== Route Based on Attributes + +This Processor is almost identical to the Route Data Based on Content +Processors described above. It takes two different forms: One-to-One +and +One-to-Many, as do the Content-Based Routing Processors. This +Processor, however, does not make any call to ProcessSession's `read` +method, +as it does not read FlowFile content. This Processor is typically very +fast, so the `@SupportsBatching` annotation can be very important +in this case. + + + +=== Split Content (One-to-Many) + +This Processor generally requires no user configuration, with the +exception of the size of each Split to create. The `onTrigger` method +obtains +a FlowFile from its input queues. A List of type FlowFile is created. +The original FlowFile is read via the ProcessSession's `read` method, +and an InputStreamCallback is used. Within the InputStreamCallback, +the content is read until a point is reached at which the FlowFile +should be +split. If no split is needed, the Callback returns, and the original +FlowFile is routed to `success`. In this case, a Provenance ROUTE +event +is emitted. Typically, ROUTE events are not emitted when routing a +FlowFile to `success` because this generates a very verbose lineage +that +becomes difficult to navigate. However, in this case,the event is +useful because we would otherwise expect a FORK event and the absence +of +any event is likely to cause confusion. The fact that the FlowFile was +not split but was instead transferred to `success` is logged, and the +method returns. + +If a point is reached at which a FlowFile needs to be split, a new +FlowFile is created via the ProcessSession's `create(FlowFile)` method +or the +`clone(FlowFile, long, long)` method. The next section of code depends +on whether the `create` method is used or the `clone` method is used. +Both methods are described below. Which solution is appropriate must +be determined on a case-by-case basis. + +The Create Method is most appropriate when the data will not be +directly copied from the original FlowFile to the new FlowFile. +For example, if only some of the data will be copied, or if the data +will be modified in some way before being copied to the new +FlowFile, this method is necessary. However, if the content of the new +FlowFile will be an exact copy of a portion of the original +FlowFile, the Clone Method is much preferred. + +*Create Method* +If using the `create` method, the method is called with the original +FlowFile as the argument so that the newly created FlowFile will +inherit +the attributes of the original FlowFile and a Provenance FORK event +will be created by the framework. + +The code then enters a `try/finally` block. Within the `finally` +block, the newly created FlowFile is added to the List of FlowFiles +that have +been created. This is done within a `finally` block so that if an +Exception is thrown, the newly created FlowFile will be appropriately +cleaned up. +Within the `try` block, the callback initiates a new callback by +calling the ProcessSession's `write` method with an +OutputStreamCallback. +The appropriate data is then copied from the InputStream of the +original FlowFile to the OutputStream for the new FlowFile. + +*Clone Method* +If the content of the newly created created FlowFile is to be only a +contiguous subset of the bytes of the original FlowFile, it is +preferred +to use the `clone(FlowFile, long, long)` method instead of the +`create(FlowFile)` method of the ProcessSession. In this case, the +offset +of the original FlwoFile at which the new FlowFile's content should +begin is passed as the second argument to the `clone` method. The +length +of the new FlowFile is passed as the third argument to the `clone` +method. For example, if the original FlowFile was 10,000 bytes +and we called `clone(flowFile, 500, 100)`, the FlowFile that would be +returned to us would be identical to `flowFile` with respect to its +attributes. However, the content of the newly created FlowFile would +be 100 bytes in length and would start at offset 500 of the original +FlowFile. That is, the contents of the newly created FlowFile would be +the same as if you had copied bytes 500 through 599 of the original +FlowFile. + +After the clone has been created, it is added to the List of FlowFiles. + +This method is much more highly preferred than the Create method, when +applicable, +because no disk I/O is required. The framework is able to simply +create a new FlowFile +that references a subset of the original FlowFile's content, rather +than actually copying +the data. However, this is not always possible. For example, if header +information must be copied +from the beginning of the original FlowFile and added to the beginning +of each Split, +then this method is not possible. + + +*Both Methods* +Regardless of whether the Clone Method or the Create Method is used, +the following is applicable. + +If at any point in the InputStreamCallback, a condition is reached in +which processing cannot continue +(for example, the input is malformed), a `ProcessException` should be +thrown. The call to the +ProcesssSession's `read` method is wrapped in a `try/catch` block +where `ProcessException` is +caught. If an Exception is caught, a log message is generated +explaining the error. The List of +newly created FlowFiles is removed via the ProcessSession's `remove` +method. The original FlowFile +is routed to `failure`. + +If no problems arise, the original FlowFile is routed to `original` +and all newly created FlowFiles +are updated to include the following attributes: + +[options="header"] +|=== +| Attribute Name | Description +| `split.parent.uuid` | The UUID of the original FlowFile +| `split.index` | A one-up number indicating which FlowFile in the list this is (the first FlowFile + created will have a value `0`, the second will have a value `1`, etc.) +| `split.count` | The total number of split FlowFiles that were created +|=== + +The newly created FlowFiles are routed to `success`; this event is +logged; and the method returns. + + +=== Update Attributes Based on Content + +This Processor is very similar to the Route Based on Content +Processors discussed above. Rather than +routing a FlowFile to `matched` or `unmatched`, the FlowFile is +generally routed to `success` or `failure` +and attributes are added to the FlowFile as appropriate. The +attributes to be added are configured in a +manner similar to that of the Route Based on Content (One-to-Many), +with the user defining their own +properties. The name of the property indicates the name of an +attribute to add. The value of the +property indicates some Matching Criteria to be applied to the data. +If the Matching Criteria matches +the data, an attribute is added with the name the same as that of the +Property. The value of the +attribute is the criteria from the content that matched. + +For example, a Processor that evaluates XPath Expressions may allow +user-defined XPaths to be +entered. If the XPath matches the content of a FlowFile, that FlowFile +will have an attribute added with +the name being equal to that of the Property name and a value equal to +the textual content of the XML Element or +Attribute that matched the XPath. The `failure` relationship would +then be used if the incoming FlowFile +was not valid XML in this example. The `success` relationship would be +used regardless of whether or not +any matches were found. This can then be used to route the FlowFile +when appropriate. + +This Processor emits a Provenance Event of type ATTRIBUTES_MODIFIED. + + +=== Enrich/Modify Content + +The Enrich/Modify Content pattern is very common and very generic. +This pattern is responsible for any +general content modification. For the majority of cases, this +Processor is marked with the +`@SideEffectFree` and `@SupportsBatching` annotations. The Processor +has any number of required and optional +Properties, depending on the Processor's function. The Processor +generally has a `success` and `failure` relationship. +The `failure` relationship is generally used when the input file is +not in the expected format. + +This Processor obtains a FlowFile and updates it using the +ProcessSession's `write(StreamCallback)` method +so that it is able to both read from the FlowFile's content and write +to the next version of the FlowFile's +content. If errors are encountered during the callback, the callback +will throw a `ProcessException`. The +call to the ProcessSession's `write` method is wrapped in a +`try/catch` block that catches `ProcessException` +and routes the FlowFile to failure. + +If the callback succeeds, a CONTENT_MODIFIED Provenance Event is emitted. + + + +== Error Handling + +=== Exceptions within a callback: IOException, RuntimeException + +=== Exceptions within the Processor: ProcessException, others. + +=== Catching ProcessException from callback + +=== Penalization vs. Yielding + +=== Session Rollback: with and within penalization + +=== Administrative Yielding + + + +== General Design Considerations + +=== Cohesion / Reusability : Do one thing and do it well! + +=== Naming Conventions + +=== Processor Behavior Annotations + +=== Data Buffering + + + + + + +[[controller-services]] +== Controller Services + +The `ControllerService` interface allows developers to share +functionality and state across the JVM in a clean +and consistent manner. The interface resembles that of the `Processor` +interface but does not +have an `onTrigger` method because Controller Services are not +scheduled to run periodically, and +Controller Services do not have Relationships because they are not +integrated into the flow directly. Rather, +they are used Processors, Reporting Tasks, and other Controller Services. + +[[developing-controller-service]] +=== Developing a ControllerService + +Just like with the Processor interface, the ControllerService +interface exposes methods for configuration, +validation, and initialization. These methods are all identical to +those of the Processor interface +except that the `initialize` method is passed a +`ControllerServiceInitializationContext`, rather +than a `ProcessorInitializationContext`. + +Controller Services come with an additional constraint that Processors +do not have. A Controller Service +must be comprised of an interface that extends `ControllerService`. +Implementations can then be interacted +with only through their interface. A Processor, for instance, will +never be given a concrete implementation of +a ControllerService and therefore must reference the service only via +interfaces that extends `ControllerService`. + +This constraint is in place mainly because a Processor can exist in +one NiFi Archive (NAR) while the implementation +of the Controller Service that the Processor lives in can exist in a +different NAR. This is accomplished by +the framework by dynamically implementing the exposed interfaces in +such a way that the framework can +switch to the appropriate ClassLoader and invoke the desired method on +the concrete implementation. However, +in order to make this work, the Processor and the Controller Service +implementation must share the same definition +of the Controller Service interface. Therefore, both of these NARs +must depend on the NAR that houses the +Controller Service's interface. See <> for more information. + + +[[interacting-with-controller-service]] +=== Interacting with a ControllerService + +ControllerServices may be obtained by a Processor, another +ControllerService, or a ReportingTask +by means of the ControllerServiceLookup or by using the +`identifiesControllerService` method of the +PropertyDescriptor's Builder class. The ControllerServiceLookup can be +obtained by a Processor from the +ProcessorInitializationContext that is passed to the `initialize` +method. Likewise, it is obtained by +a ControllerService from the ControllerServiceInitializationContext +and by a ReportingTask via the +ReportingConfiguration object passed to the `initialize` method. + +For most use cases, though, using the `identifiesControllerService` +method of a PropertyDescriptor Builder +is preferred and is the least complicated method. In order to use this +method, we create a PropertyDescriptor +that references a Controller Service as such: + +[source,java] +---- +public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .details("Specified the SSL Context Service that can be used to create secure connections") + .required(true) + .identifiesControllerService(SSLContextService.class) + .build(); +---- + +Using this method, the user will be prompted to supply the SSL Context +Service that should be used. This is +done by providing the user with a drop-down menu from which they are +able to choose any of the SSLContextService +configurations that have been configured, regardless of the implementation. + +In order to make use of this service, the Processor can use code such as: + +[source,java] +---- +final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE) + .asControllerService(SSLContextService.class); +---- + +Note here that `SSLContextService` is an interface that extends +ControllerService. The only implementation +at this time is the `StandardSSLContextService`. However, the +Processor developer need not worry about this +detail. + + + + + + + +== Reporting Tasks + +So far, we have mentioned little about how to convey to the outside +world how NiFi and its components +are performing. Is the system able to keep up with the incoming data +rate? How much more can +the system handle? How much data is processed at the peak time of day +versus the least busy time of day? + +In order to answer these questions, and many more, NiFi provides a +capability for reporting status, +statistics, metrics, and monitoring information to external services +by means of the `ReportingTask` +interface. ReportingTasks are given access to a host of information to +determine how the system is performing. + + +=== Developing a Reporting Task + +Just like with the Processor and ControllerService interfaces, the +ReportingTask interface exposes methods for +configuration, validation, and initialization. These methods are all +identical to those of the +Processor and ControllerService interfaces except that the +`initialize` method is passed a `ReportingConfiguration` +object, as opposed to the initialization objects received by the other +Components. The ReportingTask also has +an `onTrigger` method that is invoked by the framework to trigger the +task to perform its job. + +Within the `onTrigger` method, the ReportingTask is given access to a +ReportingContext, from which configuration +and information about the NiFi instance can be obtained. The +BulletinRepository allows Bulletins to be queried +and allows the ReportingTask to submit its own Bulletins, so that +information will be rendered to users. The +ControllerServiceLookup that is accessible via the Context provides +access to ControllerServices that have been +configured. However, this method of obtaining Controller Services is +not the preferred method. Rather, the +preferred method for obtaining a Controller Service is to reference +the Controller Service in a PropertyDescriptor, +as is discussed in the <> section. + +The `EventAccess` object that is exposed via the ReportingContext +provides access to the `ProcessGroupStatus`, +which exposes statistics about the amount of data processed in the +past five minutes by Process Groups, +Processors, Connections, and other Components. Additionally, the +EventAccess object provides access to +the `ProvenanceEventRecord`s +that have been stored in the `ProvenanceEventRepository`. These +Provenance Events are emitted by Processors when +data is received from external sources, emitted to external services, +removed from the system, modified, +or routed according to some decision that was made. + +Each ProvenanceEvent has the ID of the FlowFile, the type of Event, +the creation time of the Event, and +all FlowFile attributes associated with the FlowFile at the time that +the FlowFile was accessed by the component +as well as the FlowFile attributes that were associated with the +FlowFile as a result of the processing that the +event describes. This provides a great deal of information to +ReportingTasks, allowing reports to be generated +in many different ways to expose metrics and monitoring capabilities +needed for any number of operational concerns. + + + + + + +== Testing + +Testing the components that will be used within a larger framework can often be very cumbersome +and tricky. With NiFi, we strive to make testing components as easy as possible. In order to do this, +we have created a `nifi-mock` module that can be used in conjunction with JUnit to provide extensive +testing of components. + +The Mock Framework is mostly aimed at testing Processors, as these are by far the most commonly +developed extension point. However, the framework does provide the ability to test Controller Services +as well. + +Components have typically been tested by creating functional tests to verify component behavior. This is +done because often a Processor will consist of a handful of helper methods but the logic will largely be +encompassed within the `onTrigger` method. The `TestRunner` interface allows us to test Processors +and Controller Services by converting more "primitive" objects such as files and byte arrays into +FlowFiles and handles creating the ProcessSessions and ProcessContexts needed for a Processor to do its job, +as well as invoking the necessary lifecycle methods in order to ensure that the Processor behaves the +same way in the unit tests as it does in production. + + +=== Instantiate TestRunner + +Most unit tests for a Processor or a Controller Service start by creating an instance of the `TestRunner` +class. In order to add the necessary classes to your Processor, +you can use the Maven dependency: + +[source] +---- + + org.apache.nifi + nifi-mock + ${nifi version} + +---- + +We create a new `TestRunner` by calling the static `newTestRunner` method of the `TestRunners` class +(located in the `org.apache.nifi.util` package). This method takes a single argument. That argument can +either be the class of the Processor to test or can be an instance of a Processor. + +=== Add ControllerServices + +After creating a new Test Runner, we can add any Controller Services to the Test Runner that our Processor +will need in order to perform its job. We do this by calling the `addControllerService` method and supply +both an identifier for the Controller Service and an instance of the Controller Service. + +If the Controller Service needs to be configured, its properties can be set by +calling the `setProperty(ControllerService, PropertyDescriptor, String)`, `setProperty(ControllerService, String, String)`, +or `setProperty(ControllerService, PropertyDescriptor, AllowableValue)` method. Each of these methods returns a +`ValidationResult`. This object can then be inspected to ensure that the property is valid by calling `isValid`. +Annotation data can be set by calling the `setAnnotationData(ControllerService, String)` method. + +We can now ensure that the Controller Service is valid by calling `assertValid(ControllerService)` - or ensure +that the configured values are not valid, if testing the Controller Service itself, by calling +`assertNotValid(ControllerService)`. + +Once a Controller Service has been added to the Test Runner and configured, it can now be enabled by calling the +`enableControllerService(ControllerService)` method. If the Controller Service is not valid, this method +will throw an IllegalStateException. Otherwise, the service is now ready to use. + + + +=== Set Property Values + +After configuring any necessary Controller Services, we need to configure our Processor. We can do this by +calling the same methods as we do for Controller Services, without specifying any Controller Service. I.e., +we can call `setProperty(PropertyDescriptor, String)`, and so on. Each of the `setProperty` methods again +returns a `ValidationResult` property that can be used to ensure that the property value is valid. + +Similarly, we can also call `assertValid()` and `assertNotValid()` to ensure that the configuration of the +Processor is valid or not, according to our expectations. + + +=== Enqueue FlowFiles + + + + +=== Run the Processor + + + +=== Validate Output Counts + + + +=== Validate Output Results + + + +=== Mocking Connections + + + +=== Additional Testing Capabilities +In addition to the above-mentioned capabilities provided by the +testing framework, the TestRunner provides several +convenience methods for verifying the behavior of a Processor. Methods +are provided for ensuring that the Processor's +Input Queue has been emptied. Unit Tests are able to obtain the +ProcessContext, ProcessSessionFactory, ProvenanceReporter, +and other framework-specific entities that will be used by the +TestRunner. The `shutdown` method provides the ability to +test Processor methods that are annotated to be run only on shutdown +of NiFi. Annotation Data can be set for Processors +that make use of Custom User Interfaces. Finally, the number of +threads that should be used to run the Processor can +be set via the `setThreadCount(int)` method. + + + + + + +[[nars]] +== NiFi Archives (NARs) + +When software from many different organizations is all hosted within +the same environment, Java ClassLoaders quickly +become a concern. If multiple components have a dependency on the same +library but each depends on a different +version, many problems arise, typically resulting in unexpected +behavior or `NoClassDefFoudnError` errors occurring. +In order to prevent these issues from becoming problematic, NiFi +introduces the notion of a NiFi Archive, or NAR. + +A NAR allows several components and their dependencies to be packaged +together into a single package. +The NAR package is then provided ClassLoader isolation from other NAR +packages. Developers should always deploy +their NiFi components as NAR packages. + +To achieve this, a developer creates a new Maven Artifact, which we +refer to as the NAR artifact. The packaging is +set to `nar`. The `dependencies` section of the POM is then created so +that the NAR has a dependency on all NiFi +Components that are to be included within the NAR. + +TODO: DISCUSS HOW TO INCLUDE THE NAR MAVEN PLUGIN. + +The NAR is able to have one dependency that is of type `nar`. If more +than one dependency is specified that is of type +`nar`, then the nifi-nar-maven-plugin will error. If NAR A adds a +dependency on NAR B, this will *not* result in +NAR B packaging all of the components of NAR A. Rather, this will add +a `Nar-Dependency-Id` element to the `MANIFEST.MF` +file of NAR A. This will result in setting the ClassLoader of NAR B as +the Parent ClassLoader of NAR A. In this case, +we refer to NAR B as the _Parent_ of NAR A. + +This linkage of Parent ClassLoaders is the mechanism that NiFi uses in +order to enable Controller Services to be shared +across all NARs. As mentioned in the <> +section, A Controller Service must be separated +into an interface that extends `ControllerService` and an +implementation that implements that interface. Controller Services +can be referenced from any Processor, regardless of which NAR it is +in, as long as both the Controller Service Implementation +and the Processor share the same definition of the Controller Service interface. + +In order to share this same definition, both the Processor's NAR and +the Controller Service Implementation's NAR must have +as a Parent the Controller Service definition's NAR. An example +hierarchy may look like this: + +.Controller Service NAR Layout +------------------------------------------------------------------------------------------------ +root +├── my-controller-service-api +│ ├── pom.xml +│ └── src +│ └── main +│ └── java +│ └── org +│ └── my +│ └── services +│ └── MyService.java +│ +├── my-controller-service-api-nar +│ └── pom.xml <1> +│ +│ +│ +├── my-controller-service-impl +│ ├── pom.xml <2> +│ └── src +│ ├── main +│ │ ├── java +│ │ │ └── org +│ │ │ └── my +│ │ │ └── services +│ │ │ └── MyServiceImpl.java +│ │ └── resources +│ │ └── META-INF +│ │ └── services +│ │ └── org.apache.nifi.controller.ControllerService +│ └── test +│ └── java +│ └── org +│ └── my +│ └── services +│ └── TestMyServiceImpl.java +│ +│ +├── my-controller-service-nar +│ └── pom.xml <3> +│ +│ +└── other-processor-nar + └── pom.xml <3> + + + + +------------------------------------------------------------------------------------------------ +<1> This POM file has a type of `nar`. It has a dependency on +`nifi-standard-services-api-nar`. +<2> This POM file is of type `jar`. It has a dependency on +`my-controller-service-api`. It does *not* have a dependency on any +`nar` artifacts. +<3> This POM file has a type of `nar`. It has a dependency on +`my-controller-service-api-nar`. + + +While these may seem very complex at first, after creating such a +hierarchy once or twice, it becomes +far less complicated. Note here that the +`my-controller-service-api-nar` has a dependency on +`nifi-standard-services-api-nar`. +This is done so that any NAR that has a dependency on +`my-controller-service-api-nar` will also be able to access +all of the Controller Services that are provided by the +`nifi-standard-services-api-nar`, such as the SSLContextService. +In this same vane, it is not necessary to create a different +"service-api" NAR for each service. Instead, it often +makes sense to have a single "service-api" NAR that encapsulates the +API's for many different Controller Services, +as is done by the `nifi-standard-services-api-nar`. Generally, the API +will not include extensive dependencies, and as a result, +ClassLoader isolation may be less important, so lumping together many +API artifacts into the same NAR is often acceptable. + + + + + + + +== Consider the User Experience -Consider the User Experience ----------------------------- The user... -How to contribute to Apache NiFi --------------------------------- + + + +== How to contribute to Apache NiFi + Git, Maven, ASF processes, NiFi processes, ...