Merge branch 'develop' into nifi-site-to-site-client

This commit is contained in:
Mark Payne 2015-02-22 11:18:26 -05:00
commit 5291084dc0
84 changed files with 1070 additions and 307 deletions

View File

@ -14,7 +14,7 @@
-->
# Apache NiFi
Apache NiFi is a dataflow system based on the concepts of flow-based programming. It is currently apart of the Apache Incubator.
Apache NiFi is an easy to use, powerful, and reliable system to process and distribute data.
## Table of Contents
@ -28,6 +28,10 @@ Apache NiFi is a dataflow system based on the concepts of flow-based programming
follow the directions found there.
- Build nifi. Change directory to 'nifi' and follow the directions found there.
## Documentation
See http://nifi.incubator.apache.org/ for the latest documentation.
## License
Except as otherwise noted this software is licensed under the

View File

@ -28,6 +28,7 @@
<a href="#">Documentation</a>
<ul class="dropdown">
<li><a href="faq.html">FAQ</a></li>
<li><a href="screencasts.html">Screencasts</a></li>
<li><a href="overview.html">NiFi Overview</a></li>
<li><a href="user-guide.html">User Guide</a></li>
<li><a href="developer-guide.html">Developer Guide</a></li>

View File

@ -11,5 +11,40 @@ title: Apache NiFi Screencasts
<div class="medium-space"></div>
<div class="row">
<div class="large-12 columns">
<a href="#" data-reveal-id="toolbar-overview">NiFi Toolbar Overview</a>
<div id="toolbar-overview" class="reveal-modal medium" data-reveal>
<h2>NiFi Toolbar Overview</h2>
<div class="flex-video widescreen" style="display: block;">
<iframe width="560" height="315" src="https://www.youtube.com/embed/LGXRAVUzL4U" frameborder="0" allowfullscreen></iframe>
</div>
<a class="close-reveal-modal">&#215;</a>
</div>
<br/>
<a href="#" data-reveal-id="creating-process-groups">Creating Process Groups</a>
<div id="creating-process-groups" class="reveal-modal medium" data-reveal>
<h2>Creating Process Groups</h2>
<div class="flex-video widescreen" style="display: block;">
<iframe width="560" height="315" src="https://www.youtube.com/embed/hAveiDgDj-8" frameborder="0" allowfullscreen></iframe>
</div>
<a class="close-reveal-modal">&#215;</a>
</div>
<br/>
<a href="#" data-reveal-id="creating-templates">Creating Templates</a>
<div id="creating-templates" class="reveal-modal medium" data-reveal>
<h2>Creating Templates</h2>
<div class="flex-video widescreen" style="display: block;">
<iframe width="560" height="315" src="https://www.youtube.com/embed/PpmL-IMoCnU" frameborder="0" allowfullscreen></iframe>
</div>
<a class="close-reveal-modal">&#215;</a>
</div>
<br/>
<a href="#" data-reveal-id="managing-templates">Managing Templates</a>
<div id="managing-templates" class="reveal-modal medium" data-reveal>
<h2>Managing Templates</h2>
<div class="flex-video widescreen" style="display: block;">
<iframe width="560" height="315" src="https://www.youtube.com/embed/HU5_3PlNmtQ" frameborder="0" allowfullscreen></iframe>
</div>
<a class="close-reveal-modal">&#215;</a>
</div>
</div>
</div>

View File

@ -14,7 +14,7 @@
-->
# Apache NiFi
Apache NiFi is an easy to use, powerful, and reliable system to process and distribute data. It is currently apart of the Apache Incubator.
Apache NiFi is an easy to use, powerful, and reliable system to process and distribute data.
## Table of Contents

View File

@ -21,6 +21,5 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-api</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<packaging>jar</packaging>
</project>

View File

@ -21,7 +21,6 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-assembly</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<packaging>pom</packaging>
<description>This is the assembly Apache NiFi (incubating)</description>
<build>

View File

@ -20,7 +20,6 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-data-provenance-utils</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<packaging>jar</packaging>
<dependencies>
<dependency>

View File

@ -20,7 +20,6 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-expression-language</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<packaging>jar</packaging>
<build>
<plugins>

View File

@ -20,7 +20,6 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-flowfile-packager</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<packaging>jar</packaging>
<dependencies>
<dependency>

View File

@ -21,7 +21,6 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-logging-utils</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<description>Utilities for logging</description>
<dependencies>
<dependency>

View File

@ -21,7 +21,6 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-processor-utils</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<packaging>jar</packaging>
<dependencies>
<dependency>

View File

@ -21,5 +21,4 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-properties</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
</project>

View File

@ -20,7 +20,6 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-security-utils</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<description>Contains security functionality.</description>
<dependencies>
<dependency>

View File

@ -21,7 +21,6 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-socket-utils</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<description>Utilities for socket communication</description>
<dependencies>
<dependency>

View File

@ -21,7 +21,6 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-web-utils</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>

View File

@ -21,7 +21,6 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-write-ahead-log</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<packaging>jar</packaging>
<dependencies>
<dependency>

View File

