From 7d02ab1e2fc9a5a424b3cfe9cd4a872f5c5c97ae Mon Sep 17 00:00:00 2001 From: Matthew Burgess Date: Mon, 8 Apr 2019 13:52:24 -0400 Subject: [PATCH] NIFI-6145: Add Groovy file idioms (withInput/OutputStream, withReader/Writer) to ExecuteGroovyScript This closes #3415 Signed-off-by: Mike Thomsen --- .../groovyx/flow/GroovySessionFile.java | 33 ++++++++++ .../groovyx/ExecuteGroovyScriptTest.java | 62 +++++++++++++++++++ 2 files changed, 95 insertions(+) diff --git a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/flow/GroovySessionFile.java b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/flow/GroovySessionFile.java index 25ef2fbdb4..831b0a4287 100644 --- a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/flow/GroovySessionFile.java +++ b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/flow/GroovySessionFile.java @@ -28,12 +28,15 @@ import groovy.lang.GroovyObject; import org.codehaus.groovy.runtime.InvokerHelper; +import java.io.BufferedReader; +import java.io.BufferedWriter; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.IOException; import java.io.OutputStreamWriter; import java.io.Writer; +import java.nio.charset.UnsupportedCharsetException; /** * SessionFile with groovy specific methods. @@ -281,4 +284,34 @@ public class GroovySessionFile extends SessionFile implements GroovyObject { }); } + public GroovySessionFile withInputStream(Closure c) throws IOException { + InputStream inStream = session.read(this); + c.call(inStream); + inStream.close(); + return this; + } + + public GroovySessionFile withOutputStream(Closure c) throws IOException { + OutputStream outStream = session.write(this); + c.call(outStream); + outStream.close(); + return this; + } + + public GroovySessionFile withReader(String charset, Closure c) throws IOException, UnsupportedCharsetException { + InputStream inStream = session.read(this); + BufferedReader br = new BufferedReader(new InputStreamReader(inStream, charset)); + c.call(br); + br.close(); + return this; + } + + public GroovySessionFile withWriter(String charset, Closure c) throws IOException, UnsupportedCharsetException { + OutputStream outStream = session.write(this); + BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(outStream, charset)); + c.call(bw); + bw.close(); + return this; + } + } diff --git a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScriptTest.java b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScriptTest.java index 8cce6d1723..b2b3e2d0cb 100644 --- a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScriptTest.java +++ b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScriptTest.java @@ -365,6 +365,68 @@ public class ExecuteGroovyScriptTest { runner.assertAllFlowFilesTransferred(proc.REL_SUCCESS.getName(), 1); } + @Test + public void test_withInputStream() { + runner.setProperty(ExecuteGroovyScript.SCRIPT_BODY, "import java.util.stream.*\n" + + "def ff = session.get(); if(!ff)return;\n" + + "ff.withInputStream{inputStream -> String r = new BufferedReader(new InputStreamReader(inputStream)).lines()" + + ".collect(Collectors.joining(\"\\n\")); assert r=='1234' }; REL_SUCCESS << ff; "); + + runner.assertValid(); + + runner.enqueue("1234".getBytes(StandardCharsets.UTF_8)); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExecuteGroovyScript.REL_SUCCESS.getName(), 1); + } + + @Test + public void test_withOutputStream() { + runner.setProperty(ExecuteGroovyScript.SCRIPT_BODY, "import java.util.stream.*\n" + + "def ff = session.get(); if(!ff)return;\n" + + "ff.withOutputStream{outputStream -> outputStream.write('5678'.bytes)}; REL_SUCCESS << ff; "); + + runner.assertValid(); + + runner.enqueue("1234".getBytes(StandardCharsets.UTF_8)); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExecuteGroovyScript.REL_SUCCESS.getName(), 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExecuteGroovyScript.REL_SUCCESS).get(0); + flowFile.assertContentEquals("5678"); + } + + @Test + public void test_withReader() { + runner.setProperty(ExecuteGroovyScript.SCRIPT_BODY, "import java.util.stream.*\n" + + "def ff = session.get(); if(!ff)return;\n" + + "ff.withReader('UTF-8'){reader -> String r = new BufferedReader(reader).lines()" + + ".collect(Collectors.joining(\"\\n\")); assert r=='1234' }; REL_SUCCESS << ff; "); + + runner.assertValid(); + + runner.enqueue("1234".getBytes(StandardCharsets.UTF_8)); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExecuteGroovyScript.REL_SUCCESS.getName(), 1); + } + + @Test + public void test_withWriter() throws Exception { + runner.setProperty(ExecuteGroovyScript.SCRIPT_BODY, "import java.util.stream.*\n" + + "def ff = session.get(); if(!ff)return;\n" + + "ff.withWriter('UTF-16LE'){writer -> writer.write('5678')}; REL_SUCCESS << ff; "); + + runner.assertValid(); + + runner.enqueue("1234".getBytes(StandardCharsets.UTF_8)); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExecuteGroovyScript.REL_SUCCESS.getName(), 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExecuteGroovyScript.REL_SUCCESS).get(0); + flowFile.assertContentEquals("5678".getBytes(StandardCharsets.UTF_16LE)); + } + private HashMap map(String key, String value) { HashMap attrs = new HashMap<>();