mirror of https://github.com/apache/nifi.git
NIFI-6145: Add Groovy file idioms (withInput/OutputStream, withReader/Writer) to ExecuteGroovyScript
This closes #3415 Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
parent
0650521eb2
commit
7d02ab1e2f
|
@ -28,12 +28,15 @@ import groovy.lang.GroovyObject;
|
|||
import org.codehaus.groovy.runtime.InvokerHelper;
|
||||
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.io.Writer;
|
||||
import java.nio.charset.UnsupportedCharsetException;
|
||||
|
||||
/**
|
||||
* SessionFile with groovy specific methods.
|
||||
|
@ -281,4 +284,34 @@ public class GroovySessionFile extends SessionFile implements GroovyObject {
|
|||
});
|
||||
}
|
||||
|
||||
public GroovySessionFile withInputStream(Closure c) throws IOException {
|
||||
InputStream inStream = session.read(this);
|
||||
c.call(inStream);
|
||||
inStream.close();
|
||||
return this;
|
||||
}
|
||||
|
||||
public GroovySessionFile withOutputStream(Closure c) throws IOException {
|
||||
OutputStream outStream = session.write(this);
|
||||
c.call(outStream);
|
||||
outStream.close();
|
||||
return this;
|
||||
}
|
||||
|
||||
public GroovySessionFile withReader(String charset, Closure c) throws IOException, UnsupportedCharsetException {
|
||||
InputStream inStream = session.read(this);
|
||||
BufferedReader br = new BufferedReader(new InputStreamReader(inStream, charset));
|
||||
c.call(br);
|
||||
br.close();
|
||||
return this;
|
||||
}
|
||||
|
||||
public GroovySessionFile withWriter(String charset, Closure c) throws IOException, UnsupportedCharsetException {
|
||||
OutputStream outStream = session.write(this);
|
||||
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(outStream, charset));
|
||||
c.call(bw);
|
||||
bw.close();
|
||||
return this;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -365,6 +365,68 @@ public class ExecuteGroovyScriptTest {
|
|||
runner.assertAllFlowFilesTransferred(proc.REL_SUCCESS.getName(), 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_withInputStream() {
|
||||
runner.setProperty(ExecuteGroovyScript.SCRIPT_BODY, "import java.util.stream.*\n"
|
||||
+ "def ff = session.get(); if(!ff)return;\n"
|
||||
+ "ff.withInputStream{inputStream -> String r = new BufferedReader(new InputStreamReader(inputStream)).lines()"
|
||||
+ ".collect(Collectors.joining(\"\\n\")); assert r=='1234' }; REL_SUCCESS << ff; ");
|
||||
|
||||
runner.assertValid();
|
||||
|
||||
runner.enqueue("1234".getBytes(StandardCharsets.UTF_8));
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ExecuteGroovyScript.REL_SUCCESS.getName(), 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_withOutputStream() {
|
||||
runner.setProperty(ExecuteGroovyScript.SCRIPT_BODY, "import java.util.stream.*\n"
|
||||
+ "def ff = session.get(); if(!ff)return;\n"
|
||||
+ "ff.withOutputStream{outputStream -> outputStream.write('5678'.bytes)}; REL_SUCCESS << ff; ");
|
||||
|
||||
runner.assertValid();
|
||||
|
||||
runner.enqueue("1234".getBytes(StandardCharsets.UTF_8));
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ExecuteGroovyScript.REL_SUCCESS.getName(), 1);
|
||||
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExecuteGroovyScript.REL_SUCCESS).get(0);
|
||||
flowFile.assertContentEquals("5678");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_withReader() {
|
||||
runner.setProperty(ExecuteGroovyScript.SCRIPT_BODY, "import java.util.stream.*\n"
|
||||
+ "def ff = session.get(); if(!ff)return;\n"
|
||||
+ "ff.withReader('UTF-8'){reader -> String r = new BufferedReader(reader).lines()"
|
||||
+ ".collect(Collectors.joining(\"\\n\")); assert r=='1234' }; REL_SUCCESS << ff; ");
|
||||
|
||||
runner.assertValid();
|
||||
|
||||
runner.enqueue("1234".getBytes(StandardCharsets.UTF_8));
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ExecuteGroovyScript.REL_SUCCESS.getName(), 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_withWriter() throws Exception {
|
||||
runner.setProperty(ExecuteGroovyScript.SCRIPT_BODY, "import java.util.stream.*\n"
|
||||
+ "def ff = session.get(); if(!ff)return;\n"
|
||||
+ "ff.withWriter('UTF-16LE'){writer -> writer.write('5678')}; REL_SUCCESS << ff; ");
|
||||
|
||||
runner.assertValid();
|
||||
|
||||
runner.enqueue("1234".getBytes(StandardCharsets.UTF_8));
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ExecuteGroovyScript.REL_SUCCESS.getName(), 1);
|
||||
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExecuteGroovyScript.REL_SUCCESS).get(0);
|
||||
flowFile.assertContentEquals("5678".getBytes(StandardCharsets.UTF_16LE));
|
||||
}
|
||||
|
||||
|
||||
private HashMap<String, String> map(String key, String value) {
|
||||
HashMap<String, String> attrs = new HashMap<>();
|
||||
|
|
Loading…
Reference in New Issue