This commit is contained in:
Mark Payne 2015-11-23 14:18:50 -05:00
commit 4e2c94d659
29 changed files with 1237 additions and 235 deletions

View File

@ -259,6 +259,7 @@ public class StandardLineageResult implements ComputeLineageResult {
case FORK:
case JOIN:
case REPLAY:
case FETCH:
case CLONE: {
// For events that create FlowFile nodes, we need to create the FlowFile Nodes and associated Edges, as appropriate
for (final String childUuid : record.getChildUuids()) {

View File

@ -728,6 +728,7 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor
}
break;
case RECEIVE:
case FETCH:
case SEND:
assertSet(transitUri, "Transit URI");
break;

Binary file not shown.

After

Width:  |  Height:  |  Size: 30 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 163 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 97 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 98 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 106 KiB

View File

@ -37,8 +37,7 @@ use a supported web browser to view the User Interface. Supported web browsers i
* Google Chrome 36+
* Safari 8
Note that there is a known issue in Internet Explorer (IE) 10 and 11 that can cause problems when moving items on the NiFi graph. If you encounter this problem, we suggest using a browser other than IE. This known issue is described here: https://connect.microsoft.com/IE/Feedback/Details/1050422.
Note that there is a known issue in Internet Explorer (IE) 10 and 11 that can cause problems when moving items on the NiFi canvas. If you encounter this problem, we suggest using a browser other than IE. This known issue is described here: https://connect.microsoft.com/IE/Feedback/Details/1050422.
[template="glossary", id="terminology"]
@ -100,6 +99,11 @@ Terminology
As a result, several components may be combined together to make a larger building block from which to create a dataflow.
These templates can also be exported as XML and imported into another NiFi instance, allowing these building blocks to be shared.
*flow.xml.gz*: Everything the DFM puts onto the NiFi User Interface canvas is written, in real time, to one file called the flow.xml.gz. This file is located in the nifi/conf directory.
Any change made on the canvas is automatically saved to this file, without the user needing to click a "save" button. In addition, the user may create a back-up copy of this file at any time
by selecting the Controller Settings button in the far-right section of the tool bar and clicking "Back-up flow" on the General tab. By default, this action saves a copy of the current flow in the nifi/conf/archive directory.
See <<Controller_Settings>> for a description of where the "Back-up flow" button may be found. (Note that in a NiFi Cluster, the NiFi Cluster Manager's copy of this file is named flow.tar, whereas this file is still named flow.xml.gz on the nodes.)
[[User_Interface]]
NiFi User Interface
@ -123,8 +127,8 @@ Along the top of the of the screen is a toolbar that contains several of these s
To the left is the Components Toolbar. This toolbar consists of the different components that can be dragged onto the canvas.
Next to the Components Toolbar is the Actions Toolbar. This toolbar consists of buttons to manipulate the existing
components on the graph. To the right of the Actions Toolbar is the Search Toolbar. This toolbar consists of a single
Search field that allows users to easily find components on the graph. Users are able to search by component name,
components on the canvas. To the right of the Actions Toolbar is the Search Toolbar. This toolbar consists of a single
Search field that allows users to easily find components on the canvas. Users are able to search by component name,
type, identifier, configuration properties, and their values.
The Management Toolbar sits to the right-hand side of the screen. This toolbar consists of buttons that are
@ -133,8 +137,8 @@ and configure system properties, such as how many system resources should be pro
image::nifi-toolbar-components.png["NiFi Components Toolbar"]
Next, we have segments that provide capabilities to easily navigate around the graph. On the left-hand side is a toolbar that
provides the ability to pan around the graph and zoom in and out. On the right-hand side is a “Birds-Eye View” of the dataflow.
Next, we have segments that provide capabilities to easily navigate around the canvas. On the left-hand side is a toolbar that
provides the ability to pan around the canvas and zoom in and out. On the right-hand side is a “Birds-Eye View” of the dataflow.
This provides a high-level view of the dataflow and allows the user to quickly and easily pan across large portions of the dataflow.
Along the top of the screen is a trail of breadcrumbs. As users navigate into and out of Process Groups, the breadcrumbs show
the depth in the flow and each Process Group that was entered to reach this depth. Each of the Process Groups listed in the breadcrumbs
@ -143,8 +147,8 @@ is a link that will take you back up to that level in the flow.
image::nifi-navigation.png["NiFi Navigation"]
[[status_bar]]
Below the breadcrumbs lives the Status bar. The Status bar provides information about how many Processors exist in the graph in
each state (Stopped, Running, Invalid, Disabled), how many Remote Process Groups exist on the graph in each state
Below the breadcrumbs lives the Status bar. The Status bar provides information about how many Processors exist on the canvas in
each state (Stopped, Running, Invalid, Disabled), how many Remote Process Groups exist on the canvas in each state
(Transmitting, Not Transmitting), the number of threads that are currently active in the flow, the amount of data that currently
exists in the flow, and the timestamp at which all of this information was last refreshed. If there are any System-Level bulletins,
these are shown in the Status bar as well. Additionally, if the instance of NiFi is clustered, the Status bar shows how many nodes
@ -175,7 +179,7 @@ image:iconProcessor.png["Processor", width=32]
*Processor*: The Processor is the most commonly used component, as it is responsible for data ingress, egress, routing, and
manipulating. There are many different types of Processors. In fact, this is a very common Extension Point in NiFi,
meaning that many vendors may implement their own Processors to perform whatever functions are necessary for their use case.
When a Processor is dragged onto the graph, the user is presented with a dialog to choose which type of Processor to use:
When a Processor is dragged onto the canvas, the user is presented with a dialog to choose which type of Processor to use:
image::add-processor.png["Add Processor Dialog"]
@ -191,7 +195,26 @@ image::add-processor-with-tag-cloud.png["Add Processor with Tag Cloud"]
Clicking the `Add` button or double-clicking on a Processor Type will add the selected Processor to the canvas at the
location that it was dropped.
*Note*: For any component added to the graph, it is possible to select it with the mouse and move it anywhere on the graph. Also, it is possible to select multiple items at once by either holding down the Shift key and selecting each item or by holding down the Shift key and dragging a selection box around the desired components.
*Note*: For any component added to the canvas, it is possible to select it with the mouse and move it anywhere on the canvas. Also, it is possible to select multiple items at once by either holding down the Shift key and selecting each item or by holding down the Shift key and dragging a selection box around the desired components.
Once a Processor has been dragged onto the canvas, the user may interact with it by right-clicking on the Processor and selecting an option from
context menu.
image::nifi-processor-menu.png["Processor Menu", width=300]
The following options are available:
- *Configure*: This option allows the user to establish or change the configuration of the Processor. (See <<Configuring_a_Processor>>.)
- *Start* or *Stop*: This option allows the user to start or stop a Processor; the option will be either Start or Stop, depending on the current state of the Processor.
- *Stats*: This option opens a graphical representation of the Processor's statistical information over time.
- *Upstream connections*: This option allows the user to see and "jump to" upstream connections that are coming into the Processor. This is particularly useful when processors connect into and out of other Process Groups.
- *Downstream connections*: This option allows the user to see and "jump to" downstream connections that are going out of the Processor. This is particularly useful when processors connect into and out of other Process Groups.
- *Usage*: This option takes the user to the Processor's usage documentation.
- *Change color*: This option allows the user to change the color of the Processor, which can make the visual management of large flows easier.
- *Center in view*: This option centers the view of the canvas on the given Processor.
- *Copy*: This option places a copy of the selected Processor on the clipboard, so that it may be pasted elsewhere on the canvas by right-clicking on the canvas and selecting Paste. The Copy/Paste actions also may be done using the keystrokes Ctrl-C (Command-C) and Ctrl-V (Command-V).
- *Delete*: This option allows the DFM to delete a Processor from the canvas.
[[input_port]]
@ -228,6 +251,24 @@ image:iconProcessGroup.png["Process Group", width=32]
and maintain. When a Process Group is dragged onto the canvas, the DFM is prompted to name the Process Group. All Process
Groups within the same parent group must have unique names. The Process Group will then be nested within that parent group.
Once a Process Group has been dragged onto the canvas, the user may interact with it by right-clicking on the Process Group and selecting an option from
context menu.
image::nifi-process-group-menu.png["Process Group Menu", width=300]
The following options are available:
- *Configure*: This option allows the user to establish or change the configuration of the Process Group.
- *Enter group*: This option allows the user to enter the Process Group. It is also possible to double-click on the Process Group to enter it.
- *Start*: This option allows the user to start a Process Group.
- *Stop*: This option allows the user to stop a Process Group.
- *Stats*: This option opens a graphical representation of the Process Group's statistical information over time.
- *Upstream connections*: This option allows the user to see and "jump to" upstream connections that are coming into the Process Group.
- *Downstream connections*: This option allows the user to see and "jump to" downstream connections that are going out of the Process Group.
- *Center in view*: This option centers the view of the canvas on the given Process Group.
- *Copy*: This option places a copy of the selected Process Group on the clipboard, so that it may be pasted elsewhere on the canvas by right-clicking on the canvas and selecting Paste. The Copy/Paste actions also may be done using the keystrokes Ctrl-C (Command-C) and Ctrl-V (Command-V).
- *Delete*: This option allows the DFM to delete a Process Group.
[[remote_process_group]]
@ -241,6 +282,27 @@ how busy each node is. This information is then used to load balance the data th
then interrogated periodically to determine information about any nodes that are dropped from or added to the cluster and to
recalculate the load balancing based on each node's load. For more information, see the section on <<site-to-site,Site-to-Site>>.
Once a Remote Process Group has been dragged onto the canvas, the user may interact with it by right-clicking on the Remote Process Group and selecting an option from
context menu.
image::nifi-rpg-menu.png["Remote Process Group Menu", width=300]
The following options are available:
- *Configure*: This option allows the user to establish or change the configuration of the Remote Process Group.
- *Remote Ports*: This option allows the user to see input ports and/or output ports that exist on the remote instance of NiFi that the Remote Process Group is connected to. Note that if the Site-to-Site configuration is secure, only the ports that the connecting NiFi has been given accessed to will be visible.
- *Enable transmission*: Makes the transmission of data between NiFi instances active. (See <<Remote_Group_Transmission>> )
- *Disable transmission*: Disables the transmission of data between NiFi instances.
- *Stats*: This option opens a graphical representation of the Remote Process Group's statistical information over time.
- *Upstream connections*: This option allows the user to see and "jump to" upstream connections that are coming into the Remote Process Group.
- *Downstream connections*: This option allows the user to see and "jump to" downstream connections that are going out of the Remote Process Group.
- *Refresh*: This option refreshes the view of the status of the remote NiFi instance.
- *Go to*: This option opens a view of the remote NiFi instance in a new tab of the browser. Note that if the Site-to-Site configuration is secure, the user must have access to the remote NiFi instance in order to view it.
- *Center in view*: This option centers the view of the canvas on the given Remote Process Group.
- *Copy*: This option places a copy of the selected Process Group on the clipboard, so that it may be pasted elsewhere on the canvas by right-clicking on the canvas and selecting Paste. The Copy/Paste actions also may be done using the keystrokes Ctrl-C (Command-C) and Ctrl-V (Command-V).
- *Delete*: This option allows the DFM to delete a Remote Process Group from the canvas.
[[funnel]]
image:iconFunnel.png["Funnel", width=32]
@ -276,11 +338,10 @@ choosing `Configure...`
[[Configuring_a_Processor]]
=== Configuring a Processor
Once a Processor has been dragged onto the Canvas, it is ready to configure. This is done by right-clicking on the
Processor and clicking the `Configure...` option from the context menu. The configuration dialog is opened with four
To configure a processor, right-click on the Processor and select the `Configure...` option from the context menu. The configuration dialog is opened with four
different tabs, each of which is discussed below. Once you have finished configuring the Processor, you can apply
the changes by clicking the `Apply` button or cancel all changes by clicking the `Cancel` button.
@ -298,7 +359,7 @@ image::settings-tab.png["Settings Tab"]
This tab contains several different configuration items. First, it allows the DFM to change the name of the Processor.
The name of a Processor by default is the same as the Processor type. Next to the Processor Name is a checkbox, indicating
whether the Processor is Enabled. When a Processor is added to the graph, it is enabled. If the
whether the Processor is Enabled. When a Processor is added to the canvas, it is enabled. If the
Processor is disabled, it cannot be started. The disabled state is used to indicate that when a group of Processors is started,
such as when a DFM starts an entire Process Group, this (disabled) Processor should be excluded.
@ -460,8 +521,12 @@ for all the Processors that are available. Clicking on the desired Processor in
While DFMs have the ability to create Controller Services from the Configure Processor window, there is also a central place within the User Interface for adding and configuring both Controller Services and Reporting Tasks. To get there, click on the Controller Settings button in the Management section of the toolbar.
[[Controller_Settings]]
==== Controller Settings
image:controller-settings-button.png["Controller Settings Button", width=200]
The Controller Settings window has three tabs across the top: General, Controller Services, and Reporting Tasks. The General tab is for settings that pertain to general information about the NiFi instance. For example, here, the DFM can provide a unique name for the overall dataflow, as well as comments that describe the flow. Be aware that this information is visible to any other NiFi instance that connects remotely to this instance (using Remote Process Groups, a.k.a., Site-to-Site).
The General tab also provides settings for the overall maximum thread counts of the instance, as well as the ability to click "Back-up flow" to create a backup copy of the current flow, which is saved by default in the /conf/archive directory.
@ -472,7 +537,7 @@ To the right of the General tab is the Controller Services tab. From this tab, t
image:controller-services-tab.png["Controller Services Tab", width=900]
The Add Controller Service window opens. This window is similar to the Add Processor window. It provides a list of the available Controller Services on the right and a tag cloud, showing the most common catagory tags used for Controller Services, on the left. The DFM may click any tag in the tag cloud in order to narrow down the list of Controller Services to those that fit the categories desired. The DFM may also use the Filter field at the top of the window to search for the desired Contoller Service. Upon selecting a Controller Service from the list, the DFM can see a description of the the service below. Select the desired controller service and click Add, or simply double-click the name of the service to add it.
The Add Controller Service window opens. This window is similar to the Add Processor window. It provides a list of the available Controller Services on the right and a tag cloud, showing the most common category tags used for Controller Services, on the left. The DFM may click any tag in the tag cloud in order to narrow down the list of Controller Services to those that fit the categories desired. The DFM may also use the Filter field at the top of the window to search for the desired Controller Service. Upon selecting a Controller Service from the list, the DFM can see a description of the the service below. Select the desired controller service and click Add, or simply double-click the name of the service to add it.
image:add-controller-service-window.png["Add Controller Service Window", width=700]
@ -513,10 +578,10 @@ The Comments tab is just an open-text field, where the DFM may include comments
When you want to run the Reporting Task, click the Start button in the far-right column of the Reporting Tasks tab.
[[Connecting_Components]]
=== Connecting Components
Once processors and other components have been added to the graph and configured, the next step is to connect them
Once processors and other components have been added to the canvas and configured, the next step is to connect them
to one another so that NiFi knows what to do with each FlowFile after it has been processed. This is accomplished by creating a
Connection between each component. When the user hovers the mouse over the center of a component, a new Connection icon (
image:addConnect.png["Connection Bubble"]
@ -557,7 +622,7 @@ File expiration is a concept by which data that cannot be processed in a timely
This is useful, for example, when the volume of data is expected to exceed the volume that can be sent to a remote site.
In this case, the expiration can be used in conjunction with Prioritizers to ensure that the highest priority data is
processed first and then anything that cannot be processed within a certain time period (one hour, for example) can be dropped. The expiration period is based on the time that the data entered the NiFi instance. In other words, if the file expiration on a given connection is set to '1 hour', and a file that has been in the NiFi instance for one hour reaches that connection, it will expire. The default
value of `0 sec` indicates that the data will never expire. When a file expiration other than '0 sec' is set, a small clock icon appears on the connection label, so the DFM can see it at-a-glance when looking at a flow on the graph.
value of `0 sec` indicates that the data will never expire. When a file expiration other than '0 sec' is set, a small clock icon appears on the connection label, so the DFM can see it at-a-glance when looking at a flow on the canvas.
NiFi provides two configuration elements for Back Pressure. These thresholds indicate how much data should be
@ -581,10 +646,28 @@ The following prioritizers are available:
- *FirstInFirstOutPrioritizer*: Given two FlowFiles, the on that reached the connection first will be processed first.
- *NewestFlowFileFirstPrioritizer*: Given two FlowFiles, the one that is newest in the dataflow will be processed first.
- *OldestFlowFileFirstPrioritizer*: Given two FlowFiles, the on that is oldest in the dataflow will be processed first. This is the default scheme that is used if no prioritizers are selected.
- *PriorityAttributePrioritizer*: Given two FlowFiles that both have a "priority" attribute, the one that has the highest priority value will be prprocessed first. Note that an UpdateAttribute processor should be used to add the "priority" attribute to the FlowFiles before they reach a connection that has this prioritizer set. Values for the "priority" attribute may be alphanumeric, where "a" is a higher priority than "z", and "1" is a higher priority than "9", for example.
- *PriorityAttributePrioritizer*: Given two FlowFiles that both have a "priority" attribute, the one that has the highest priority value will be processed first. Note that an UpdateAttribute processor should be used to add the "priority" attribute to the FlowFiles before they reach a connection that has this prioritizer set. Values for the "priority" attribute may be alphanumeric, where "a" is a higher priority than "z", and "1" is a higher priority than "9", for example.
*Note*: After a connection has been drawn between two components, the connection's configuration may be changed, and the connection may be moved to a new destination; however, the processors on either side of the connection must be stopped before a configuration or destination change may be made.
image:nifi-connection.png["Connection", width=300]
To change a connection's configuration or interact with the connection in other ways, right-click on the connection to open the connection context menu.
image:nifi-connection-menu.png["Connection Menu", width=200]
The following options are available:
- *Configure*: This option allows the user to change the configuration of the connection.
- *Stats*: This option opens a graphical representation of the connection's statistical information over time.
- *Bring to front*: This option brings the connection to the front of the canvas if something else (such as another connection) is overlapping it.
- *Go to source*: This option can be useful if there is a long distance between the connection's source and destination components on the canvas. By clicking this option, the view of the canvas will jump to the source of the connection.
- *Go to destination*: Similar to the "Go to source" option, this option changes the view to the destination component on the canvas and can be useful if there is a long distance between two connected components.
- *Empty queue*: This option allows the DFM to clear the queue of FlowFiles that may be waiting to be processed. This option can be especially useful during testing, when the DFM is not concerned about deleting data from the queue. When this option is selected, users must confirm that they want to delete the data in the queue.
- *Delete*: This option allows the DFM to delete a connection between two components. Note that the components on both sides of the connection must be stopped and the connection must be empty before it can be deleted.
=== Processor Validation
Before trying to start a Processor, it's important to make sure that the Processor's configuration is valid.
@ -647,7 +730,7 @@ link:administration-guide.html[Admin Guide].
This allows new capabilities to be added while still maintaining backward compatibility with all older instances. Additionally, if a vulnerability
or deficiency is ever discovered in a protocol, it allows a newer version of NiFi to forbid communication over the compromised versions of the protocol.
In order to communicate with a remote NiFi instance via Site-to-Site, simply drag a <<remote_process_group,Remote Process Group>> onto the graph
In order to communicate with a remote NiFi instance via Site-to-Site, simply drag a <<remote_process_group,Remote Process Group>> onto the canvas
and enter the URL of the remote NiFi instance (for more information on the components of a Remote Process Group, see
<<Remote_Group_Transmission,Remote Process Group Transmission>> section of this guide.) The URL is the same
URL you would use to go to that instance's User Interface. At that point, you can drag a connection to or from the Remote Process Group
@ -664,7 +747,7 @@ communicate with. For information on configuring NiFi to run securely, see the
link:administration-guide.html[Admin Guide].
In order to allow another NiFi instance to push data to your local instance, you can simply drag an <<input_port,Input Port>> onto the Root Process Group
of your graph. After entering a name for the port, it will be added to your flow. You can now right-click on the Input Port and choose Configure in order
of your canvas. After entering a name for the port, it will be added to your flow. You can now right-click on the Input Port and choose Configure in order
to adjust the name and the number of concurrent tasks that are used for the port. If Site-to-Site is configured to run securely, you will also be given
the ability to adjust who has access to the port. If secure, only those who have been granted access to communicate with the port will be able to see
that the port exists.
@ -698,7 +781,7 @@ This section has described the steps required to build a dataflow. Now, to put i
consists of just two processors: GenerateFlowFile and LogAttribute. These processors are normally used for testing, but they can also be used
to build a quick flow for demonstration purposes and see NiFi in action.
After you drag the GenerateFlowFile and LogAttribute processors to the graph and connect them (using the guidelines provided above), configure them as follows:
After you drag the GenerateFlowFile and LogAttribute processors to the canvas and connect them (using the guidelines provided above), configure them as follows:
* Generate FlowFile
** On the Scheduling tab, set Run schedule to: 5 sec. Note that the GenerateFlowFile processor can create many FlowFiles very quickly; that's why setting the Run schedule is important so that this flow does not overwhelm the system NiFi is running on.
@ -717,7 +800,7 @@ Now see the following section on how to start and stop the dataflow. When the da
== Command and Control of DataFlow
== Command and Control of the DataFlow
When a component is added to the NiFi canvas, it is in the Stopped state. In order to cause the component to
be triggered, the component must be started. Once started, the component can be stopped at any time. From a
@ -802,7 +885,7 @@ Only Ports and Processors can be enabled and disabled.
=== Remote Process Group Transmission
Remote Process Groups provide a mechanism for sending data to or retrieving data from a remote instance
of NiFi. When a Remote Process Group (RPG) is added to the canvas, it is added with the Transmision Disabled,
of NiFi. When a Remote Process Group (RPG) is added to the canvas, it is added with the Transmission Disabled,
as indicated by the icon (
image:iconTransmissionInactive.png["Transmission Disabled"]
) in the top-left corner. When Transmission is Disabled, it can be enabled by right-clicking on the
@ -857,7 +940,7 @@ or not compression should be used when transmitting data to or from this Port.
== Navigating within a DataFlow
NiFi provides various mechanisms for getting around a dataflow. The <<User_Interface>> section discussed various ways to navigate around
the NiFi graph; however, once a flow exists on the graph, there are additional ways to get from one component to another. The <<User Interface>> section showed that when multiple Process Groups exist in a flow, breadcrumbs appear under the toolbar, providing a way to navigate between them. In addition, to enter a Process Group that is currently visible on the graph, simply double-click it, thereby "drilling down" into it. Connections also provide a way to jump from one location to another within the flow. Right-click on a connection and select "Go to source" or "Go to destination" in order to jump to one end of the connection or another. This can be very useful in large, complex dataflows, where the connection lines may be long and span large areas of the graph. Finally, all components provide the ability to jump forward or backward within the flow. Right-click any component (e.g., a processor, process group, port, etc.) and select either "Upstream connections" or "Downstream connections". A dialog window will open, showing the available upstream or downstream connections that the user may jump to. This can be especially useful when trying to follow a dataflow in a backward direction. It is typically easy to follow the path of a dataflow from start to finish, drilling down into nested process groups; however, it can be more difficult to follow the dataflow in the other direction.
the NiFi canvas; however, once a flow exists on the canvas, there are additional ways to get from one component to another. The <<User Interface>> section showed that when multiple Process Groups exist in a flow, breadcrumbs appear under the toolbar, providing a way to navigate between them. In addition, to enter a Process Group that is currently visible on the canvas, simply double-click it, thereby "drilling down" into it. Connections also provide a way to jump from one location to another within the flow. Right-click on a connection and select "Go to source" or "Go to destination" in order to jump to one end of the connection or another. This can be very useful in large, complex dataflows, where the connection lines may be long and span large areas of the canvas. Finally, all components provide the ability to jump forward or backward within the flow. Right-click any component (e.g., a processor, process group, port, etc.) and select either "Upstream connections" or "Downstream connections". A dialog window will open, showing the available upstream or downstream connections that the user may jump to. This can be especially useful when trying to follow a dataflow in a backward direction. It is typically easy to follow the path of a dataflow from start to finish, drilling down into nested process groups; however, it can be more difficult to follow the dataflow in the other direction.
@ -869,7 +952,7 @@ health and status. The Status bar provides information about the overall system
(See <<status_bar>> above for more information). Processors, Process Groups, and Remote Process Groups
provide fine-grained details about their operations. Connections and Process Groups provide information
about the amount of data in their queues. The Summary Page provides information about all of the components
on the graph in a tabular format and also provides System Diagnostics information that includes disk usage,
on the canvas in a tabular format and also provides System Diagnostics information that includes disk usage,
CPU utilization, and Java Heap and Garbage Collection information. In a clustered environment, this
information is available per-node or as aggregates across the entire cluster. We will explore each of these
monitoring artifacts below.
@ -1118,7 +1201,7 @@ image:iconNotSecure.png["Not Secure"]
will be counted.
** image:iconTransmissionActive.png["Transmitting"]
*Transmitting Ports*: The number of Output Ports from whcih this NiFi is connected and currently configured
*Transmitting Ports*: The number of Output Ports from which this NiFi is connected and currently configured
to pull data from. Ports can be turned on and off by enabling and disabling transmission on the Remote Process
Group (see <<Remote_Group_Transmission>>) or via the <<Remote_Port_Configuration>> dialog.
@ -1158,7 +1241,7 @@ image:iconSummary.png["Summary"]
image::summary-table.png["Summary Table"]
This dialog provides a great deal of information about each of the components on the graph. Below, we have annotated
This dialog provides a great deal of information about each of the components on the canvas. Below, we have annotated
the different elements within the dialog in order to make the discussion of the dialog easier.
image::summary-annotated.png["Summary Table Annotated"]
@ -1294,7 +1377,7 @@ error message if unable to create the template for some reason.
.Note
********************************************************************************************************************
It is important to note that if any Processor that is Templated has a sensitive property (such as a password), the value of that
sensitive property is not included in the Template. As a result, when dragging the Template onto the graph, newly
sensitive property is not included in the Template. As a result, when dragging the Template onto the canvas, newly
created Processors may not be valid if they are missing values for their sensitive properties. Additionally, any
Connection that was selected when making the Template is not included in the Template if either the source or the
destination of the Connection is not also included in the Template.
@ -1458,9 +1541,9 @@ image:expanded-events.png["Expanded Events", width=300]
Other Management Features
-------------------------
In addition to the Summary Page, Data Provenance Page, Template Management Page, and Bulletin Board Page, there are other tools in the Management Toolbar (See <<User_Interface>>) that are useful to the DFM. The Flow Configuration History, which is available by clicking on the clock icon ( image:iconFlowHistory.png["Flow History", width=28] ) in the Management Toolbar, shows all the changes that have been made to the dataflow graph. The history can aid in troubleshooting, such as if a recent change to the dataflow has caused a problem and needs to be fixed. The DFM can see what changes have been made and adjust the flow as needed to fix the problem. While NiFi does not have an "undo" feature, the DFM can make new changes to the dataflow that will fix the problem.
In addition to the Summary Page, Data Provenance Page, Template Management Page, and Bulletin Board Page, there are other tools in the Management Toolbar (See <<User_Interface>>) that are useful to the DFM. The Flow Configuration History, which is available by clicking on the clock icon ( image:iconFlowHistory.png["Flow History", width=28] ) in the Management Toolbar, shows all the changes that have been made to the dataflow. The history can aid in troubleshooting, such as if a recent change to the dataflow has caused a problem and needs to be fixed. The DFM can see what changes have been made and adjust the flow as needed to fix the problem. While NiFi does not have an "undo" feature, the DFM can make new changes to the dataflow that will fix the problem.
Two other tools in the Management Toolbar are the Controller Settings page ( image:iconSettings.png["Settings", width=28] ) and the Users page ( image:iconUsers.png["Users", width=28] ). The Controller Settings page provides the ability to change the name of the NiFi instance, add comments describing the NiFi instance, set the maximum number of threads that are available to the application, and create a back-up copy of the dataflow(s) currently on the graph. It also provides tabs where DFMs may add and configure Controller Services and Reporting Tasks (see <<Controller_Services_and_Reporting_Tasks>>). The Users page is used to manage user access, which is described in the Admin Guide.
Two other tools in the Management Toolbar are the Controller Settings page ( image:iconSettings.png["Settings", width=28] ) and the Users page ( image:iconUsers.png["Users", width=28] ). The Controller Settings page provides the ability to change the name of the NiFi instance, add comments describing the NiFi instance, set the maximum number of threads that are available to the application, and create a back-up copy of the dataflow(s) currently on the canvas. It also provides tabs where DFMs may add and configure Controller Services and Reporting Tasks (see <<Controller_Services_and_Reporting_Tasks>>). The Users page is used to manage user access, which is described in the Admin Guide.

View File

@ -457,6 +457,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
bytesSent += event.getFileSize();
break;
case RECEIVE:
case FETCH:
flowFilesReceived++;
bytesReceived += event.getFileSize();
break;
@ -616,7 +617,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
if (registeredTypes.contains(ProvenanceEventType.CREATE)
|| registeredTypes.contains(ProvenanceEventType.FORK)
|| registeredTypes.contains(ProvenanceEventType.JOIN)
|| registeredTypes.contains(ProvenanceEventType.RECEIVE)) {
|| registeredTypes.contains(ProvenanceEventType.RECEIVE)
|| registeredTypes.contains(ProvenanceEventType.FETCH)) {
creationEventRegistered = true;
}
}

View File

@ -1203,6 +1203,11 @@ nf.ProvenanceTable = (function () {
formatEventDetail('Relationship', event.relationship);
}
// conditionally show FETCH details
if (event.eventType === 'FETCH') {
formatEventDetail('Transit Uri', event.transitUri);
}
// conditionally show the cluster node identifier
if (nf.Common.isDefinedAndNotNull(event.clusterNodeId)) {
// save the cluster node id

View File

@ -50,6 +50,10 @@
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>

View File

@ -0,0 +1,183 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Base class for processors that put data to HBase.
*/
public abstract class AbstractPutHBase extends AbstractProcessor {
protected static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
.name("HBase Client Service")
.description("Specifies the Controller Service to use for accessing HBase.")
.required(true)
.identifiesControllerService(HBaseClientService.class)
.build();
protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
.name("Table Name")
.description("The name of the HBase Table to put data into")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor ROW_ID = new PropertyDescriptor.Builder()
.name("Row Identifier")
.description("Specifies the Row ID to use when inserting data into HBase")
.required(false) // not all sub-classes will require this
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor COLUMN_FAMILY = new PropertyDescriptor.Builder()
.name("Column Family")
.description("The Column Family to use when inserting data into HBase")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor COLUMN_QUALIFIER = new PropertyDescriptor.Builder()
.name("Column Qualifier")
.description("The Column Qualifier to use when inserting data into HBase")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("The maximum number of FlowFiles to process in a single execution. The FlowFiles will be " +
"grouped by table, and a single Put per table will be performed.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("25")
.build();
protected static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A FlowFile is routed to this relationship after it has been successfully stored in HBase")
.build();
protected static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("A FlowFile is routed to this relationship if it cannot be sent to HBase")
.build();
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
List<FlowFile> flowFiles = session.get(batchSize);
if (flowFiles == null || flowFiles.size() == 0) {
return;
}
final Map<String,List<PutFlowFile>> tablePuts = new HashMap<>();
// Group FlowFiles by HBase Table
for (final FlowFile flowFile : flowFiles) {
final PutFlowFile putFlowFile = createPut(session, context, flowFile);
if (putFlowFile == null) {
// sub-classes should log appropriate error messages before returning null
session.transfer(flowFile, REL_FAILURE);
} else if (!putFlowFile.isValid()) {
if (StringUtils.isBlank(putFlowFile.getTableName())) {
getLogger().error("Missing table name for FlowFile {}; routing to failure", new Object[]{flowFile});
} else if (StringUtils.isBlank(putFlowFile.getRow())) {
getLogger().error("Missing row id for FlowFile {}; routing to failure", new Object[]{flowFile});
} else if (putFlowFile.getColumns() == null || putFlowFile.getColumns().isEmpty()) {
getLogger().error("No columns provided for FlowFile {}; routing to failure", new Object[]{flowFile});
} else {
// really shouldn't get here, but just in case
getLogger().error("Failed to produce a put for FlowFile {}; routing to failure", new Object[]{flowFile});
}
session.transfer(flowFile, REL_FAILURE);
} else {
List<PutFlowFile> putFlowFiles = tablePuts.get(putFlowFile.getTableName());
if (putFlowFiles == null) {
putFlowFiles = new ArrayList<>();
tablePuts.put(putFlowFile.getTableName(), putFlowFiles);
}
putFlowFiles.add(putFlowFile);
}
}
getLogger().debug("Sending {} FlowFiles to HBase in {} put operations", new Object[]{flowFiles.size(), tablePuts.size()});
final long start = System.nanoTime();
final List<PutFlowFile> successes = new ArrayList<>();
final HBaseClientService hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
for (Map.Entry<String, List<PutFlowFile>> entry : tablePuts.entrySet()) {
try {
hBaseClientService.put(entry.getKey(), entry.getValue());
successes.addAll(entry.getValue());
} catch (Exception e) {
getLogger().error(e.getMessage(), e);
for (PutFlowFile putFlowFile : entry.getValue()) {
getLogger().error("Failed to send {} to HBase due to {}; routing to failure", new Object[]{putFlowFile.getFlowFile(), e});
final FlowFile failure = session.penalize(putFlowFile.getFlowFile());
session.transfer(failure, REL_FAILURE);
}
}
}
final long sendMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
getLogger().debug("Sent {} FlowFiles to HBase successfully in {} milliseconds", new Object[]{successes.size(), sendMillis});
for (PutFlowFile putFlowFile : successes) {
session.transfer(putFlowFile.getFlowFile(), REL_SUCCESS);
final String details = "Put " + putFlowFile.getColumns().size() + " cells to HBase";
session.getProvenanceReporter().send(putFlowFile.getFlowFile(), getTransitUri(putFlowFile), details, sendMillis);
}
}
protected String getTransitUri(PutFlowFile putFlowFile) {
return "hbase://" + putFlowFile.getTableName() + "/" + putFlowFile.getRow();
}
/**
* Sub-classes provide the implementation to create a put from a FlowFile.
*
* @param session
* the current session
* @param context
* the current context
* @param flowFile
* the FlowFile to create a Put from
*
* @return a PutFlowFile instance for the given FlowFile
*/
protected abstract PutFlowFile createPut(final ProcessSession session, final ProcessContext context, final FlowFile flowFile);
}

View File

@ -41,6 +41,7 @@ import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.hbase.scan.ResultHandler;
import org.apache.nifi.hbase.util.ObjectSerDe;
import org.apache.nifi.hbase.util.StringSerDe;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
@ -83,7 +84,7 @@ import java.util.regex.Pattern;
@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the data was pulled from"),
@WritesAttribute(attribute = "mime.type", description = "Set to application/json to indicate that output is JSON")
})
public class GetHBase extends AbstractHBaseProcessor {
public class GetHBase extends AbstractProcessor {
static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:\\w+)?(?:,\\w+(:\\w+)?)*");

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.hbase;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
@ -24,91 +23,36 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@EventDriven
@SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"hadoop", "hbase"})
@CapabilityDescription("Adds the Contents of a FlowFile to HBase as the value of a single cell")
public class PutHBaseCell extends AbstractProcessor {
protected static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
.name("HBase Client Service")
.description("Specifies the Controller Service to use for accessing HBase.")
.required(true)
.identifiesControllerService(HBaseClientService.class)
.build();
protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
.name("Table Name")
.description("The name of the HBase Table to put data into")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor ROW = new PropertyDescriptor.Builder()
.name("Row Identifier")
.description("Specifies the Row ID to use when inserting data into HBase")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor COLUMN_FAMILY = new PropertyDescriptor.Builder()
.name("Column Family")
.description("The Column Family to use when inserting data into HBase")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor COLUMN_QUALIFIER = new PropertyDescriptor.Builder()
.name("Column Qualifier")
.description("The Column Qualifier to use when inserting data into HBase")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("The maximum number of FlowFiles to process in a single execution. The FlowFiles will be " +
"grouped by table, and a single Put per table will be performed.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("25")
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A FlowFile is routed to this relationship after it has been successfully stored in HBase")
.build();
static final Relationship FAILURE = new Relationship.Builder()
.name("failure")
.description("A FlowFile is routed to this relationship if it cannot be sent to HBase")
.build();
public class PutHBaseCell extends AbstractPutHBase {
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(HBASE_CLIENT_SERVICE);
properties.add(TABLE_NAME);
properties.add(ROW);
properties.add(ROW_ID);
properties.add(COLUMN_FAMILY);
properties.add(COLUMN_QUALIFIER);
properties.add(BATCH_SIZE);
@ -119,84 +63,27 @@ public class PutHBaseCell extends AbstractProcessor {
public Set<Relationship> getRelationships() {
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
rels.add(FAILURE);
rels.add(REL_FAILURE);
return rels;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
List<FlowFile> flowFiles = session.get(batchSize);
if (flowFiles == null || flowFiles.size() == 0) {
return;
}
protected PutFlowFile createPut(final ProcessSession session, final ProcessContext context, final FlowFile flowFile) {
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String row = context.getProperty(ROW_ID).evaluateAttributeExpressions(flowFile).getValue();
final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
final String columnQualifier = context.getProperty(COLUMN_QUALIFIER).evaluateAttributeExpressions(flowFile).getValue();
final Map<String,List<PutFlowFile>> tablePuts = new HashMap<>();
// Group FlowFiles by HBase Table
for (final FlowFile flowFile : flowFiles) {
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String row = context.getProperty(ROW).evaluateAttributeExpressions(flowFile).getValue();
final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
final String columnQualifier = context.getProperty(COLUMN_QUALIFIER).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isBlank(tableName) || StringUtils.isBlank(row) || StringUtils.isBlank(columnFamily) || StringUtils.isBlank(columnQualifier)) {
getLogger().error("Invalid FlowFile {} missing table, row, column familiy, or column qualifier; routing to failure", new Object[]{flowFile});
session.transfer(flowFile, FAILURE);
} else {
final byte[] buffer = new byte[(int) flowFile.getSize()];
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.fillBuffer(in, buffer);
}
});
final PutFlowFile putFlowFile = new PutFlowFile(tableName, row, columnFamily, columnQualifier, buffer, flowFile);
List<PutFlowFile> putFlowFiles = tablePuts.get(tableName);
if (putFlowFiles == null) {
putFlowFiles = new ArrayList<>();
tablePuts.put(tableName, putFlowFiles);
}
putFlowFiles.add(putFlowFile);
final byte[] buffer = new byte[(int) flowFile.getSize()];
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.fillBuffer(in, buffer);
}
}
});
getLogger().debug("Sending {} FlowFiles to HBase in {} put operations", new Object[] {flowFiles.size(), tablePuts.size()});
final long start = System.nanoTime();
final List<PutFlowFile> successes = new ArrayList<>();
final HBaseClientService hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
for (Map.Entry<String, List<PutFlowFile>> entry : tablePuts.entrySet()) {
try {
hBaseClientService.put(entry.getKey(), entry.getValue());
successes.addAll(entry.getValue());
} catch (Exception e) {
getLogger().error(e.getMessage(), e);
for (PutFlowFile putFlowFile : entry.getValue()) {
getLogger().error("Failed to send {} to HBase due to {}; routing to failure", new Object[]{putFlowFile.getFlowFile(), e});
final FlowFile failure = session.penalize(putFlowFile.getFlowFile());
session.transfer(failure, FAILURE);
}
}
}
final long sendMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
getLogger().debug("Sent {} FlowFiles to HBase successfully in {} milliseconds", new Object[] {successes.size(), sendMillis});
for (PutFlowFile putFlowFile : successes) {
session.transfer(putFlowFile.getFlowFile(), REL_SUCCESS);
session.getProvenanceReporter().send(putFlowFile.getFlowFile(), getTransitUri(putFlowFile), sendMillis);
}
}
protected String getTransitUri(PutFlowFile putFlowFile) {
return "hbase://" + putFlowFile.getTableName() + "/" + putFlowFile.getRow() + "/" + putFlowFile.getColumnFamily()
+ ":" + putFlowFile.getColumnQualifier();
final Collection<PutColumn> columns = Collections.singletonList(new PutColumn(columnFamily, columnQualifier, buffer));
return new PutFlowFile(tableName, row, columns, flowFile);
}
}

