mirror of https://github.com/apache/nifi.git
NIFI-7016: Fix Groovy File functions available for FlowFile in ExecuteGroovyScript
This closes #7016 Signed-off-by: Mike Thomsen <mthomsen@apache.org>
This commit is contained in:
parent
26d02fff49
commit
84fdb5e32f
|
@ -109,7 +109,7 @@ public class GroovySessionFile extends SessionFile implements GroovyObject {
|
||||||
* @param c Closure that will receive writer as a parameter to write file content
|
* @param c Closure that will receive writer as a parameter to write file content
|
||||||
* @return reference to self
|
* @return reference to self
|
||||||
*/
|
*/
|
||||||
public GroovySessionFile write(String charset, Closure c) {
|
public GroovySessionFile write(String charset, Closure<?> c) {
|
||||||
this.write(new OutputStreamCallback() {
|
this.write(new OutputStreamCallback() {
|
||||||
public void process(OutputStream out) throws IOException {
|
public void process(OutputStream out) throws IOException {
|
||||||
Writer w = new OutputStreamWriter(out, charset);
|
Writer w = new OutputStreamWriter(out, charset);
|
||||||
|
@ -166,7 +166,7 @@ public class GroovySessionFile extends SessionFile implements GroovyObject {
|
||||||
* or two parameters InputStream and OutputStream to perform read and write.
|
* or two parameters InputStream and OutputStream to perform read and write.
|
||||||
* @return reference to self
|
* @return reference to self
|
||||||
*/
|
*/
|
||||||
public GroovySessionFile write(Closure c) {
|
public GroovySessionFile write(Closure<?> c) {
|
||||||
if (c.getMaximumNumberOfParameters() == 1) {
|
if (c.getMaximumNumberOfParameters() == 1) {
|
||||||
this.write(new OutputStreamCallback() {
|
this.write(new OutputStreamCallback() {
|
||||||
public void process(OutputStream out) throws IOException {
|
public void process(OutputStream out) throws IOException {
|
||||||
|
@ -189,7 +189,7 @@ public class GroovySessionFile extends SessionFile implements GroovyObject {
|
||||||
* @param c Closure that receives one parameter OutputStream to perform append.
|
* @param c Closure that receives one parameter OutputStream to perform append.
|
||||||
* @return reference to self
|
* @return reference to self
|
||||||
*/
|
*/
|
||||||
public GroovySessionFile append(Closure c) {
|
public GroovySessionFile append(Closure<?> c) {
|
||||||
this.append(new OutputStreamCallback() {
|
this.append(new OutputStreamCallback() {
|
||||||
public void process(OutputStream out) throws IOException {
|
public void process(OutputStream out) throws IOException {
|
||||||
c.call(out);
|
c.call(out);
|
||||||
|
@ -224,7 +224,7 @@ public class GroovySessionFile extends SessionFile implements GroovyObject {
|
||||||
* @param c Closure with one parameter - Writer.
|
* @param c Closure with one parameter - Writer.
|
||||||
* @return reference to self
|
* @return reference to self
|
||||||
*/
|
*/
|
||||||
public GroovySessionFile append(String charset, Closure c) {
|
public GroovySessionFile append(String charset, Closure<?> c) {
|
||||||
this.append(new OutputStreamCallback() {
|
this.append(new OutputStreamCallback() {
|
||||||
public void process(OutputStream out) throws IOException {
|
public void process(OutputStream out) throws IOException {
|
||||||
Writer w = new OutputStreamWriter(out, charset);
|
Writer w = new OutputStreamWriter(out, charset);
|
||||||
|
@ -260,9 +260,9 @@ public class GroovySessionFile extends SessionFile implements GroovyObject {
|
||||||
*
|
*
|
||||||
* @param c Closure with one parameter InputStream.
|
* @param c Closure with one parameter InputStream.
|
||||||
*/
|
*/
|
||||||
public void read(Closure c) {
|
public void read(Closure<?> c) {
|
||||||
this.read(new InputStreamCallback() {
|
this.read(new InputStreamCallback() {
|
||||||
public void process(InputStream in) throws IOException {
|
public void process(InputStream in) {
|
||||||
c.call(in);
|
c.call(in);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -274,7 +274,7 @@ public class GroovySessionFile extends SessionFile implements GroovyObject {
|
||||||
* @param charset charset to use for Reader
|
* @param charset charset to use for Reader
|
||||||
* @param c Closure with one parameter Reader.
|
* @param c Closure with one parameter Reader.
|
||||||
*/
|
*/
|
||||||
public void read(String charset, Closure c) {
|
public void read(String charset, Closure<?> c) {
|
||||||
this.read(new InputStreamCallback() {
|
this.read(new InputStreamCallback() {
|
||||||
public void process(InputStream in) throws IOException {
|
public void process(InputStream in) throws IOException {
|
||||||
InputStreamReader r = new InputStreamReader(in, charset);
|
InputStreamReader r = new InputStreamReader(in, charset);
|
||||||
|
@ -284,34 +284,33 @@ public class GroovySessionFile extends SessionFile implements GroovyObject {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public GroovySessionFile withInputStream(Closure c) throws IOException {
|
public Object withInputStream(Closure<?> c) throws IOException {
|
||||||
InputStream inStream = session.read(this);
|
InputStream inStream = session.read(this);
|
||||||
c.call(inStream);
|
final Object returnObject = c.call(inStream);
|
||||||
inStream.close();
|
inStream.close();
|
||||||
return this;
|
return returnObject;
|
||||||
}
|
}
|
||||||
|
|
||||||
public GroovySessionFile withOutputStream(Closure c) throws IOException {
|
public Object withOutputStream(Closure<?> c) throws IOException {
|
||||||
OutputStream outStream = session.write(this);
|
OutputStream outStream = session.write(this);
|
||||||
c.call(outStream);
|
final Object returnObject = c.call(outStream);
|
||||||
outStream.close();
|
outStream.close();
|
||||||
return this;
|
return returnObject;
|
||||||
}
|
}
|
||||||
|
|
||||||
public GroovySessionFile withReader(String charset, Closure c) throws IOException, UnsupportedCharsetException {
|
public Object withReader(String charset, Closure<?> c) throws IOException, UnsupportedCharsetException {
|
||||||
InputStream inStream = session.read(this);
|
InputStream inStream = session.read(this);
|
||||||
BufferedReader br = new BufferedReader(new InputStreamReader(inStream, charset));
|
BufferedReader br = new BufferedReader(new InputStreamReader(inStream, charset));
|
||||||
c.call(br);
|
final Object returnObject = c.call(br);
|
||||||
br.close();
|
br.close();
|
||||||
return this;
|
return returnObject;
|
||||||
}
|
}
|
||||||
|
|
||||||
public GroovySessionFile withWriter(String charset, Closure c) throws IOException, UnsupportedCharsetException {
|
public Object withWriter(String charset, Closure<?> c) throws IOException, UnsupportedCharsetException {
|
||||||
OutputStream outStream = session.write(this);
|
OutputStream outStream = session.write(this);
|
||||||
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(outStream, charset));
|
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(outStream, charset));
|
||||||
c.call(bw);
|
final Object returnObject = c.call(bw);
|
||||||
bw.close();
|
bw.close();
|
||||||
return this;
|
return returnObject;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -488,6 +488,23 @@ public class ExecuteGroovyScriptTest {
|
||||||
flowFile.assertContentEquals("5678".getBytes(StandardCharsets.UTF_16LE));
|
flowFile.assertContentEquals("5678".getBytes(StandardCharsets.UTF_16LE));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test_withInputStreamReturnsClosureValue() {
|
||||||
|
runner.setProperty(ExecuteGroovyScript.SCRIPT_BODY, "import java.util.stream.*\n"
|
||||||
|
+ "def ff = session.get(); if(!ff)return;\n"
|
||||||
|
+ "def outputBody = ff.withInputStream{inputStream -> '5678'}\n"
|
||||||
|
+ "ff.withOutputStream{outputStream -> outputStream.write(outputBody.bytes)}; REL_SUCCESS << ff; ");
|
||||||
|
|
||||||
|
runner.assertValid();
|
||||||
|
|
||||||
|
runner.enqueue("1234".getBytes(StandardCharsets.UTF_8));
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(ExecuteGroovyScript.REL_SUCCESS.getName(), 1);
|
||||||
|
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExecuteGroovyScript.REL_SUCCESS).get(0);
|
||||||
|
flowFile.assertContentEquals("5678");
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void test_onStart_onStop() {
|
public void test_onStart_onStop() {
|
||||||
runner.setProperty(ExecuteGroovyScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_onStart_onStop.groovy");
|
runner.setProperty(ExecuteGroovyScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_onStart_onStop.groovy");
|
||||||
|
|
Loading…
Reference in New Issue