mirror of https://github.com/apache/nifi.git
NIFI-152: Updated developer guide
This commit is contained in:
parent
5830f005f5
commit
bafa945a76
|
@ -22,64 +22,60 @@ Apache NiFi Team <dev@nifi.incubator.apache.org>
|
|||
|
||||
== Introduction
|
||||
|
||||
This Developer Guide is written by developers for developers. It is expected that before reading this
|
||||
guide, you have a basic understanding of Apache NiFi (incubating) and the concepts of dataflow. If not,
|
||||
please see the link:overview.html[NiFi Overview] and the link:user-guide.html[NiFi User Guide] to familiar
|
||||
yourself with the concepts of NiFi.
|
||||
|
||||
The intent of this Developer Guide is to provide the reader with the information needed to understand how Apache NiFi (incubating)
|
||||
extensions are developed and help to explain the thought process behind developing the components. It provides an introduction to
|
||||
and explanation of the API that is used to develop extensions. It does not, however, go into great detail about each
|
||||
of the methods in the API, as this guide is intended to supplement the JavaDocs of the API rather than replace them.
|
||||
This guide also assumes that the reader is familiar with Java 7 and Apache Maven.
|
||||
|
||||
The intent of this guide is to provide the reader with the information needed to understand how NiFi extensions
|
||||
are developed and help to explain the thought process behind developing the components.
|
||||
This guide is written by developers for developers. It is expected that before reading this
|
||||
guide, you have a basic understanding of NiFi and the concepts of dataflow. If not, please see the link:overview.html[NiFi Overview]
|
||||
and the link:user-guide.html[NiFi User Guide] to familiarize yourself with the concepts of NiFi.
|
||||
|
||||
|
||||
[[components]]
|
||||
== NiFi Components
|
||||
|
||||
NiFi provides several extension points to provide developers the
|
||||
ability to add functionality
|
||||
to the application to meet their needs. The following list provides a
|
||||
high-level description
|
||||
of the most common extension points:
|
||||
ability to add functionality to the application to meet their needs. The following list provides a
|
||||
high-level description of the most common extension points:
|
||||
|
||||
- Processor
|
||||
- The Processor interface is the mechanism through which NiFi
|
||||
exposes access to
|
||||
FlowFiles, their attributes, and their content. The Processor is
|
||||
the basic building
|
||||
block used to comprise a NiFi dataflow. This interface is used
|
||||
to accomplish
|
||||
* The Processor interface is the mechanism through which NiFi exposes access to
|
||||
<<flowfile>>s, their attributes, and their content. The Processor is the basic building
|
||||
block used to comprise a NiFi dataflow. This interface is used to accomplish
|
||||
all of the following tasks:
|
||||
|
||||
- Create FlowFiles
|
||||
- Read FlowFile content
|
||||
- Write FlowFile content
|
||||
- Read FlowFile attributes
|
||||
- Update FlowFile attributes
|
||||
- Ingest data
|
||||
- Egress data
|
||||
- Route data
|
||||
- Extract data
|
||||
- Modify data
|
||||
** Create FlowFiles
|
||||
** Read FlowFile content
|
||||
** Write FlowFile content
|
||||
** Read FlowFile attributes
|
||||
** Update FlowFile attributes
|
||||
** Ingest data
|
||||
** Egress data
|
||||
** Route data
|
||||
** Extract data
|
||||
** Modify data
|
||||
|
||||
- ReportingTask
|
||||
- The ReportingTask interface is a mechanism that NiFi exposes to allow metrics,
|
||||
monitoring information, and internal NiFI state to be published to external
|
||||
sources, such as log files, e-mail, and remote web services.
|
||||
* The ReportingTask interface is a mechanism that NiFi exposes to allow metrics,
|
||||
monitoring information, and internal NiFi state to be published to external
|
||||
endpoints, such as log files, e-mail, and remote web services.
|
||||
|
||||
- ControllerService
|
||||
- A ControllerService provides shared state across Processors, other ControllerServices,
|
||||
* A ControllerService provides shared state and functionality across Processors, other ControllerServices,
|
||||
and ReportingTasks within a single JVM. An example use case may include loading a very
|
||||
large dataset into memory. By performing this work in a ControllerService, the data
|
||||
can be loaded once and be exposed to all Processors via this service.
|
||||
can be loaded once and be exposed to all Processors via this service, rather than requiring
|
||||
many different Processors to load the dataset themselves.
|
||||
|
||||
- FlowFilePrioritizer
|
||||
- The FlowFilePrioritizer interface provides a mechanism by which FlowFiles
|
||||
* The FlowFilePrioritizer interface provides a mechanism by which <<flowfile>s
|
||||
in a queue can be prioritized, or sorted, so that the FlowFiles can be processed in an order
|
||||
that is most effective for a particular use case.
|
||||
|
||||
- AuthorityProvider
|
||||
- An AuthorityProvide is responsible for determining which privileges and roles, if any,
|
||||
* An AuthorityProvide is responsible for determining which privileges and roles, if any,
|
||||
a given user should be granted.
|
||||
|
||||
|
||||
|
@ -103,20 +99,15 @@ Processors must adhere to the following rules:
|
|||
While `Processor` is an interface that can be implemented directly, it
|
||||
will be extremely rare to do so, as
|
||||
the `org.apache.nifi.processor.AbstractProcessor` is the base class
|
||||
for almost all Processor
|
||||
implementations. The `AbstractProcessor` class provides a significant
|
||||
amount of functionality, which makes
|
||||
the task of developing a Processor much easier and more convenient.
|
||||
For the scope of this document,
|
||||
we will focus primarily on the `AbstractProcessor` class when dealing
|
||||
for almost all Processor implementations. The `AbstractProcessor` class provides a significant
|
||||
amount of functionality, which makes the task of developing a Processor much easier and more convenient.
|
||||
For the scope of this document, we will focus primarily on the `AbstractProcessor` class when dealing
|
||||
with the Processor API.
|
||||
|
||||
.Concurrency Note
|
||||
NiFi is a highly concurrent framework. This means that all extensions
|
||||
must be thread-safe.
|
||||
If unfamiliar with writing concurrent software in Java, it is highly
|
||||
recommended that you familiarize
|
||||
yourself with the principles of Java concurrency.
|
||||
must be thread-safe. If unfamiliar with writing concurrent software in Java, it is highly
|
||||
recommended that you familiarize yourself with the principles of Java concurrency.
|
||||
|
||||
|
||||
[[supporting_api]]
|
||||
|
@ -138,27 +129,18 @@ immutable. Modifications to a FlowFile are made possible by the ProcessSession.
|
|||
[[process_session]]
|
||||
==== ProcessSession
|
||||
The ProcessSession, often referred to as simply a "session," provides
|
||||
a mechanism by which FlowFiles
|
||||
can be created, destroyed, examined, cloned, and transferred to other
|
||||
Processors. Additionally, a
|
||||
ProcessSession provides mechanism for creating modified versions of
|
||||
FlowFiles, by adding or removing
|
||||
attributes, or by modifying the FlowFile's content. The ProcessSession
|
||||
also exposes a mechanism for
|
||||
emitting provenance events that provide for the ability to track the
|
||||
lineage and history of a FlowFile.
|
||||
After operations are performed on one or more FlowFiles, a
|
||||
ProcessSession can be either committed or
|
||||
rolled back.
|
||||
a mechanism by which FlowFiles can be created, destroyed, examined, cloned, and transferred to other
|
||||
Processors. Additionally, a ProcessSession provides mechanism for creating modified versions of
|
||||
FlowFiles, by adding or removing attributes, or by modifying the FlowFile's content. The ProcessSession
|
||||
also exposes a mechanism for emitting provenance events that provide for the ability to track the
|
||||
lineage and history of a FlowFile. After operations are performed on one or more FlowFiles, a
|
||||
ProcessSession can be either committed or rolled back.
|
||||
|
||||
[[process_context]]
|
||||
==== ProcessContext
|
||||
The ProcessContext provides a bridge between a Processor and the
|
||||
framework. It provides information
|
||||
about how the Processor is currently configured and allows the
|
||||
Processor to perform
|
||||
Framework-specific tasks, such as yielding its resources so that the
|
||||
framework will schedule other
|
||||
The ProcessContext provides a bridge between a Processor and the framework. It provides information
|
||||
about how the Processor is currently configured and allows the Processor to perform
|
||||
Framework-specific tasks, such as yielding its resources so that the framework will schedule other
|
||||
Processors to run without consuming resources unnecessarily.
|
||||
|
||||
|
||||
|
@ -1337,30 +1319,248 @@ If the callback succeeds, a CONTENT_MODIFIED Provenance Event is emitted.
|
|||
|
||||
== Error Handling
|
||||
|
||||
When writing a Processor, there are several different unexpected cases that can occur.
|
||||
It is important that Processor developers understand the mechanics of how the NiFi framework
|
||||
behaves if Processors do not handle errors themselves, and it's important to understand
|
||||
what error handling is expected of Processors. Here, we will discuss how Processors should
|
||||
handle unexpected errors during the course of their work.
|
||||
|
||||
|
||||
=== Exceptions within the Processor
|
||||
|
||||
During the execution of the `onTrigger` method of a Processor, many things can potentially go
|
||||
awry. Common failure conditions include:
|
||||
|
||||
- Incoming data is not in the expected format.
|
||||
- Network connections to external services fail.
|
||||
- Reading or writing data to a disk fails.
|
||||
- There is a bug in the Processor or a dependent library.
|
||||
|
||||
Any of these conditions can result in an Exception being thrown from the Processor. From the framework
|
||||
perspective, there are two types of Exceptions that can escape a Processor: `ProcessException` and
|
||||
all others.
|
||||
|
||||
If a ProcessException is thrown from the Processor, the framework will assume that this is a failure that
|
||||
is a known outcome. Moreover, it is a condition where attempting to process the data again later may
|
||||
be successful. As a result, the framework will roll back the session that was being processed and penalize
|
||||
the FlowFiles that were being processed.
|
||||
|
||||
If any other Exception escapes the Processor, though, the framework will assume that it is a failure that
|
||||
was not taken into account by the developer. In this case, the framework will also roll back the session
|
||||
and penalize the FlowFiles. However, in this case, we can get into some very problematic cases. For example,
|
||||
the Processor may be in a bad state and may continually run, depleting system resources, without providing
|
||||
any useful work. This is fairly common, for instance, when a NullPointerException is thrown continually.
|
||||
In order to avoid this case, if an Exception other than ProcessException is able to escape the Processor's
|
||||
`onTrigger` method, the framework will also "Administratively Yield" the Processor. This means that the
|
||||
Processor will not be triggered to run again for some amount of time. The amount of time is configured
|
||||
in the `nifi.properties` file but is 10 seconds by default.
|
||||
|
||||
|
||||
=== Exceptions within a callback: IOException, RuntimeException
|
||||
|
||||
=== Exceptions within the Processor: ProcessException, others.
|
||||
More often than not, when an Exception occurs in a Processor, it occurs from within a callback (I.e.,
|
||||
`InputStreamCallback`, `OutputStreamCallback`, or `StreamCallback`). That is, during the processing of a
|
||||
FlowFile's content. Callbacks are allowed to throw either `RuntimeException` or `IOException`. In the case
|
||||
of RuntimeException, this Exception will propagate back to the `onTrigger` method. In the case of an
|
||||
`IOException`, the Exception will be wrapped within a ProcessException and this ProcessException will then
|
||||
be thrown from the Framework.
|
||||
|
||||
For this reason, it is recommended that Processors that use callbacks do so within a `try/catch` block
|
||||
and catch `ProcessException` as well as any other `RuntimeException` that they expect their callback to
|
||||
throw. It is *not* recommended that Processors catch the general `Exception` or `Throwable` cases, however.
|
||||
This is discouraged for two reasons.
|
||||
|
||||
First, if an unexpected RuntimeException is thrown, it is likely a bug
|
||||
and allowing the framework to rollback the session will ensure no data loss and ensures that DataFlow Managers
|
||||
are able to deal with the data as they see fit by keeping the data queued up in place.
|
||||
|
||||
Second, when an IOException is thrown from a callback, there really are two types of IOExceptions: those thrown
|
||||
from Processor code (for example, the data is not in the expected format or a network connection fails), and
|
||||
those that are thrown from the Content Repository (where the FlowFile content is stored). If the latter is the case,
|
||||
the framework will catch this IOException and wrap it into a `FlowFileAccessException`, which extends `RuntimeException`.
|
||||
This is done explicitly so that the Exception will escape the `onTrigger` method and the framework can handle this
|
||||
condition appropriately. Catching the general Exception prevents this from happening.
|
||||
|
||||
=== Catching ProcessException from callback
|
||||
|
||||
=== Penalization vs. Yielding
|
||||
|
||||
=== Session Rollback: with and within penalization
|
||||
When an issue occurs during processing, the framework exposes two methods to allow Processor developers to avoid performing
|
||||
unnecessary work: "penalization" and "yielding." These two concepts can become confusing for developers new to the NiFi API.
|
||||
A developer is able to penalize a FlowFile by calling the `penalize(FlowFile)` method of ProcessSession. This causes the
|
||||
FlowFile itself to be inaccessible to downstream Processors for a period of time. The amount of time that the FlowFile is
|
||||
inaccessible is determined by the DataFlow Manager by setting the "Penalty Duration" setting in the Processor Configuration
|
||||
dialog. The default value is 30 seconds. Typically, this is done when a Processor determines that the data cannot be processed
|
||||
due to environmental reasons that are expected to sort themselves out. A great example of this is the PutSFTP processor, which
|
||||
will penalize a FlowFile if a file already exists on the SFTP server that has the same filename. In this case, the Processor
|
||||
penalizes the FlowFile and routes it to failure. A DataFlow Manager can then route failure back to the same PutSFTP Processor.
|
||||
This way, if a file exists with the same filename, the Processor will not attempt to send the file again for 30 seconds
|
||||
(or whatever period the DFM has configured the Processor to use). In the meantime, it is able to continue to process other
|
||||
FlowFiles.
|
||||
|
||||
On the other hand, yielding allows a Processor developer to indicate to the framework that it will not be able to perform
|
||||
any useful function for some period of time. This commonly happens with a Processor that is communicating with a remote
|
||||
resource. If the Processor cannot connect to the remote resource, or if the remote resource is expected to provide data
|
||||
but reports that it has none, the Processor should call `yield` on the `ProcessContext` object and then return. By doing
|
||||
this, the Processor is telling the framework that it should not waste resources triggering this Processor to run, because
|
||||
there's nothing that it can do - it's better to use those resources to allow other Processors to run.
|
||||
|
||||
|
||||
=== Session Rollback
|
||||
|
||||
Thus far, when we have discussed the `ProcessSession`, we have typically referred to it simply as a mechanism for accessing
|
||||
FlowFiles. However, it provides another very important capability, which is transactionality. All methods that are called
|
||||
on a ProcessSession happen as a transaction. When we decided to end the transaction, we can do so either by calling
|
||||
`commit()` or by calling `rollback()`. Typically, this is handled by the `AbstractProcessor` class: if the `onTrigger` method
|
||||
throws an Exception, the AbstractProcessor will catch the Exception, call `session.rollback()`, and then re-throw the Exception.
|
||||
Otherwise, the AbstractProcessor will call `commit()` on the ProcessSession.
|
||||
|
||||
There are times, however, that developers will want to roll back a session explicitly. This can be accomplished at any time
|
||||
by calling the `rollback()` or `rollback(boolean)` method. If using the latter, the boolean indicates whether or not those
|
||||
FlowFiles that have been pulled from queues (via the ProcessSession `get` methods) should be penalized before being added
|
||||
back to their queues.
|
||||
|
||||
When `rollback` is called, any modification that has occurred to the FlowFiles in that session are discarded, to included
|
||||
both content modification and attribute modification. Additionally, all Provenance Events are rolled back (with the exception
|
||||
of any SEND event that was emitted by passing a value of `true` for the `force` argument). The FlowFiles that were pulled from
|
||||
the input queues are then transferred back to the input queues (and optionally penalized) so that they can be processed again.
|
||||
|
||||
On the other hand, when the `commit` method is called, the FlowFile's new state is persisted in the FlowFile Repository, and
|
||||
any Provenance Events that occurred are persisted in the Provenance Repository. The previous content is destroyed (unless
|
||||
another FlowFile references the same piece of content), and the FlowFiles are transferred to the outbound queues so that the
|
||||
next Processors can operate on the data.
|
||||
|
||||
It is also important to note how this behavior is affected by using the `org.apache.nifi.annotations.behavior.SupportsBatching`
|
||||
annotation. If a Processor utilizes this annotation, calls to `ProcessSession.commit` may not take affect immediately. Rather,
|
||||
these commits may be batched together in order to provide higher throughput. However, if at any point, the Processor rolls back
|
||||
the ProcessSession, all changes since the last call to `commit` will be discarded and all "batched" commits will take affect.
|
||||
These "batched" commits are not rolled back.
|
||||
|
||||
=== Administrative Yielding
|
||||
|
||||
|
||||
|
||||
== General Design Considerations
|
||||
|
||||
=== Cohesion / Reusability : Do one thing and do it well!
|
||||
When designing a Processor, there are a few important design considering to keep in mind. This section of the Developer Guide
|
||||
brings to the forefront some of the ideas that a developer should be thinking about when creating a Processor.
|
||||
|
||||
=== Consider the User
|
||||
|
||||
One of the most important concepts to keep in mind when developing a Processor (or any other component) is the user
|
||||
experience that you are creating. It's important to remember that as the developer of such a component, you may have
|
||||
important knowledge about the context that others do not have. Documentation should always be supplied so that those
|
||||
less familiar with the process are able to use it with ease.
|
||||
|
||||
When thinking about the user experience, it is also important to note that consistency is very important. It is best
|
||||
to stick with the standard <<naming-convensions>>. This is true for Processor names, Property names and value, Relationship
|
||||
names, and any other aspect that the user will experience.
|
||||
|
||||
Simplicity is crucial! Avoid adding properties that you don't expect users to understand or change. As developers, we are
|
||||
told that hard-coding values is bad. But this sometimes results in developers exposing properties that, when asked for clarification,
|
||||
tell users to just leave the default value. This leads to confusion and complexity.
|
||||
|
||||
|
||||
=== Cohesion and Reusability
|
||||
|
||||
For the sake of making a single, cohesive unit, developers are sometimes tempted to combine several functions into a single Processor.
|
||||
This is very true for the case when a Processor expects input data to be in format X so that the Processor can convert the data into
|
||||
format Y and send the newly-formatted data to some external service.
|
||||
|
||||
Taking this approach of formatting the data for a particular endpoint and then sending the data to that endpoint within the same Processor
|
||||
has several drawbacks:
|
||||
|
||||
- The Processor becomes very complex, as it has to perform the data translation task as well as the task of
|
||||
sending the data to the remote service.
|
||||
- If the Processor is unable to communicate with the remote service, it will route the data to a `failure` Relationship. In this case,
|
||||
the Processor will be responsible to perform the data translation again. And if it fails again, the translation is done yet again.
|
||||
- If we have five different Processors that translate the incoming data into this new format before sending the data, we have a great
|
||||
deal of duplicated code. If the schema changes, for instance, many Processors must be updated.
|
||||
- This intermediate data is thrown away when the Processor finishes sending to the remote service. The intermediate data format
|
||||
may well be useful to other Processors.
|
||||
|
||||
In order to avoid these issues, and make Processors more reusable, a Processor should always stick to the principal of "do one thing and do
|
||||
it well." Such a Processor should be broken into two separate Processors: one to convert the data from Format X to Format Y, and another
|
||||
Processor to send data to the remote resource.
|
||||
|
||||
|
||||
[[naming-convensions]]
|
||||
=== Naming Conventions
|
||||
|
||||
In order to deliver a consistent look and feel to users, it is advisable that Processors keep with standard naming conventions. The following
|
||||
is a list of standard conventions that are used:
|
||||
|
||||
- Processors that pull data from a remote system are named Get<Service> or Get<Protocol>, depending on if they poll data from arbitrary
|
||||
sources over a known Protocol (such as GetHTTP or GetFTP) or if they pull data from a known service (such as GetKafka)
|
||||
- Processors that push data to a remote system are named Put<Service> or Put<Protocol>.
|
||||
- Relationship names are lower-cased and use spaces to delineated words.
|
||||
- Property names capitalize significant words, as would be done with the title of a book.
|
||||
|
||||
|
||||
|
||||
=== Processor Behavior Annotations
|
||||
|
||||
When creating a Processor, the developer is able to provide hints to the framework about how to utilize the Processor most
|
||||
effectively. This is done by applying annotations to the Processor's class. The annotations that can be applied to a
|
||||
Processor exist in three sub-packages of `org.apache.nifi.annotations`. Those in the `documentation` sub-package are used
|
||||
to provide documentation to the user. Those in the `lifecycle` sub-package instruct the framework which methods should
|
||||
be called on the Processor in order to respond to the appropriate life-cycle events. Those in the `behavior` package
|
||||
help the framework understand how to interact with the Processor in terms of scheduling and general behavior.
|
||||
|
||||
The following annotations from the `org.apache.nifi.annotations.behavior` package can be used to modify how the framework
|
||||
will handle your Processor:
|
||||
|
||||
- `EventDriven`: Instructs the framework that the Processor can be scheduled using the Event-Driven scheduling
|
||||
strategy. This strategy is still experimental at this point, but can result in reduced resource utilization
|
||||
on dataflows that do not handle extremely high data rates.
|
||||
|
||||
- `SideEffectFree`: Indicates that the Processor does not have any side effects external to NiFi. As a result, the
|
||||
framework is free to invoke the Processor many times with the same input without causing any unexpected
|
||||
results to occur. This implies idempotent behavior. This can be used by the framework to improve efficiency by
|
||||
performing actions such as transferring a ProcessSession from one Processor to another, such that if
|
||||
a problem occurs many Processors' actions can be rolled back and performed again.
|
||||
|
||||
- `SupportsBatching`: This annotation indicates that it is okay for the framework to batch together multiple
|
||||
ProcessSession commits into a single commit. If this annotation is present, the user will be able to choose
|
||||
whether they prefer high throughput or lower latency in the Processor's Scheduling tab. This annotation should
|
||||
be applied to most Processors, but it comes with a caveat: if the Processor calls `ProcessSession.commit`,
|
||||
there is no guarantee that the data has been safely stored in NiFi's Content, FlowFile, and Provenance Repositories.
|
||||
As a result, it is not appropriate for those Processors that receive data from an external source, commit the session,
|
||||
and then delete the remote data or confirm a transaction with a remote resource.
|
||||
|
||||
- `TriggerSerially`: When this annotation is present, the framework will not allow the user to schedule more than one
|
||||
concurrent thread to execute the `onTrigger` method at a time. Instead, the number of thread ("Concurrent Tasks")
|
||||
will always be set to `1`. This does *not*, however, mean that the Processor does not have to be thread-safe,
|
||||
as the thread that is executing `onTrigger` may change between invocations.
|
||||
|
||||
- `TriggerWhenAnyDestinationAvailable`: By default, NiFi will not schedule a Processor to run if any of its outbound
|
||||
queues is full. This allows back-pressure to be applied all the way a chain of Processors. However, some Processors
|
||||
may need to run even if one of the outbound queues is full. This annotations indicates that the Processor should run
|
||||
if any Relationship is "available." A Relationship is said to be "available" if none of the connections that use
|
||||
that Relationship is full. For example, the DistributeLoad Processor makes use of this annotation. If the "round robin"
|
||||
scheduling strategy is used, the Processor will not run if any outbound queue is full. However, if the "next available"
|
||||
scheduling strategy is used, the Processor will run if any Relationship at all is available and will route FlowFiles
|
||||
only to those relationships that are available.
|
||||
|
||||
- `TriggerWhenEmpty`: The default behavior is to trigger a Processor to run only if its input queue has at least one
|
||||
FlowFile or if the Processor has no input queues (which is typical of a "source" Processor). Applying this annotation
|
||||
will cause the framework to ignore the size of the input queues and trigger the Processor regardless of whether or
|
||||
not there is any data on an input queue. This is useful, for example, if the Processor needs to be triggered to run
|
||||
periodically to time out a network connection.
|
||||
|
||||
|
||||
=== Data Buffering
|
||||
|
||||
An important point to keep in mind is that NiFi provides a generic data processing capability. Data can be in any format. Processors
|
||||
are generally scheduled with several threads. A common mistake that developers new to NiFi make is to buffer all the contents of a
|
||||
FlowFile in memory. While there are cases when this is required, it should be avoided if at all possible, unless it is well-known
|
||||
what format the data is in. For example, a Processor responsible for executing XPath against an XML document will need to load the
|
||||
entire contents of the data into memory. This is generally acceptable, as XML is not expected to be extremely large. However, a Processor
|
||||
that searches for a specific byte sequence may be used to search files that are hundreds of gigabytes or more. Attempting to load this
|
||||
into memory can cause a lot of problems - especially if multiple threads are processing different FlowFiles simultaneously.
|
||||
|
||||
Instead of buffering this data into memory, it is advisable to instead evaluate the data as it is streamed from the Content Repository
|
||||
(i.e., scan the content from the `InputStream` that is provided to your callback by `ProcessSession.read`). Of course, in this case,
|
||||
we don't want to read from the Content Repository for each byte, so we would use a BufferedInputStream or somehow buffer some small
|
||||
amount of data, as appropriate.
|
||||
|
||||
|
||||
|
||||
|
@ -1618,26 +1818,104 @@ Processor is valid or not, according to our expectations.
|
|||
|
||||
=== Enqueue FlowFiles
|
||||
|
||||
Before triggering a Processor to run, it is usually necessary to enqueue FlowFiles for the Processor to process.
|
||||
This can be achieved by using the `enqueue` methods of the `TestRunner` class. The `enqueue` method has several
|
||||
different overrides, and allows data to be added in the form of a `byte[]`, `InputStream`, or `Path`. Each of these
|
||||
methods also supports a variation that allows a `Map<String, String>` to be added to support FlowFile attributes.
|
||||
|
||||
Additionally, there is an `enqueue` method that takes a var-args of FlowFile objects. This can be useful, for example,
|
||||
to obtain the output of a Processor and then feed this to the input of the Processor.
|
||||
|
||||
|
||||
=== Run the Processor
|
||||
|
||||
After configuring the Controller Services and enqueuing the necessary FlowFiles, the Processor can be triggered
|
||||
to run by calling the `run` method of `TestRunner`. If this method is called without any arguments, it will
|
||||
invoke any method in the Processor with an `@OnScheduled` annotation, call the Processor's `onTrigger` method once,
|
||||
and then run the `@OnUnscheduled` and finally `@OnStopped` methods.
|
||||
|
||||
If it is desirable to run several iterations of the `onTrigger` method before the other `@OnUnscheduled` and
|
||||
`@OnStopped` life-cycle events are triggered, the `run(int)` method can be used to specify now many iterations
|
||||
of `onTrigger` should be called.
|
||||
|
||||
There are times when we want to trigger the Processor to run but not trigger the `@OnUnscheduled` and `@OnStopped`
|
||||
life-cycle events. This is useful, for instance, to inspect the Processor's state before these events occur. This
|
||||
can be achieved using the `run(int, boolean)` and passing `false` as the second argument. After doing this, though,
|
||||
calling the `@OnScheduled` life-cycle methods could cause an issue. As a result, we can now run `onTrigger` again
|
||||
without causing these events to occur by using the `run(int,boolean,boolean)` version of the `run` method and
|
||||
passing `false` as the third argument.
|
||||
|
||||
If it is useful to test behavior that occurs with multiple threads, this can also be achieved by calling the
|
||||
`setThreadCount` method of `TestRunner`. The default is 1 thread. If using multiple threads, it is important
|
||||
to remember that the `run` call of `TestRunner` specifies how many times the Processor should be triggered, not
|
||||
the number of times that the Processor should be triggered per thread. So, if the thread count is set to 2 but
|
||||
`run(1)` is called, only a single thread will be used.
|
||||
|
||||
|
||||
=== Validate Output Counts
|
||||
=== Validate Output
|
||||
|
||||
After a Processor has finished running, a unit test will generally want to validate that the FlowFiles went where
|
||||
they were expected to go. This can be achieved using the `TestRunners` `assertAllFlowFilesTransferred` and
|
||||
`assertTransferCount` methods. The former method takes as arguments a Relationship and an integer to dictate how many
|
||||
FlowFiles should have been transferred to that Relationship. The method will fail the unit test unless this number of
|
||||
FlowFiles were transferred to the given Relationship *or* if any FlowFile was transferred to any other Relationship.
|
||||
The `assertTransferCount` method validates only that the FlowFile count was the expected number for the given Relationship.
|
||||
|
||||
After validating the counts, we can then obtain the actual output FlowFiles via the `getFlowFilesForRelationship` method.
|
||||
This method returns a `List<MockFlowFile>`. It's important to note that the type of the List is `MockFlowFile`, rather
|
||||
than the `FlowFile` interface. This is done because `MockFlowFile` comes with many methods for validating the contents.
|
||||
|
||||
For example, `MockFlowFile` has methods for asserting that FlowFile Attributes exist (`assertAttributeExists`), asserting
|
||||
that other attributes are not present (`assertAttributeNotExists`), or that Attributes have the correct value
|
||||
(`assertAttributeEquals`, `assertAttributeNotEquals`). Similar methods exist for verifying the contents of the FlowFile.
|
||||
The contents of a FlowFile can be compared to a `byte[]`, and `InputStream`, a file, or a String. If the data is expected
|
||||
to be textual, the String version is preferred, as it provides a more intuitive error message if the output is not
|
||||
as expected.
|
||||
|
||||
|
||||
|
||||
=== Validate Output Results
|
||||
=== Mocking External Resources
|
||||
|
||||
One of the biggest problems when testing a NiFi processor that connects to a remote resource is that we don't want to
|
||||
actually connect to some remote resource from a unit test. We can stand up a simple server ourselves in the unit test
|
||||
and configure the Processor to communicate with it, but then we have to understand and implement the server-specific
|
||||
specification and may not be able to properly send back error messages, etc. that we would like for testing.
|
||||
|
||||
Generally, the approach taken here is to have a method in the Processor that is responsible for obtaining a connection
|
||||
or a client to a remote resource. We generally mark this method as protected. In the unit test, instead of creating
|
||||
the `TestRunner` by calling `TestRunners.newTestRunner(Class)` and providing the Processor class, we instead create
|
||||
a subclass of the Processor in our unit test and use this:
|
||||
|
||||
=== Mocking Connections
|
||||
[source,java]
|
||||
----
|
||||
@Test
|
||||
public void testConnectionFailure() {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new MyProcessor() {
|
||||
protected Client getClient() {
|
||||
// Return a mocked out client here.
|
||||
return new Client() {
|
||||
public void connect() throws IOException {
|
||||
throw new IOException();
|
||||
}
|
||||
|
||||
// ...
|
||||
// other client methods
|
||||
// ...
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
// rest of unit test.
|
||||
}
|
||||
----
|
||||
|
||||
This allows us to implement a Client that mocks out all of the network communications and returns the different
|
||||
error results that we want to test, as well as ensure that our logic is correct for handling successful calls
|
||||
to the client.
|
||||
|
||||
|
||||
=== Additional Testing Capabilities
|
||||
|
||||
In addition to the above-mentioned capabilities provided by the
|
||||
testing framework, the TestRunner provides several
|
||||
convenience methods for verifying the behavior of a Processor. Methods
|
||||
|
@ -1678,10 +1956,28 @@ their NiFi components as NAR packages.
|
|||
To achieve this, a developer creates a new Maven Artifact, which we
|
||||
refer to as the NAR artifact. The packaging is
|
||||
set to `nar`. The `dependencies` section of the POM is then created so
|
||||
that the NAR has a dependency on all NiFi
|
||||
Components that are to be included within the NAR.
|
||||
that the NAR has a dependency on all NiFi Components that are to be included within the NAR.
|
||||
|
||||
TODO: DISCUSS HOW TO INCLUDE THE NAR MAVEN PLUGIN.
|
||||
In order to use a packaging of `nar`, we must use the `nifi-nar-maven-plugin` module.
|
||||
This is included by adding the following snippet to the NAR's pom.xml:
|
||||
|
||||
[source,xml]
|
||||
----
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-nar-maven-plugin</artifactId>
|
||||
<version>1.0.0-incubating</version>
|
||||
<extensions>true</extensions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
----
|
||||
|
||||
In the Apache NiFi codebase, this exists in the NiFi root POM from which all other NiFi artifacts
|
||||
(with the exception of the nifi-nar-maven-plugin itself) inherit, so that we do not need to include
|
||||
this in any of our other POM files.
|
||||
|
||||
The NAR is able to have one dependency that is of type `nar`. If more
|
||||
than one dependency is specified that is of type
|
||||
|
@ -1789,16 +2085,41 @@ API artifacts into the same NAR is often acceptable.
|
|||
|
||||
|
||||
|
||||
|
||||
|
||||
== Consider the User Experience
|
||||
|
||||
The user...
|
||||
|
||||
|
||||
|
||||
|
||||
== How to contribute to Apache NiFi
|
||||
|
||||
Git, Maven, ASF processes, NiFi processes, ...
|
||||
We are always excited to have contributions from the community - especially from new contributors!
|
||||
We are interested in accepting contributions of code, as well as documentation and even artwork that
|
||||
can be applied as icons or styling to the application.
|
||||
|
||||
|
||||
=== Technologies
|
||||
|
||||
The back end of Apache NiFi is written in Java. The web tier makes use of JAX-RS and JavaScript is extensively
|
||||
used to provide a user interface. We depend on several third-party JavaScript libraries, including D3 and JQuery,
|
||||
among others. We make use of Apache Maven for our builds and Git for our version control system.
|
||||
|
||||
|
||||
=== Where to Start?
|
||||
|
||||
link:http://issues.apache.org/jira/browse/NIFI[NiFi's Jira page] can be used to find tickets that are tagged as "beginner",
|
||||
or you can dig into any of the tickets for creating Processors. Processors should be self-contained and not rely on other
|
||||
outside components (except for Controller Services), so they make for excellent starting points for new NiFi developers to
|
||||
get started. This exposes the developer to the NiFi API and is the most extensible part of the dataflow system.
|
||||
|
||||
|
||||
=== Supplying a contribution
|
||||
|
||||
Contributions can be provided either by creating a patch:
|
||||
|
||||
`git format-patch`
|
||||
|
||||
and attaching that patch to a ticket, or by generating a Pull Request.
|
||||
|
||||
|
||||
=== Contact Us
|
||||
|
||||
The developer mailing list (dev@nifi.incubator.apache.org) is monitored pretty closely, and we tend to respond pretty
|
||||
quickly. If you have a question, don't hesitate to shoot us an e-mail - we're here to help! Unfortunately, though, e-mails
|
||||
can get lost in the shuffle, so if you do send an e-mail and don't get a response within a day or two, it's our fault - don't
|
||||
worry about bothering us. Just ping the mailing list again.
|
||||
|
||||
|
|
Loading…
Reference in New Issue