View File

@ -0,0 +1,230 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.ObjectHolder;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@EventDriven
@SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"hadoop", "hbase", "put", "json"})
@CapabilityDescription("Adds rows to HBase based on the contents of incoming JSON documents. Each FlowFile must contain a single " +
"UTF-8 encoded JSON document, and any FlowFiles where the root element is not a single document will be routed to failure. " +
"Each JSON field name and value will become a column qualifier and value of the HBase row. Any fields with a null value " +
"will be skipped, and fields with a complex value will be handled according to the Complex Field Strategy. " +
"The row id can be specified either directly on the processor through the Row Identifier property, or can be extracted from the JSON " +
"document by specifying the Row Identifier Field Name property. This processor will hold the contents of all FlowFiles for the given batch " +
"in memory at one time.")
public class PutHBaseJSON extends AbstractPutHBase {
protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder()
.name("Row Identifier Field Name")
.description("Specifies the name of a JSON element whose value should be used as the row id for the given JSON document.")
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final String FAIL_VALUE = "Fail";
protected static final String WARN_VALUE = "Warn";
protected static final String IGNORE_VALUE = "Ignore";
protected static final String TEXT_VALUE = "Text";
protected static final AllowableValue COMPLEX_FIELD_FAIL = new AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any elements contain complex values.");
protected static final AllowableValue COMPLEX_FIELD_WARN = new AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include field in row sent to HBase.");
protected static final AllowableValue COMPLEX_FIELD_IGNORE = new AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include in row sent to HBase.");
protected static final AllowableValue COMPLEX_FIELD_TEXT = new AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the complex field as the value of the given column.");
protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new PropertyDescriptor.Builder()
.name("Complex Field Strategy")
.description("Indicates how to handle complex fields, i.e. fields that do not have a single text value.")
.expressionLanguageSupported(false)
.required(true)
.allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT)
.defaultValue(COMPLEX_FIELD_TEXT.getValue())
.build();
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(HBASE_CLIENT_SERVICE);
properties.add(TABLE_NAME);
properties.add(ROW_ID);
properties.add(ROW_FIELD_NAME);
properties.add(COLUMN_FAMILY);
properties.add(BATCH_SIZE);
properties.add(COMPLEX_FIELD_STRATEGY);
return properties;
}
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
rels.add(REL_FAILURE);
return rels;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final Collection<ValidationResult> results = new ArrayList<>();
final String rowId = validationContext.getProperty(ROW_ID).getValue();
final String rowFieldName = validationContext.getProperty(ROW_FIELD_NAME).getValue();
if (StringUtils.isBlank(rowId) && StringUtils.isBlank(rowFieldName)) {
results.add(new ValidationResult.Builder()
.subject(this.getClass().getSimpleName())
.explanation("Row Identifier or Row Identifier Field Name is required")
.valid(false)
.build());
} else if (!StringUtils.isBlank(rowId) && !StringUtils.isBlank(rowFieldName)) {
results.add(new ValidationResult.Builder()
.subject(this.getClass().getSimpleName())
.explanation("Row Identifier and Row Identifier Field Name can not be used together")
.valid(false)
.build());
}
return results;
}
@Override
protected PutFlowFile createPut(final ProcessSession session, final ProcessContext context, final FlowFile flowFile) {
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String rowId = context.getProperty(ROW_ID).evaluateAttributeExpressions(flowFile).getValue();
final String rowFieldName = context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
final boolean extractRowId = !StringUtils.isBlank(rowFieldName);
final String complexFieldStrategy = context.getProperty(COMPLEX_FIELD_STRATEGY).getValue();
// Parse the JSON document
final ObjectMapper mapper = new ObjectMapper();
final ObjectHolder<JsonNode> rootNodeRef = new ObjectHolder<>(null);
try {
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
try (final InputStream bufferedIn = new BufferedInputStream(in)) {
rootNodeRef.set(mapper.readTree(bufferedIn));
}
}
});
} catch (final ProcessException pe) {
getLogger().error("Failed to parse {} as JSON due to {}; routing to failure", new Object[]{flowFile, pe.toString()}, pe);
return null;
}
final JsonNode rootNode = rootNodeRef.get();
if (rootNode.isArray()) {
getLogger().error("Root node of JSON must be a single document, found array for {}; routing to failure", new Object[]{flowFile});
return null;
}
final Collection<PutColumn> columns = new ArrayList<>();
final ObjectHolder<String> rowIdHolder = new ObjectHolder<>(null);
// convert each field/value to a column for the put, skip over nulls and arrays
final Iterator<String> fieldNames = rootNode.getFieldNames();
while (fieldNames.hasNext()) {
final String fieldName = fieldNames.next();
final ObjectHolder<String> fieldValueHolder = new ObjectHolder<>(null);
final JsonNode fieldNode = rootNode.get(fieldName);
if (fieldNode.isNull()) {
getLogger().debug("Skipping {} because value was null", new Object[]{fieldName});
} else if (fieldNode.isValueNode()) {
fieldValueHolder.set(fieldNode.asText());
} else {
// for non-null, non-value nodes, determine what to do based on the handling strategy
switch (complexFieldStrategy) {
case FAIL_VALUE:
getLogger().error("Complex value found for {}; routing to failure", new Object[]{fieldName});
return null;
case WARN_VALUE:
getLogger().warn("Complex value found for {}; skipping", new Object[]{fieldName});
break;
case TEXT_VALUE:
// use toString() here because asText() is only guaranteed to be supported on value nodes
// some other types of nodes, like ArrayNode, provide toString implementations
fieldValueHolder.set(fieldNode.toString());
break;
case IGNORE_VALUE:
// silently skip
break;
default:
break;
}
}
// if we have a field value, then see if this is the row id field, if so store the value for later
// otherwise add a new column where the fieldName and fieldValue are the column qualifier and value
if (fieldValueHolder.get() != null) {
if (extractRowId && fieldName.equals(rowFieldName)) {
rowIdHolder.set(fieldValueHolder.get());
} else {
columns.add(new PutColumn(columnFamily, fieldName, fieldValueHolder.get().getBytes(StandardCharsets.UTF_8)));
}
}
}
// if we are expecting a field name to use for the row id and the incoming document doesn't have it
// log an error message so the user can see what the field names were and return null so it gets routed to failure
if (extractRowId && rowIdHolder.get() == null) {
final String fieldNameStr = StringUtils.join(rootNode.getFieldNames(), ",");
getLogger().error("Row ID field named '{}' not found in field names '{}'; routing to failure", new Object[] {rowFieldName, fieldNameStr});
return null;
}
final String putRowId = (extractRowId ? rowIdHolder.get() : rowId);
return new PutFlowFile(tableName, putRowId, columns, flowFile);
}
}

