1
0
mirror of https://github.com/apache/nifi.git synced 2025-02-14 14:05:26 +00:00

NIFI-6543: Added unit tests for lifecycle methods

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes .
This commit is contained in:
Matthew Burgess 2020-01-10 10:25:16 -05:00 committed by Pierre Villard
parent 48d4e6d14c
commit 91e9e65a5c
No known key found for this signature in database
GPG Key ID: BEE1599F0726E9CD
4 changed files with 102 additions and 1 deletions
nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src
main/resources/docs/org.apache.nifi.processors.groovyx.ExecuteGroovyScript
test
java/org/apache/nifi/processors/groovyx
resources/groovy

@ -182,7 +182,7 @@ then you can access it from code <code>SQL.mydb.rows('select * from mytable')</c
<h2>Handling processor start &amp; stop</h2>
<p>In the extended groovy processor you can catch `start` and `stop` events by providing corresponding static methods:</p>
<p>In the extended groovy processor you can catch `start` and `stop` and `unscheduled` events by providing corresponding static methods:</p>
<pre>
import org.apache.nifi.processor.ProcessContext
import java.util.concurrent.atomic.AtomicLong
@ -203,6 +203,10 @@ then you can access it from code <code>SQL.mydb.rows('select * from mytable')</c
println "onStop $context executed ${ Const.triggerCount } times during ${ alive } seconds"
}
static onUnscheduled(ProcessContext context){
def alive = (System.currentTimeMillis() - Const.startTime.getTime()) / 1000
println "onUnscheduled $context executed ${ Const.triggerCount } times during ${ alive } seconds"
}
flowFile.'trigger.count' = Const.triggerCount.incrementAndGet()
REL_SUCCESS << flowFile

@ -40,9 +40,11 @@ import org.junit.Test;
import org.junit.FixMethodOrder;
import org.junit.runners.MethodSorters;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
@ -53,6 +55,7 @@ import java.sql.DriverManager;
import java.sql.Connection;
import java.sql.Statement;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import org.apache.nifi.controller.AbstractControllerService;
@ -471,6 +474,39 @@ public class ExecuteGroovyScriptTest {
flowFile.assertContentEquals("5678".getBytes(StandardCharsets.UTF_16LE));
}
@Test
public void test_onStart_onStop() throws Exception {
runner.setProperty(ExecuteGroovyScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_onStart_onStop.groovy");
runner.assertValid();
runner.enqueue("");
final PrintStream originalOut = System.out;
final ByteArrayOutputStream outContent = new ByteArrayOutputStream();
System.setOut(new PrintStream(outContent));
runner.run();
runner.assertAllFlowFilesTransferred(ExecuteGroovyScript.REL_SUCCESS.getName(), 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(ExecuteGroovyScript.REL_SUCCESS.getName());
MockFlowFile resultFile = result.get(0);
resultFile.assertAttributeExists("a");
resultFile.assertAttributeEquals("a", "A");
System.setOut(originalOut);
assertEquals("onStop invoked successfully\n", outContent.toString());
// Inspect the output visually for onStop, no way to pass back values
}
@Test
public void test_onUnscheduled() throws Exception {
runner.setProperty(ExecuteGroovyScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_onUnscheduled.groovy");
runner.assertValid();
final PrintStream originalOut = System.out;
final ByteArrayOutputStream outContent = new ByteArrayOutputStream();
System.setOut(new PrintStream(outContent));
runner.run();
System.setOut(originalOut);
assertEquals("onUnscheduled invoked successfully\n", outContent.toString());
}
private HashMap<String, String> map(String key, String value) {
HashMap<String, String> attrs = new HashMap<>();

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.nifi.processor.ProcessContext
class OnStart {
static String a = null
}
static onStart(ProcessContext context){
OnStart.a = 'A'
}
static onStop(ProcessContext context){
println 'onStop invoked successfully'
}
def flowFile = session.get()
if (!flowFile) {
return
}
flowFile.'a' = OnStart.a
REL_SUCCESS << flowFile

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.nifi.processor.ProcessContext
static onUnscheduled(ProcessContext context){
println 'onUnscheduled invoked successfully'
}