NIFI-6497: Allow FreeFormTextRecordSetWriter to access FlowFile Attributes

This closes #4275.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Matthew Burgess 2020-05-14 16:33:55 -04:00 committed by Mark Payne
parent 13418ccb91
commit a3cc2c58ff
6 changed files with 248 additions and 4 deletions

View File

@ -170,6 +170,7 @@
<exclude>src/test/resources/syslog/syslog5424/log_all.txt</exclude>
<exclude>src/test/resources/syslog/syslog5424/log_mix.txt</exclude>
<exclude>src/test/resources/syslog/syslog5424/log_mix_in_error.txt</exclude>
<exclude>src/test/resources/text/testschema</exclude>
<exclude>src/test/resources/xml/people.xml</exclude>
<exclude>src/test/resources/xml/people2.xml</exclude>
<exclude>src/test/resources/xml/people3.xml</exclude>

View File

@ -43,8 +43,9 @@ import java.util.Map;
@Tags({"text", "freeform", "expression", "language", "el", "record", "recordset", "resultset", "writer", "serialize"})
@CapabilityDescription("Writes the contents of a RecordSet as free-form text. The configured "
+ "text is able to make use of the Expression Language to reference each of the fields that are available "
+ "in a Record. Each record in the RecordSet will be separated by a single newline character.")
+ "text is able to make use of the Expression Language to reference each of the fields that are available "
+ "in a Record, as well as the attributes in the FlowFile and variables. If there is a name collision, the field name/value is used before attributes or variables. "
+ "Each record in the RecordSet will be separated by a single newline character.")
public class FreeFormTextRecordSetWriter extends SchemaRegistryRecordSetWriter implements RecordSetWriterFactory {
static final PropertyDescriptor TEXT = new PropertyDescriptor.Builder()
.name("Text")
@ -81,7 +82,7 @@ public class FreeFormTextRecordSetWriter extends SchemaRegistryRecordSetWriter i
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out, final Map<String, String> variables) {
return new FreeFormTextWriter(textValue, characterSet, out);
return new FreeFormTextWriter(textValue, characterSet, out, variables);
}
@Override

View File

@ -38,11 +38,13 @@ public class FreeFormTextWriter extends AbstractRecordSetWriter implements Recor
private static final byte NEW_LINE = (byte) '\n';
private final PropertyValue propertyValue;
private final Charset charset;
private final Map<String, String> variables;
public FreeFormTextWriter(final PropertyValue textPropertyValue, final Charset characterSet, final OutputStream out) {
public FreeFormTextWriter(final PropertyValue textPropertyValue, final Charset characterSet, final OutputStream out, final Map<String, String> variables) {
super(new BufferedOutputStream(out));
this.propertyValue = textPropertyValue;
this.charset = characterSet;
this.variables = variables;
}
private List<String> getColumnNames(final RecordSchema schema) {
@ -71,6 +73,10 @@ public class FreeFormTextWriter extends AbstractRecordSetWriter implements Recor
final String columnValue = record.getAsString(columnName);
values.put(columnName, columnValue);
}
// Add attributes and variables (but don't override fields with the same name)
for (Map.Entry<String, String> variable : variables.entrySet()) {
values.putIfAbsent(variable.getKey(), variable.getValue());
}
final String evaluated = propertyValue.evaluateAttributeExpressions(values).getValue();
out.write(evaluated.getBytes(charset));

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.text;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
public class TestFreeFormTextRecordSetWriter {
private TestRunner setup(FreeFormTextRecordSetWriter writer) throws InitializationException, IOException {
TestRunner runner = TestRunners.newTestRunner(TestFreeFormTextRecordSetWriterProcessor.class);
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/text/testschema")));
runner.addControllerService("writer", writer);
runner.setProperty(TestFreeFormTextRecordSetWriterProcessor.WRITER, "writer");
runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
runner.setProperty(writer, FreeFormTextRecordSetWriter.TEXT, "ID: ${ID}, Name: ${NAME}, Age: ${AGE}, Country: ${COUNTRY}, Username: ${user.name}");
return runner;
}
@Test
public void testDefault() throws IOException, InitializationException {
FreeFormTextRecordSetWriter writer = new FreeFormTextRecordSetWriter();
TestRunner runner = setup(writer);
runner.enableControllerService(writer);
Map<String, String> attributes = new HashMap<>();
attributes.put("user.name", "jdoe64");
runner.enqueue("", attributes);
runner.run();
// In addition to making sure a flow file was output successfully, also check nothing got rolled back into the incoming queue. May be a moot point as there is a
// fake processor, but in operation the flow file could be rolled back on error.
runner.assertQueueEmpty();
runner.assertAllFlowFilesTransferred(TestFreeFormTextRecordSetWriterProcessor.SUCCESS, 1);
String expected = "ID: ABC123, Name: John Doe, Age: 22, Country: USA, Username: jdoe64\nID: ABC123, Name: John Doe, Age: 22, Country: USA, Username: jdoe64\n";
String actual = new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(TestFreeFormTextRecordSetWriterProcessor.SUCCESS).get(0)));
assertEquals(expected, actual);
}
@Test
public void testDefaultSingleRecord() throws IOException, InitializationException {
FreeFormTextRecordSetWriter writer = new FreeFormTextRecordSetWriter();
TestRunner runner = setup(writer);
runner.setProperty(TestFreeFormTextRecordSetWriterProcessor.MULTIPLE_RECORDS, "false");
runner.enableControllerService(writer);
Map<String, String> attributes = new HashMap<>();
// Test ID field value does not get overridden
attributes.put("ID", "jdoe64");
runner.enqueue("", attributes);
runner.run();
// In addition to making sure a flow file was output successfully, also check nothing got rolled back into the incoming queue. May be a moot point as there is a
// fake processor, but in operation the flow file could be rolled back on error.
runner.assertQueueEmpty();
runner.assertAllFlowFilesTransferred(TestFreeFormTextRecordSetWriterProcessor.SUCCESS, 1);
String expected = "ID: ABC123, Name: John Doe, Age: 22, Country: USA, Username: \n";
String actual = new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(TestFreeFormTextRecordSetWriterProcessor.SUCCESS).get(0)));
assertEquals(expected, actual);
}
}

