NIFI-8273: Fix typos in docs/clarify some statements. Addressed handling of arrays as partitions

This closes #4948.
This commit is contained in:
Mark Payne 2021-09-23 09:12:27 -04:00
parent db5b618550
commit 5b7af511fc
4 changed files with 17 additions and 17 deletions

View File

@ -50,6 +50,7 @@ import javax.script.ScriptException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -67,7 +68,8 @@ import java.util.Set;
explanation = "Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.") explanation = "Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.")
}) })
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute = "partition", description = "The partition of the outgoing flow file."), @WritesAttribute(attribute = "partition", description = "The partition of the outgoing flow file. If the script indicates that the partition has a null value, the attribute will be set to " +
"the literal string \"<null partition>\" (without quotes). Otherwise, the attribute is set to the String representation of whatever value is returned by the script."),
@WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"), @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"),
@WritesAttribute(attribute = "record.count", description = "The number of records within the flow file."), @WritesAttribute(attribute = "record.count", description = "The number of records within the flow file."),
@WritesAttribute(attribute = "record.error.message", description = "This attribute provides on failure the error message encountered by the Reader or Writer."), @WritesAttribute(attribute = "record.error.message", description = "This attribute provides on failure the error message encountered by the Reader or Writer."),
@ -167,8 +169,8 @@ public class ScriptedPartitionRecord extends ScriptedRecordProcessor {
final RecordSet recordSet = reader.createRecordSet(); final RecordSet recordSet = reader.createRecordSet();
final PushBackRecordSet pushBackSet = new PushBackRecordSet(recordSet); final PushBackRecordSet pushBackSet = new PushBackRecordSet(recordSet);
final Map<String, FlowFile> outgoingFlowFiles = new HashMap<>(); final Map<Object, FlowFile> outgoingFlowFiles = new HashMap<>();
final Map<String, RecordSetWriter> recordSetWriters = new HashMap<>(); final Map<Object, RecordSetWriter> recordSetWriters = new HashMap<>();
// Reading in records and evaluate script // Reading in records and evaluate script
while (pushBackSet.isAnotherRecord()) { while (pushBackSet.isAnotherRecord()) {
@ -177,7 +179,7 @@ public class ScriptedPartitionRecord extends ScriptedRecordProcessor {
getLogger().debug("Evaluated scripted against {} (index {}), producing result of {}", record, counts.getRecordCount(), evaluatedValue); getLogger().debug("Evaluated scripted against {} (index {}), producing result of {}", record, counts.getRecordCount(), evaluatedValue);
counts.incrementRecordCount(); counts.incrementRecordCount();
final String partition = (evaluatedValue == null) ? null : evaluatedValue.toString(); final Object partition = (evaluatedValue instanceof Object[]) ? Arrays.asList((Object[]) evaluatedValue) : evaluatedValue;
RecordSetWriter writer = recordSetWriters.get(partition); RecordSetWriter writer = recordSetWriters.get(partition);
if (writer == null) { if (writer == null) {
@ -196,13 +198,13 @@ public class ScriptedPartitionRecord extends ScriptedRecordProcessor {
// Sending outgoing flow files // Sending outgoing flow files
int fragmentIndex = 0; int fragmentIndex = 0;
for (final String partition : outgoingFlowFiles.keySet()) { for (final Object partition : outgoingFlowFiles.keySet()) {
final RecordSetWriter writer = recordSetWriters.get(partition); final RecordSetWriter writer = recordSetWriters.get(partition);
final FlowFile outgoingFlowFile = outgoingFlowFiles.get(partition); final FlowFile outgoingFlowFile = outgoingFlowFiles.get(partition);
final Map<String, String> attributes = new HashMap<>(incomingFlowFile.getAttributes()); final Map<String, String> attributes = new HashMap<>(incomingFlowFile.getAttributes());
attributes.put("mime.type", writer.getMimeType()); attributes.put("mime.type", writer.getMimeType());
attributes.put("partition", partition); attributes.put("partition", partition == null ? "<null partition>" : partition.toString());
attributes.put("fragment.index", String.valueOf(fragmentIndex)); attributes.put("fragment.index", String.valueOf(fragmentIndex));
attributes.put("fragment.count", String.valueOf(outgoingFlowFiles.size())); attributes.put("fragment.count", String.valueOf(outgoingFlowFiles.size()));

View File

@ -33,14 +33,14 @@ td {text-align: left}
<h3>Description</h3> <h3>Description</h3>
<p> <p>
The ScriptedPartitionRecord provides the ability to use a scripting language, such as Groovy or Jython, to quickly and easily partition a Record based on the contents of it. The ScriptedPartitionRecord provides the ability to use a scripting language, such as Groovy or Jython, to quickly and easily partition a Record based on its contents.
There are multiple ways to reach the same behaviour such as using PartitionRecord but working with user provided scripts opens a wide range of possibilities on the decision There are multiple ways to reach the same behaviour such as using PartitionRecord but working with user provided scripts opens a wide range of possibilities on the decision
logic of partitioning the individual records. logic of partitioning the individual records.
</p> </p>
<p> <p>
The provided script is evaluated once for each Record that is encountered in the incoming FlowFile. Each time that the script is invoked, it is expected to return an <code>object</code> or a <code>null</code> value. The provided script is evaluated once for each Record that is encountered in the incoming FlowFile. Each time that the script is invoked, it is expected to return an <code>object</code> or a <code>null</code> value.
The string representation of the return value is used as "partition". The <code>null</code> value is handled separately without conversion into string. The string representation of the return value is used as the record's "partition". The <code>null</code> value is handled separately without conversion into string.
All Records with the same partition then will be batched to one FlowFile and routed to the <code>success</code> Relationship. All Records with the same partition then will be batched to one FlowFile and routed to the <code>success</code> Relationship.
</p> </p>
@ -89,17 +89,14 @@ td {text-align: left}
<h3>Return Value</h3> <h3>Return Value</h3>
<p> <p>
The script is invoked separately for each Record. It is accepted to return with any Object values might be represented as string. This string value will be used as the partition of the given Record. The script is invoked separately for each Record. It is acceptable to return any Object might be represented as string. This string value will be used as the partition of the given
Alternately it is possible to return with <code>null</code> value. Record. Additionally, the script may return <code>null</code>.
Each time the script is invoked, it is expected to return a <code>boolean</code> value. Return values other than <code>boolean</code>, including <code>null</code> value will be handled as
unexpected script behaviour and handled accordingly: the processing will be interrupted and the incoming FlowFile will be transferred to the <code>failure</code> relationship without further execution.
</p> </p>
<h2>Example</h2> <h2>Example</h2>
<p> <p>
The following script The following script will partition the input on the value of the "stellarType" field.
</p> </p>
<p> <p>

View File

@ -34,8 +34,9 @@ td {text-align: left}
<p> <p>
The ScriptedValidateRecord Processor provides the ability to use a scripting language, such as Groovy or Jyton in order to validate Records in an incoming FlowFile. The ScriptedValidateRecord Processor provides the ability to use a scripting language, such as Groovy or Jyton in order to validate Records in an incoming FlowFile.
NiFi provides several different Processors that can be used to work with Records in different ways. Each of these processors has its pros and cons. The provided script will be evaluated against each Record in an incoming FlowFile. Each of those records will then be routed to either the "valid" or "invalid" FlowFile.
The ScriptedValidateRecord is intended to work together with these processors and be used as a preliminary step in order to prevent more complex processing steps from working with incorrect Records. As a result, each incoming FlowFile may be broken up into two individual FlowFiles (if some records are valid and some are invalid, according to the script), or the incoming FlowFile
may have all of its Records kept together in a single FlowFile, routed to either "valid" or "invalid" (if all records are valid or if all records are invalid, according to the script).
</p> </p>
<p> <p>

View File

@ -37,7 +37,7 @@ public class TestScriptedPartitionRecord extends TestScriptedRouterProcessor {
private static final String PARTITION_1 = "partition1"; private static final String PARTITION_1 = "partition1";
private static final String PARTITION_2 = "partition2"; private static final String PARTITION_2 = "partition2";
private static final Integer PARTITION_3 = 3; private static final Integer PARTITION_3 = 3;
private static final String PARTITION_4 = null; private static final String PARTITION_4 = "<null partition>";
@Test @Test
public void testIncomingFlowFileContainsNoRecords() { public void testIncomingFlowFileContainsNoRecords() {