View File

@ -14,4 +14,5 @@
# limitations under the License.
org.apache.nifi.hbase.GetHBase
org.apache.nifi.hbase.PutHBaseCell
org.apache.nifi.hbase.PutHBaseCell
org.apache.nifi.hbase.PutHBaseJSON

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertTrue;
public class HBaseTestUtil {
public static void verifyPut(final String row, final String columnFamily, final Map<String,String> columns, final List<PutFlowFile> puts) {
boolean foundPut = false;
for (final PutFlowFile put : puts) {
if (!row.equals(put.getRow())) {
continue;
}
if (put.getColumns() == null || put.getColumns().size() != columns.size()) {
continue;
}
// start off assuming we have all the columns
boolean foundAllColumns = true;
for (Map.Entry<String, String> entry : columns.entrySet()) {
// determine if we have the current expected column
boolean foundColumn = false;
for (PutColumn putColumn : put.getColumns()) {
final String colVal = new String(putColumn.getBuffer(), StandardCharsets.UTF_8);
if (columnFamily.equals(putColumn.getColumnFamily()) && entry.getKey().equals(putColumn.getColumnQualifier())
&& entry.getValue().equals(colVal)) {
foundColumn = true;
break;
}
}
// if we didn't have the current expected column we know we don't have all expected columns
if (!foundColumn) {
foundAllColumns = false;
break;
}
}
// if we found all the expected columns this was a match so we can break
if (foundAllColumns) {
foundPut = true;
break;
}
}
assertTrue(foundPut);
}
public static void verifyEvent(final List<ProvenanceEventRecord> events, final String uri, final ProvenanceEventType eventType) {
boolean foundEvent = false;
for (final ProvenanceEventRecord event : events) {
if (event.getTransitUri().equals(uri) && event.getEventType().equals(eventType)) {
foundEvent = true;
break;
}
}
assertTrue(foundEvent);
}
}

