From b588073cbbb9655f43b26d854262f449d7de6607 Mon Sep 17 00:00:00 2001 From: Matthew Burgess Date: Fri, 4 Oct 2019 18:15:16 -0400 Subject: [PATCH] NIFI-6741: Add RecordPath support to scripting components This closes #3790. Signed-off-by: Andy LoPresto --- .../nifi-scripting-processors/pom.xml | 11 ++ .../processors/script/TestInvokeGroovy.java | 37 ++++++ .../resources/groovy/test_record_path.groovy | 110 ++++++++++++++++++ 3 files changed, 158 insertions(+) create mode 100644 nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_path.groovy diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml index c69cee52c4..0483968032 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml @@ -51,6 +51,11 @@ org.apache.nifi nifi-record + + org.apache.nifi + nifi-record-path + 1.10.0-SNAPSHOT + org.apache.nifi nifi-lookup-service-api @@ -107,6 +112,12 @@ 1.10.0-SNAPSHOT test + + org.apache.nifi + nifi-mock-record-utils + 1.10.0-SNAPSHOT + test + org.codehaus.groovy groovy-json diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java index 64047ae747..4e8c58da1a 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java @@ -21,6 +21,8 @@ import org.apache.commons.codec.binary.Hex; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.processor.Relationship; import org.apache.nifi.script.ScriptingComponentUtils; +import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.MockProcessorInitializationContext; @@ -35,6 +37,7 @@ import java.security.MessageDigest; import java.util.List; import java.util.Set; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -209,4 +212,38 @@ public class TestInvokeGroovy extends BaseScriptTest { outputFlowFile.assertContentEquals(expectedOutput); outputFlowFile.assertAttributeEquals("outAttr", expectedOutput); } + + /** + * Tests a script that has a Groovy Processor that reads records and outputs a comma-delimited list of fields selected by a given RecordPath expression + * + * @throws Exception Any error encountered while testing + */ + @Test + public void testReadRecordsWithRecordPath() throws Exception { + runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Groovy"); + runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "target/test/resources/groovy/test_record_path.groovy"); + runner.setProperty(ScriptingComponentUtils.MODULES, "target/test/resources/groovy"); + + final MockRecordParser readerService = new MockRecordParser(); + runner.addControllerService("reader", readerService); + runner.enableControllerService(readerService); + runner.setProperty("record-reader", "reader"); + runner.setProperty("record-path", "/age"); + readerService.addSchemaField("name", RecordFieldType.STRING); + readerService.addSchemaField("age", RecordFieldType.INT); + + readerService.addRecord("John Doe", 48); + readerService.addRecord("Jane Doe", 47); + readerService.addRecord("Jimmy Doe", 14); + + runner.assertValid(); + runner.enqueue("".getBytes(StandardCharsets.UTF_8)); + runner.run(); + + runner.assertAllFlowFilesTransferred("success", 1); + final List result = runner.getFlowFilesForRelationship("success"); + assertEquals(1, result.size()); + MockFlowFile ff = result.get(0); + ff.assertContentEquals("48\n47\n14\n"); + } } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_path.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_path.groovy new file mode 100644 index 0000000000..74c1429718 --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_path.groovy @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.nifi.flowfile.FlowFile +import org.apache.nifi.processor.AbstractProcessor +import org.apache.nifi.processor.ProcessContext +import org.apache.nifi.processor.ProcessSession +import org.apache.nifi.processor.Relationship +import org.apache.nifi.processor.io.StreamCallback +import org.apache.nifi.processor.util.StandardValidators +import org.apache.nifi.record.path.RecordPath +import org.apache.nifi.record.path.RecordPathResult +import org.apache.nifi.serialization.* +import org.apache.nifi.serialization.record.* +import org.apache.nifi.schema.access.SchemaNotFoundException +import java.util.concurrent.atomic.AtomicInteger +import java.util.stream.Collectors + +class MyRecordProcessor extends AbstractProcessor { + + // Properties + static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for reading incoming data") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build() + + static final PropertyDescriptor RECORD_PATH = new PropertyDescriptor.Builder() + .name("record-path") + .displayName("Record Path") + .description("Specifies the Record Path expression to evaluate against each record") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .build() + + def REL_SUCCESS = new Relationship.Builder().name("success").description('FlowFiles that were successfully processed are routed here').build() + def REL_FAILURE = new Relationship.Builder().name("failure").description('FlowFiles are routed here if an error occurs during processing').build() + + @Override + protected List getSupportedPropertyDescriptors() { + def properties = [] as ArrayList + properties.add(RECORD_READER) + properties.add(RECORD_PATH) + properties + } + + @Override + Set getRelationships() { + [REL_SUCCESS, REL_FAILURE] as Set + } + + @Override + void onTrigger(ProcessContext context, ProcessSession session) { + def flowFile = session.get() + if (!flowFile) return + + def readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory) + + final Map attributes = new HashMap<>() + final FlowFile original = flowFile + final Map originalAttributes = flowFile.attributes + final String recordPathExpression = context.getProperty(RECORD_PATH).getValue() + final RecordPath recordPath = RecordPath.compile(recordPathExpression) + try { + flowFile = session.write(flowFile, { inStream, outStream -> + def reader = readerFactory.createRecordReader(originalAttributes, inStream, 100, getLogger()) + try { + Record record + while (record = reader.nextRecord()) { + RecordPathResult result = recordPath.evaluate(record) + def line = result.selectedFields.map({f -> record.getAsString(f.field.fieldName).toString()}).collect(Collectors.joining(',')) + '\n' + outStream.write(line.bytes) + } + + } catch (final SchemaNotFoundException e) { + throw new ProcessException(e.localizedMessage, e) + } catch (final MalformedRecordException e) { + throw new ProcessException('Could not parse incoming data', e) + } finally { + reader.close() + } + } as StreamCallback) + + } catch (final Exception e) { + getLogger().error('Failed to process {}; will route to failure', [flowFile, e] as Object[]) + session.transfer(flowFile, REL_FAILURE); + return; + } + flowFile = session.putAllAttributes(flowFile, attributes) + session.transfer(flowFile, REL_SUCCESS) + } +} + +processor = new MyRecordProcessor() \ No newline at end of file