View File

@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.text;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.ListRecordSet;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class TestFreeFormTextRecordSetWriterProcessor extends AbstractProcessor {
static final PropertyDescriptor WRITER = new PropertyDescriptor.Builder()
.name("writer")
.identifiesControllerService(FreeFormTextRecordSetWriter.class)
.required(true)
.build();
static final PropertyDescriptor MULTIPLE_RECORDS = new PropertyDescriptor.Builder()
.name("multiple_records")
.allowableValues("true", "false")
.defaultValue("true")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship SUCCESS = new Relationship.Builder().name("success").description("success").build();
private static final RecordSchema recordSchema = new SimpleRecordSchema(Arrays.asList(
new RecordField("ID", RecordFieldType.STRING.getDataType()),
new RecordField("NAME", RecordFieldType.STRING.getDataType()),
new RecordField("AGE", RecordFieldType.INT.getDataType()),
new RecordField("COUNTRY", RecordFieldType.STRING.getDataType())));
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
final RecordSetWriterFactory writerFactory = context.getProperty(WRITER).asControllerService(RecordSetWriterFactory.class);
final FlowFile flowFileRef = flowFile;
flowFile = session.write(flowFile, out -> {
try {
// The "reader" RecordSchema must be passed in here as the controller service expects to inherit it from the record itself
// See the InheritSchemaFromRecord class for more details
final RecordSchema schema = writerFactory.getSchema(flowFileRef.getAttributes(), recordSchema);
boolean multipleRecords = Boolean.parseBoolean(context.getProperty(MULTIPLE_RECORDS).getValue());
RecordSet recordSet = getRecordSet(multipleRecords);
final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, flowFileRef);
writer.write(recordSet);
writer.flush();
} catch (Exception e) {
throw new ProcessException(e.getMessage());
}
});
session.transfer(flowFile, SUCCESS);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return new ArrayList<PropertyDescriptor>() {{
add(WRITER);
add(MULTIPLE_RECORDS);
}};
}
@Override
public Set<Relationship> getRelationships() {
return new HashSet<Relationship>() {{
add(SUCCESS);
}};
}
protected static RecordSet getRecordSet(boolean multipleRecords) {
Map<String, Object> recordFields = new HashMap<>();
recordFields.put("ID", "ABC123");
recordFields.put("NAME", "John Doe");
recordFields.put("AGE", 22);
recordFields.put("COUNTRY", "USA");
// Username is an additional "field" in the output but is not present in the record and will be supplied by an attribute for the test(s).
List<Record> records = new ArrayList<>();
records.add(new MapRecord(recordSchema, recordFields));
if (multipleRecords) {
records.add(new MapRecord(recordSchema, recordFields));
}
return new ListRecordSet(recordSchema, records);
}
}

View File

@ -0,0 +1,11 @@
{
"namespace": "nifi",
"name": "test",
"type": "record",
"fields": [
{ "name": "ID", "type": "string" },
{ "name": "NAME", "type": "string" },
{ "name": "AGE", "type": "int" },
{ "name": "COUNTRY", "type": "string" }
]
}