View File

@ -17,6 +17,7 @@
package org.apache.nifi.hbase;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
@ -33,7 +34,7 @@ import java.util.Map;
public class MockHBaseClientService extends AbstractControllerService implements HBaseClientService {
private Map<String,ResultCell[]> results = new HashMap<>();
private Map<String, List<PutFlowFile>> puts = new HashMap<>();
private Map<String, List<PutFlowFile>> flowFilePuts = new HashMap<>();
private boolean throwException = false;
@Override
@ -42,7 +43,12 @@ public class MockHBaseClientService extends AbstractControllerService implements
throw new IOException("exception");
}
this.puts.put(tableName, new ArrayList<>(puts));
this.flowFilePuts.put(tableName, new ArrayList<>(puts));
}
@Override
public void put(String tableName, String rowId, Collection<PutColumn> columns) throws IOException {
throw new UnsupportedOperationException();
}
@Override
@ -92,8 +98,8 @@ public class MockHBaseClientService extends AbstractControllerService implements
results.put(rowKey, cellArray);
}
public Map<String, List<PutFlowFile>> getPuts() {
return puts;
public Map<String, List<PutFlowFile>> getFlowFilePuts() {
return flowFilePuts;
}
public void setThrowException(boolean throwException) {

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.hbase;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
@ -43,7 +44,7 @@ public class TestPutHBaseCell {
final TestRunner runner = TestRunners.newTestRunner(PutHBaseCell.class);
runner.setProperty(PutHBaseCell.TABLE_NAME, tableName);
runner.setProperty(PutHBaseCell.ROW, row);
runner.setProperty(PutHBaseCell.ROW_ID, row);
runner.setProperty(PutHBaseCell.COLUMN_FAMILY, columnFamily);
runner.setProperty(PutHBaseCell.COLUMN_QUALIFIER, columnQualifier);
runner.setProperty(PutHBaseCell.BATCH_SIZE, "1");
@ -58,12 +59,14 @@ public class TestPutHBaseCell {
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
assertNotNull(hBaseClient.getPuts());
assertEquals(1, hBaseClient.getPuts().size());
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
List<PutFlowFile> puts = hBaseClient.getPuts().get(tableName);
List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName);
assertEquals(1, puts.size());
verifyPut(row, columnFamily, columnQualifier, content, puts.get(0));
assertEquals(1, runner.getProvenanceEvents().size());
}
@Test
@ -89,12 +92,14 @@ public class TestPutHBaseCell {
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
assertNotNull(hBaseClient.getPuts());
assertEquals(1, hBaseClient.getPuts().size());
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
List<PutFlowFile> puts = hBaseClient.getPuts().get(tableName);
List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName);
assertEquals(1, puts.size());
verifyPut(row, columnFamily, columnQualifier, content, puts.get(0));
assertEquals(1, runner.getProvenanceEvents().size());
}
@Test
@ -115,7 +120,9 @@ public class TestPutHBaseCell {
runner.run();
runner.assertTransferCount(PutHBaseCell.REL_SUCCESS, 0);
runner.assertTransferCount(PutHBaseCell.FAILURE, 1);
runner.assertTransferCount(PutHBaseCell.REL_FAILURE, 1);
assertEquals(0, runner.getProvenanceEvents().size());
}
@Test
@ -142,7 +149,9 @@ public class TestPutHBaseCell {
runner.run();
runner.assertTransferCount(PutHBaseCell.REL_SUCCESS, 1);
runner.assertTransferCount(PutHBaseCell.FAILURE, 1);
runner.assertTransferCount(PutHBaseCell.REL_FAILURE, 1);
assertEquals(1, runner.getProvenanceEvents().size());
}
@Test
@ -171,13 +180,15 @@ public class TestPutHBaseCell {
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content1);
assertNotNull(hBaseClient.getPuts());
assertEquals(1, hBaseClient.getPuts().size());
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
List<PutFlowFile> puts = hBaseClient.getPuts().get(tableName);
List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName);
assertEquals(2, puts.size());
verifyPut(row1, columnFamily, columnQualifier, content1, puts.get(0));
verifyPut(row2, columnFamily, columnQualifier, content2, puts.get(1));
assertEquals(2, runner.getProvenanceEvents().size());
}
@Test
@ -202,7 +213,9 @@ public class TestPutHBaseCell {
runner.enqueue(content2.getBytes("UTF-8"), attributes2);
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.FAILURE, 2);
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 2);
assertEquals(0, runner.getProvenanceEvents().size());
}
@Test
@ -229,13 +242,15 @@ public class TestPutHBaseCell {
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content1);
assertNotNull(hBaseClient.getPuts());
assertEquals(1, hBaseClient.getPuts().size());
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
List<PutFlowFile> puts = hBaseClient.getPuts().get(tableName);
List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName);
assertEquals(2, puts.size());
verifyPut(row, columnFamily, columnQualifier, content1, puts.get(0));
verifyPut(row, columnFamily, columnQualifier, content2, puts.get(1));
assertEquals(2, runner.getProvenanceEvents().size());
}
private Map<String, String> getAtrributeMapWithEL(String tableName, String row, String columnFamily, String columnQualifier) {
@ -250,7 +265,7 @@ public class TestPutHBaseCell {
private TestRunner getTestRunnerWithEL(PutHBaseCell proc) {
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutHBaseCell.TABLE_NAME, "${hbase.tableName}");
runner.setProperty(PutHBaseCell.ROW, "${hbase.row}");
runner.setProperty(PutHBaseCell.ROW_ID, "${hbase.row}");
runner.setProperty(PutHBaseCell.COLUMN_FAMILY, "${hbase.columnFamily}");
runner.setProperty(PutHBaseCell.COLUMN_QUALIFIER, "${hbase.columnQualifier}");
return runner;
@ -266,9 +281,14 @@ public class TestPutHBaseCell {
private void verifyPut(String row, String columnFamily, String columnQualifier, String content, PutFlowFile put) {
assertEquals(row, put.getRow());
assertEquals(columnFamily, put.getColumnFamily());
assertEquals(columnQualifier, put.getColumnQualifier());
assertEquals(content, new String(put.getBuffer(), StandardCharsets.UTF_8));
assertNotNull(put.getColumns());
assertEquals(1, put.getColumns().size());
final PutColumn column = put.getColumns().iterator().next();
assertEquals(columnFamily, column.getColumnFamily());
assertEquals(columnQualifier, column.getColumnQualifier());
assertEquals(content, new String(column.getBuffer(), StandardCharsets.UTF_8));
}
}

View File

@ -0,0 +1,423 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class TestPutHBaseJSON {
public static final String DEFAULT_TABLE_NAME = "nifi";
public static final String DEFAULT_ROW = "row1";
public static final String DEFAULT_COLUMN_FAMILY = "family1";
@Test
public void testCustomValidate() throws InitializationException {
// missing row id and row id field name should be invalid
TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
getHBaseClientService(runner);
runner.assertNotValid();
// setting both properties should still be invalid
runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, "rowId");
runner.setProperty(PutHBaseJSON.ROW_FIELD_NAME, "rowFieldName");
runner.assertNotValid();
// only a row id field name should make it valid
runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_FIELD_NAME, "rowFieldName");
runner.assertValid();
// only a row id should make it valid
runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, "rowId");
runner.assertValid();
}
@Test
public void testSingleJsonDocAndProvidedRowId() throws IOException, InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
final String content = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }";
runner.enqueue(content.getBytes("UTF-8"));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
assertEquals(1, puts.size());
final Map<String,String> expectedColumns = new HashMap<>();
expectedColumns.put("field1", "value1");
expectedColumns.put("field2", "value2");
HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
assertEquals(1, events.size());
final ProvenanceEventRecord event = events.get(0);
assertEquals("hbase://" + DEFAULT_TABLE_NAME + "/" + DEFAULT_ROW, event.getTransitUri());
}
@Test
public void testSingJsonDocAndExtractedRowId() throws IOException, InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_FIELD_NAME, "rowField");
final String content = "{ \"rowField\" : \"myRowId\", \"field1\" : \"value1\", \"field2\" : \"value2\" }";
runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
assertEquals(1, puts.size());
// should be a put with row id of myRowId, and rowField shouldn't end up in the columns
final Map<String,String> expectedColumns1 = new HashMap<>();
expectedColumns1.put("field1", "value1");
expectedColumns1.put("field2", "value2");
HBaseTestUtil.verifyPut("myRowId", DEFAULT_COLUMN_FAMILY, expectedColumns1, puts);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
assertEquals(1, events.size());
HBaseTestUtil.verifyEvent(runner.getProvenanceEvents(), "hbase://" + DEFAULT_TABLE_NAME + "/myRowId", ProvenanceEventType.SEND);
}
@Test
public void testSingJsonDocAndExtractedRowIdMissingField() throws IOException, InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_FIELD_NAME, "rowField");
final String content = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }";
runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_FAILURE).get(0);
outFile.assertContentEquals(content);
// should be no provenance events
assertEquals(0, runner.getProvenanceEvents().size());
// no puts should have made it to the client
assertEquals(0, hBaseClient.getFlowFilePuts().size());
}
@Test
public void testMultipleJsonDocsRouteToFailure() throws IOException, InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
final String content1 = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }";
final String content2 = "{ \"field3\" : \"value3\", \"field4\" : \"value4\" }";
final String content = "[ " + content1 + " , " + content2 + " ]";
runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_FAILURE).get(0);
outFile.assertContentEquals(content);
// should be no provenance events
assertEquals(0, runner.getProvenanceEvents().size());
// no puts should have made it to the client
assertEquals(0, hBaseClient.getFlowFilePuts().size());
}
@Test
public void testELWithProvidedRowId() throws IOException, InitializationException {
final TestRunner runner = getTestRunner("${hbase.table}", "${hbase.colFamily}", "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, "${hbase.rowId}");
final Map<String,String> attributes = new HashMap<>();
attributes.put("hbase.table", "myTable");
attributes.put("hbase.colFamily", "myColFamily");
attributes.put("hbase.rowId", "myRowId");
final String content = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }";
runner.enqueue(content.getBytes("UTF-8"), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get("myTable");
assertEquals(1, puts.size());
final Map<String,String> expectedColumns = new HashMap<>();
expectedColumns.put("field1", "value1");
expectedColumns.put("field2", "value2");
HBaseTestUtil.verifyPut("myRowId", "myColFamily", expectedColumns, puts);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
assertEquals(1, events.size());
HBaseTestUtil.verifyEvent(runner.getProvenanceEvents(), "hbase://myTable/myRowId", ProvenanceEventType.SEND);
}
@Test
public void testELWithExtractedRowId() throws IOException, InitializationException {
final TestRunner runner = getTestRunner("${hbase.table}", "${hbase.colFamily}", "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_FIELD_NAME, "${hbase.rowField}");
final Map<String,String> attributes = new HashMap<>();
attributes.put("hbase.table", "myTable");
attributes.put("hbase.colFamily", "myColFamily");
attributes.put("hbase.rowField", "field1");
final String content = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }";
runner.enqueue(content.getBytes("UTF-8"), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get("myTable");
assertEquals(1, puts.size());
final Map<String,String> expectedColumns = new HashMap<>();
expectedColumns.put("field2", "value2");
HBaseTestUtil.verifyPut("value1", "myColFamily", expectedColumns, puts);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
assertEquals(1, events.size());
HBaseTestUtil.verifyEvent(runner.getProvenanceEvents(), "hbase://myTable/value1", ProvenanceEventType.SEND);
}
@Test
public void testNullAndArrayElementsWithWarnStrategy() throws InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_WARN.getValue());
// should route to success because there is at least one valid field
final String content = "{ \"field1\" : [{ \"child_field1\" : \"child_value1\" }], \"field2\" : \"value2\", \"field3\" : null }";
runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
assertEquals(1, puts.size());
// should have skipped field1 and field3
final Map<String,String> expectedColumns = new HashMap<>();
expectedColumns.put("field2", "value2");
HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts);
}
@Test
public void testNullAndArrayElementsWithIgnoreStrategy() throws InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_IGNORE.getValue());
// should route to success because there is at least one valid field
final String content = "{ \"field1\" : [{ \"child_field1\" : \"child_value1\" }], \"field2\" : \"value2\", \"field3\" : null }";
runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
assertEquals(1, puts.size());
// should have skipped field1 and field3
final Map<String,String> expectedColumns = new HashMap<>();
expectedColumns.put("field2", "value2");
HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts);
}
@Test
public void testNullAndArrayElementsWithFailureStrategy() throws InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_FAIL.getValue());
// should route to success because there is at least one valid field
final String content = "{ \"field1\" : [{ \"child_field1\" : \"child_value1\" }], \"field2\" : \"value2\", \"field3\" : null }";
runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_FAILURE).get(0);
outFile.assertContentEquals(content);
// should be no provenance events
assertEquals(0, runner.getProvenanceEvents().size());
// no puts should have made it to the client
assertEquals(0, hBaseClient.getFlowFilePuts().size());
}
@Test
public void testNullAndArrayElementsWithTextStrategy() throws InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_TEXT.getValue());
// should route to success because there is at least one valid field
final String content = "{ \"field1\" : [{ \"child_field1\" : \"child_value1\" }], \"field2\" : \"value2\", \"field3\" : null }";
runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
assertEquals(1, puts.size());
// should have skipped field1 and field3
final Map<String,String> expectedColumns = new HashMap<>();
expectedColumns.put("field1", "[{\"child_field1\":\"child_value1\"}]");
expectedColumns.put("field2", "value2");
HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts);
}
@Test
public void testNestedDocWithTextStrategy() throws InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_TEXT.getValue());
// should route to success because there is at least one valid field
final String content = "{ \"field1\" : { \"child_field1\" : \"child_value1\" }, \"field2\" : \"value2\", \"field3\" : null }";
runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
assertEquals(1, puts.size());
// should have skipped field1 and field3
final Map<String,String> expectedColumns = new HashMap<>();
expectedColumns.put("field1", "{\"child_field1\":\"child_value1\"}");
expectedColumns.put("field2", "value2");
HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts);
}
@Test
public void testAllElementsAreNullOrArrays() throws InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_WARN.getValue());
// should route to failure since it would produce a put with no columns
final String content = "{ \"field1\" : [{ \"child_field1\" : \"child_value1\" }], \"field2\" : null }";
runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_FAILURE).get(0);
outFile.assertContentEquals(content);
// should be no provenance events
assertEquals(0, runner.getProvenanceEvents().size());
// no puts should have made it to the client
assertEquals(0, hBaseClient.getFlowFilePuts().size());
}
@Test
public void testInvalidJson() throws InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
final String content = "NOT JSON";
runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1);
}
private TestRunner getTestRunner(String table, String columnFamily, String batchSize) {
final TestRunner runner = TestRunners.newTestRunner(PutHBaseJSON.class);
runner.setProperty(PutHBaseJSON.TABLE_NAME, table);
runner.setProperty(PutHBaseJSON.COLUMN_FAMILY, columnFamily);
runner.setProperty(PutHBaseJSON.BATCH_SIZE, batchSize);
return runner;
}
private MockHBaseClientService getHBaseClientService(final TestRunner runner) throws InitializationException {
final MockHBaseClientService hBaseClient = new MockHBaseClientService();
runner.addControllerService("hbaseClient", hBaseClient);
runner.enableControllerService(hBaseClient);
runner.setProperty(PutHBaseCell.HBASE_CLIENT_SERVICE, "hbaseClient");
return hBaseClient;
}
}

View File

@ -347,6 +347,8 @@ public class StandardRecordReader implements RecordReader {
} else if (eventType == ProvenanceEventType.RECEIVE) {
builder.setTransitUri(readNullableString(dis));
builder.setSourceSystemFlowFileIdentifier(readNullableString(dis));
} else if (eventType == ProvenanceEventType.FETCH) {
builder.setTransitUri(readNullableString(dis));
} else if (eventType == ProvenanceEventType.SEND) {
builder.setTransitUri(readNullableString(dis));
} else if (eventType == ProvenanceEventType.ADDINFO) {

View File

@ -235,6 +235,8 @@ public class StandardRecordWriter implements RecordWriter {
} else if (recordType == ProvenanceEventType.RECEIVE) {
writeNullableString(out, record.getTransitUri());
writeNullableString(out, record.getSourceSystemFlowFileIdentifier());
} else if (recordType == ProvenanceEventType.FETCH) {
writeNullableString(out, record.getTransitUri());
} else if (recordType == ProvenanceEventType.SEND) {
writeNullableString(out, record.getTransitUri());
} else if (recordType == ProvenanceEventType.ADDINFO) {

View File

@ -66,7 +66,7 @@ public abstract class AbstractSyslogProcessor extends AbstractProcessor {
BODY("syslog.body"),
VALID("syslog.valid"),
PROTOCOL("syslog.protocol"),
PORT("syslog.pprt");
PORT("syslog.port");
private String key;

View File

@ -67,7 +67,6 @@ import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -96,7 +95,6 @@ import org.joda.time.format.DateTimeFormatter;
@SupportsBatching
@Tags({"http", "https", "rest", "client"})
@InputRequirement(Requirement.INPUT_ALLOWED)
@TriggerWhenEmpty
@CapabilityDescription("An HTTP client processor which converts FlowFile attributes to HTTP headers, with configurable HTTP method, url, etc.")
@WritesAttributes({
@WritesAttribute(attribute = "invokehttp.status.code", description = "The status code that is returned"),
@ -610,7 +608,11 @@ public final class InvokeHTTP extends AbstractProcessor {
// emit provenance event
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().fetch(responseFlowFile, url.toExternalForm(), millis);
if(requestFlowFile != null) {
session.getProvenanceReporter().fetch(responseFlowFile, url.toExternalForm(), millis);
} else {
session.getProvenanceReporter().receive(responseFlowFile, url.toExternalForm(), millis);
}
}
}
@ -775,13 +777,9 @@ public final class InvokeHTTP extends AbstractProcessor {
private void route(FlowFile request, FlowFile response, ProcessSession session, ProcessContext context, int statusCode){
// check if we should penalize the request
if (!isSuccess(statusCode)) {
if (request == null) {
context.yield();
} else {
request = session.penalize(request);
}
// check if we should yield the processor
if (!isSuccess(statusCode) && request == null) {
context.yield();
}
// If the property to output the response flowfile regardless of status code is set then transfer it
@ -805,6 +803,7 @@ public final class InvokeHTTP extends AbstractProcessor {
// 5xx -> RETRY
} else if (statusCode / 100 == 5) {
if (request != null) {
request = session.penalize(request);
session.transfer(request, REL_RETRY);
}

View File

@ -20,6 +20,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultHandler;
@ -73,6 +74,16 @@ public interface HBaseClientService extends ControllerService {
*/
void put(String tableName, Collection<PutFlowFile> puts) throws IOException;
/**
* Puts the given row to HBase with the provided columns.
*
* @param tableName the name of an HBase table
* @param rowId the id of the row to put
* @param columns the columns of the row to put
* @throws IOException thrown when there are communication errors with HBase
*/
void put(String tableName, String rowId, Collection<PutColumn> columns) throws IOException;
/**
* Scans the given table using the optional filter criteria and passing each result to the provided handler.
*

View File

@ -14,10 +14,34 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
package org.apache.nifi.hbase.put;
import org.apache.nifi.processor.AbstractProcessor;
/**
* Encapsulates the information for one column of a put operation.
*/
public class PutColumn {
public abstract class AbstractHBaseProcessor extends AbstractProcessor {
private final String columnFamily;
private final String columnQualifier;
private final byte[] buffer;
public PutColumn(final String columnFamily, final String columnQualifier, final byte[] buffer) {
this.columnFamily = columnFamily;
this.columnQualifier = columnQualifier;
this.buffer = buffer;
}
public String getColumnFamily() {
return columnFamily;
}
public String getColumnQualifier() {
return columnQualifier;
}
public byte[] getBuffer() {
return buffer;
}
}

View File

@ -16,8 +16,11 @@
*/
package org.apache.nifi.hbase.put;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.flowfile.FlowFile;
import java.util.Collection;
/**
* Wrapper to encapsulate all of the information for the Put along with the FlowFile.
*/
@ -25,18 +28,13 @@ public class PutFlowFile {
private final String tableName;
private final String row;
private final String columnFamily;
private final String columnQualifier;
private final byte[] buffer;
private final Collection<PutColumn> columns;
private final FlowFile flowFile;
public PutFlowFile(String tableName, String row, String columnFamily, String columnQualifier,
byte[] buffer, FlowFile flowFile) {
public PutFlowFile(String tableName, String row, Collection<PutColumn> columns, FlowFile flowFile) {
this.tableName = tableName;
this.row = row;
this.columnFamily = columnFamily;
this.columnQualifier = columnQualifier;
this.buffer = buffer;
this.columns = columns;
this.flowFile = flowFile;
}
@ -48,20 +46,26 @@ public class PutFlowFile {
return row;
}
public String getColumnFamily() {
return columnFamily;
}
public String getColumnQualifier() {
return columnQualifier;
}
public byte[] getBuffer() {
return buffer;
public Collection<PutColumn> getColumns() {
return columns;
}
public FlowFile getFlowFile() {
return flowFile;
}
public boolean isValid() {
if (StringUtils.isBlank(tableName) || StringUtils.isBlank(row) || flowFile == null || columns == null || columns.isEmpty()) {
return false;
}
for (PutColumn column : columns) {
if (StringUtils.isBlank(column.getColumnQualifier()) || StringUtils.isBlank(column.getColumnFamily()) || column.getBuffer() == null) {
return false;
}
}
return true;
}
}

View File

@ -43,6 +43,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
@ -195,15 +196,33 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
put = new Put(putFlowFile.getRow().getBytes(StandardCharsets.UTF_8));
rowPuts.put(putFlowFile.getRow(), put);
}
put.addColumn(putFlowFile.getColumnFamily().getBytes(StandardCharsets.UTF_8),
putFlowFile.getColumnQualifier().getBytes(StandardCharsets.UTF_8),
putFlowFile.getBuffer());
for (final PutColumn column : putFlowFile.getColumns()) {
put.addColumn(
column.getColumnFamily().getBytes(StandardCharsets.UTF_8),
column.getColumnQualifier().getBytes(StandardCharsets.UTF_8),
column.getBuffer());
}
}
table.put(new ArrayList<>(rowPuts.values()));
}
}
@Override
public void put(final String tableName, final String rowId, final Collection<PutColumn> columns) throws IOException {
try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
Put put = new Put(rowId.getBytes(StandardCharsets.UTF_8));
for (final PutColumn column : columns) {
put.addColumn(
column.getColumnFamily().getBytes(StandardCharsets.UTF_8),
column.getColumnQualifier().getBytes(StandardCharsets.UTF_8),
column.getBuffer());
}
table.put(put);
}
}
@Override
public void scan(final String tableName, final Collection<Column> columns, final String filterExpression, final long minTime, final ResultHandler handler)
throws IOException {

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
@ -41,6 +42,7 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@ -130,8 +132,9 @@ public class TestHBase_1_1_2_ClientService {
final String columnQualifier = "qualifier1";
final String content = "content1";
final PutFlowFile putFlowFile = new PutFlowFile(tableName, row, columnFamily, columnQualifier,
content.getBytes(StandardCharsets.UTF_8), null);
final Collection<PutColumn> columns = Collections.singletonList(new PutColumn(columnFamily, columnQualifier,
content.getBytes(StandardCharsets.UTF_8)));
final PutFlowFile putFlowFile = new PutFlowFile(tableName, row, columns, null);
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
@ -168,11 +171,13 @@ public class TestHBase_1_1_2_ClientService {
final String content1 = "content1";
final String content2 = "content2";
final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, row, columnFamily, columnQualifier,
content1.getBytes(StandardCharsets.UTF_8), null);
final Collection<PutColumn> columns1 = Collections.singletonList(new PutColumn(columnFamily, columnQualifier,
content1.getBytes(StandardCharsets.UTF_8)));
final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, row, columns1, null);
final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, row, columnFamily, columnQualifier,
content2.getBytes(StandardCharsets.UTF_8), null);
final Collection<PutColumn> columns2 = Collections.singletonList(new PutColumn(columnFamily, columnQualifier,
content2.getBytes(StandardCharsets.UTF_8)));
final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, row, columns2, null);
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
@ -214,11 +219,13 @@ public class TestHBase_1_1_2_ClientService {
final String content1 = "content1";
final String content2 = "content2";
final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, row1, columnFamily, columnQualifier,
content1.getBytes(StandardCharsets.UTF_8), null);
final Collection<PutColumn> columns1 = Collections.singletonList(new PutColumn(columnFamily, columnQualifier,
content1.getBytes(StandardCharsets.UTF_8)));
final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, row1, columns1, null);
final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, row2, columnFamily, columnQualifier,
content2.getBytes(StandardCharsets.UTF_8), null);
final Collection<PutColumn> columns2 = Collections.singletonList(new PutColumn(columnFamily, columnQualifier,
content2.getBytes(StandardCharsets.UTF_8)));
final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, row2, columns2, null);
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);