NIFI-3688

- removed `require flowfile` property
- fixed test cases according to deprecated property
This commit is contained in:
dlukyanov 2017-07-16 18:17:05 +03:00
parent 5beb4f7dbf
commit 647cdcde17
6 changed files with 25 additions and 49 deletions

View File

@ -31,6 +31,7 @@ import java.util.Set;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
@ -63,13 +64,14 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.ValidationContext;
@EventDriven
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@Tags({"script", "groovy", "groovyx"})
@CapabilityDescription(
"Experimental Extended Groovy script processor. The script is responsible for "
+ "handling the incoming flow file (transfer to SUCCESS or remove, e.g.) as well as any flow files created by "
+ "the script. If the handling is incomplete or incorrect, the session will be rolled back.")
@Restricted("Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.")
@SeeAlso({})
@SeeAlso(classNames={"org.apache.nifi.processors.script.ExecuteScript"})
@DynamicProperty(name = "A script engine property to update",
value = "The value to set it to",
supportsExpressionLanguage = true,
@ -90,20 +92,20 @@ public class ExecuteGroovyScript extends AbstractProcessor {
.description("Body of script to execute. Only one of Script File or Script Body may be used").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(false)
.build();
public static String[] VALID_BOOLEANS = {"true", "false"};
public static final PropertyDescriptor REQUIRE_FLOW = new PropertyDescriptor.Builder().name("Requires flow file")
.description("If `true` then flowFile variable initialized and validated. So developer don't need to do flowFile = session.get(). If `false` the flowFile variable not initialized.")
.required(true).expressionLanguageSupported(false).allowableValues(VALID_BOOLEANS).defaultValue("false").build();
public static String[] VALID_FAIL_STRATEGY = {"rollback", "transfer to failure"};
public static final PropertyDescriptor FAIL_STRATEGY = new PropertyDescriptor.Builder()
.name("Failure strategy")
.description("If `transfer to failure` used then all flowFiles received from incoming queues in this session "
+"in case of exception will be transferred to `failure` relationship with additional attributes set: ERROR_MESSAGE and ERROR_STACKTRACE.")
.description("What to do with unhandled exceptions. If you want to manage exception by code then keep the default value `rollback`."
+" If `transfer to failure` selected and unhandled exception occured then all flowFiles received from incoming queues in this session"
+" will be transferred to `failure` relationship with additional attributes set: ERROR_MESSAGE and ERROR_STACKTRACE."
+" If `rollback` selected and unhandled exception occured then all flowFiles received from incoming queues will be returned back"
+" with penalize flag."
+" If the processor has no incoming connections then this parameter has no affect."
)
.required(true).expressionLanguageSupported(false).allowableValues(VALID_FAIL_STRATEGY).defaultValue(VALID_FAIL_STRATEGY[0]).build();
public static final PropertyDescriptor ADD_CLASSPATH = new PropertyDescriptor.Builder().name("Additional classpath").required(false)
.description("Classpath list separated by semicolon. You can use masks like `*`, `*.jar` in file name. Please avoid using this parameter because of deploy complexity :)")
.description("Classpath list separated by semicolon. You can use masks like `*`, `*.jar` in file name.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).build();
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that were successfully processed").build();
@ -127,7 +129,6 @@ public class ExecuteGroovyScript extends AbstractProcessor {
List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(SCRIPT_FILE);
descriptors.add(SCRIPT_BODY);
descriptors.add(REQUIRE_FLOW);
descriptors.add(FAIL_STRATEGY);
descriptors.add(ADD_CLASSPATH);
this.descriptors = Collections.unmodifiableList(descriptors);
@ -366,24 +367,11 @@ public class ExecuteGroovyScript extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession _session) throws ProcessException {
String requireFlow = context.getProperty(REQUIRE_FLOW).getValue();
boolean toFailureOnError = VALID_FAIL_STRATEGY[1].equals(context.getProperty(FAIL_STRATEGY).getValue());
//create wrapped session to control list of newly created and files got from this session.
//so transfer original input to failure will be possible
GroovyProcessSessionWrap session = new GroovyProcessSessionWrap(_session, toFailureOnError);
FlowFile flowFile = null;
if ("true".equals(requireFlow)) {
flowFile = session.get();
if (flowFile == null) {
return;
}
} else {
if (toFailureOnError) {
throw new ProcessException("The parameter `" + REQUIRE_FLOW.getName() + "` must be true when `" + FAIL_STRATEGY.getName() + "` is " + VALID_FAIL_STRATEGY[1]);
}
}
HashMap CTL = new HashMap() {
@Override
@ -424,9 +412,6 @@ public class ExecuteGroovyScript extends AbstractProcessor {
bindings.put("REL_SUCCESS", REL_SUCCESS);
bindings.put("REL_FAILURE", REL_FAILURE);
bindings.put("CTL", CTL);
if (flowFile != null) {
bindings.put("flowFile", flowFile);
}
script.run();
bindings.clear();

View File

@ -130,8 +130,7 @@ public class ExecuteGroovyScriptTest {
*/
@Test
public void testReadFlowFileContentAndStoreInFlowFileAttribute() throws Exception {
runner.setProperty(proc.SCRIPT_BODY, "flowFile.testAttr = flowFile.read().getText('UTF-8'); REL_SUCCESS << flowFile;");
runner.setProperty(proc.REQUIRE_FLOW, "true");
runner.setProperty(proc.SCRIPT_BODY, "def flowFile = session.get(); if(!flowFile)return; flowFile.testAttr = flowFile.read().getText('UTF-8'); REL_SUCCESS << flowFile;");
//runner.setProperty(proc.FAIL_STRATEGY, "rollback");
runner.assertValid();
@ -146,7 +145,6 @@ public class ExecuteGroovyScriptTest {
@Test
public void test_onTrigger_groovy() throws Exception {
runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_onTrigger.groovy");
runner.setProperty(proc.REQUIRE_FLOW, "false");
//runner.setProperty(proc.FAIL_STRATEGY, "rollback");
runner.assertValid();
@ -160,7 +158,6 @@ public class ExecuteGroovyScriptTest {
@Test
public void test_onTriggerX_groovy() throws Exception {
runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_onTriggerX.groovy");
runner.setProperty(proc.REQUIRE_FLOW, "true");
//runner.setProperty(proc.FAIL_STRATEGY, "rollback");
runner.assertValid();
@ -174,7 +171,6 @@ public class ExecuteGroovyScriptTest {
@Test
public void test_onTrigger_changeContent_groovy() throws Exception {
runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_onTrigger_changeContent.groovy");
runner.setProperty(proc.REQUIRE_FLOW, "false");
//runner.setProperty(proc.FAIL_STRATEGY, "rollback");
runner.assertValid();
@ -191,7 +187,6 @@ public class ExecuteGroovyScriptTest {
@Test
public void test_onTrigger_changeContentX_groovy() throws Exception {
runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_onTrigger_changeContentX.groovy");
runner.setProperty(proc.REQUIRE_FLOW, "true");
//runner.setProperty(proc.FAIL_STRATEGY, "rollback");
runner.assertValid();
@ -208,7 +203,6 @@ public class ExecuteGroovyScriptTest {
@Test
public void test_no_input_groovy() throws Exception {
runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_no_input.groovy");
runner.setProperty(proc.REQUIRE_FLOW, "false");
//runner.setProperty(proc.FAIL_STRATEGY, "rollback");
runner.assertValid();
runner.run();
@ -222,22 +216,19 @@ public class ExecuteGroovyScriptTest {
@Test
public void test_good_script() throws Exception {
runner.setProperty(proc.SCRIPT_BODY, " REL_SUCCESS << flowFile ");
runner.setProperty(proc.REQUIRE_FLOW, "true");
runner.setProperty(proc.SCRIPT_BODY, " def ff = session.get(); if(!ff)return; REL_SUCCESS << ff ");
runner.assertValid();
}
@Test
public void test_bad_script() throws Exception {
runner.setProperty(proc.SCRIPT_BODY, " { { ");
runner.setProperty(proc.REQUIRE_FLOW, "true");
runner.assertNotValid();
}
@Test
public void test_sql_01_select() throws Exception {
runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_sql_01_select.groovy");
runner.setProperty(proc.REQUIRE_FLOW, "false");
runner.setProperty("CTL.sql", "dbcp");
runner.assertValid();
@ -253,7 +244,6 @@ public class ExecuteGroovyScriptTest {
@Test
public void test_sql_02_blob_write() throws Exception {
runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_sql_02_blob_write.groovy");
runner.setProperty(proc.REQUIRE_FLOW, "true");
runner.setProperty("CTL.sql", "dbcp");
//runner.setProperty("ID", "0");
runner.assertValid();
@ -273,7 +263,6 @@ public class ExecuteGroovyScriptTest {
public void test_sql_03_blob_read() throws Exception {
//read blob from database written at previous step and write to flow file
runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_sql_03_blob_read.groovy");
runner.setProperty(proc.REQUIRE_FLOW, "false");
runner.setProperty("CTL.sql", "dbcp");
runner.setProperty("ID", "0");
runner.setValidateExpressionUsage(false);
@ -291,7 +280,6 @@ public class ExecuteGroovyScriptTest {
public void test_sql_04_insert_and_json() throws Exception {
//read blob from database written at previous step and write to flow file
runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_sql_04_insert_and_json.groovy");
runner.setProperty(proc.REQUIRE_FLOW, "true");
runner.setProperty("CTL.sql", "dbcp");
runner.setValidateExpressionUsage(false);
runner.assertValid();
@ -310,7 +298,6 @@ public class ExecuteGroovyScriptTest {
@Test
public void test_filter_01() throws Exception {
runner.setProperty(proc.SCRIPT_BODY, "def ff = session.get{it.FILTER=='3'}; if(!ff)return; REL_SUCCESS << ff;");
runner.setProperty(proc.REQUIRE_FLOW, "false");
//runner.setProperty(proc.FAIL_STRATEGY, "rollback");
runner.assertValid();
@ -330,8 +317,7 @@ public class ExecuteGroovyScriptTest {
@Test
public void test_read_01() throws Exception {
runner.setProperty(proc.SCRIPT_BODY, "assert flowFile.read().getText('UTF-8')=='1234'; REL_SUCCESS << flowFile;");
runner.setProperty(proc.REQUIRE_FLOW, "true");
runner.setProperty(proc.SCRIPT_BODY, "def ff = session.get(); if(!ff)return; assert ff.read().getText('UTF-8')=='1234'; REL_SUCCESS << ff; ");
runner.assertValid();
@ -343,8 +329,7 @@ public class ExecuteGroovyScriptTest {
@Test
public void test_read_02() throws Exception {
runner.setProperty(proc.SCRIPT_BODY, "flowFile.read{s-> assert s.getText('UTF-8')=='1234' }; REL_SUCCESS << flowFile;");
runner.setProperty(proc.REQUIRE_FLOW, "true");
runner.setProperty(proc.SCRIPT_BODY, "def ff = session.get(); if(!ff)return; ff.read{s-> assert s.getText('UTF-8')=='1234' }; REL_SUCCESS << ff; ");
runner.assertValid();
@ -356,8 +341,7 @@ public class ExecuteGroovyScriptTest {
@Test
public void test_read_03() throws Exception {
runner.setProperty(proc.SCRIPT_BODY, "flowFile.read('UTF-8'){r-> assert r.getText()=='1234' }; REL_SUCCESS << flowFile;");
runner.setProperty(proc.REQUIRE_FLOW, "true");
runner.setProperty(proc.SCRIPT_BODY, "def ff = session.get(); if(!ff)return; ff.read('UTF-8'){r-> assert r.getText()=='1234' }; REL_SUCCESS << ff; ");
runner.assertValid();

View File

@ -14,5 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
def flowFile = session.get()
if(!flowFile)return
flowFile."from-content" = "test content"
REL_SUCCESS << flowFile

View File

@ -14,6 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
def flowFile = session.get()
if(!flowFile)return
def selectedColumns = ''
flowFile.write{inputStream, outputStream->
String[] header = null

View File

@ -17,8 +17,8 @@
import groovy.sql.Sql
//the next line not needed because require flowFile property set to true
//def flowFile = session.get()
def flowFile = session.get()
if(!flowFile)return
//write content of the flow file into database blob
flowFile.read{ rawIn->

View File

@ -30,6 +30,9 @@ import groovy.json.JsonOutput
...
]
*/
def flowFile = session.get()
if(!flowFile)return
def outFiles = [] //list for new flow files
def rows = new JsonSlurper().parse( flowFile.read() )