@ -21,21 +21,45 @@ Apache NiFi Team <dev@nifi.incubator.apache.org>
How to install and start NiFi
-----------------------------
NOTE: This section is incomplete.
* Linux
** Decompress and untar into desired installation directory.
* Linux/Unix/OSX
** Decompress and untar into desired installation directory
** Make any desired edits in files found under <installdir>/conf
** Execute the following commands
*** At a minimum, we recommend editing the _nifi.properties_ file and entering a password for the nifi.sensitive.props.key (see <<system_properties>> below)
** From the <installdir>/bin directory, execute the following commands by typing ./nifi.sh <command>:
*** start: starts NiFi in the background
*** stop: stops NiFi that is running in the background
*** status: provides the current status of NiFi
*** run: runs NiFi in the foreground and waits for a Ctl-C to initiate shutdown of NiFi
*** intall: installs NiFi as a service that can then be controlled via
*** run: runs NiFi in the foreground and waits for a Ctrl-C to initiate shutdown of NiFi
*** install: installs NiFi as a service that can then be controlled via
**** service nifi start
**** service nifi stop
**** service nifi status
* Windows
** Decompress into the desired installation directory
** Make any desired edits in the files found under <installdir>/conf
*** At a minimum, we recommend editing the _nifi.properties_ file and entering a password for the nifi.sensitive.props.key (see <<system_properties>> below)
** Navigate to the <installdir>/bin directory
** Double-click run-nifi.bat. This runs NiFi in the foreground and waits for a Ctrl-C to initiate shutdown of NiFi
** Alternatively, to start NiFi in the background, double-click start-nifi.bat
** To stop NiFi running in the background, double-click stop-nifi.bat
** To see the current status of NiFi, double-click status-nifi.bat
When NiFi first starts up, the following files and directories are created:
* content_repository
* database_repository
* flowfile_repository
* provenance_repository
* work directory
* logs directory
* Within the conf directory, the _flow.xml.gz_ file and the templates directory are created
See the <<system_properties>> section of this guide for more information about configuring NiFi repositories and configuration files.
Best Practice Configuration
---------------------------
NOTE: Typical Linux defaults are not necessarily well tuned for the needs of an IO intensive application like
@ -76,14 +100,14 @@ it but to adjust do something like
sudo sysctl -w net.ipv4.netfilter.ip_conntrack_tcp_timeout_time_wait="1"
----
Tell linux you never want NiFi to swap::
Tell Linux you never want NiFi to swap::
Swapping is fantastic for some applications. It isn't good for something like
NiFi that always wants to be running. To tell linux you'd like swapping off you
NiFi that always wants to be running. To tell Linux you'd like swapping off you
can edit '/etc/sysctl.conf' to add the following line
----
vm.swappiness = 0
----
For the partions handling the various NiFi repos turn off things like 'atime'.
For the partitions handling the various NiFi repos turn off things like 'atime'.
Doing so can cause a surprising bump in throughput. Edit the '/etc/fstab' file
and for the partition(s) of interest add the 'noatime' option.
@ -94,15 +118,14 @@ NOTE: This section is incomplete.
Controlling Levels of Access
----------------------------
NOTE: This section is incomplete.
Once NiFi is configured to run securely as discussed in the previous section, it is necessary
to manually designate an ADMIN user in the authorized-users.xml file, which is located in the
to manually designate an ADMIN user in the _authorized-users.xml_ file, which is located in the
root installation's conf directory. After this ADMIN user has been added, s/he may grant access
to other users, systems, and other instances of NiFi, through the User Interface (UI) without having to manually edit the authorized-users.xml
to other users, systems, and other instances of NiFi, through the User Interface (UI) without having to manually edit the _authorized-users.xml_
file. If you are the administrator, you would add yourself as the ADMIN user in this file.
Open the authorized-users.xml file in a text editor. You will notice that it includes a template
Open the _authorized-users.xml_ file in a text editor. You will notice that it includes a template
to guide you, with example entries that are commented out.
It is only necessary to manually add one user, the ADMIN user,
@ -128,9 +151,9 @@ Here is an example entry using the name John Smith:
</users>
----
After the authorized-users.xml file has been edited and saved, restart NiFi.
After the _authorized-users.xml_ file has been edited and saved, restart NiFi.
Once the application starts, the ADMIN user is
able to access the UI at the https URL that is configured in the nifi.properties file.
able to access the UI at the https URL that is configured in the _nifi.properties_ file.
From the UI, click on the Users icon ( image:iconUsers.png["Users", width=32] ) in the
Management Toolbar (upper-right corner of the UI), and the User Management Page opens.
@ -154,6 +177,88 @@ in the remote cluster can be included in the same group. When the ADMIN wants to
cluster, s/he can grant it to the group and avoid having to grant it individually to each node in the cluster.
[[clustering]]
Clustering Configuration
------------------------
This section provides a quick overview of NiFi Clustering and instructions on how to set up a basic cluster. In the future, we hope to provide supplemental documentation that covers the NiFi Cluster Architecture in depth.
The design of NiFi clustering is a simple master/slave model where there is a master and one or more slaves. While the model is that of master and slave, if the master dies, the slaves are all instructed to continue operating as they were to ensure the dataflow remains live. The absence of the master simply means new slaves cannot come on-line and flow changes cannot occur until the master is restored. In NiFi clustering, we call the master the NiFi Cluster Manager (NCM), and the slaves are called Nodes. See a full description of each in the Terminology section below.
*Why Cluster?* +
NiFi Administrators or Dataflow Managers (DFMs) may find that using one instance of NiFi on a single server is not enough to process the amount of data they have. So, one solution is to run the same dataflow on multiple NiFi servers. However, this creates a management problem, because each time DFMs want to change or update the dataflow, they must make those changes on each server and then monitor each server individually. By clustering the NiFi servers, it's possible to have that increased processing capability along with a single interface through which to make dataflow changes and monitor the dataflow. Clustering allows the DFM to make each change only once, and that change is then replicated to all the nodes of the cluster. Through the single interface, the DFM may also monitor the health and status of all the nodes.
NiFi Clustering is unique and has its own terminology. It's important to understand the following terms before setting up a cluster.
[template="glossary", id="terminology"]
*Terminology* +
*NiFi Cluster Manager*: A NiFi Cluster Manager (NCM) is an instance of NiFi that provides the sole management point for the cluster. It communicates dataflow changes to the nodes and receives health and status information from the nodes. It also ensures that a uniform dataflow is maintained across the cluster. When DFMs manage a dataflow in a cluster, they do so through the User Interface of the NCM (i.e., via the URL of the NCM's User Interface). Fundamentally, the NCM keeps the state of the cluster consistent.
*Nodes*: Each cluster is made up of the NCM and one or more nodes. The nodes do the actual data processing. (The NCM does not process any data; all data runs through the nodes.) While nodes are connected to a cluster, the DFM may not access the User Interface for any of the individual nodes. The User Interface of a node may only be accessed if the node is manually removed from the cluster.
*Primary Node*: Every cluster has one Primary Node. On this node, it is possible to run "Isolated Processors" (see below). By default, the NCM will elect the first node that connects to the cluster as the Primary Node; however, the DFM may select a new node as the Primary Node in the Cluster Management page of the User Interface if desired. If the cluster restarts, the NCM will "remember" which node was he Primary Node and wait for that node to re-connect before allowing the DFM to make any changes to the dataflow. The ADMIN may adjust how long the NCM waits for the Primary Node to reconnect by adjusting the property _nifi.cluster.manager.safemode.duration_ in the _nifi.properties_ file, which is discussed in the <<system_properties>> section of this document.
*Isolated Processors*: In a NiFi cluster, the same dataflow runs on all the nodes. As a result, every component in the flow runs on every node. However, there may be cases when the DFM would not want every processor to run on every node. The most common case is when using a processor like the GetSFTP processor, which is pulling from a remote directory. If the GetSFTP on every node tries simultaneously to pull from the same remote directory, there could be race conditions. Therefore, the DFM could configure the GetSFTP on the Primary Node to run in isolation, meaning that it only runs on that node. It could pull in data and -with the proper dataflow configuration- load-balance it across the rest of the nodes in the cluster. Note that while this feature exists, it is also very common to simply use a standalone NiFi instance to pull data and feed it to the cluster. It just depends on the resources available and how the Administrator decides to configure the cluster.
*Heartbeats*: The nodes communicate their health and status to the NCM via "heartbeats", which let the NCM know they are still connected to the cluster and working properly. By default, the nodes emit heartbeats to the NCM every 5 seconds, and if the NCM does not receive a heartbeat from a node within 45 seconds, it disconnects the node due to "lack of heartbeat". (The 5-second and 45-second settings are configurable in the _nifi.properties_ file. See the <<system_properties>> section of this document for more information.) The reason that the NCM disconnects the node is because the NCM needs to ensure that every node in the cluster is in sync, and if a node is not heard from regularly, the NCM cannot be sure it is still in sync with the rest of the cluster. If, after 45 seconds, the node does send a new heartbeat, the NCM will automatically reconnect the node to the cluster. Both the disconnection due to lack of heartbeat and the reconnection once a heartbeat is received are reported to the DFM in the NCM's User Interface.
*Communication within the Cluster* +
As noted, the nodes communicate with the NCM via heartbeats. The NCM-to-node communication may be set up as multicast or unicast, depending on the properties that are configured in the _nifi.properties_ file (See <<system_properties>> ). By default, unicast is used. It is important to note that the nodes in a NiFi cluster are not aware of each other. They only communicate with the NCM. Therefore, if one of the nodes goes down, the other nodes in the cluster will not automatically pick up the load of the missing node. It is possible for the DFM to configure the dataflow for failover contingencies; however, this is dependent on the dataflow design and does not happen automatically.
When the DFM makes changes to the dataflow, the NCM communicates those changes to the nodes and waits for each node to respond, indicating that it has made the change on its local flow. If the DFM wants to make another change, the NCM will only allow this to happen once all the nodes have acknowledged that they've implemented the last change. As such, the speed with which dataflow changes may be made is as fast as the slowest node. When all nodes are located in close proximity and the network is stable, this response time is not an issue. However, if your cluster is comprised of nodes that are geographically dispersed and/or operating over a latent network, there may be times when DFMs cannot make changes as quickly as they would like. Keep this in mind when setting up a cluster.
*Dealing with Disconnected Nodes* +
A DFM may manually disconnect a node from the cluster. But if a node becomes disconnected for any other reason (such as due to lack of heartbeat), the NCM will show a bulletin on the User Interface, and the DFM will not be able to make any changes to the dataflow until the issue of the disconnected node is resolved. The DFM or the Administrator will need to troubleshoot the issue with the node and resolve it before any new changes may be made to the dataflow. However, it is worth noting that just because a node is disconnected does not mean that it is definitely down; it just means that the NCM cannot communicate with the node.
*Basic Cluster Setup* +
This section describes the setup for a simple two-node, non-secure, unicast cluster comprised of three instances of NiFi:
* The NCM
* Node 1
* Node 2
Administrators may install each instance on a separate server; however, it is also perfectly fine to install the NCM and one of the nodes on the same server, as the NCM is very lightweight. Just keep in mind that the ports assigned to each instance must not collide if the NCM and one of the nodes share the same server.
For each instance, the clustering properties in the _nifi.properties_ file will need to be updated. All the clustering properties are described in the <<system_properties>> section of this guide; however, in this section, we will focus on the minimum properties that must be set for simple cluster.
For all three instances, the Cluster Common Properties can be left with the default settings. Note, however, that if you change these settings, they must be set the same on every instance in the cluster (NCM and nodes).
For the NCM, the minimum properties to configure are as follows:
* Under the Web Properties, set either the http or https port that you want the NCM to run on. If the NCM and one of the nodes are on the same server, make sure this port is different from the web port used by the node.
* Under the Cluster Manager Properties, set the following:
** nifi.cluster.is.manager - Set this to _true_.
** nifi.cluster.protocol.manager.port - Set this to an open port that is higher than 1024 (anything lower requires root). Take note of this setting, as you will need to reference it when you set up the nodes.
For Node 1, the minimum properties to configure are as follows:
* Under the Web Properties, set either the http or https port that you want Node 1 to run on. If the NCM is running on the same server, choose a different web port for Node 1.
* Under Cluster Node Properties, set the following:
** nifi.cluster.is.node - Set this to _true_.
** nifi.cluster.node.protocol.port - Set this to an open port that is higher than 1024 (anything lower requires root). If Node 1 and the NCM are on the same server, make sure this port is different from the nifi.cluster.protocol.manager.port.
** nifi.cluster.node.unicast.manager.protocol.port - Set this to exactly the same port that was set on the NCM for the property nifi.cluster.manager.protocol.port.
For Node 2, the minimum properties to configure are as follows:
* Under the Web Properties, set either the http or https port that you want Node 2 to run on.
* Under the Cluster Node Properties, set the following:
** nifi.cluster.is.node - Set this to _true_.
** nifi.cluster.node.protocol.port - Set this to an open port that is higher than 1024 (anything lower requires root).
** nifi.cluster.node.unicast.manager.protocol.port - Set this to exactly the same port that was set on the NCM for the property nifi.cluster.manager.protocol.port.
Now, it is possible to start up the cluster. Technically, it does not matter which instance starts up first. However, you could start the NCM first, then Node 1 and then Node 2. Since the first node that connects is automatically elected as the Primary Node, this sequence should create a cluster where Node 1 is the Primary Node. Navigate to the URL for the NCM in your web browser, and the User Interface should look similar to the following:
image:ncm.png["NCM User Interface", width=940]
[[system_properties]]
System Properties
-----------------
The _nifi.properties_ file in the conf directory is the main configuration file for controlling how NiFi runs. This section provides an overview of the properties in this file and includes some notes on how to configure it in a way that will make upgrading easier. *After making changes to this file, restart NiFi in order
@ -248,7 +353,7 @@ FlowFile Repository, if also on that disk, could become corrupt. To avoid this s
|nifi.content.repository.directory.default*|The location of the Content Repository. The default value is ./content_repository.
|nifi.content.repository.archive.max.retention.period|If archiving is enabled (see nifi.content.repository.archive.enabled below), then
this property specifies the maximum amount of time to keep the archived data. It is blank by default.
|nifi.content.repository.archive.max.usage.percentage|If archiving is enabled (see nifi.content.repository.archive.enabled below), then this property also must have a value to indiciate the maximum percentage of disk space that may be used for archiving. It is blank by default.
|nifi.content.repository.archive.max.usage.percentage|If archiving is enabled (see nifi.content.repository.archive.enabled below), then this property also must have a value to indicate the maximum percentage of disk space that may be used for archiving. It is blank by default.
|nifi.content.repository.archive.enabled|To enable archiving, set this to _true_ and specify a value for the nifi.content.repository.archive.max.usage.percentage property above. By default, archiving is not enabled.
|nifi.content.repository.always.sync|If set to _true_, any change to the repository will be synchronized to the disk, meaning that NiFi will ask the operating system not to cache the information. This is very expensive and can significantly reduce NiFi performance. However, if it is _false_, there could be the potential for data loss if either there is a sudden power loss or the operating system crashes. The default value is _false_.
|nifi.content.viewer.url|The URL for a web-based content viewer if one is available. It is blank by default.
@ -332,7 +437,7 @@ Security Configuration section of this Administrator's Guide.
|====
|*Property*|*Description*
|nifi.sensitive.props.key|This is the password used to encrypt any sensitive property values that are configured in processors. By default, it is blank, but the system administrator should provide a value for it. It can be a string of any length. Be aware that once this password is set and one or more sensitive processor properties has been configured, this password should not be changed.
|nifi.sensitive.props.key|This is the password used to encrypt any sensitive property values that are configured in processors. By default, it is blank, but the system administrator should provide a value for it. It can be a string of any length. Be aware that once this password is set and one or more sensitive processor properties have been configured, this password should not be changed.
|nifi.sensitive.props.algorithm|The algorithm used to encrypt sensitive properties. The default value is PBEWITHMD5AND256BITAES-CBC-OPENSSL.
|nifi.sensitive.props.provider|The sensitive property provider. The default value is BC.
|nifi.security.keystore*|The full path and name of the keystore. It is blank by default.

Binary file not shown.

After

Width:  |  Height:  |  Size: 332 KiB

View File

@ -39,7 +39,7 @@ Data access exceeds capacity to consume::
Sometimes a given data source can outpace some part of the processing or delivery chain - it only takes one weak-link to have an issue.
Boundary conditions are mere suggestions::
You will get data that is too big, too small, too fast, too slow, corrupt, wrong, wrong format
You will invariably get data that is too big, too small, too fast, too slow, corrupt, wrong, or in the wrong format.
What is noise one day becomes signal the next::
Priorities of an organization change - rapidly. Enabling new flows and changing existing ones must be fast.
@ -92,7 +92,7 @@ and either commit that work or rollback.
| Connection | Bounded Buffer |
Connections provide the actual linkage between processors. These act as queues
and allow various processes to interact at differing rates. These queues then
can be prioritized dynamically and can have upper bounds on load which enable
can be prioritized dynamically and can have upper bounds on load, which enable
back pressure.
| Flow Controller | Scheduler |
@ -102,14 +102,14 @@ Flow Controller acts as the broker facilitating the exchange of FlowFiles
between processors.
| Process Group | subnet |
A Process Group is a specific set of processes and their connections which can
A Process Group is a specific set of processes and their connections, which can
receive data via input ports and send data out via output ports. In
this manner process groups allow creation of entirely new components simply by
composition of other components.
|===========================
This design model, also similar to <<seda>>, provides many beneficial consequences which help NiFi
This design model, also similar to <<seda>>, provides many beneficial consequences that help NiFi
to be a very effective platform for building powerful and scalable dataflows.
A few of these benefits include:
@ -141,7 +141,7 @@ FlowFile Repository::
The FlowFile Repository is where NiFi keeps track of the state of what it knows about a given FlowFile that is presently active in the flow. The implementation of the repository is pluggable. The default approach is a persistent Write-Ahead Log that lives on a specified disk partition.
Content Repository::
The Content Repository is where the actual content bytes of a given FlowFile live. The implementation of the repository is pluggable. The default approach is a fairly simple mechanism which stores blocks of data in the file system. More than one file system storage location can be specified so as to get different physical partitions engaged to reduce contention on any single volume.
The Content Repository is where the actual content bytes of a given FlowFile live. The implementation of the repository is pluggable. The default approach is a fairly simple mechanism, which stores blocks of data in the file system. More than one file system storage location can be specified so as to get different physical partitions engaged to reduce contention on any single volume.
Provenance Repository::
The Provenance Repository is where all provenance event data is stored. The repository construct is pluggable with the default implementation being to use one or more physical disk volumes. Within each location event data is indexed and searchable.
@ -161,8 +161,8 @@ instructed to continue operating as they were to ensure the data flow remains li
The absence of the NCM simply means new nodes cannot come on-line and flow changes
cannot occur until the NCM is restored.
Performance Expections and Characteristics of NiFi
--------------------------------------------------
Performance Expectations and Characteristics of NiFi
----------------------------------------------------
NiFi is designed to fully leverage the capabilities of the underlying host system
it is operating on. This maximization of resources is particularly strong with
regard to CPU and disk. Many more details will
@ -181,7 +181,7 @@ efficiently reach 100 or more MB/s of throughput. That is because linear growth
is expected for each physical partition and content repository added to NiFi. This will
bottleneck at some point on the FlowFile repository and provenance repository.
We plan to provide a benchmarking/performance test template to
include in the build which will allow users to easily test their system and
include in the build, which will allow users to easily test their system and
to identify where bottlenecks are and at which point they might become a factor. It
should also make it easy for system administrators to make changes and to verify the impact.
@ -272,8 +272,8 @@ Designed for Extension::
Points of extension;;
Processors, Controller Services, Reporting Tasks, Prioritizers, Customer User Interfaces
Classloader Isolation;;
For any component based system one problem that can quickly occur is dependency nightmares. NiFi addresses this by providing a custom class loader model
ensuring that each extension bundle is exposed to a very limited set of dependencies. As a result extensions can be built with little concern for whether
For any component-based system, dependency nightmares can quickly occur. NiFi addresses this by providing a custom class loader model,
ensuring that each extension bundle is exposed to a very limited set of dependencies. As a result, extensions can be built with little concern for whether
they might conflict with another extension. The concept of these extension bundles is called 'NiFi Archives' and will be discussed in greater detail
in the developer's guide.
Clustering (scale-out)::

View File

@ -21,7 +21,6 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-mock</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>

View File

@ -20,7 +20,6 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-framework-nar</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<packaging>nar</packaging>
<description>NiFi: Framework Nar</description>
<dependencies>

View File

@ -21,7 +21,6 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-administration</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<build>
<resources>
<resource>

View File

@ -24,7 +24,6 @@
<packaging>jar</packaging>
<description>The messaging protocol for clustered NiFi</description>
<dependencies>
<!-- application dependencies -->
<dependency>
<groupId>org.apache.nifi</groupId>

View File

@ -21,7 +21,6 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-file-authorization-provider</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<build>
<resources>
<resource>

View File

@ -21,7 +21,6 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-framework-core-api</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>

View File

@ -94,7 +94,7 @@ public class StandardFunnel implements Funnel {
position = new AtomicReference<>(new Position(0D, 0D));
scheduledState = new AtomicReference<>(ScheduledState.STOPPED);
penalizationPeriod = new AtomicReference<>("30 sec");
yieldPeriod = new AtomicReference<>("1 sec");
yieldPeriod = new AtomicReference<>("250 millis");
yieldExpiration = new AtomicLong(0L);
schedulingPeriod = new AtomicReference<>("0 millis");
schedulingNanos = new AtomicLong(30000);

View File

@ -21,6 +21,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -34,8 +35,9 @@ import org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask;
import org.apache.nifi.controller.tasks.ReportingTaskWrapper;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.FormatUtils;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -130,13 +132,16 @@ public class QuartzSchedulingAgent implements SchedulingAgent {
final List<AtomicBoolean> triggers = new ArrayList<>();
for (int i = 0; i < connectable.getMaxConcurrentTasks(); i++) {
final Runnable continuallyRunTask;
final Callable<Boolean> continuallyRunTask;
if (connectable.getConnectableType() == ConnectableType.PROCESSOR) {
final ProcessorNode procNode = (ProcessorNode) connectable;
ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController, contextFactory, scheduleState, encryptor);
final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor);
ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController, contextFactory, scheduleState, standardProcContext);
continuallyRunTask = runnableTask;
} else {
continuallyRunTask = new ContinuallyRunConnectableTask(contextFactory, connectable, scheduleState, encryptor);
final ConnectableProcessContext connProcContext = new ConnectableProcessContext(connectable, encryptor);
continuallyRunTask = new ContinuallyRunConnectableTask(contextFactory, connectable, scheduleState, connProcContext);
}
final AtomicBoolean canceled = new AtomicBoolean(false);
@ -147,7 +152,13 @@ public class QuartzSchedulingAgent implements SchedulingAgent {
return;
}
continuallyRunTask.run();
try {
continuallyRunTask.call();
} catch (final RuntimeException re) {
throw re;
} catch (final Exception e) {
throw new ProcessException(e);
}
if (canceled.get()) {
return;

View File

@ -16,9 +16,10 @@
*/
package org.apache.nifi.controller.scheduling;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -27,7 +28,7 @@ public class ScheduleState {
private final AtomicInteger activeThreadCount = new AtomicInteger(0);
private final AtomicBoolean scheduled = new AtomicBoolean(false);
private final List<ScheduledFuture<?>> futures = new ArrayList<>();
private final Set<ScheduledFuture<?>> futures = new HashSet<ScheduledFuture<?>>();
private final AtomicBoolean mustCallOnStoppedMethods = new AtomicBoolean(false);
private volatile long lastStopTime = -1;
@ -79,12 +80,17 @@ public class ScheduleState {
*
* @param newFutures
*/
public void setFutures(final List<ScheduledFuture<?>> newFutures) {
public synchronized void setFutures(final Collection<ScheduledFuture<?>> newFutures) {
futures.clear();
futures.addAll(newFutures);
}
public List<ScheduledFuture<?>> getFutures() {
return Collections.unmodifiableList(futures);
public synchronized void replaceFuture(final ScheduledFuture<?> oldFuture, final ScheduledFuture<?> newFuture) {
futures.remove(oldFuture);
futures.add(newFuture);
}
public synchronized Set<ScheduledFuture<?>> getFutures() {
return Collections.unmodifiableSet(futures);
}
}

View File

@ -18,8 +18,10 @@ package org.apache.nifi.controller.scheduling;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
@ -31,15 +33,17 @@ import org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask;
import org.apache.nifi.controller.tasks.ReportingTaskWrapper;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TimerDrivenSchedulingAgent implements SchedulingAgent {
private static final Logger logger = LoggerFactory.getLogger(TimerDrivenSchedulingAgent.class);
private static final long NO_WORK_YIELD_NANOS = TimeUnit.MILLISECONDS.toNanos(10L);
private final FlowController flowController;
private final FlowEngine flowEngine;
private final ProcessContextFactory contextFactory;
@ -72,20 +76,105 @@ public class TimerDrivenSchedulingAgent implements SchedulingAgent {
logger.info("{} started.", taskNode.getReportingTask());
}
@Override
public void schedule(final Connectable connectable, final ScheduleState scheduleState) {
final Runnable runnable;
if (connectable.getConnectableType() == ConnectableType.PROCESSOR) {
final ProcessorNode procNode = (ProcessorNode) connectable;
ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController, contextFactory, scheduleState, encryptor);
runnable = runnableTask;
} else {
runnable = new ContinuallyRunConnectableTask(contextFactory, connectable, scheduleState, encryptor);
}
final List<ScheduledFuture<?>> futures = new ArrayList<>();
for (int i = 0; i < connectable.getMaxConcurrentTasks(); i++) {
final ScheduledFuture<?> future = flowEngine.scheduleWithFixedDelay(runnable, 0L, connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
final Callable<Boolean> continuallyRunTask;
final ProcessContext processContext;
// Determine the task to run and create it.
if (connectable.getConnectableType() == ConnectableType.PROCESSOR) {
final ProcessorNode procNode = (ProcessorNode) connectable;
final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor);
final ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController,
contextFactory, scheduleState, standardProcContext);
continuallyRunTask = runnableTask;
processContext = standardProcContext;
} else {
processContext = new ConnectableProcessContext(connectable, encryptor);
continuallyRunTask = new ContinuallyRunConnectableTask(contextFactory, connectable, scheduleState, processContext);
}
final AtomicReference<ScheduledFuture<?>> futureRef = new AtomicReference<>();
final Runnable yieldDetectionRunnable = new Runnable() {
@Override
public void run() {
// Call the continually run task. It will return a boolean indicating whether or not we should yield
// based on a lack of work for to do for the component.
final boolean shouldYield;
try {
shouldYield = continuallyRunTask.call();
} catch (final RuntimeException re) {
throw re;
} catch (final Exception e) {
throw new ProcessException(e);
}
// If the component is yielded, cancel its future and re-submit it to run again
// after the yield has expired.
final long newYieldExpiration = connectable.getYieldExpiration();
if ( newYieldExpiration > System.currentTimeMillis() ) {
final long yieldMillis = System.currentTimeMillis() - newYieldExpiration;
final ScheduledFuture<?> scheduledFuture = futureRef.get();
if ( scheduledFuture == null ) {
return;
}
// If we are able to cancel the future, create a new one and update the ScheduleState so that it has
// an accurate accounting of which futures are outstanding; we must then also update the futureRef
// so that we can do this again the next time that the component is yielded.
if (scheduledFuture.cancel(false)) {
final long yieldNanos = TimeUnit.MILLISECONDS.toNanos(yieldMillis);
final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this, yieldNanos,
connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
synchronized (scheduleState) {
if ( scheduleState.isScheduled() ) {
scheduleState.replaceFuture(scheduledFuture, newFuture);
futureRef.set(newFuture);
}
}
}
} else if ( shouldYield ) {
// Component itself didn't yield but there was no work to do, so the framework will choose
// to yield the component automatically for a short period of time.
final ScheduledFuture<?> scheduledFuture = futureRef.get();
if ( scheduledFuture == null ) {
return;
}
// If we are able to cancel the future, create a new one and update the ScheduleState so that it has
// an accurate accounting of which futures are outstanding; we must then also update the futureRef
// so that we can do this again the next time that the component is yielded.
if (scheduledFuture.cancel(false)) {
final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this, NO_WORK_YIELD_NANOS,
connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
synchronized (scheduleState) {
if ( scheduleState.isScheduled() ) {
scheduleState.replaceFuture(scheduledFuture, newFuture);
futureRef.set(newFuture);
}
}
}
}
}
};
// Schedule the task to run
final ScheduledFuture<?> future = flowEngine.scheduleWithFixedDelay(yieldDetectionRunnable, 0L,
connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
// now that we have the future, set the atomic reference so that if the component is yielded we
// are able to then cancel this future.
futureRef.set(future);
// Keep track of the futures so that we can update the ScheduleState.
futures.add(future);
}

View File

@ -16,16 +16,16 @@
*/
package org.apache.nifi.controller.tasks;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
import org.apache.nifi.controller.scheduling.ConnectableProcessContext;
import org.apache.nifi.controller.scheduling.ProcessContextFactory;
import org.apache.nifi.controller.scheduling.ScheduleState;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.Connectables;
@ -33,28 +33,33 @@ import org.apache.nifi.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ContinuallyRunConnectableTask implements Runnable {
/**
* Continually runs a Connectable as long as the processor has work to do. {@link #call()} will return
* <code>true</code> if the Connectable should be yielded, <code>false</code> otherwise.
*/
public class ContinuallyRunConnectableTask implements Callable<Boolean> {
private static final Logger logger = LoggerFactory.getLogger(ContinuallyRunConnectableTask.class);
private final Connectable connectable;
private final ScheduleState scheduleState;
private final ProcessSessionFactory sessionFactory;
private final ConnectableProcessContext processContext;
private final ProcessContext processContext;
public ContinuallyRunConnectableTask(final ProcessContextFactory contextFactory, final Connectable connectable, final ScheduleState scheduleState, final StringEncryptor encryptor) {
public ContinuallyRunConnectableTask(final ProcessContextFactory contextFactory, final Connectable connectable, final ScheduleState scheduleState, final ProcessContext processContext) {
this.connectable = connectable;
this.scheduleState = scheduleState;
this.sessionFactory = new StandardProcessSessionFactory(contextFactory.newProcessContext(connectable, new AtomicLong(0L)));
this.processContext = new ConnectableProcessContext(connectable, encryptor);
this.processContext = processContext;
}
@SuppressWarnings("deprecation")
@Override
public void run() {
@SuppressWarnings("deprecation")
public Boolean call() {
if (!scheduleState.isScheduled()) {
return;
return false;
}
// Connectable should run if the following conditions are met:
// 1. It's an Input Port or or is a Remote Input Port or has incoming FlowFiles queued
// 2. Any relationship is available (since there's only 1
@ -62,8 +67,9 @@ public class ContinuallyRunConnectableTask implements Runnable {
// it means the same thing)
// 3. It is not yielded.
final boolean triggerWhenEmpty = connectable.isTriggerWhenEmpty();
boolean flowFilesQueued = true;
final boolean shouldRun = (connectable.getYieldExpiration() < System.currentTimeMillis())
&& (triggerWhenEmpty || Connectables.flowFilesQueued(connectable)) && (connectable.getRelationships().isEmpty() || Connectables.anyRelationshipAvailable(connectable));
&& (triggerWhenEmpty || (flowFilesQueued = Connectables.flowFilesQueued(connectable))) && (connectable.getRelationships().isEmpty() || Connectables.anyRelationshipAvailable(connectable));
if (shouldRun) {
scheduleState.incrementActiveThreadCount();
@ -92,6 +98,12 @@ public class ContinuallyRunConnectableTask implements Runnable {
scheduleState.decrementActiveThreadCount();
}
} else if (!flowFilesQueued) {
// FlowFiles must be queued in order to run but there are none queued;
// yield for just a bit.
return true;
}
return true;
}
}

View File

@ -17,6 +17,7 @@
package org.apache.nifi.controller.tasks;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@ -31,7 +32,6 @@ import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
import org.apache.nifi.controller.scheduling.ProcessContextFactory;
import org.apache.nifi.controller.scheduling.ScheduleState;
import org.apache.nifi.controller.scheduling.SchedulingAgent;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.ProcessSessionFactory;
@ -43,7 +43,12 @@ import org.apache.nifi.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ContinuallyRunProcessorTask implements Runnable {
/**
* Continually runs a processor as long as the processor has work to do. {@link #call()} will return
* <code>true</code> if the processor should be yielded, <code>false</code> otherwise.
*/
public class ContinuallyRunProcessorTask implements Callable<Boolean> {
private static final Logger logger = LoggerFactory.getLogger(ContinuallyRunProcessorTask.class);
@ -56,7 +61,8 @@ public class ContinuallyRunProcessorTask implements Runnable {
private final int numRelationships;
public ContinuallyRunProcessorTask(final SchedulingAgent schedulingAgent, final ProcessorNode procNode,
final FlowController flowController, final ProcessContextFactory contextFactory, final ScheduleState scheduleState, final StringEncryptor encryptor) {
final FlowController flowController, final ProcessContextFactory contextFactory, final ScheduleState scheduleState,
final StandardProcessContext processContext) {
this.schedulingAgent = schedulingAgent;
this.procNode = procNode;
@ -65,28 +71,28 @@ public class ContinuallyRunProcessorTask implements Runnable {
this.flowController = flowController;
context = contextFactory.newProcessContext(procNode, new AtomicLong(0L));
this.processContext = new StandardProcessContext(procNode, flowController, encryptor);
this.processContext = processContext;
}
@SuppressWarnings("deprecation")
@Override
public void run() {
@SuppressWarnings("deprecation")
public Boolean call() {
// make sure processor is not yielded
boolean shouldRun = (procNode.getYieldExpiration() < System.currentTimeMillis());
if (!shouldRun) {
return;
return false;
}
// make sure that either we're not clustered or this processor runs on all nodes or that this is the primary node
shouldRun = !procNode.isIsolated() || !flowController.isClustered() || flowController.isPrimary();
if (!shouldRun) {
return;
return false;
}
// make sure that either proc has incoming FlowFiles or has no incoming connections or is annotated with @TriggerWhenEmpty
shouldRun = procNode.isTriggerWhenEmpty() || !procNode.hasIncomingConnection() || Connectables.flowFilesQueued(procNode);
if (!shouldRun) {
return;
return true;
}
if (numRelationships > 0) {
@ -109,7 +115,7 @@ public class ContinuallyRunProcessorTask implements Runnable {
}
if (!shouldRun) {
return;
return false;
}
scheduleState.incrementActiveThreadCount();
@ -124,11 +130,11 @@ public class ContinuallyRunProcessorTask implements Runnable {
invocationCount++;
if (!batch) {
return;
return false;
}
if (System.nanoTime() > finishNanos) {
return;
return false;
}
shouldRun = procNode.isTriggerWhenEmpty() || !procNode.hasIncomingConnection() || Connectables.flowFilesQueued(procNode);
@ -180,6 +186,8 @@ public class ContinuallyRunProcessorTask implements Runnable {
logger.error("", e);
}
}
return false;
}
}

View File

@ -20,7 +20,6 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-security</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<description>Contains security functionality common to NiFi.</description>
<dependencies>
<dependency>

View File

@ -21,7 +21,6 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-site-to-site</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<dependencies>
<dependency> <!-- This can be removed after testing.... -->
<groupId>org.apache.nifi</groupId>

View File

@ -21,5 +21,4 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-user-actions</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
</project>

View File

@ -19,10 +19,14 @@
<div class="dialog-content">
<div class="setting">
<div class="setting-name">Color</div>
<div class="setting-field">
<input type="text" id="fill-color" value="#FFFFFF"/>
</div>
<div class="setting-name" style="margin-top: 10px;">Value</div>
<div class="setting-field">
<input type="text" id="fill-color-value" value="#FFFFFF"/>
</div>
<div class="setting-name" style="margin-top: 5px;">Preview</div>
<div class="setting-name" style="margin-top: 10px;">Preview</div>
<div class="setting-field">
<div id="fill-color-processor-preview">
<div id="fill-color-processor-preview-name">Processor</div>

View File

@ -88,10 +88,14 @@
z-index: 1301;
display: none;
width: 195px;
height: 345px;
height: 400px;
border: 1px solid #eee;
}
#fill-color-value {
width: 165px;
}
#fill-color-processor-preview {
width: 173px;
height: 56px;

View File

@ -843,20 +843,39 @@ nf.Actions = (function () {
* @param {type} selection The selection
*/
fillColor: function (selection) {
if (selection.size() === 1 && (nf.CanvasUtils.isProcessor(selection) || nf.CanvasUtils.isLabel(selection))) {
var selectionData = selection.datum();
var color = nf[selectionData.type].defaultColor();
// use the specified color if appropriate
if (nf.Common.isDefinedAndNotNull(selectionData.component.style['background-color'])) {
color = selectionData.component.style['background-color'];
var selectedProcessors = selection.filter(function(d) {
return nf.CanvasUtils.isProcessor(d3.select(this));
});
var selectedLabels = selection.filter(function(d) {
return nf.CanvasUtils.isLabel(d3.select(this));
});
var allProcessors = selectedProcessors.size() === selection.size();
var allLabels = selectedLabels.size() === selection.size();
if (allProcessors || allLabels) {
var color;
if (allProcessors) {
color = nf.Processor.defaultColor();
} else {
color = nf.Label.defaultColor();
}
// if there is only one component selected, get its color otherwise use default
if (selection.size() === 1) {
var selectionData = selection.datum();
// use the specified color if appropriate
if (nf.Common.isDefinedAndNotNull(selectionData.component.style['background-color'])) {
color = selectionData.component.style['background-color'];
}
}
// set the color
$('#fill-color-value').minicolors('value', color);
$('#fill-color').minicolors('value', color);
// update the preview visibility
if (nf.CanvasUtils.isProcessor(selection)) {
if (allProcessors) {
$('#fill-color-processor-preview').show();
$('#fill-color-label-preview').hide();
} else {
@ -1065,7 +1084,6 @@ nf.Actions = (function () {
// refresh the birdseye/toolbar
nf.Birdseye.refresh();
nf.CanvasToolbar.refresh();
// remove the original snippet
nf.Snippet.remove(snippet.id).fail(reject);
@ -1078,7 +1096,6 @@ nf.Actions = (function () {
// refresh the birdseye/toolbar
nf.Birdseye.refresh();
nf.CanvasToolbar.refresh();
});
// reject the deferred

View File

@ -170,22 +170,22 @@ nf.CanvasHeader = (function () {
buttonText: 'Apply',
handler: {
click: function () {
// close the dialog
$('#fill-color-dialog').modal('hide');
// ensure the selection is a processor or label
var selection = nf.CanvasUtils.getSelection();
if (selection.size() === 1 && (nf.CanvasUtils.isProcessor(selection) || nf.CanvasUtils.isLabel(selection))) {
// color the selected components
selection.each(function (d) {
var selected = d3.select(this);
var revision = nf.Client.getRevision();
var selectionData = selection.datum();
var selectedData = selected.datum();
// get the color and update the styles
var color = $('#fill-color-value').val();
var color = $('#fill-color').minicolors('value');
// update the style for the specified component
$.ajax({
type: 'PUT',
url: selectionData.component.uri,
url: selectedData.component.uri,
data: {
'version': revision.version,
'clientId': revision.clientId,
@ -197,7 +197,7 @@ nf.CanvasHeader = (function () {
nf.Client.setRevision(response.revision);
// update the processor
if (nf.CanvasUtils.isProcessor(selection)) {
if (nf.CanvasUtils.isProcessor(selected)) {
nf.Processor.set(response.processor);
} else {
nf.Label.set(response.label);
@ -210,7 +210,10 @@ nf.CanvasHeader = (function () {
});
}
});
}
});
// close the dialog
$('#fill-color-dialog').modal('hide');
}
}
}, {
@ -221,13 +224,24 @@ nf.CanvasHeader = (function () {
$('#fill-color-dialog').modal('hide');
}
}
}]
}],
handler: {
close: function () {
// clear the current color
$('#fill-color-value').val('');
$('#fill-color').minicolors('value', '');
}
}
});
// initialize the fill color picker
$('#fill-color-value').minicolors({
$('#fill-color').minicolors({
inline: true,
change: function (hex, opacity) {
// update the value
$('#fill-color-value').val(hex);
// always update the preview
$('#fill-color-processor-preview, #fill-color-label-preview').css({
'border-color': hex,
'background': 'linear-gradient(to bottom, #ffffff, ' + hex + ')',
@ -235,6 +249,24 @@ nf.CanvasHeader = (function () {
});
}
});
// updates the color if its a valid hex color string
var updateColor = function () {
var hex = $('#fill-color-value').val();
// only update the fill color when its a valid hex color string
if (/(^#[0-9A-F]{6}$)|(^#[0-9A-F]{3}$)/i.test(hex)) {
$('#fill-color').minicolors('value', hex);
}
};
// initialize the fill color value
$('#fill-color-value').on('blur', updateColor).on('keyup', function(e) {
var code = e.keyCode ? e.keyCode : e.which;
if (code === $.ui.keyCode.ENTER) {
updateColor();
}
});
// mousewheel -> IE, Chrome
// DOMMouseScroll -> FF

View File

@ -141,14 +141,19 @@ nf.CanvasToolbar = (function () {
actions['group'].disable();
}
// determine how many colorable components are selected
var colorableComponents = selection.filter(function (d) {
var selected = d3.select(this);
return nf.CanvasUtils.isProcessor(selected) || nf.CanvasUtils.isLabel(selected);
// determine if the current selection is entirely processors or labels
var selectedProcessors = selection.filter(function(d) {
return nf.CanvasUtils.isProcessor(d3.select(this));
});
var selectedLabels = selection.filter(function(d) {
return nf.CanvasUtils.isLabel(d3.select(this));
});
var allProcessors = selectedProcessors.size() === selection.size();
var allLabels = selectedLabels.size() === selection.size();
// if there are any colorable components enable the button
if (colorableComponents.size() === 1 && colorableComponents.size() === selection.size()) {
if (allProcessors || allLabels) {
actions['fill'].enable();
} else {
actions['fill'].disable();

View File

@ -373,6 +373,9 @@ nf.CanvasToolbox = (function () {
// show the dialog
$('#new-processor-dialog').modal('show');
// set the focus in the filter field
$('#processor-type-filter').focus();
// adjust the grid canvas now that its been rendered
grid.resizeCanvas();
@ -427,12 +430,11 @@ nf.CanvasToolbox = (function () {
*/
var promptForInputPortName = function (pt) {
var addInputPort = function () {
// hide the dialog
$('#new-port-dialog').modal('hide');
// get the name of the input port and clear the textfield
var portName = $('#new-port-name').val();
$('#new-port-name').val('');
// hide the dialog
$('#new-port-dialog').modal('hide');
// create the input port
createInputPort(portName, pt);
@ -514,12 +516,11 @@ nf.CanvasToolbox = (function () {
*/
var promptForOutputPortName = function (pt) {
var addOutputPort = function () {
// hide the dialog
$('#new-port-dialog').modal('hide');
// get the name of the output port and clear the textfield
var portName = $('#new-port-name').val();
$('#new-port-name').val('');
// hide the dialog
$('#new-port-dialog').modal('hide');
// create the output port
createOutputPort(portName, pt);
@ -641,12 +642,11 @@ nf.CanvasToolbox = (function () {
*/
var promptForRemoteProcessGroupUri = function (pt) {
var addRemoteProcessGroup = function () {
// hide the dialog
$('#new-remote-process-group-dialog').modal('hide');
// get the uri of the controller and clear the textfield
var remoteProcessGroupUri = $('#new-remote-process-group-uri').val();
$('#new-remote-process-group-uri').val('');
// hide the dialog
$('#new-remote-process-group-dialog').modal('hide');
// create the remote process group
createRemoteProcessGroup(remoteProcessGroupUri, pt);
@ -882,7 +882,9 @@ nf.CanvasToolbox = (function () {
nf.Client.setRevision(response.revision);
// add the label to the graph
nf.Label.add(response.label, true);
nf.Graph.add({
'labels': [response.label]
}, true);
// update the birdseye
nf.Birdseye.refresh();
@ -1121,19 +1123,34 @@ nf.CanvasToolbox = (function () {
// configure the new port dialog
$('#new-port-dialog').modal({
headerText: 'Add Port',
overlayBackground: false
overlayBackground: false,
handler: {
close: function () {
$('#new-port-name').val('');
}
}
});
// configure the new process group dialog
$('#new-process-group-dialog').modal({
headerText: 'Add Process Group',
overlayBackground: false
overlayBackground: false,
handler: {
close: function () {
$('#new-process-group-name').val('');
}
}
});
// configure the new remote process group dialog
$('#new-remote-process-group-dialog').modal({
headerText: 'Add Remote Process Group',
overlayBackground: false
overlayBackground: false,
handler: {
close: function () {
$('#new-remote-process-group-uri').val('');
}
}
});
// configure the instantiate template dialog
@ -1162,12 +1179,11 @@ nf.CanvasToolbox = (function () {
promptForGroupName: function (pt) {
return $.Deferred(function (deferred) {
var addGroup = function () {
// hide the dialog
$('#new-process-group-dialog').modal('hide');
// get the name of the group and clear the textfield
var groupName = $('#new-process-group-name').val();
$('#new-process-group-name').val('');
// hide the dialog
$('#new-process-group-dialog').modal('hide');
// create the group and resolve the deferred accordingly
createGroup(groupName, pt).done(function (response) {

View File

@ -393,7 +393,7 @@ nf.CanvasUtils = (function () {
*/
disableImageHref: function (selection) {
selection.on('click.disableImageHref', function () {
if (d3.event.ctrlKey) {
if (d3.event.ctrlKey || d3.event.shiftKey) {
d3.event.preventDefault();
}
});

View File

@ -409,10 +409,10 @@ nf.Canvas = (function () {
// update the selection box
selectionBox.attr(d);
// prevent further propagation (to parents)
d3.event.stopPropagation();
}
// prevent further propagation (to parents)
d3.event.stopPropagation();
}
})
.on('mouseup.selection', function () {
@ -510,12 +510,24 @@ nf.Canvas = (function () {
$(window).on('resize', function () {
updateGraphSize();
}).on('keydown', function (evt) {
var isCtrl = evt.ctrlKey || evt.metaKey;
// consider escape, before checking dialogs
if (!isCtrl && evt.keyCode === 27) {
// esc
nf.Actions.hideDialogs();
evt.preventDefault();
return;
}
// if a dialog is open, disable canvas shortcuts
if ($('.dialog').is(':visible')) {
return;
}
if (evt.ctrlKey || evt.metaKey) {
// handle shortcuts
if (isCtrl) {
if (evt.keyCode === 82) {
// ctrl-r
nf.Actions.reloadStatus();
@ -543,11 +555,6 @@ nf.Canvas = (function () {
// delete
nf.Actions['delete'](nf.CanvasUtils.getSelection());
evt.preventDefault();
} else if (evt.keyCode === 27) {
// esc
nf.Actions.hideDialogs();
evt.preventDefault();
}
}

View File

@ -20,6 +20,21 @@ nf.Connectable = (function () {
var canvas;
var origin;
/**
* Determines if we want to allow adding connections in the current state:
*
* 1) When shift is down, we could be adding components to the current selection.
* 2) When the selection box is visible, we are in the process of moving all the
* components currently selected.
* 3) When the drag selection box is visible, we are in the process or selecting components
* using the selection box.
*
* @returns {boolean}
*/
var allowConnection = function () {
return !d3.event.shiftKey && d3.select('rect.drag-selection').empty() && d3.select('rect.selection').empty();
};
return {
init: function () {
canvas = d3.select('#canvas');
@ -102,17 +117,27 @@ nf.Connectable = (function () {
}).attr('d', function (pathDatum) {
if (!destination.empty() && destination.classed('connectable-destination')) {
var destinationData = destination.datum();
// show the line preview as appropriate
if (pathDatum.sourceId === destinationData.component.id) {
var x = pathDatum.x;
var y = pathDatum.y;
var componentOffset = pathDatum.sourceWidth / 2;
var xOffset = nf.Connection.config.selfLoopXOffset;
var yOffset = nf.Connection.config.selfLoopYOffset;
return 'M' + x + ' ' + y + 'L' + (x + componentOffset + xOffset) + ' ' + (y - yOffset) + 'L' + (x + componentOffset + xOffset) + ' ' + (y + yOffset) + 'Z';
} else {
// get the position on the destination perimeter
var end = nf.CanvasUtils.getPerimeterPoint(pathDatum, {
'x': destinationData.component.position.x,
'y': destinationData.component.position.y,
'width': destinationData.dimensions.width,
'height': destinationData.dimensions.height
});
// get the position on the destination perimeter
var end = nf.CanvasUtils.getPerimeterPoint(pathDatum, {
'x': destinationData.component.position.x,
'y': destinationData.component.position.y,
'width': destinationData.dimensions.width,
'height': destinationData.dimensions.height
});
// direct line between components to provide a 'snap feel'
return 'M' + pathDatum.x + ' ' + pathDatum.y + 'L' + end.x + ' ' + end.y;
// direct line between components to provide a 'snap feel'
return 'M' + pathDatum.x + ' ' + pathDatum.y + 'L' + end.x + ' ' + end.y;
}
} else {
return 'M' + pathDatum.x + ' ' + pathDatum.y + 'L' + d3.event.x + ' ' + d3.event.y;
}
@ -122,57 +147,70 @@ nf.Connectable = (function () {
// stop further propagation
d3.event.sourceEvent.stopPropagation();
// get the add connect img
var addConnect = d3.select(this);
// get the connector, if it the current point is not over a new destination
// the connector will be removed. otherwise it will be removed after the
// connection has been configured/cancelled
var connector = d3.select('path.connector');
var connectorData = connector.datum();
// get the destination
var destination = d3.select('g.connectable-destination');
// we are not over a new destination
if (destination.empty()) {
// get the source to determine if we are still over it
var source = d3.select('#id-' + connectorData.sourceId);
var sourceData = source.datum();
// get the mouse position relative to the source
var position = d3.mouse(source.node());
// if the position is outside the component, remove the add connect img
if (position[0] < 0 || position[0] > sourceData.dimensions.width || position[1] < 0 || position[1] > sourceData.dimensions.height) {
addConnect.remove();
} else {
// reset the add connect img by restoring the position and place in the DOM
addConnect.classed('dragging', false).attr('transform', function () {
return 'translate(' + d.origX + ', ' + d.origY + ')';
});
source.node().appendChild(this);
}
// remove the connector
connector.remove();
} else {
var connectorData = connector.datum();
var destinationData = destination.datum();
// if this is a self loop we need to insert some bend points
if (connectorData.sourceId === destinationData.component.id) {
connector.attr('d', function (pathDatum) {
var x = pathDatum.x;
var y = pathDatum.y;
var componentOffset = pathDatum.sourceWidth / 2;
var xOffset = nf.Connection.config.selfLoopXOffset;
var yOffset = nf.Connection.config.selfLoopYOffset;
return 'M' + x + ' ' + y + 'L' + (x + componentOffset + xOffset) + ' ' + (y - yOffset) + 'L' + (x + componentOffset + xOffset) + ' ' + (y + yOffset) + 'Z';
});
}
// remove the add connect img
addConnect.remove();
// create the connection
var destinationData = destination.datum();
nf.ConnectionConfiguration.createConnection(connectorData.sourceId, destinationData.component.id);
}
// remove this component
d3.select(this).remove();
});
},
activate: function (components) {
components
.on('mouseenter.connectable', function (d) {
if (!d3.event.shiftKey && d3.select('rect.drag-selection').empty()) {
if (allowConnection()) {
var selection = d3.select(this);
// ensure the current component supports connection source
if (nf.CanvasUtils.isValidConnectionSource(selection)) {
// see if theres already a connector rendered
var anyConnector = d3.select('image.add-connect');
if (anyConnector.empty()) {
var addConnect = d3.select('image.add-connect');
if (addConnect.empty()) {
var x = (d.dimensions.width / 2) - 14;
var y = (d.dimensions.height / 2) - 14;
selection.append('image')
.datum({
origX: x,
origY: y
})
.call(connect)
.call(nf.CanvasUtils.disableImageHref)
.attr({
@ -188,17 +226,16 @@ nf.Connectable = (function () {
})
.on('mouseleave.connectable', function () {
// conditionally remove the connector
var connector = d3.select(this).select('image.add-connect');
if (!connector.empty() && !connector.classed('dragging')) {
connector.remove();
var addConnect = d3.select(this).select('image.add-connect');
if (!addConnect.empty() && !addConnect.classed('dragging')) {
addConnect.remove();
}
})
// Using mouseover/out to workaround chrome issue #122746
.on('mouseover.connectable', function () {
// mark that we are hovering when appropriate
var selection = d3.select(this);
selection.classed('hover', function () {
return !d3.event.shiftKey && !selection.classed('hover') && d3.select('rect.drag-selection').empty();
d3.select(this).classed('hover', function () {
return allowConnection();
});
})
.on('mouseout.connection', function () {

View File

@ -67,7 +67,6 @@ nf.Graph = (function () {
// if we are going to select the new components, deselect the previous selection
if (selectAll) {
// deselect the current selection
nf.CanvasUtils.getSelection().classed('selected', false);
}
@ -96,6 +95,11 @@ nf.Graph = (function () {
if (!nf.Common.isEmpty(processGroupContents.connections)) {
nf.Connection.add(processGroupContents.connections, selectAll);
}
// trigger the toolbar to refresh if the selection is changing
if (selectAll) {
nf.CanvasToolbar.refresh();
}
},
/**

View File

@ -48,6 +48,9 @@ nf.RemoteProcessGroupConfiguration = (function () {
}).done(function (response) {
// update the revision
nf.Client.setRevision(response.revision);
// refresh the remote process group component
nf.RemoteProcessGroup.set(response.remoteProcessGroup);
// close the details panel
$('#remote-process-group-configuration').modal('hide');

View File

@ -445,11 +445,11 @@ nf.ClusterTable = (function () {
var columnModel = [
{id: 'moreDetails', name: '&nbsp;', sortable: false, resizable: false, formatter: moreDetailsFormatter, width: 50, maxWidth: 50},
{id: 'node', field: 'node', name: 'Node Address', formatter: nodeFormatter, resizable: true, sortable: true},
{id: 'activeThreadCount', field: 'activeThreadCount', name: 'Active Thread Count', resizable: true, sortable: true},
{id: 'queued', field: 'queued', name: '<span class="queued-title">Queue</span>&nbsp;/&nbsp;<span class="queued-size-title">Size</span>', resizable: true, sortable: true},
{id: 'activeThreadCount', field: 'activeThreadCount', name: 'Active Thread Count', resizable: true, sortable: true, defaultSortAsc: false},
{id: 'queued', field: 'queued', name: '<span class="queued-title">Queue</span>&nbsp;/&nbsp;<span class="queued-size-title">Size</span>', resizable: true, sortable: true, defaultSortAsc: false},
{id: 'status', field: 'status', name: 'Status', formatter: statusFormatter, resizable: true, sortable: true},
{id: 'uptime', field: 'nodeStartTime', name: 'Uptime', formatter: valueFormatter, resizable: true, sortable: true},
{id: 'heartbeat', field: 'heartbeat', name: 'Last Heartbeat', formatter: valueFormatter, resizable: true, sortable: true}
{id: 'uptime', field: 'nodeStartTime', name: 'Uptime', formatter: valueFormatter, resizable: true, sortable: true, defaultSortAsc: false},
{id: 'heartbeat', field: 'heartbeat', name: 'Last Heartbeat', formatter: valueFormatter, resizable: true, sortable: true, defaultSortAsc: false}
];
// only allow the admin to modify the cluster

View File

@ -172,7 +172,7 @@ nf.CountersTable = (function () {
var countersColumns = [
{id: 'context', name: 'Context', field: 'context', sortable: true, resizable: true},
{id: 'name', name: 'Name', field: 'name', sortable: true, resizable: true},
{id: 'value', name: 'Value', field: 'value', sortable: true, resizable: true}
{id: 'value', name: 'Value', field: 'value', sortable: true, resizable: true, defaultSortAsc: false}
];
// only allow dfm's to reset counters

View File

@ -584,10 +584,10 @@ nf.ProvenanceTable = (function () {
// initialize the provenance table
var provenanceColumns = [
{id: 'moreDetails', name: '&nbsp;', sortable: false, resizable: false, formatter: moreDetailsFormatter, width: 50, maxWidth: 50},
{id: 'eventTime', name: 'Date/Time', field: 'eventTime', sortable: true, resizable: true},
{id: 'eventTime', name: 'Date/Time', field: 'eventTime', sortable: true, defaultSortAsc: false, resizable: true},
{id: 'eventType', name: 'Type', field: 'eventType', sortable: true, resizable: true},
{id: 'flowFileUuid', name: 'FlowFile Uuid', field: 'flowFileUuid', sortable: true, resizable: true},
{id: 'fileSize', name: 'Size', field: 'fileSize', sortable: true, resizable: true},
{id: 'fileSize', name: 'Size', field: 'fileSize', sortable: true, defaultSortAsc: false, resizable: true},
{id: 'componentName', name: 'Component Name', field: 'componentName', sortable: true, resizable: true, formatter: valueFormatter},
{id: 'componentType', name: 'Component Type', field: 'componentType', sortable: true, resizable: true}
];

View File

@ -301,10 +301,10 @@ nf.SummaryTable = (function () {
// define the input, read, written, and output columns (reused between both tables)
var nameColumn = {id: 'name', field: 'name', name: 'Name', sortable: true, resizable: true};
var runStatusColumn = {id: 'runStatus', field: 'runStatus', name: 'Run Status', formatter: runStatusFormatter, sortable: true};
var inputColumn = {id: 'input', field: 'input', name: '<span class="input-title">In</span>&nbsp;/&nbsp;<span class="input-size-title">Size</span>&nbsp;<span style="font-weight: normal; overflow: hidden;">5 min</span>', toolTip: 'Count / data size in the last 5 min', sortable: true, resizable: true};
var ioColumn = {id: 'io', field: 'io', name: '<span class="read-title">Read</span>&nbsp;/&nbsp;<span class="written-title">Write</span>&nbsp;<span style="font-weight: normal; overflow: hidden;">5 min</span>', toolTip: 'Data size in the last 5 min', formatter: ioFormatter, sortable: true, resizable: true};
var outputColumn = {id: 'output', field: 'output', name: '<span class="output-title">Out</span>&nbsp;/&nbsp;<span class="output-size-title">Size</span>&nbsp;<span style="font-weight: normal; overflow: hidden;">5 min</span>', toolTip: 'Count / data size in the last 5 min', sortable: true, resizable: true};
var tasksTimeColumn = {id: 'tasks', field: 'tasks', name: '<span class="tasks-title">Tasks</span>&nbsp;/&nbsp;<span class="time-title">Time</span>&nbsp;<span style="font-weight: normal; overflow: hidden;">5 min</span>', toolTip: 'Count / duration in the last 5 min', formatter: taskTimeFormatter, sortable: true, resizable: true};
var inputColumn = {id: 'input', field: 'input', name: '<span class="input-title">In</span>&nbsp;/&nbsp;<span class="input-size-title">Size</span>&nbsp;<span style="font-weight: normal; overflow: hidden;">5 min</span>', toolTip: 'Count / data size in the last 5 min', sortable: true, defaultSortAsc: false, resizable: true};
var ioColumn = {id: 'io', field: 'io', name: '<span class="read-title">Read</span>&nbsp;/&nbsp;<span class="written-title">Write</span>&nbsp;<span style="font-weight: normal; overflow: hidden;">5 min</span>', toolTip: 'Data size in the last 5 min', formatter: ioFormatter, sortable: true, defaultSortAsc: false, resizable: true};
var outputColumn = {id: 'output', field: 'output', name: '<span class="output-title">Out</span>&nbsp;/&nbsp;<span class="output-size-title">Size</span>&nbsp;<span style="font-weight: normal; overflow: hidden;">5 min</span>', toolTip: 'Count / data size in the last 5 min', sortable: true, defaultSortAsc: false, resizable: true};
var tasksTimeColumn = {id: 'tasks', field: 'tasks', name: '<span class="tasks-title">Tasks</span>&nbsp;/&nbsp;<span class="time-title">Time</span>&nbsp;<span style="font-weight: normal; overflow: hidden;">5 min</span>', toolTip: 'Count / duration in the last 5 min', formatter: taskTimeFormatter, sortable: true, defaultSortAsc: false, resizable: true};
// define the column model for the processor summary table
var processorsColumnModel = [
@ -565,7 +565,7 @@ nf.SummaryTable = (function () {
};
// define the input, read, written, and output columns (reused between both tables)
var queueColumn = {id: 'queued', field: 'queued', name: '<span class="queued-title">Queue</span>&nbsp;/&nbsp;<span class="queued-size-title">Size</span>', sortable: true, resize: true};
var queueColumn = {id: 'queued', field: 'queued', name: '<span class="queued-title">Queue</span>&nbsp;/&nbsp;<span class="queued-size-title">Size</span>', sortable: true, defaultSortAsc: false, resize: true};
// define the column model for the summary table
var connectionsColumnModel = [
@ -1264,8 +1264,8 @@ nf.SummaryTable = (function () {
var transmissionStatusColumn = {id: 'transmissionStatus', field: 'transmissionStatus', name: 'Transmitting', formatter: transmissionStatusFormatter, sortable: true, resizable: true};
var targetUriColumn = {id: 'targetUri', field: 'targetUri', name: 'Target URI', sortable: true, resizable: true};
var sentColumn = {id: 'sent', field: 'sent', name: '<span class="sent-title">Sent</span>&nbsp;/&nbsp;<span class="sent-size-title">Size</span>&nbsp;<span style="font-weight: normal; overflow: hidden;">5 min</span>', toolTip: 'Count / data size in the last 5 min', sortable: true, resizable: true};
var receivedColumn = {id: 'received', field: 'received', name: '<span class="received-title">Received</span>&nbsp;/&nbsp;<span class="received-size-title">Size</span>&nbsp;<span style="font-weight: normal; overflow: hidden;">5 min</span>', toolTip: 'Count / data size in the last 5 min', sortable: true, resizable: true};
var sentColumn = {id: 'sent', field: 'sent', name: '<span class="sent-title">Sent</span>&nbsp;/&nbsp;<span class="sent-size-title">Size</span>&nbsp;<span style="font-weight: normal; overflow: hidden;">5 min</span>', toolTip: 'Count / data size in the last 5 min', sortable: true, defaultSortAsc: false, resizable: true};
var receivedColumn = {id: 'received', field: 'received', name: '<span class="received-title">Received</span>&nbsp;/&nbsp;<span class="received-size-title">Size</span>&nbsp;<span style="font-weight: normal; overflow: hidden;">5 min</span>', toolTip: 'Count / data size in the last 5 min', sortable: true, defaultSortAsc: false, resizable: true};
// define the column model for the summary table
var remoteProcessGroupsColumnModel = [

View File

@ -204,7 +204,7 @@ nf.TemplatesTable = (function () {
// initialize the templates table
var templatesColumns = [
{id: 'timestamp', name: 'Date/Time', field: 'timestamp', sortable: true, resizable: false, formatter: valueFormatter, width: 225, maxWidth: 225},
{id: 'timestamp', name: 'Date/Time', field: 'timestamp', sortable: true, defaultSortAsc: false, resizable: false, formatter: valueFormatter, width: 225, maxWidth: 225},
{id: 'name', name: 'Name', field: 'name', sortable: true, resizable: true},
{id: 'description', name: 'Description', field: 'description', sortable: true, resizable: true, formatter: valueFormatter},
{id: 'actions', name: '&nbsp;', sortable: false, resizable: false, formatter: actionFormatter, width: 100, maxWidth: 100}
@ -231,14 +231,14 @@ nf.TemplatesTable = (function () {
// initialize the sort
sort({
columnId: 'timestamp',
sortAsc: true
sortAsc: false
}, templatesData);
// initialize the grid
var templatesGrid = new Slick.Grid('#templates-table', templatesData, templatesColumns, templatesOptions);
templatesGrid.setSelectionModel(new Slick.RowSelectionModel());
templatesGrid.registerPlugin(new Slick.AutoTooltips());
templatesGrid.setSortColumn('timestamp', true);
templatesGrid.setSortColumn('timestamp', false);
templatesGrid.onSort.subscribe(function (e, args) {
sort({
columnId: args.sortCol.field,

View File

@ -592,7 +592,7 @@ nf.UsersTable = (function () {
{id: 'userName', name: 'User', field: 'userName', sortable: true, resizable: true},
{id: 'userGroup', name: 'Group', field: 'userGroup', sortable: true, resizable: true, formatter: valueFormatter},
{id: 'authorities', name: 'Roles', field: 'authorities', sortable: true, resizable: true, formatter: roleFormatter},
{id: 'lastAccessed', name: 'Last Accessed', field: 'lastAccessed', sortable: true, resizable: true, formatter: valueFormatter},
{id: 'lastAccessed', name: 'Last Accessed', field: 'lastAccessed', sortable: true, defaultSortAsc: false, resizable: true, formatter: valueFormatter},
{id: 'status', name: 'Status', field: 'status', sortable: true, resizable: false, formatter: statusFormatter},
{id: 'actions', name: '&nbsp;', sortable: false, resizable: false, formatter: actionFormatter, width: 100, maxWidth: 100}
];

View File

@ -21,7 +21,6 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-framework</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
<module>nifi-framework-core-api</module>

View File

@ -20,7 +20,6 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-framework-bundle</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<packaging>pom</packaging>
<description>NiFi: Framework Bundle</description>
<modules>

View File

@ -20,7 +20,6 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-hadoop-nar</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<packaging>nar</packaging>
<dependencies>
<dependency>

View File

@ -20,7 +20,6 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-hadoop-bundle</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<packaging>pom</packaging>
<description>A bundle of processors that work with Hadoop</description>
<modules>

View File

@ -21,7 +21,6 @@
</parent>
<artifactId>nifi-hadoop-libraries-nar</artifactId>
<packaging>nar</packaging>
<version>0.0.2-incubating-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>

View File

@ -21,7 +21,6 @@
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hadoop-libraries-bundle</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<packaging>pom</packaging>
<description>A bundle which provides the Hadoop libraries</description>
<modules>

View File

@ -20,10 +20,8 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-jetty-bundle</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<packaging>nar</packaging>
<description>NiFi: Jetty Bundle</description>
<dependencies>
<dependency>
<groupId>org.eclipse.jetty</groupId>

View File

@ -20,7 +20,6 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-kafka-nar</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<packaging>nar</packaging>
<description>NiFi NAR for interacting with Apache Kafka</description>
<dependencies>

View File

@ -13,12 +13,12 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-kafka-bundle</artifactId>
<packaging>pom</packaging>
<modules>

View File

@ -20,7 +20,6 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-provenance-repository-nar</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<packaging>nar</packaging>
<dependencies>
<dependency>

View File

@ -20,7 +20,6 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-provenance-repository-bundle</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
<module>nifi-persistent-provenance-repository</module>

View File

@ -20,7 +20,6 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-standard-nar</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<packaging>nar</packaging>
<description>NiFi Standard Extensions NAR</description>
<dependencies>

View File

@ -84,7 +84,7 @@ import org.xml.sax.InputSource;
@CapabilityDescription("Evaluates one or more XPaths against the content of a FlowFile. The results of those XPaths are assigned to "
+ "FlowFile Attributes or are written to the content of the FlowFile itself, depending on configuration of the "
+ "Processor. XPaths are entered by adding user-defined properties; the name of the property maps to the Attribute "
+ "Name into which the result will be placed (if the Destination is flowfile-content; otherwise, the property name is ignored). "
+ "Name into which the result will be placed (if the Destination is flowfile-attribute; otherwise, the property name is ignored). "
+ "The value of the property must be a valid XPath expression. If the XPath evaluates to more than one node and the Return Type is "
+ "set to 'nodeset' (either directly, or via 'auto-detect' with a Destination of "
+ "'flowfile-content', the FlowFile will be unmodified and will be routed to failure. If the XPath does not "

View File

@ -57,6 +57,11 @@ public class GetFTP extends GetFileTransfer {
properties.add(FTPTransfer.MAX_SELECTS);
properties.add(FTPTransfer.REMOTE_POLL_BATCH_SIZE);
properties.add(FTPTransfer.USE_NATURAL_ORDERING);
properties.add(FTPTransfer.PROXY_TYPE);
properties.add(FTPTransfer.PROXY_HOST);
properties.add(FTPTransfer.PROXY_PORT);
properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
this.properties = Collections.unmodifiableList(properties);
}

View File

@ -33,6 +33,8 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -40,12 +42,12 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@ -54,15 +56,16 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processors.standard.util.JmsFactory;
import org.apache.nifi.processors.standard.util.JmsProcessingSummary;
import org.apache.nifi.processors.standard.util.WrappedMessageConsumer;
import org.apache.nifi.util.BooleanHolder;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.util.IntegerHolder;
import org.apache.nifi.util.LongHolder;
import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.StopWatch;
public abstract class JmsConsumer extends AbstractProcessor {
public static final String MAP_MESSAGE_PREFIX = "jms.mapmessage.";
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("All FlowFiles are routed to success").build();
@ -108,22 +111,17 @@ public abstract class JmsConsumer extends AbstractProcessor {
final boolean addAttributes = context.getProperty(JMS_PROPS_TO_ATTRIBUTES).asBoolean();
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
final ObjectHolder<Message> lastMessageReceived = new ObjectHolder<>(null);
final ObjectHolder<Map<String, String>> attributesFromJmsProps = new ObjectHolder<>(null);
final Set<FlowFile> allFlowFilesCreated = new HashSet<>();
final IntegerHolder messagesReceived = new IntegerHolder(0);
final LongHolder bytesReceived = new LongHolder(0L);
final JmsProcessingSummary processingSummary = new JmsProcessingSummary();
final StopWatch stopWatch = new StopWatch(true);
for (int i = 0; i < batchSize; i++) {
final BooleanHolder failure = new BooleanHolder(false);
final Message message;
try {
// If we haven't received a message, wait until one is available. If we have already received at least one
// message, then we are not willing to wait for more to become available, but we are willing to keep receiving
// all messages that are immediately available.
if (messagesReceived.get() == 0) {
if (processingSummary.getMessagesReceived() == 0) {
message = consumer.receive(timeout);
} else {
message = consumer.receiveNoWait();
@ -131,7 +129,6 @@ public abstract class JmsConsumer extends AbstractProcessor {
} catch (final JMSException e) {
logger.error("Failed to receive JMS Message due to {}", e);
wrappedConsumer.close(logger);
failure.set(true);
break;
}
@ -139,48 +136,16 @@ public abstract class JmsConsumer extends AbstractProcessor {
break;
}
final IntegerHolder msgsThisFlowFile = new IntegerHolder(0);
FlowFile flowFile = session.create();
try {
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream rawOut) throws IOException {
try (final OutputStream out = new BufferedOutputStream(rawOut, 65536)) {
messagesReceived.getAndIncrement();
final Map<String, String> attributes = (addAttributes ? JmsFactory.createAttributeMap(message) : null);
attributesFromJmsProps.set(attributes);
final byte[] messageBody = JmsFactory.createByteArray(message);
out.write(messageBody);
bytesReceived.addAndGet(messageBody.length);
msgsThisFlowFile.incrementAndGet();
lastMessageReceived.set(message);
} catch (final JMSException e) {
logger.error("Failed to receive JMS Message due to {}", e);
failure.set(true);
}
}
});
} finally {
if (failure.get()) { // no flowfile created
session.remove(flowFile);
wrappedConsumer.close(logger);
} else {
allFlowFilesCreated.add(flowFile);
final Map<String, String> attributes = attributesFromJmsProps.get();
if (attributes != null) {
flowFile = session.putAllAttributes(flowFile, attributes);
}
session.getProvenanceReporter().receive(flowFile, context.getProperty(URL).getValue());
session.transfer(flowFile, REL_SUCCESS);
logger.info("Created {} from {} messages received from JMS Server and transferred to 'success'", new Object[]{flowFile, msgsThisFlowFile.get()});
}
}
processingSummary.add( map2FlowFile(context, session, message, addAttributes, logger) );
} catch (Exception e) {
logger.error("Failed to receive JMS Message due to {}", e);
wrappedConsumer.close(logger);
break;
}
}
if (allFlowFilesCreated.isEmpty()) {
if (processingSummary.getFlowFilesCreated()==0) {
context.yield();
return;
}
@ -188,21 +153,81 @@ public abstract class JmsConsumer extends AbstractProcessor {
session.commit();
stopWatch.stop();
if (!allFlowFilesCreated.isEmpty()) {
if (processingSummary.getFlowFilesCreated()>0) {
final float secs = ((float) stopWatch.getDuration(TimeUnit.MILLISECONDS) / 1000F);
float messagesPerSec = ((float) messagesReceived.get()) / secs;
final String dataRate = stopWatch.calculateDataRate(bytesReceived.get());
logger.info("Received {} messages in {} milliseconds, at a rate of {} messages/sec or {}", new Object[]{messagesReceived.get(), stopWatch.getDuration(TimeUnit.MILLISECONDS), messagesPerSec, dataRate});
float messagesPerSec = ((float) processingSummary.getMessagesReceived()) / secs;
final String dataRate = stopWatch.calculateDataRate(processingSummary.getBytesReceived());
logger.info("Received {} messages in {} milliseconds, at a rate of {} messages/sec or {}", new Object[]{processingSummary.getMessagesReceived(), stopWatch.getDuration(TimeUnit.MILLISECONDS), messagesPerSec, dataRate});
}
// if we need to acknowledge the messages, do so now.
final Message lastMessage = lastMessageReceived.get();
final Message lastMessage = processingSummary.getLastMessageReceived();
if (clientAcknowledge && lastMessage != null) {
try {
lastMessage.acknowledge(); // acknowledge all received messages by acknowledging only the last.
} catch (final JMSException e) {
logger.error("Failed to acknowledge {} JMS Message(s). This may result in duplicate messages. Reason for failure: {}", new Object[]{messagesReceived.get(), e});
logger.error("Failed to acknowledge {} JMS Message(s). This may result in duplicate messages. Reason for failure: {}", new Object[]{processingSummary.getMessagesReceived(), e});
}
}
}
public static JmsProcessingSummary map2FlowFile(final ProcessContext context, final ProcessSession session, final Message message, final boolean addAttributes, ProcessorLog logger) throws Exception {
// Currently not very useful, because always one Message == one FlowFile
final IntegerHolder msgsThisFlowFile = new IntegerHolder(1);
FlowFile flowFile = session.create();
try {
// MapMessage is exception, add only name-value pairs to FlowFile attributes
if (message instanceof MapMessage) {
MapMessage mapMessage = (MapMessage) message;
flowFile = session.putAllAttributes(flowFile, createMapMessageValues(mapMessage));
}
// all other message types, write Message body to FlowFile content
else {
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream rawOut) throws IOException {
try (final OutputStream out = new BufferedOutputStream(rawOut, 65536)) {
final byte[] messageBody = JmsFactory.createByteArray(message);
out.write(messageBody);
} catch (final JMSException e) {
throw new ProcessException("Failed to receive JMS Message due to {}", e);
}
}
});
}
if (addAttributes)
flowFile = session.putAllAttributes(flowFile, JmsFactory.createAttributeMap(message));
session.getProvenanceReporter().receive(flowFile, context.getProperty(URL).getValue());
session.transfer(flowFile, REL_SUCCESS);
logger.info("Created {} from {} messages received from JMS Server and transferred to 'success'", new Object[]{flowFile, msgsThisFlowFile.get()});
return new JmsProcessingSummary(flowFile.getSize(), message, flowFile);
} catch (Exception e) {
session.remove(flowFile);
throw e;
}
}
public static Map<String, String> createMapMessageValues(final MapMessage mapMessage) throws JMSException {
final Map<String, String> valueMap = new HashMap<>();
final Enumeration<?> enumeration = mapMessage.getMapNames();
while (enumeration.hasMoreElements()) {
final String name = (String) enumeration.nextElement();
final Object value = mapMessage.getObject(name);
if (value==null)
valueMap.put(MAP_MESSAGE_PREFIX+name, "");
else
valueMap.put(MAP_MESSAGE_PREFIX+name, value.toString());
}
return valueMap;
}
}

View File

@ -71,7 +71,12 @@ public class PutFTP extends PutFileTransfer<FTPTransfer> {
properties.add(FTPTransfer.LAST_MODIFIED_TIME);
properties.add(FTPTransfer.PERMISSIONS);
properties.add(FTPTransfer.USE_COMPRESSION);
properties.add(FTPTransfer.PROXY_TYPE);
properties.add(FTPTransfer.PROXY_HOST);
properties.add(FTPTransfer.PROXY_PORT);
properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
this.properties = Collections.unmodifiableList(properties);
}

View File

@ -40,6 +40,7 @@ import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_BY
import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_EMPTY;
import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_STREAM;
import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_TEXT;
import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_MAP;
import static org.apache.nifi.processors.standard.util.JmsProperties.PASSWORD;
import static org.apache.nifi.processors.standard.util.JmsProperties.REPLY_TO_QUEUE;
import static org.apache.nifi.processors.standard.util.JmsProperties.TIMEOUT;
@ -257,18 +258,22 @@ public class PutJMS extends AbstractProcessor {
switch (context.getProperty(MESSAGE_TYPE).getValue()) {
case MSG_TYPE_EMPTY: {
message = jmsSession.createTextMessage("");
break;
}
break;
case MSG_TYPE_STREAM: {
final StreamMessage streamMessage = jmsSession.createStreamMessage();
streamMessage.writeBytes(messageContent);
message = streamMessage;
break;
}
break;
case MSG_TYPE_TEXT: {
message = jmsSession.createTextMessage(new String(messageContent, UTF8));
break;
}
case MSG_TYPE_MAP: {
message = jmsSession.createMapMessage();
break;
}
break;
case MSG_TYPE_BYTE:
default: {
final BytesMessage bytesMessage = jmsSession.createBytesMessage();

View File

@ -20,6 +20,8 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.nio.file.Paths;
@ -40,6 +42,7 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPHTTPClient;
import org.apache.commons.net.ftp.FTPReply;
public class FTPTransfer implements FileTransfer {
@ -49,6 +52,9 @@ public class FTPTransfer implements FileTransfer {
public static final String TRANSFER_MODE_ASCII = "ASCII";
public static final String TRANSFER_MODE_BINARY = "Binary";
public static final String FTP_TIMEVAL_FORMAT = "yyyyMMddHHmmss";
public static final String PROXY_TYPE_DIRECT = Proxy.Type.DIRECT.name();
public static final String PROXY_TYPE_HTTP = Proxy.Type.HTTP.name();
public static final String PROXY_TYPE_SOCKS = Proxy.Type.SOCKS.name();
public static final PropertyDescriptor CONNECTION_MODE = new PropertyDescriptor.Builder()
.name("Connection Mode")
@ -69,6 +75,35 @@ public class FTPTransfer implements FileTransfer {
.required(true)
.defaultValue("21")
.build();
public static final PropertyDescriptor PROXY_TYPE = new PropertyDescriptor.Builder()
.name("Proxy Type")
.description("Proxy type used for file transfers")
.allowableValues(PROXY_TYPE_DIRECT, PROXY_TYPE_HTTP, PROXY_TYPE_SOCKS)
.defaultValue(PROXY_TYPE_DIRECT)
.build();
public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder()
.name("Proxy Host")
.description("The fully qualified hostname or IP address of the proxy server")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PROXY_PORT = new PropertyDescriptor.Builder()
.name("Proxy Port")
.description("The port of the proxy server")
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
public static final PropertyDescriptor HTTP_PROXY_USERNAME = new PropertyDescriptor.Builder()
.name("Http Proxy Username")
.description("Http Proxy Username")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.build();
public static final PropertyDescriptor HTTP_PROXY_PASSWORD = new PropertyDescriptor.Builder()
.name("Http Proxy Password")
.description("Http Proxy Password")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.sensitive(true)
.build();
private final ProcessorLog logger;
@ -450,7 +485,18 @@ public class FTPTransfer implements FileTransfer {
}
}
final FTPClient client = new FTPClient();
final Proxy.Type proxyType = Proxy.Type.valueOf(ctx.getProperty(PROXY_TYPE).getValue());
final String proxyHost = ctx.getProperty(PROXY_HOST).getValue();
final Integer proxyPort = ctx.getProperty(PROXY_PORT).asInteger();
FTPClient client;
if (proxyType == Proxy.Type.HTTP) {
client = new FTPHTTPClient(proxyHost, proxyPort, ctx.getProperty(HTTP_PROXY_USERNAME).getValue(), ctx.getProperty(HTTP_PROXY_PASSWORD).getValue());
} else {
client = new FTPClient();
if (proxyType == Proxy.Type.SOCKS) {
client.setSocketFactory(new SocksProxySocketFactory(new Proxy(proxyType, new InetSocketAddress(proxyHost, proxyPort))));
}
}
this.client = client;
client.setDataTimeout(ctx.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
client.setDefaultTimeout(ctx.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.util;
import javax.jms.Message;
import org.apache.nifi.flowfile.FlowFile;
/**
* Data structure which allows to collect processing summary data.
*
*/
public class JmsProcessingSummary {
private int messagesReceived;
private long bytesReceived;
private Message lastMessageReceived;
private int flowFilesCreated;
private FlowFile lastFlowFile; // helps testing
public JmsProcessingSummary() {
super();
this.messagesReceived = 0;
this.bytesReceived = 0;
this.lastMessageReceived = null;
this.flowFilesCreated = 0;
this.lastFlowFile = null;
}
public JmsProcessingSummary(long bytesReceived, Message lastMessageReceived, FlowFile lastFlowFile) {
super();
this.messagesReceived = 1;
this.bytesReceived = bytesReceived;
this.lastMessageReceived = lastMessageReceived;
this.flowFilesCreated = 1;
this.lastFlowFile = lastFlowFile;
}
public void add(JmsProcessingSummary jmsProcessingSummary) {
this.messagesReceived += jmsProcessingSummary.messagesReceived;
this.bytesReceived += jmsProcessingSummary.bytesReceived;
this.lastMessageReceived = jmsProcessingSummary.lastMessageReceived;
this.flowFilesCreated += jmsProcessingSummary.flowFilesCreated;
this.lastFlowFile = jmsProcessingSummary.lastFlowFile;
}
public int getMessagesReceived() {
return messagesReceived;
}
public long getBytesReceived() {
return bytesReceived;
}
public Message getLastMessageReceived() {
return lastMessageReceived;
}
public int getFlowFilesCreated() {
return flowFilesCreated;
}
public FlowFile getLastFlowFile() {
return lastFlowFile;
}
}

View File

@ -33,6 +33,7 @@ public class JmsProperties {
public static final String MSG_TYPE_BYTE = "byte";
public static final String MSG_TYPE_TEXT = "text";
public static final String MSG_TYPE_STREAM = "stream";
public static final String MSG_TYPE_MAP = "map";
public static final String MSG_TYPE_EMPTY = "empty";
// Standard JMS Properties
@ -142,7 +143,7 @@ public class JmsProperties {
.name("Message Type")
.description("The Type of JMS Message to Construct")
.required(true)
.allowableValues(MSG_TYPE_BYTE, MSG_TYPE_STREAM, MSG_TYPE_TEXT, MSG_TYPE_EMPTY)
.allowableValues(MSG_TYPE_BYTE, MSG_TYPE_STREAM, MSG_TYPE_TEXT, MSG_TYPE_MAP, MSG_TYPE_EMPTY)
.defaultValue(MSG_TYPE_BYTE)
.build();
public static final PropertyDescriptor MESSAGE_PRIORITY = new PropertyDescriptor.Builder()

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.util;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.net.Socket;
import java.net.UnknownHostException;
import javax.net.SocketFactory;
public final class SocksProxySocketFactory extends SocketFactory {
private final Proxy proxy;
public SocksProxySocketFactory(Proxy proxy) {
this.proxy = proxy;
}
@Override
public Socket createSocket() throws IOException {
return new Socket(proxy);
}
@Override
public Socket createSocket(InetAddress addr, int port) throws IOException {
Socket socket = createSocket();
socket.connect(new InetSocketAddress(addr, port));
return socket;
}
@Override
public Socket createSocket(InetAddress addr, int port, InetAddress localHostAddr, int localPort) throws IOException {
Socket socket = createSocket();
socket.bind(new InetSocketAddress(localHostAddr, localPort));
socket.connect(new InetSocketAddress(addr, port));
return socket;
}
@Override
public Socket createSocket(String host, int port) throws IOException, UnknownHostException {
Socket socket = createSocket();
socket.connect(new InetSocketAddress(host, port));
return socket;
}
@Override
public Socket createSocket(String host, int port, InetAddress localHostAddr, int localPort) throws IOException, UnknownHostException {
Socket socket = createSocket();
socket.bind(new InetSocketAddress(localHostAddr, localPort));
socket.connect(new InetSocketAddress(host, port));
return socket;
}
}

View File

@ -0,0 +1,173 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import static org.junit.Assert.*;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.TextMessage;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processors.standard.util.JmsProcessingSummary;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.MockProcessorInitializationContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
/**
*
*/
public class TestJmsConsumer {
static protected MapMessage createMapMessage() throws JMSException {
MapMessage mapMessage = new ActiveMQMapMessage();
mapMessage.setString("name", "Arnold");
mapMessage.setInt ("age", 97);
mapMessage.setDouble("xyz", 89686.564);
mapMessage.setBoolean("good", true);
return mapMessage;
}
/**
* Test method for {@link org.apache.nifi.processors.standard.JmsConsumer#createMapMessageAttrs(javax.jms.MapMessage)}.
* @throws JMSException
*/
@Test
public void testCreateMapMessageValues() throws JMSException {
MapMessage mapMessage = createMapMessage();
Map<String, String> mapMessageValues = JmsConsumer.createMapMessageValues(mapMessage);
assertEquals("", 4, mapMessageValues.size());
assertEquals("", "Arnold", mapMessageValues.get(JmsConsumer.MAP_MESSAGE_PREFIX+"name"));
assertEquals("", "97", mapMessageValues.get(JmsConsumer.MAP_MESSAGE_PREFIX+"age"));
assertEquals("", "89686.564", mapMessageValues.get(JmsConsumer.MAP_MESSAGE_PREFIX+"xyz"));
assertEquals("", "true", mapMessageValues.get(JmsConsumer.MAP_MESSAGE_PREFIX+"good"));
}
/**
* Test MapMessage to FlowFile conversion
*/
@Test
public void testMap2FlowFileMapMessage() throws Exception {
TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class);
MapMessage mapMessage = createMapMessage();
ProcessContext context = runner.getProcessContext();
ProcessSession session = runner.getProcessSessionFactory().createSession();
ProcessorInitializationContext pic = new MockProcessorInitializationContext(runner.getProcessor(),
(MockProcessContext) runner.getProcessContext());
JmsProcessingSummary summary = JmsConsumer.map2FlowFile(context, session, mapMessage, true, pic.getLogger());
assertEquals("MapMessage should not create FlowFile content", 0, summary.getBytesReceived());
Map<String, String> attributes = summary.getLastFlowFile().getAttributes();
assertEquals("", "Arnold", attributes.get(JmsConsumer.MAP_MESSAGE_PREFIX+"name"));
assertEquals("", "97", attributes.get(JmsConsumer.MAP_MESSAGE_PREFIX+"age"));
assertEquals("", "89686.564", attributes.get(JmsConsumer.MAP_MESSAGE_PREFIX+"xyz"));
assertEquals("", "true", attributes.get(JmsConsumer.MAP_MESSAGE_PREFIX+"good"));
}
/**
* Test TextMessage to FlowFile conversion
*/
@Test
public void testMap2FlowFileTextMessage() throws Exception {
TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class);
TextMessage textMessage = new ActiveMQTextMessage();
String payload = "Hello world!";
textMessage.setText(payload);
ProcessContext context = runner.getProcessContext();
ProcessSession session = runner.getProcessSessionFactory().createSession();
ProcessorInitializationContext pic = new MockProcessorInitializationContext(runner.getProcessor(),
(MockProcessContext) runner.getProcessContext());
JmsProcessingSummary summary = JmsConsumer.map2FlowFile(context, session, textMessage, true, pic.getLogger());
assertEquals("TextMessage content length should equal to FlowFile content size", payload.length(), summary.getLastFlowFile().getSize());
final byte[] buffer = new byte[payload.length()];
runner.clearTransferState();
session.read(summary.getLastFlowFile(), new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
StreamUtils.fillBuffer(in, buffer, false);
}
});
String contentString = new String(buffer,"UTF-8");
assertEquals("", payload, contentString);
}
/**
* Test BytesMessage to FlowFile conversion
*/
@Test
public void testMap2FlowFileBytesMessage() throws Exception {
TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class);
BytesMessage bytesMessage = new ActiveMQBytesMessage();
String sourceString = "Apache NiFi is an easy to use, powerful, and reliable system to process and distribute data.!";
byte[] payload = sourceString.getBytes("UTF-8");
bytesMessage.writeBytes(payload);
bytesMessage.reset();
ProcessContext context = runner.getProcessContext();
ProcessSession session = runner.getProcessSessionFactory().createSession();
ProcessorInitializationContext pic = new MockProcessorInitializationContext(runner.getProcessor(),
(MockProcessContext) runner.getProcessContext());
JmsProcessingSummary summary = JmsConsumer.map2FlowFile(context, session, bytesMessage, true, pic.getLogger());
assertEquals("BytesMessage content length should equal to FlowFile content size", payload.length, summary.getLastFlowFile().getSize());
final byte[] buffer = new byte[payload.length];
runner.clearTransferState();
session.read(summary.getLastFlowFile(), new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
StreamUtils.fillBuffer(in, buffer, false);
}
});
String contentString = new String(buffer,"UTF-8");
assertEquals("", sourceString, contentString);
}
}

View File

@ -20,7 +20,6 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-standard-bundle</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<packaging>pom</packaging>
<description>NiFi Standard Extensions Bundle</description>
<modules>

View File

@ -20,7 +20,6 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-distributed-cache-services-nar</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<packaging>nar</packaging>
<dependencies>
<dependency>

View File

@ -20,7 +20,6 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-distributed-cache-services-bundle</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
<module>nifi-distributed-cache-protocol</module>

View File

@ -20,7 +20,6 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-load-distribution-service-api</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<packaging>jar</packaging>
<dependencies>
<dependency>

View File

@ -20,7 +20,6 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-ssl-context-service-nar</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<packaging>nar</packaging>
<dependencies>
<dependency>

View File

@ -20,7 +20,6 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-ssl-context-bundle</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
<module>nifi-ssl-context-service</module>

View File

@ -20,7 +20,6 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-standard-services-api-nar</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<packaging>nar</packaging>
<dependencies>
<dependency>

View File

@ -21,7 +21,6 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-standard-services</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
<module>nifi-distributed-cache-client-service-api</module>

View File

@ -20,9 +20,7 @@
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-update-attribute-nar</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>

View File

@ -20,11 +20,8 @@
<artifactId>nifi-nar-bundles</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-update-attribute-bundle</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
<module>nifi-update-attribute-model</module>
<module>nifi-update-attribute-processor</module>

View File

@ -21,12 +21,9 @@
<artifactId>nifi</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
<module>nifi-framework-bundle</module>
<module>nifi-hadoop-bundle</module>