From 84fdb5e32ff1a96e36b36a314006d33e1f0aa913 Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Tue, 20 Jun 2023 16:56:59 -0400 Subject: [PATCH] NIFI-7016: Fix Groovy File functions available for FlowFile in ExecuteGroovyScript This closes #7016 Signed-off-by: Mike Thomsen --- .../groovyx/flow/GroovySessionFile.java | 39 +++++++++---------- .../groovyx/ExecuteGroovyScriptTest.java | 17 ++++++++ 2 files changed, 36 insertions(+), 20 deletions(-) diff --git a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/flow/GroovySessionFile.java b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/flow/GroovySessionFile.java index 831b0a4287..6afcec6ef7 100644 --- a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/flow/GroovySessionFile.java +++ b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/flow/GroovySessionFile.java @@ -109,7 +109,7 @@ public class GroovySessionFile extends SessionFile implements GroovyObject { * @param c Closure that will receive writer as a parameter to write file content * @return reference to self */ - public GroovySessionFile write(String charset, Closure c) { + public GroovySessionFile write(String charset, Closure c) { this.write(new OutputStreamCallback() { public void process(OutputStream out) throws IOException { Writer w = new OutputStreamWriter(out, charset); @@ -166,7 +166,7 @@ public class GroovySessionFile extends SessionFile implements GroovyObject { * or two parameters InputStream and OutputStream to perform read and write. * @return reference to self */ - public GroovySessionFile write(Closure c) { + public GroovySessionFile write(Closure c) { if (c.getMaximumNumberOfParameters() == 1) { this.write(new OutputStreamCallback() { public void process(OutputStream out) throws IOException { @@ -189,7 +189,7 @@ public class GroovySessionFile extends SessionFile implements GroovyObject { * @param c Closure that receives one parameter OutputStream to perform append. * @return reference to self */ - public GroovySessionFile append(Closure c) { + public GroovySessionFile append(Closure c) { this.append(new OutputStreamCallback() { public void process(OutputStream out) throws IOException { c.call(out); @@ -224,7 +224,7 @@ public class GroovySessionFile extends SessionFile implements GroovyObject { * @param c Closure with one parameter - Writer. * @return reference to self */ - public GroovySessionFile append(String charset, Closure c) { + public GroovySessionFile append(String charset, Closure c) { this.append(new OutputStreamCallback() { public void process(OutputStream out) throws IOException { Writer w = new OutputStreamWriter(out, charset); @@ -260,9 +260,9 @@ public class GroovySessionFile extends SessionFile implements GroovyObject { * * @param c Closure with one parameter InputStream. */ - public void read(Closure c) { + public void read(Closure c) { this.read(new InputStreamCallback() { - public void process(InputStream in) throws IOException { + public void process(InputStream in) { c.call(in); } }); @@ -274,7 +274,7 @@ public class GroovySessionFile extends SessionFile implements GroovyObject { * @param charset charset to use for Reader * @param c Closure with one parameter Reader. */ - public void read(String charset, Closure c) { + public void read(String charset, Closure c) { this.read(new InputStreamCallback() { public void process(InputStream in) throws IOException { InputStreamReader r = new InputStreamReader(in, charset); @@ -284,34 +284,33 @@ public class GroovySessionFile extends SessionFile implements GroovyObject { }); } - public GroovySessionFile withInputStream(Closure c) throws IOException { + public Object withInputStream(Closure c) throws IOException { InputStream inStream = session.read(this); - c.call(inStream); + final Object returnObject = c.call(inStream); inStream.close(); - return this; + return returnObject; } - public GroovySessionFile withOutputStream(Closure c) throws IOException { + public Object withOutputStream(Closure c) throws IOException { OutputStream outStream = session.write(this); - c.call(outStream); + final Object returnObject = c.call(outStream); outStream.close(); - return this; + return returnObject; } - public GroovySessionFile withReader(String charset, Closure c) throws IOException, UnsupportedCharsetException { + public Object withReader(String charset, Closure c) throws IOException, UnsupportedCharsetException { InputStream inStream = session.read(this); BufferedReader br = new BufferedReader(new InputStreamReader(inStream, charset)); - c.call(br); + final Object returnObject = c.call(br); br.close(); - return this; + return returnObject; } - public GroovySessionFile withWriter(String charset, Closure c) throws IOException, UnsupportedCharsetException { + public Object withWriter(String charset, Closure c) throws IOException, UnsupportedCharsetException { OutputStream outStream = session.write(this); BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(outStream, charset)); - c.call(bw); + final Object returnObject = c.call(bw); bw.close(); - return this; + return returnObject; } - } diff --git a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScriptTest.java b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScriptTest.java index a33979a066..f146c8ef7d 100644 --- a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScriptTest.java +++ b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScriptTest.java @@ -488,6 +488,23 @@ public class ExecuteGroovyScriptTest { flowFile.assertContentEquals("5678".getBytes(StandardCharsets.UTF_16LE)); } + @Test + public void test_withInputStreamReturnsClosureValue() { + runner.setProperty(ExecuteGroovyScript.SCRIPT_BODY, "import java.util.stream.*\n" + + "def ff = session.get(); if(!ff)return;\n" + + "def outputBody = ff.withInputStream{inputStream -> '5678'}\n" + + "ff.withOutputStream{outputStream -> outputStream.write(outputBody.bytes)}; REL_SUCCESS << ff; "); + + runner.assertValid(); + + runner.enqueue("1234".getBytes(StandardCharsets.UTF_8)); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExecuteGroovyScript.REL_SUCCESS.getName(), 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExecuteGroovyScript.REL_SUCCESS).get(0); + flowFile.assertContentEquals("5678"); + } + @Test public void test_onStart_onStop() { runner.setProperty(ExecuteGroovyScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_onStart_onStop.groovy");