mirror of https://github.com/apache/nifi.git
NIFI-152: Updated developer guide
This commit is contained in:
parent
5830f005f5
commit
bafa945a76
|
@ -22,64 +22,60 @@ Apache NiFi Team <dev@nifi.incubator.apache.org>
|
||||||
|
|
||||||
== Introduction
|
== Introduction
|
||||||
|
|
||||||
This Developer Guide is written by developers for developers. It is expected that before reading this
|
The intent of this Developer Guide is to provide the reader with the information needed to understand how Apache NiFi (incubating)
|
||||||
guide, you have a basic understanding of Apache NiFi (incubating) and the concepts of dataflow. If not,
|
extensions are developed and help to explain the thought process behind developing the components. It provides an introduction to
|
||||||
please see the link:overview.html[NiFi Overview] and the link:user-guide.html[NiFi User Guide] to familiar
|
and explanation of the API that is used to develop extensions. It does not, however, go into great detail about each
|
||||||
yourself with the concepts of NiFi.
|
of the methods in the API, as this guide is intended to supplement the JavaDocs of the API rather than replace them.
|
||||||
|
|
||||||
This guide also assumes that the reader is familiar with Java 7 and Apache Maven.
|
This guide also assumes that the reader is familiar with Java 7 and Apache Maven.
|
||||||
|
|
||||||
The intent of this guide is to provide the reader with the information needed to understand how NiFi extensions
|
This guide is written by developers for developers. It is expected that before reading this
|
||||||
are developed and help to explain the thought process behind developing the components.
|
guide, you have a basic understanding of NiFi and the concepts of dataflow. If not, please see the link:overview.html[NiFi Overview]
|
||||||
|
and the link:user-guide.html[NiFi User Guide] to familiarize yourself with the concepts of NiFi.
|
||||||
|
|
||||||
|
|
||||||
[[components]]
|
[[components]]
|
||||||
== NiFi Components
|
== NiFi Components
|
||||||
|
|
||||||
NiFi provides several extension points to provide developers the
|
NiFi provides several extension points to provide developers the
|
||||||
ability to add functionality
|
ability to add functionality to the application to meet their needs. The following list provides a
|
||||||
to the application to meet their needs. The following list provides a
|
high-level description of the most common extension points:
|
||||||
high-level description
|
|
||||||
of the most common extension points:
|
|
||||||
|
|
||||||
- Processor
|
- Processor
|
||||||
- The Processor interface is the mechanism through which NiFi
|
* The Processor interface is the mechanism through which NiFi exposes access to
|
||||||
exposes access to
|
<<flowfile>>s, their attributes, and their content. The Processor is the basic building
|
||||||
FlowFiles, their attributes, and their content. The Processor is
|
block used to comprise a NiFi dataflow. This interface is used to accomplish
|
||||||
the basic building
|
|
||||||
block used to comprise a NiFi dataflow. This interface is used
|
|
||||||
to accomplish
|
|
||||||
all of the following tasks:
|
all of the following tasks:
|
||||||
|
|
||||||
- Create FlowFiles
|
** Create FlowFiles
|
||||||
- Read FlowFile content
|
** Read FlowFile content
|
||||||
- Write FlowFile content
|
** Write FlowFile content
|
||||||
- Read FlowFile attributes
|
** Read FlowFile attributes
|
||||||
- Update FlowFile attributes
|
** Update FlowFile attributes
|
||||||
- Ingest data
|
** Ingest data
|
||||||
- Egress data
|
** Egress data
|
||||||
- Route data
|
** Route data
|
||||||
- Extract data
|
** Extract data
|
||||||
- Modify data
|
** Modify data
|
||||||
|
|
||||||
- ReportingTask
|
- ReportingTask
|
||||||
- The ReportingTask interface is a mechanism that NiFi exposes to allow metrics,
|
* The ReportingTask interface is a mechanism that NiFi exposes to allow metrics,
|
||||||
monitoring information, and internal NiFI state to be published to external
|
monitoring information, and internal NiFi state to be published to external
|
||||||
sources, such as log files, e-mail, and remote web services.
|
endpoints, such as log files, e-mail, and remote web services.
|
||||||
|
|
||||||
- ControllerService
|
- ControllerService
|
||||||
- A ControllerService provides shared state across Processors, other ControllerServices,
|
* A ControllerService provides shared state and functionality across Processors, other ControllerServices,
|
||||||
and ReportingTasks within a single JVM. An example use case may include loading a very
|
and ReportingTasks within a single JVM. An example use case may include loading a very
|
||||||
large dataset into memory. By performing this work in a ControllerService, the data
|
large dataset into memory. By performing this work in a ControllerService, the data
|
||||||
can be loaded once and be exposed to all Processors via this service.
|
can be loaded once and be exposed to all Processors via this service, rather than requiring
|
||||||
|
many different Processors to load the dataset themselves.
|
||||||
|
|
||||||
- FlowFilePrioritizer
|
- FlowFilePrioritizer
|
||||||
- The FlowFilePrioritizer interface provides a mechanism by which FlowFiles
|
* The FlowFilePrioritizer interface provides a mechanism by which <<flowfile>s
|
||||||
in a queue can be prioritized, or sorted, so that the FlowFiles can be processed in an order
|
in a queue can be prioritized, or sorted, so that the FlowFiles can be processed in an order
|
||||||
that is most effective for a particular use case.
|
that is most effective for a particular use case.
|
||||||
|
|
||||||
- AuthorityProvider
|
- AuthorityProvider
|
||||||
- An AuthorityProvide is responsible for determining which privileges and roles, if any,
|
* An AuthorityProvide is responsible for determining which privileges and roles, if any,
|
||||||
a given user should be granted.
|
a given user should be granted.
|
||||||
|
|
||||||
|
|
||||||
|
@ -103,20 +99,15 @@ Processors must adhere to the following rules:
|
||||||
While `Processor` is an interface that can be implemented directly, it
|
While `Processor` is an interface that can be implemented directly, it
|
||||||
will be extremely rare to do so, as
|
will be extremely rare to do so, as
|
||||||
the `org.apache.nifi.processor.AbstractProcessor` is the base class
|
the `org.apache.nifi.processor.AbstractProcessor` is the base class
|
||||||
for almost all Processor
|
for almost all Processor implementations. The `AbstractProcessor` class provides a significant
|
||||||
implementations. The `AbstractProcessor` class provides a significant
|
amount of functionality, which makes the task of developing a Processor much easier and more convenient.
|
||||||
amount of functionality, which makes
|
For the scope of this document, we will focus primarily on the `AbstractProcessor` class when dealing
|
||||||
the task of developing a Processor much easier and more convenient.
|
|
||||||
For the scope of this document,
|
|
||||||
we will focus primarily on the `AbstractProcessor` class when dealing
|
|
||||||
with the Processor API.
|
with the Processor API.
|
||||||
|
|
||||||
.Concurrency Note
|
.Concurrency Note
|
||||||
NiFi is a highly concurrent framework. This means that all extensions
|
NiFi is a highly concurrent framework. This means that all extensions
|
||||||
must be thread-safe.
|
must be thread-safe. If unfamiliar with writing concurrent software in Java, it is highly
|
||||||
If unfamiliar with writing concurrent software in Java, it is highly
|
recommended that you familiarize yourself with the principles of Java concurrency.
|
||||||
recommended that you familiarize
|
|
||||||
yourself with the principles of Java concurrency.
|
|
||||||
|
|
||||||
|
|
||||||
[[supporting_api]]
|
[[supporting_api]]
|
||||||
|
@ -138,27 +129,18 @@ immutable. Modifications to a FlowFile are made possible by the ProcessSession.
|
||||||
[[process_session]]
|
[[process_session]]
|
||||||
==== ProcessSession
|
==== ProcessSession
|
||||||
The ProcessSession, often referred to as simply a "session," provides
|
The ProcessSession, often referred to as simply a "session," provides
|
||||||
a mechanism by which FlowFiles
|
a mechanism by which FlowFiles can be created, destroyed, examined, cloned, and transferred to other
|
||||||
can be created, destroyed, examined, cloned, and transferred to other
|
Processors. Additionally, a ProcessSession provides mechanism for creating modified versions of
|
||||||
Processors. Additionally, a
|
FlowFiles, by adding or removing attributes, or by modifying the FlowFile's content. The ProcessSession
|
||||||
ProcessSession provides mechanism for creating modified versions of
|
also exposes a mechanism for emitting provenance events that provide for the ability to track the
|
||||||
FlowFiles, by adding or removing
|
lineage and history of a FlowFile. After operations are performed on one or more FlowFiles, a
|
||||||
attributes, or by modifying the FlowFile's content. The ProcessSession
|
ProcessSession can be either committed or rolled back.
|
||||||
also exposes a mechanism for
|
|
||||||
emitting provenance events that provide for the ability to track the
|
|
||||||
lineage and history of a FlowFile.
|
|
||||||
After operations are performed on one or more FlowFiles, a
|
|
||||||
ProcessSession can be either committed or
|
|
||||||
rolled back.
|
|
||||||
|
|
||||||
[[process_context]]
|
[[process_context]]
|
||||||
==== ProcessContext
|
==== ProcessContext
|
||||||
The ProcessContext provides a bridge between a Processor and the
|
The ProcessContext provides a bridge between a Processor and the framework. It provides information
|
||||||
framework. It provides information
|
about how the Processor is currently configured and allows the Processor to perform
|
||||||
about how the Processor is currently configured and allows the
|
Framework-specific tasks, such as yielding its resources so that the framework will schedule other
|
||||||
Processor to perform
|
|
||||||
Framework-specific tasks, such as yielding its resources so that the
|
|
||||||
framework will schedule other
|
|
||||||
Processors to run without consuming resources unnecessarily.
|
Processors to run without consuming resources unnecessarily.
|
||||||
|
|
||||||
|
|
||||||
|
@ -1337,30 +1319,248 @@ If the callback succeeds, a CONTENT_MODIFIED Provenance Event is emitted.
|
||||||
|
|
||||||
== Error Handling
|
== Error Handling
|
||||||
|
|
||||||
|
When writing a Processor, there are several different unexpected cases that can occur.
|
||||||
|
It is important that Processor developers understand the mechanics of how the NiFi framework
|
||||||
|
behaves if Processors do not handle errors themselves, and it's important to understand
|
||||||
|
what error handling is expected of Processors. Here, we will discuss how Processors should
|
||||||
|
handle unexpected errors during the course of their work.
|
||||||
|
|
||||||
|
|
||||||
|
=== Exceptions within the Processor
|
||||||
|
|
||||||
|
During the execution of the `onTrigger` method of a Processor, many things can potentially go
|
||||||
|
awry. Common failure conditions include:
|
||||||
|
|
||||||
|
- Incoming data is not in the expected format.
|
||||||
|
- Network connections to external services fail.
|
||||||
|
- Reading or writing data to a disk fails.
|
||||||
|
- There is a bug in the Processor or a dependent library.
|
||||||
|
|
||||||
|
Any of these conditions can result in an Exception being thrown from the Processor. From the framework
|
||||||
|
perspective, there are two types of Exceptions that can escape a Processor: `ProcessException` and
|
||||||
|
all others.
|
||||||
|
|
||||||
|
If a ProcessException is thrown from the Processor, the framework will assume that this is a failure that
|
||||||
|
is a known outcome. Moreover, it is a condition where attempting to process the data again later may
|
||||||
|
be successful. As a result, the framework will roll back the session that was being processed and penalize
|
||||||
|
the FlowFiles that were being processed.
|
||||||
|
|
||||||
|
If any other Exception escapes the Processor, though, the framework will assume that it is a failure that
|
||||||
|
was not taken into account by the developer. In this case, the framework will also roll back the session
|
||||||
|
and penalize the FlowFiles. However, in this case, we can get into some very problematic cases. For example,
|
||||||
|
the Processor may be in a bad state and may continually run, depleting system resources, without providing
|
||||||
|
any useful work. This is fairly common, for instance, when a NullPointerException is thrown continually.
|
||||||
|
In order to avoid this case, if an Exception other than ProcessException is able to escape the Processor's
|
||||||
|
`onTrigger` method, the framework will also "Administratively Yield" the Processor. This means that the
|
||||||
|
Processor will not be triggered to run again for some amount of time. The amount of time is configured
|
||||||
|
in the `nifi.properties` file but is 10 seconds by default.
|
||||||
|
|
||||||
|
|
||||||
=== Exceptions within a callback: IOException, RuntimeException
|
=== Exceptions within a callback: IOException, RuntimeException
|
||||||
|
|
||||||
=== Exceptions within the Processor: ProcessException, others.
|
More often than not, when an Exception occurs in a Processor, it occurs from within a callback (I.e.,
|
||||||
|
`InputStreamCallback`, `OutputStreamCallback`, or `StreamCallback`). That is, during the processing of a
|
||||||
|
FlowFile's content. Callbacks are allowed to throw either `RuntimeException` or `IOException`. In the case
|
||||||
|
of RuntimeException, this Exception will propagate back to the `onTrigger` method. In the case of an
|
||||||
|
`IOException`, the Exception will be wrapped within a ProcessException and this ProcessException will then
|
||||||
|
be thrown from the Framework.
|
||||||
|
|
||||||
|
For this reason, it is recommended that Processors that use callbacks do so within a `try/catch` block
|
||||||
|
and catch `ProcessException` as well as any other `RuntimeException` that they expect their callback to
|
||||||
|
throw. It is *not* recommended that Processors catch the general `Exception` or `Throwable` cases, however.
|
||||||
|
This is discouraged for two reasons.
|
||||||
|
|
||||||
|
First, if an unexpected RuntimeException is thrown, it is likely a bug
|
||||||
|
and allowing the framework to rollback the session will ensure no data loss and ensures that DataFlow Managers
|
||||||
|
are able to deal with the data as they see fit by keeping the data queued up in place.
|
||||||
|
|
||||||
|
Second, when an IOException is thrown from a callback, there really are two types of IOExceptions: those thrown
|
||||||
|
from Processor code (for example, the data is not in the expected format or a network connection fails), and
|
||||||
|
those that are thrown from the Content Repository (where the FlowFile content is stored). If the latter is the case,
|
||||||
|
the framework will catch this IOException and wrap it into a `FlowFileAccessException`, which extends `RuntimeException`.
|
||||||
|
This is done explicitly so that the Exception will escape the `onTrigger` method and the framework can handle this
|
||||||
|
condition appropriately. Catching the general Exception prevents this from happening.
|
||||||
|
|
||||||
=== Catching ProcessException from callback
|
|
||||||
|
|
||||||
=== Penalization vs. Yielding
|
=== Penalization vs. Yielding
|
||||||
|
|
||||||
=== Session Rollback: with and within penalization
|
When an issue occurs during processing, the framework exposes two methods to allow Processor developers to avoid performing
|
||||||
|
unnecessary work: "penalization" and "yielding." These two concepts can become confusing for developers new to the NiFi API.
|
||||||
|
A developer is able to penalize a FlowFile by calling the `penalize(FlowFile)` method of ProcessSession. This causes the
|
||||||
|
FlowFile itself to be inaccessible to downstream Processors for a period of time. The amount of time that the FlowFile is
|
||||||
|
inaccessible is determined by the DataFlow Manager by setting the "Penalty Duration" setting in the Processor Configuration
|
||||||
|
dialog. The default value is 30 seconds. Typically, this is done when a Processor determines that the data cannot be processed
|
||||||
|
due to environmental reasons that are expected to sort themselves out. A great example of this is the PutSFTP processor, which
|
||||||
|
will penalize a FlowFile if a file already exists on the SFTP server that has the same filename. In this case, the Processor
|
||||||
|
penalizes the FlowFile and routes it to failure. A DataFlow Manager can then route failure back to the same PutSFTP Processor.
|
||||||
|
This way, if a file exists with the same filename, the Processor will not attempt to send the file again for 30 seconds
|
||||||
|
(or whatever period the DFM has configured the Processor to use). In the meantime, it is able to continue to process other
|
||||||
|
FlowFiles.
|
||||||
|
|
||||||
|
On the other hand, yielding allows a Processor developer to indicate to the framework that it will not be able to perform
|
||||||
|
any useful function for some period of time. This commonly happens with a Processor that is communicating with a remote
|
||||||
|
resource. If the Processor cannot connect to the remote resource, or if the remote resource is expected to provide data
|
||||||
|
but reports that it has none, the Processor should call `yield` on the `ProcessContext` object and then return. By doing
|
||||||
|
this, the Processor is telling the framework that it should not waste resources triggering this Processor to run, because
|
||||||
|
there's nothing that it can do - it's better to use those resources to allow other Processors to run.
|
||||||
|
|
||||||
|
|
||||||
|
=== Session Rollback
|
||||||
|
|
||||||
|
Thus far, when we have discussed the `ProcessSession`, we have typically referred to it simply as a mechanism for accessing
|
||||||
|
FlowFiles. However, it provides another very important capability, which is transactionality. All methods that are called
|
||||||
|
on a ProcessSession happen as a transaction. When we decided to end the transaction, we can do so either by calling
|
||||||
|
`commit()` or by calling `rollback()`. Typically, this is handled by the `AbstractProcessor` class: if the `onTrigger` method
|
||||||
|
throws an Exception, the AbstractProcessor will catch the Exception, call `session.rollback()`, and then re-throw the Exception.
|
||||||
|
Otherwise, the AbstractProcessor will call `commit()` on the ProcessSession.
|
||||||
|
|
||||||
|
There are times, however, that developers will want to roll back a session explicitly. This can be accomplished at any time
|
||||||
|
by calling the `rollback()` or `rollback(boolean)` method. If using the latter, the boolean indicates whether or not those
|
||||||
|
FlowFiles that have been pulled from queues (via the ProcessSession `get` methods) should be penalized before being added
|
||||||
|
back to their queues.
|
||||||
|
|
||||||
|
When `rollback` is called, any modification that has occurred to the FlowFiles in that session are discarded, to included
|
||||||
|
both content modification and attribute modification. Additionally, all Provenance Events are rolled back (with the exception
|
||||||
|
of any SEND event that was emitted by passing a value of `true` for the `force` argument). The FlowFiles that were pulled from
|
||||||
|
the input queues are then transferred back to the input queues (and optionally penalized) so that they can be processed again.
|
||||||
|
|
||||||
|
On the other hand, when the `commit` method is called, the FlowFile's new state is persisted in the FlowFile Repository, and
|
||||||
|
any Provenance Events that occurred are persisted in the Provenance Repository. The previous content is destroyed (unless
|
||||||
|
another FlowFile references the same piece of content), and the FlowFiles are transferred to the outbound queues so that the
|
||||||
|
next Processors can operate on the data.
|
||||||
|
|
||||||
|
It is also important to note how this behavior is affected by using the `org.apache.nifi.annotations.behavior.SupportsBatching`
|
||||||
|
annotation. If a Processor utilizes this annotation, calls to `ProcessSession.commit` may not take affect immediately. Rather,
|
||||||
|
these commits may be batched together in order to provide higher throughput. However, if at any point, the Processor rolls back
|
||||||
|
the ProcessSession, all changes since the last call to `commit` will be discarded and all "batched" commits will take affect.
|
||||||
|
These "batched" commits are not rolled back.
|
||||||
|
|
||||||
=== Administrative Yielding
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
== General Design Considerations
|
== General Design Considerations
|
||||||
|
|
||||||
=== Cohesion / Reusability : Do one thing and do it well!
|
When designing a Processor, there are a few important design considering to keep in mind. This section of the Developer Guide
|
||||||
|
brings to the forefront some of the ideas that a developer should be thinking about when creating a Processor.
|
||||||
|
|
||||||
|
=== Consider the User
|
||||||
|
|
||||||
|
One of the most important concepts to keep in mind when developing a Processor (or any other component) is the user
|
||||||
|
experience that you are creating. It's important to remember that as the developer of such a component, you may have
|
||||||
|
important knowledge about the context that others do not have. Documentation should always be supplied so that those
|
||||||
|
less familiar with the process are able to use it with ease.
|
||||||
|
|
||||||
|
When thinking about the user experience, it is also important to note that consistency is very important. It is best
|
||||||
|
to stick with the standard <<naming-convensions>>. This is true for Processor names, Property names and value, Relationship
|
||||||
|
names, and any other aspect that the user will experience.
|
||||||
|
|
||||||
|
Simplicity is crucial! Avoid adding properties that you don't expect users to understand or change. As developers, we are
|
||||||
|
told that hard-coding values is bad. But this sometimes results in developers exposing properties that, when asked for clarification,
|
||||||
|
tell users to just leave the default value. This leads to confusion and complexity.
|
||||||
|
|
||||||
|
|
||||||
|
=== Cohesion and Reusability
|
||||||
|
|
||||||
|
For the sake of making a single, cohesive unit, developers are sometimes tempted to combine several functions into a single Processor.
|
||||||
|
This is very true for the case when a Processor expects input data to be in format X so that the Processor can convert the data into
|
||||||
|
format Y and send the newly-formatted data to some external service.
|
||||||
|
|
||||||
|
Taking this approach of formatting the data for a particular endpoint and then sending the data to that endpoint within the same Processor
|
||||||
|
has several drawbacks:
|
||||||
|
|
||||||
|
- The Processor becomes very complex, as it has to perform the data translation task as well as the task of
|
||||||
|
sending the data to the remote service.
|
||||||
|
- If the Processor is unable to communicate with the remote service, it will route the data to a `failure` Relationship. In this case,
|
||||||
|
the Processor will be responsible to perform the data translation again. And if it fails again, the translation is done yet again.
|
||||||
|
- If we have five different Processors that translate the incoming data into this new format before sending the data, we have a great
|
||||||
|
deal of duplicated code. If the schema changes, for instance, many Processors must be updated.
|
||||||
|
- This intermediate data is thrown away when the Processor finishes sending to the remote service. The intermediate data format
|
||||||
|
may well be useful to other Processors.
|
||||||
|
|
||||||
|
In order to avoid these issues, and make Processors more reusable, a Processor should always stick to the principal of "do one thing and do
|
||||||
|
it well." Such a Processor should be broken into two separate Processors: one to convert the data from Format X to Format Y, and another
|
||||||
|
Processor to send data to the remote resource.
|
||||||
|
|
||||||
|
|
||||||
|
[[naming-convensions]]
|
||||||
=== Naming Conventions
|
=== Naming Conventions
|
||||||
|
|
||||||
|
In order to deliver a consistent look and feel to users, it is advisable that Processors keep with standard naming conventions. The following
|
||||||
|
is a list of standard conventions that are used:
|
||||||
|
|
||||||
|
- Processors that pull data from a remote system are named Get<Service> or Get<Protocol>, depending on if they poll data from arbitrary
|
||||||
|
sources over a known Protocol (such as GetHTTP or GetFTP) or if they pull data from a known service (such as GetKafka)
|
||||||
|
- Processors that push data to a remote system are named Put<Service> or Put<Protocol>.
|
||||||
|
- Relationship names are lower-cased and use spaces to delineated words.
|
||||||
|
- Property names capitalize significant words, as would be done with the title of a book.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
=== Processor Behavior Annotations
|
=== Processor Behavior Annotations
|
||||||
|
|
||||||
|
When creating a Processor, the developer is able to provide hints to the framework about how to utilize the Processor most
|
||||||
|
effectively. This is done by applying annotations to the Processor's class. The annotations that can be applied to a
|
||||||
|
Processor exist in three sub-packages of `org.apache.nifi.annotations`. Those in the `documentation` sub-package are used
|
||||||
|
to provide documentation to the user. Those in the `lifecycle` sub-package instruct the framework which methods should
|
||||||
|
be called on the Processor in order to respond to the appropriate life-cycle events. Those in the `behavior` package
|
||||||
|
help the framework understand how to interact with the Processor in terms of scheduling and general behavior.
|
||||||
|
|
||||||
|
The following annotations from the `org.apache.nifi.annotations.behavior` package can be used to modify how the framework
|
||||||
|
will handle your Processor:
|
||||||
|
|
||||||
|
- `EventDriven`: Instructs the framework that the Processor can be scheduled using the Event-Driven scheduling
|
||||||
|
strategy. This strategy is still experimental at this point, but can result in reduced resource utilization
|
||||||
|
on dataflows that do not handle extremely high data rates.
|
||||||
|
|
||||||
|
- `SideEffectFree`: Indicates that the Processor does not have any side effects external to NiFi. As a result, the
|
||||||
|
framework is free to invoke the Processor many times with the same input without causing any unexpected
|
||||||
|
results to occur. This implies idempotent behavior. This can be used by the framework to improve efficiency by
|
||||||
|
performing actions such as transferring a ProcessSession from one Processor to another, such that if
|
||||||
|
a problem occurs many Processors' actions can be rolled back and performed again.
|
||||||
|
|
||||||
|
- `SupportsBatching`: This annotation indicates that it is okay for the framework to batch together multiple
|
||||||
|
ProcessSession commits into a single commit. If this annotation is present, the user will be able to choose
|
||||||
|
whether they prefer high throughput or lower latency in the Processor's Scheduling tab. This annotation should
|
||||||
|
be applied to most Processors, but it comes with a caveat: if the Processor calls `ProcessSession.commit`,
|
||||||
|
there is no guarantee that the data has been safely stored in NiFi's Content, FlowFile, and Provenance Repositories.
|
||||||
|
As a result, it is not appropriate for those Processors that receive data from an external source, commit the session,
|
||||||
|
and then delete the remote data or confirm a transaction with a remote resource.
|
||||||
|
|
||||||
|
- `TriggerSerially`: When this annotation is present, the framework will not allow the user to schedule more than one
|
||||||
|
concurrent thread to execute the `onTrigger` method at a time. Instead, the number of thread ("Concurrent Tasks")
|
||||||
|
will always be set to `1`. This does *not*, however, mean that the Processor does not have to be thread-safe,
|
||||||
|
as the thread that is executing `onTrigger` may change between invocations.
|
||||||
|
|
||||||
|
- `TriggerWhenAnyDestinationAvailable`: By default, NiFi will not schedule a Processor to run if any of its outbound
|
||||||
|
queues is full. This allows back-pressure to be applied all the way a chain of Processors. However, some Processors
|
||||||
|
may need to run even if one of the outbound queues is full. This annotations indicates that the Processor should run
|
||||||
|
if any Relationship is "available." A Relationship is said to be "available" if none of the connections that use
|
||||||
|
that Relationship is full. For example, the DistributeLoad Processor makes use of this annotation. If the "round robin"
|
||||||
|
scheduling strategy is used, the Processor will not run if any outbound queue is full. However, if the "next available"
|
||||||
|
scheduling strategy is used, the Processor will run if any Relationship at all is available and will route FlowFiles
|
||||||
|
only to those relationships that are available.
|
||||||
|
|
||||||
|
- `TriggerWhenEmpty`: The default behavior is to trigger a Processor to run only if its input queue has at least one
|
||||||
|
FlowFile or if the Processor has no input queues (which is typical of a "source" Processor). Applying this annotation
|
||||||
|
will cause the framework to ignore the size of the input queues and trigger the Processor regardless of whether or
|
||||||
|
not there is any data on an input queue. This is useful, for example, if the Processor needs to be triggered to run
|
||||||
|
periodically to time out a network connection.
|
||||||
|
|
||||||
|
|
||||||
=== Data Buffering
|
=== Data Buffering
|
||||||
|
|
||||||
|
An important point to keep in mind is that NiFi provides a generic data processing capability. Data can be in any format. Processors
|
||||||
|
are generally scheduled with several threads. A common mistake that developers new to NiFi make is to buffer all the contents of a
|
||||||
|
FlowFile in memory. While there are cases when this is required, it should be avoided if at all possible, unless it is well-known
|
||||||
|
what format the data is in. For example, a Processor responsible for executing XPath against an XML document will need to load the
|
||||||
|
entire contents of the data into memory. This is generally acceptable, as XML is not expected to be extremely large. However, a Processor
|
||||||
|
that searches for a specific byte sequence may be used to search files that are hundreds of gigabytes or more. Attempting to load this
|
||||||
|
into memory can cause a lot of problems - especially if multiple threads are processing different FlowFiles simultaneously.
|
||||||
|
|
||||||
|
Instead of buffering this data into memory, it is advisable to instead evaluate the data as it is streamed from the Content Repository
|
||||||
|
(i.e., scan the content from the `InputStream` that is provided to your callback by `ProcessSession.read`). Of course, in this case,
|
||||||
|
we don't want to read from the Content Repository for each byte, so we would use a BufferedInputStream or somehow buffer some small
|
||||||
|
amount of data, as appropriate.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -1618,26 +1818,104 @@ Processor is valid or not, according to our expectations.
|
||||||
|
|
||||||
=== Enqueue FlowFiles
|
=== Enqueue FlowFiles
|
||||||
|
|
||||||
|
Before triggering a Processor to run, it is usually necessary to enqueue FlowFiles for the Processor to process.
|
||||||
|
This can be achieved by using the `enqueue` methods of the `TestRunner` class. The `enqueue` method has several
|
||||||
|
different overrides, and allows data to be added in the form of a `byte[]`, `InputStream`, or `Path`. Each of these
|
||||||
|
methods also supports a variation that allows a `Map<String, String>` to be added to support FlowFile attributes.
|
||||||
|
|
||||||
|
Additionally, there is an `enqueue` method that takes a var-args of FlowFile objects. This can be useful, for example,
|
||||||
|
to obtain the output of a Processor and then feed this to the input of the Processor.
|
||||||
|
|
||||||
|
|
||||||
=== Run the Processor
|
=== Run the Processor
|
||||||
|
|
||||||
|
After configuring the Controller Services and enqueuing the necessary FlowFiles, the Processor can be triggered
|
||||||
|
to run by calling the `run` method of `TestRunner`. If this method is called without any arguments, it will
|
||||||
|
invoke any method in the Processor with an `@OnScheduled` annotation, call the Processor's `onTrigger` method once,
|
||||||
|
and then run the `@OnUnscheduled` and finally `@OnStopped` methods.
|
||||||
|
|
||||||
|
If it is desirable to run several iterations of the `onTrigger` method before the other `@OnUnscheduled` and
|
||||||
|
`@OnStopped` life-cycle events are triggered, the `run(int)` method can be used to specify now many iterations
|
||||||
|
of `onTrigger` should be called.
|
||||||
|
|
||||||
|
There are times when we want to trigger the Processor to run but not trigger the `@OnUnscheduled` and `@OnStopped`
|
||||||
|
life-cycle events. This is useful, for instance, to inspect the Processor's state before these events occur. This
|
||||||
|
can be achieved using the `run(int, boolean)` and passing `false` as the second argument. After doing this, though,
|
||||||
|
calling the `@OnScheduled` life-cycle methods could cause an issue. As a result, we can now run `onTrigger` again
|
||||||
|
without causing these events to occur by using the `run(int,boolean,boolean)` version of the `run` method and
|
||||||
|
passing `false` as the third argument.
|
||||||
|
|
||||||
|
If it is useful to test behavior that occurs with multiple threads, this can also be achieved by calling the
|
||||||
|
`setThreadCount` method of `TestRunner`. The default is 1 thread. If using multiple threads, it is important
|
||||||
|
to remember that the `run` call of `TestRunner` specifies how many times the Processor should be triggered, not
|
||||||
|
the number of times that the Processor should be triggered per thread. So, if the thread count is set to 2 but
|
||||||
|
`run(1)` is called, only a single thread will be used.
|
||||||
|
|
||||||
|
|
||||||
=== Validate Output Counts
|
=== Validate Output
|
||||||
|
|
||||||
|
After a Processor has finished running, a unit test will generally want to validate that the FlowFiles went where
|
||||||
|
they were expected to go. This can be achieved using the `TestRunners` `assertAllFlowFilesTransferred` and
|
||||||
|
`assertTransferCount` methods. The former method takes as arguments a Relationship and an integer to dictate how many
|
||||||
|
FlowFiles should have been transferred to that Relationship. The method will fail the unit test unless this number of
|
||||||
|
FlowFiles were transferred to the given Relationship *or* if any FlowFile was transferred to any other Relationship.
|
||||||
|
The `assertTransferCount` method validates only that the FlowFile count was the expected number for the given Relationship.
|
||||||
|
|
||||||
|
After validating the counts, we can then obtain the actual output FlowFiles via the `getFlowFilesForRelationship` method.
|
||||||
|
This method returns a `List<MockFlowFile>`. It's important to note that the type of the List is `MockFlowFile`, rather
|
||||||
|
than the `FlowFile` interface. This is done because `MockFlowFile` comes with many methods for validating the contents.
|
||||||
|
|
||||||
|
For example, `MockFlowFile` has methods for asserting that FlowFile Attributes exist (`assertAttributeExists`), asserting
|
||||||
|
that other attributes are not present (`assertAttributeNotExists`), or that Attributes have the correct value
|
||||||
|
(`assertAttributeEquals`, `assertAttributeNotEquals`). Similar methods exist for verifying the contents of the FlowFile.
|
||||||
|
The contents of a FlowFile can be compared to a `byte[]`, and `InputStream`, a file, or a String. If the data is expected
|
||||||
|
to be textual, the String version is preferred, as it provides a more intuitive error message if the output is not
|
||||||
|
as expected.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
=== Validate Output Results
|
=== Mocking External Resources
|
||||||
|
|
||||||
|
One of the biggest problems when testing a NiFi processor that connects to a remote resource is that we don't want to
|
||||||
|
actually connect to some remote resource from a unit test. We can stand up a simple server ourselves in the unit test
|
||||||
|
and configure the Processor to communicate with it, but then we have to understand and implement the server-specific
|
||||||
|
specification and may not be able to properly send back error messages, etc. that we would like for testing.
|
||||||
|
|
||||||
|
Generally, the approach taken here is to have a method in the Processor that is responsible for obtaining a connection
|
||||||
|
or a client to a remote resource. We generally mark this method as protected. In the unit test, instead of creating
|
||||||
|
the `TestRunner` by calling `TestRunners.newTestRunner(Class)` and providing the Processor class, we instead create
|
||||||
|
a subclass of the Processor in our unit test and use this:
|
||||||
|
|
||||||
=== Mocking Connections
|
[source,java]
|
||||||
|
----
|
||||||
|
@Test
|
||||||
|
public void testConnectionFailure() {
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(new MyProcessor() {
|
||||||
|
protected Client getClient() {
|
||||||
|
// Return a mocked out client here.
|
||||||
|
return new Client() {
|
||||||
|
public void connect() throws IOException {
|
||||||
|
throw new IOException();
|
||||||
|
}
|
||||||
|
|
||||||
|
// ...
|
||||||
|
// other client methods
|
||||||
|
// ...
|
||||||
|
};
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// rest of unit test.
|
||||||
|
}
|
||||||
|
----
|
||||||
|
|
||||||
|
This allows us to implement a Client that mocks out all of the network communications and returns the different
|
||||||
|
error results that we want to test, as well as ensure that our logic is correct for handling successful calls
|
||||||
|
to the client.
|
||||||
|
|
||||||
|
|
||||||
=== Additional Testing Capabilities
|
=== Additional Testing Capabilities
|
||||||
|
|
||||||
In addition to the above-mentioned capabilities provided by the
|
In addition to the above-mentioned capabilities provided by the
|
||||||
testing framework, the TestRunner provides several
|
testing framework, the TestRunner provides several
|
||||||
convenience methods for verifying the behavior of a Processor. Methods
|
convenience methods for verifying the behavior of a Processor. Methods
|
||||||
|
@ -1678,10 +1956,28 @@ their NiFi components as NAR packages.
|
||||||
To achieve this, a developer creates a new Maven Artifact, which we
|
To achieve this, a developer creates a new Maven Artifact, which we
|
||||||
refer to as the NAR artifact. The packaging is
|
refer to as the NAR artifact. The packaging is
|
||||||
set to `nar`. The `dependencies` section of the POM is then created so
|
set to `nar`. The `dependencies` section of the POM is then created so
|
||||||
that the NAR has a dependency on all NiFi
|
that the NAR has a dependency on all NiFi Components that are to be included within the NAR.
|
||||||
Components that are to be included within the NAR.
|
|
||||||
|
|
||||||
TODO: DISCUSS HOW TO INCLUDE THE NAR MAVEN PLUGIN.
|
In order to use a packaging of `nar`, we must use the `nifi-nar-maven-plugin` module.
|
||||||
|
This is included by adding the following snippet to the NAR's pom.xml:
|
||||||
|
|
||||||
|
[source,xml]
|
||||||
|
----
|
||||||
|
<build>
|
||||||
|
<plugins>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-nar-maven-plugin</artifactId>
|
||||||
|
<version>1.0.0-incubating</version>
|
||||||
|
<extensions>true</extensions>
|
||||||
|
</plugin>
|
||||||
|
</plugins>
|
||||||
|
</build>
|
||||||
|
----
|
||||||
|
|
||||||
|
In the Apache NiFi codebase, this exists in the NiFi root POM from which all other NiFi artifacts
|
||||||
|
(with the exception of the nifi-nar-maven-plugin itself) inherit, so that we do not need to include
|
||||||
|
this in any of our other POM files.
|
||||||
|
|
||||||
The NAR is able to have one dependency that is of type `nar`. If more
|
The NAR is able to have one dependency that is of type `nar`. If more
|
||||||
than one dependency is specified that is of type
|
than one dependency is specified that is of type
|
||||||
|
@ -1789,16 +2085,41 @@ API artifacts into the same NAR is often acceptable.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
== Consider the User Experience
|
|
||||||
|
|
||||||
The user...
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
== How to contribute to Apache NiFi
|
== How to contribute to Apache NiFi
|
||||||
|
|
||||||
Git, Maven, ASF processes, NiFi processes, ...
|
We are always excited to have contributions from the community - especially from new contributors!
|
||||||
|
We are interested in accepting contributions of code, as well as documentation and even artwork that
|
||||||
|
can be applied as icons or styling to the application.
|
||||||
|
|
||||||
|
|
||||||
|
=== Technologies
|
||||||
|
|
||||||
|
The back end of Apache NiFi is written in Java. The web tier makes use of JAX-RS and JavaScript is extensively
|
||||||
|
used to provide a user interface. We depend on several third-party JavaScript libraries, including D3 and JQuery,
|
||||||
|
among others. We make use of Apache Maven for our builds and Git for our version control system.
|
||||||
|
|
||||||
|
|
||||||
|
=== Where to Start?
|
||||||
|
|
||||||
|
link:http://issues.apache.org/jira/browse/NIFI[NiFi's Jira page] can be used to find tickets that are tagged as "beginner",
|
||||||
|
or you can dig into any of the tickets for creating Processors. Processors should be self-contained and not rely on other
|
||||||
|
outside components (except for Controller Services), so they make for excellent starting points for new NiFi developers to
|
||||||
|
get started. This exposes the developer to the NiFi API and is the most extensible part of the dataflow system.
|
||||||
|
|
||||||
|
|
||||||
|
=== Supplying a contribution
|
||||||
|
|
||||||
|
Contributions can be provided either by creating a patch:
|
||||||
|
|
||||||
|
`git format-patch`
|
||||||
|
|
||||||
|
and attaching that patch to a ticket, or by generating a Pull Request.
|
||||||
|
|
||||||
|
|
||||||
|
=== Contact Us
|
||||||
|
|
||||||
|
The developer mailing list (dev@nifi.incubator.apache.org) is monitored pretty closely, and we tend to respond pretty
|
||||||
|
quickly. If you have a question, don't hesitate to shoot us an e-mail - we're here to help! Unfortunately, though, e-mails
|
||||||
|
can get lost in the shuffle, so if you do send an e-mail and don't get a response within a day or two, it's our fault - don't
|
||||||
|
worry about bothering us. Just ping the mailing list again.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue