Added detailed documentation for the ExecuteFlumeSource/Sink processors

Signed-off-by: Matt Gilman <matt.c.gilman@gmail.com>
This commit is contained in:
Joey Echeverria 2015-07-12 18:44:16 -07:00 committed by Matt Gilman
parent 286e4738b8
commit e7e15facc4
5 changed files with 280 additions and 8 deletions

View File

@ -21,7 +21,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.util.List;
import java.util.Set;
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.conf.Configurables;
@ -44,14 +43,14 @@ import org.apache.nifi.processor.util.StandardValidators;
/**
* This processor runs a Flume sink
*/
@Tags({"flume", "hadoop", "get", "sink"})
@CapabilityDescription("Write FlowFile data to a Flume sink")
@Tags({"flume", "hadoop", "put", "sink"})
@CapabilityDescription("Execute a Flume sink. Each input FlowFile is converted into a Flume Event for processing by the sink.")
@TriggerSerially
public class ExecuteFlumeSink extends AbstractFlumeProcessor {
public static final PropertyDescriptor SINK_TYPE = new PropertyDescriptor.Builder()
.name("Sink Type")
.description("The fully-qualified name of the Sink class")
.description("The component type name for the sink. For some sinks, this is a short, symbolic name (e.g. hdfs). For others, it's the fully-qualified name of the Sink class. See the Flume User Guide for details.")
.required(true)
.addValidator(createSinkValidator())
.build();

View File

@ -50,13 +50,13 @@ import org.apache.nifi.processor.util.StandardValidators;
* This processor runs a Flume source
*/
@Tags({"flume", "hadoop", "get", "source"})
@CapabilityDescription("Generate FlowFile data from a Flume source")
@CapabilityDescription("Execute a Flume source. Each Flume Event is sent to the success relationship as a FlowFile")
@TriggerSerially
public class ExecuteFlumeSource extends AbstractFlumeProcessor {
public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder()
.name("Source Type")
.description("The fully-qualified name of the Source class")
.description("The component type name for the source. For some sources, this is a short, symbolic name (e.g. spooldir). For others, it's the fully-qualified name of the Source class. See the Flume User Guide for details.")
.required(true)
.addValidator(createSourceValidator())
.build();

View File

@ -12,5 +12,5 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.flume.FlumeSourceProcessor
org.apache.nifi.processors.flume.FlumeSinkProcessor
org.apache.nifi.processors.flume.ExecuteFlumeSource
org.apache.nifi.processors.flume.ExecuteFlumeSink

View File

@ -0,0 +1,159 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>ExecuteFlumeSink</title>
<link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
</head>
<body>
<h2>Data Model</h2>
<p>
This processor executes an Apache Flume sink. FlowFiles are wrapped in Flume's
Event interface. The content of the FlowFile becomes the body of the Event and
the attributes of the FlowFile become Event headers. The following special
headers are also set:
</p>
<table id="headers">
<tr>
<th>Flume Event Header</th>
<th>FlowFile Attribute</th>
</tr>
<tr>
<td>nifi.entry.date</td>
<td>FlowFile#getEntryDate()</td>
</tr>
<tr>
<td>nifi.id</td>
<td>FlowFile#getId()</td>
</tr>
<tr>
<td>nifi.last.queue.date</td>
<td>FlowFile#getLastQueueDate()</td>
</tr>
<tr>
<td>nifi.lineage.identifiers.${i}</td>
<td>FlowFile#getLineageIdentifiers()[i]</td>
</tr>
<tr>
<td>nifi.lineage.start.date</td>
<td>FlowFile#getLineageStartDate()</td>
</tr>
<tr>
<td>nifi.size</td>
<td>FlowFile#getSize()</td>
</tr>
</table>
<h2>Warning</h2>
<p>
In NiFi, the contents of a FlowFile are accessed via a stream, but in Flume it is
stored in a byte array. This means the full content will be loaded into memory when
a FlowFile is processed by the ExecuteFlumeSink processor. You should consider the
typical size of the FlowFiles you'll process and the batch size, if any, your sink
is configured with when setting NiFi's heap size.
</p>
<h2>Configuration Details</h2>
<p>
This processor is designed to execute arbitrary Flume sinks. Most of the details
of configuring the sink is deferred to Flume's built-in configuration system.
For details on the available settings for each sink type, refer to the Flume
<a href="http://flume.apache.org/FlumeUserGuide.html#flume-sinks">User Guide</a>.
Configuring the Flume sink is a four step process:
</p>
<ol>
<li>Set the Sink Type property to a valid Flume sink type.</li>
<li>
Set the Agent Name property to the name of the agent in your
Flume configuration. This is the prefix of the properties in the Flume
configuration file. Example: <code>tier1</code>
</li>
<li>
Set the Sink Name property to the name of the sink in your Flume
configuration. If Agent Name is <code>tier1</code>, then the Sink Name
is the value of the <code>tier1.sinks</code> property. Example: <code>sink-1</code>
</li>
<li>
Copy and paste the configuration for the sink from your Flume configuration
file into the Flume Configuration property. Assuming you're using
the same Agent Name and Sink Name as in the examples above, this will be all
of the properties that start with <code>tier1.sinks.sink-1</code>.
Do not copy the <code>tier1.sinks.sink-1.type</code> or
<code>tier1.sinks.sink-1.channel</code> properties.
</li>
</ol>
<h2>Usage Example</h2>
<p>
Assuming you had the following existing Flume configuration file:
</p>
<pre>
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1</pre>
<p>
Then you'd configure the ExecuteFlumeSink as follows:
</p>
<table id="example">
<tr>
<th>Property</th>
<th>Value</th>
</tr>
<tr>
<td>Sink Type</td>
<td>hdfs</td>
</tr>
<tr>
<td>Agent Name</td>
<td>a1</td>
</tr>
<tr>
<td>Sink Name</td>
<td>k1</td>
</tr>
<tr>
<td>Flume Configuration</td>
<td>
<code>
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S<br>
a1.sinks.k1.hdfs.filePrefix = events-<br>
a1.sinks.k1.hdfs.round = true<br>
a1.sinks.k1.hdfs.roundValue = 10<br>
a1.sinks.k1.hdfs.roundUnit = minute
</code>
</td>
</tr>
</table>
</body>
</html>

View File

@ -0,0 +1,114 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>ExecuteFlumeSource</title>
<link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
</head>
<body>
<h2>Data Model</h2>
<p>
This processor executes an Apache Flume source. Each Flume Event is turned into
a FlowFile. The content of the FlowFile is set to the body of the Event and
the Event headers become FlowFile attributes.
</p>
<h2>Configuration Details</h2>
<p>
This processor is designed to execute arbitrary Flume sources. Most of the details
of configuring the source is deferred to Flume's built-in configuration system.
For details on the available settings for each source type, refer to the Flume
<a href="http://flume.apache.org/FlumeUserGuide.html#flume-sources">User Guide</a>.
Configuring the Flume source is a four step process:
</p>
<ol>
<li>Set the Source Type property to a valid Flume source type.</li>
<li>
Set the Agent Name property to the name of the agent in your
Flume configuration. This is the prefix of the properties in the Flume
configuration file. Example: <code>tier1</code>
</li>
<li>
Set the Source Name property to the name of the source in your Flume
configuration. If Agent Name is <code>tier1</code>, then the Source Name
is the value of the <code>tier1.sources</code> property. Example: <code>src-1</code>
</li>
<li>
Copy and paste the configuration for the source from your Flume configuration
file into the Flume Configuration property. Assuming you're using
the same Agent Name and Source Name as in the examples above, this will be all
of the properties that start with <code>tier1.sources.src-1</code>.
Do not copy the <code>tier1.sources.src-1.type</code> or
<code>tier1.sources.src-1.channel</code> properties.
</li>
</ol>
<h2>Usage Example</h2>
<p>
Assuming you had the following existing Flume configuration file:
</p>
<pre>
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = multiport_syslogtcp
a1.sources.r1.channels = c1
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.ports = 10001 10002 10003
a1.sources.r1.portHeader = port
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1</pre>
<p>
Then you'd configure the ExecuteFlumeSource as follows:
</p>
<table id="example">
<tr>
<th>Property</th>
<th>Value</th>
</tr>
<tr>
<td>Source Type</td>
<td>multiport_syslogtcp</td>
</tr>
<tr>
<td>Agent Name</td>
<td>a1</td>
</tr>
<tr>
<td>Source Name</td>
<td>r1</td>
</tr>
<tr>
<td>Flume Configuration</td>
<td>
<code>
a1.sources.r1.host = 0.0.0.0<br>
a1.sources.r1.ports = 10001 10002 10003<br>
a1.sources.r1.portHeader = port
</code>
</td>
</tr>
</table>
</body>
</html>