groovyx initial version

NIFI-3688 license update to ASF
NIFI-3688 add groovyx dependency
NIFI-3688 comments from @alopresto
- reformat code https://cwiki.apache.org/confluence/display/NIFI/Contributor+Guide#ContributorGuide-EclipseUsers
- enabled java-doc
NIFI-3688
- try fix mess with licenses
- remove commented import
- remove unused var `sql`
NIFI-3688 the properties/skip javadoc/src
NIFI-3688 remove not used sql var
NIFI-3688
- first test case
NIFI-3688
+ 2 more tests based on current groovy tests
NIFI-3688 comment from @mattyb149
- processor renamed to ExecuteGroovyScript
NIFI-3688 fix script path validation
NIFI-3688
- refactor to compile on validation
- prepare 4 test cases with database
NIFI-3688 new test cases with groovy and sql
NIFI-3688 documentation
NIFI-3688
- refactor groovy extended methods
- add more test cases
NIFI-3688
- codestyle
- javadoc
- refactor flowfile voids to self-reference
- fix test cases
NIFI-3688 minor comments changes
NIFI-3688 rename additional documentation according to processor classname
NIFI-3688 exclude json test file from rat check
NIFI-3688 codestyle
NIFI-3688 add nar bundle to root pom.xml and to nifi-assembly/pom.xml
NIFI-3688
- fix & extend additional documentation
- fix pom.xml to bundle groovy into nar
NIFI-3688 add examples into additional processor documentation
NIFI-3688 fix pom.xml to exclude unnecessary libs from nar bundle
NIFI-3688 add restricted annotation
NIFI-3688 change version from 1.2.0-SNAPSHOT to 1.3.0-SNAPSHOT after rebase
NIFI-3688 new method in ProcessSession: public OutputStream write(FlowFile source)

NIFI-3688 change version from 1.3.0-SNAPSHOT to 1.4.0-SNAPSHOT after rebase

NIFI-3688 fix for @mattyb149 comment: The bundles referred to here and below are not included with this NAR and should be removed.

NIFI-3688 fix for @mattyb149 comment: This unused line can be removed

NIFI-3688
- removed `require flowfile` property
- fixed test cases according to deprecated property

change version to 1.5.0-SNAPSHOT

[NIFI-3688] Commented by mistake

[NIFI-3688] remove unused class

[NIFI-3688] fix javadoc comments

[NIFI-3688] refactor CTL & SQL properties

b3eecec901

2916ce1ec8

8e15392e2f

NIFI-3688: Checkstyle and typo fixes

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #1662
This commit is contained in:
dlukyanov 2017-04-02 10:55:22 +03:00 committed by Matthew Burgess
parent 2d3e5abf81
commit c7a5a09b8a
34 changed files with 3876 additions and 0 deletions

View File

@ -351,6 +351,11 @@
<artifactId>nifi-scripting-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-groovyx-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-elasticsearch-nar</artifactId>

View File

@ -0,0 +1,44 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-groovyx-bundle</artifactId>
<version>1.5.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-groovyx-nar</artifactId>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-groovyx-processors</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,211 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
nifi-groovyx-bundle includes subcomponents with separate copyright notices and
license terms. Your use of these subcomponents is subject to the terms
and conditions of the following licenses:
The binary distribution of this product bundles 'Apache Groovy Language'
under an Apache License Version 2.0, January 2004 http://www.apache.org/licenses/

View File

@ -0,0 +1,14 @@
nifi-groovyx-nar
Copyright 2014-2017 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
******************
Apache Software License v2
******************
The following binary components are provided under the Apache Software License v2
Apache Groovy
Copyright 2003-2015 The Apache Software Foundation

View File

@ -0,0 +1,75 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-groovyx-bundle</artifactId>
<version>1.5.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-groovyx-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>
<version>2.4.12</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.12.1.1</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes combine.children="append">
<exclude>src/test/resources/groovy/test_sql_04_insert_and_json.json</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,508 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.groovyx;
import java.io.File;
import java.lang.reflect.Method;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.codehaus.groovy.control.CompilerConfiguration;
import org.codehaus.groovy.runtime.ResourceGroovyMethods;
import org.codehaus.groovy.runtime.StackTraceUtils;
import org.apache.nifi.processors.groovyx.sql.OSql;
import org.apache.nifi.processors.groovyx.util.Files;
import org.apache.nifi.processors.groovyx.util.Validators;
import org.apache.nifi.processors.groovyx.flow.GroovyProcessSessionWrap;
import groovy.lang.GroovyShell;
import groovy.lang.Script;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.ValidationContext;
@EventDriven
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@Tags({"script", "groovy", "groovyx"})
@CapabilityDescription(
"Experimental Extended Groovy script processor. The script is responsible for "
+ "handling the incoming flow file (transfer to SUCCESS or remove, e.g.) as well as any flow files created by "
+ "the script. If the handling is incomplete or incorrect, the session will be rolled back.")
@Restricted("Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.")
@SeeAlso(classNames={"org.apache.nifi.processors.script.ExecuteScript"})
@DynamicProperty(name = "A script engine property to update",
value = "The value to set it to",
supportsExpressionLanguage = true,
description = "Updates a script engine property specified by the Dynamic Property's key with the value "
+ "specified by the Dynamic Property's value. Use `CTL.` to access any controller services.")
public class ExecuteGroovyScript extends AbstractProcessor {
public static final String GROOVY_CLASSPATH = "${groovy.classes.path}";
private static final String PRELOADS = "import org.apache.nifi.components.*;" + "import org.apache.nifi.flowfile.FlowFile;" + "import org.apache.nifi.processor.*;"
+ "import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult;" + "import org.apache.nifi.processor.exception.*;" + "import org.apache.nifi.processor.io.*;"
+ "import org.apache.nifi.processor.util.*;" + "import org.apache.nifi.processors.script.*;" + "import org.apache.nifi.logging.ComponentLog;";
public static final PropertyDescriptor SCRIPT_FILE = new PropertyDescriptor.Builder()
.name("groovyx-script-file")
.displayName("Script File")
.required(false)
.description("Path to script file to execute. Only one of Script File or Script Body may be used")
.addValidator(Validators.createFileExistsAndReadableValidator())
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor SCRIPT_BODY = new PropertyDescriptor.Builder()
.name("groovyx-script-body")
.displayName("Script Body")
.required(false)
.description("Body of script to execute. Only one of Script File or Script Body may be used")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.build();
public static String[] VALID_FAIL_STRATEGY = {"rollback", "transfer to failure"};
public static final PropertyDescriptor FAIL_STRATEGY = new PropertyDescriptor.Builder()
.name("groovyx-failure-strategy")
.displayName("Failure strategy")
.description("What to do with unhandled exceptions. If you want to manage exception by code then keep the default value `rollback`."
+" If `transfer to failure` selected and unhandled exception occurred then all flowFiles received from incoming queues in this session"
+" will be transferred to `failure` relationship with additional attributes set: ERROR_MESSAGE and ERROR_STACKTRACE."
+" If `rollback` selected and unhandled exception occurred then all flowFiles received from incoming queues will be penalized and returned."
+" If the processor has no incoming connections then this parameter has no effect."
)
.required(true).expressionLanguageSupported(false).allowableValues(VALID_FAIL_STRATEGY).defaultValue(VALID_FAIL_STRATEGY[0]).build();
public static final PropertyDescriptor ADD_CLASSPATH = new PropertyDescriptor.Builder()
.name("groovyx-additional-classpath")
.displayName("Additional classpath")
.required(false)
.description("Classpath list separated by semicolon. You can use masks like `*`, `*.jar` in file name.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).build();
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that were successfully processed").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles that failed to be processed").build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
//parameters evaluated on Start or on Validate
File scriptFile = null; //SCRIPT_FILE
String scriptBody = null; //SCRIPT_BODY
String addClasspath = null; //ADD_CLASSPATH
String groovyClasspath = null; //evaluated from GROOVY_CLASSPATH = ${groovy.classes.path} global property
//compiled script
volatile GroovyShell shell = null; //new GroovyShell();
volatile Class<Script> compiled = null; //compiled script
volatile long scriptLastModified = 0; //last scriptFile modification to check if recompile required
@Override
protected void init(final ProcessorInitializationContext context) {
List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(SCRIPT_FILE);
descriptors.add(SCRIPT_BODY);
descriptors.add(FAIL_STRATEGY);
descriptors.add(ADD_CLASSPATH);
this.descriptors = Collections.unmodifiableList(descriptors);
HashSet<Relationship> relationshipSet = new HashSet<>();
relationshipSet.add(REL_SUCCESS);
relationshipSet.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(relationshipSet);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
private File asFile(String f) {
if (f == null || f.length() == 0) {
return null;
}
return new File(f);
}
private void callScriptStatic(String method, final ProcessContext context) throws IllegalAccessException, java.lang.reflect.InvocationTargetException {
if (compiled != null) {
Method m = null;
try {
m = compiled.getDeclaredMethod(method, ProcessContext.class);
} catch (NoSuchMethodException e) {
// The method will not be invoked if it does not exist
}
if (m == null) {
try {
m = compiled.getDeclaredMethod(method, Object.class);
} catch (NoSuchMethodException e) {
// The method will not be invoked if it does not exist
}
}
if (m != null) {
m.invoke(null, context);
}
}
}
/**
* Let's do validation by script compile at this point.
*
* @param context provides a mechanism for obtaining externally managed values, such as property values and supplies convenience methods for operating on those values
* @return Collection of ValidationResult objects that will be added to any other validation findings - may be null
*/
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
this.scriptFile = asFile(context.getProperty(SCRIPT_FILE).evaluateAttributeExpressions().getValue()); //SCRIPT_FILE
this.scriptBody = context.getProperty(SCRIPT_BODY).getValue(); //SCRIPT_BODY
this.addClasspath = context.getProperty(ADD_CLASSPATH).evaluateAttributeExpressions().getValue(); //ADD_CLASSPATH
this.groovyClasspath = context.newPropertyValue(GROOVY_CLASSPATH).evaluateAttributeExpressions().getValue(); //evaluated from ${groovy.classes.path} global property
final Collection<ValidationResult> results = new HashSet<>();
try {
getGroovyScript();
} catch (Throwable t) {
results.add(new ValidationResult.Builder().subject("GroovyScript").input(this.scriptFile != null ? this.scriptFile.toString() : null).valid(false).explanation(t.toString()).build());
}
return results;
}
/**
* Hook method allowing subclasses to eagerly react to a configuration
* change for the given property descriptor. As an alternative to using this
* method a processor may simply get the latest value whenever it needs it
* and if necessary lazily evaluate it.
*
* @param descriptor of the modified property
* @param oldValue non-null property value (previous)
* @param newValue the new property value or if null indicates the property was removed
*/
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
this.shell = null;
this.compiled = null;
this.scriptLastModified = 0;
}
/**
* Performs setup operations when the processor is scheduled to run. This includes evaluating the processor's
* properties, as well as reloading the script (from file or the "Script Body" property)
*
* @param context the context in which to perform the setup operations
*/
@OnScheduled
public void onScheduled(final ProcessContext context) {
this.scriptFile = asFile(context.getProperty(SCRIPT_FILE).evaluateAttributeExpressions().getValue()); //SCRIPT_FILE
this.scriptBody = context.getProperty(SCRIPT_BODY).getValue(); //SCRIPT_BODY
this.addClasspath = context.getProperty(ADD_CLASSPATH).evaluateAttributeExpressions().getValue(); //ADD_CLASSPATH
this.groovyClasspath = context.newPropertyValue(GROOVY_CLASSPATH).evaluateAttributeExpressions().getValue(); //evaluated from ${groovy.classes.path} global property
try {
//compile if needed
getGroovyScript();
} catch (Throwable t) {
getLogger().error("Load script failed: " + t);
throw new ProcessException("Load script failed: " + t, t);
}
try {
callScriptStatic("onStart", context);
} catch (Throwable t) {
getLogger().error("onStart failed: " + t);
throw new ProcessException("onStart failed: " + t, t);
}
}
@OnStopped
public void onStopped(final ProcessContext context) {
try {
callScriptStatic("onStop", context);
} catch (Throwable t) {
throw new ProcessException("Failed to finalize groovy script:\n" + t, t);
}
//reset of compiled script not needed here because we did it onPropertyModified
}
// used in validation and processing
@SuppressWarnings("unchecked")
Script getGroovyScript() throws Throwable {
GroovyMethods.init();
if (scriptBody != null && scriptFile != null) {
throw new ProcessException("Only one parameter accepted: `" + SCRIPT_BODY.getDisplayName() + "` or `" + SCRIPT_FILE.getDisplayName() + "`");
}
if (scriptBody == null && scriptFile == null) {
throw new ProcessException("At least one parameter required: `" + SCRIPT_BODY.getDisplayName() + "` or `" + SCRIPT_FILE.getDisplayName() + "`");
}
if (shell == null) {
CompilerConfiguration conf = new CompilerConfiguration();
conf.setDebug(true);
shell = new GroovyShell(conf);
if (addClasspath != null && addClasspath.length() > 0) {
for (File fcp : Files.listPathsFiles(addClasspath)) {
if (!fcp.exists()) {
throw new ProcessException("Path not found `" + fcp + "` for `" + ADD_CLASSPATH.getDisplayName() + "`");
}
shell.getClassLoader().addClasspath(fcp.toString());
}
}
//try to add classpath with groovy classes
if (groovyClasspath != null && groovyClasspath.length() > 0) {
shell.getClassLoader().addClasspath(groovyClasspath);
}
}
Script script = null;
if (compiled != null && scriptFile != null && scriptLastModified != scriptFile.lastModified() && System.currentTimeMillis() - scriptFile.lastModified() > 3000) {
//force recompile if script file has been changed
compiled = null;
}
if (compiled == null) {
String scriptName;
String scriptText;
if (scriptFile != null) {
scriptName = scriptFile.getName();
scriptLastModified = scriptFile.lastModified();
scriptText = ResourceGroovyMethods.getText(scriptFile, "UTF-8");
} else {
scriptName = "Script" + Long.toHexString(scriptBody.hashCode()) + ".groovy";
scriptText = scriptBody;
}
script = shell.parse(PRELOADS + scriptText, scriptName);
compiled = (Class<Script>) script.getClass();
}
if (script == null) {
script = compiled.newInstance();
}
Thread.currentThread().setContextClassLoader(shell.getClassLoader());
return script;
}
/**
* init SQL variables from DBCP services
*/
@SuppressWarnings("unchecked")
private void onInitSQL(HashMap SQL) throws SQLException {
for (Map.Entry e : (Set<Map.Entry>) SQL.entrySet()) {
DBCPService s = (DBCPService) e.getValue();
OSql sql = new OSql(s.getConnection());
//try to set autocommit to false
try {
if (sql.getConnection().getAutoCommit()) {
sql.getConnection().setAutoCommit(false);
}
} catch (Throwable ei) {
getLogger().warn("Failed to set autocommit=false for `" + e.getKey() + "`", ei);
}
e.setValue(sql);
}
}
/**
* before commit SQL services
*/
@SuppressWarnings("unchecked")
private void onCommitSQL(HashMap SQL) throws SQLException {
for (Map.Entry e : (Set<Map.Entry>) SQL.entrySet()) {
OSql sql = (OSql) e.getValue();
if (!sql.getConnection().getAutoCommit()) {
sql.commit();
}
}
}
/**
* finalize SQL services. no exceptions should be thrown.
*/
@SuppressWarnings("unchecked")
private void onFinitSQL(HashMap SQL) {
for (Map.Entry e : (Set<Map.Entry>) SQL.entrySet()) {
OSql sql = (OSql) e.getValue();
try {
if (!sql.getConnection().getAutoCommit()) {
sql.getConnection().setAutoCommit(true); //default autocommit value in nifi
}
} catch (Throwable ei) {
getLogger().warn("Failed to set autocommit=true for `" + e.getKey() + "`", ei);
}
try {
sql.close();
sql = null;
} catch (Throwable ei) {
// Nothing to do
}
}
}
/**
* exception SQL services
*/
@SuppressWarnings("unchecked")
private void onFailSQL(HashMap SQL) {
for (Map.Entry e : (Set<Map.Entry>) SQL.entrySet()) {
OSql sql = (OSql) e.getValue();
try {
if (!sql.getConnection().getAutoCommit()) {
sql.rollback();
}
} catch (Throwable ei) {
//the rollback error is usually not important, rather it is the DML error that is really important
}
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession _session) throws ProcessException {
boolean toFailureOnError = VALID_FAIL_STRATEGY[1].equals(context.getProperty(FAIL_STRATEGY).getValue());
//create wrapped session to control list of newly created and files got from this session.
//so transfer original input to failure will be possible
GroovyProcessSessionWrap session = new GroovyProcessSessionWrap(_session, toFailureOnError);
HashMap CTL = new AccessMap("CTL");
HashMap SQL = new AccessMap("SQL");
try {
Script script = getGroovyScript(); //compilation must be moved to validation
Map bindings = script.getBinding().getVariables();
bindings.clear();
// Find the user-added properties and bind them for the script
for (Map.Entry<PropertyDescriptor, String> property : context.getProperties().entrySet()) {
if (property.getKey().isDynamic()) {
if (property.getKey().getName().startsWith("CTL.")) {
//get controller service
ControllerService ctl = context.getProperty(property.getKey()).asControllerService(ControllerService.class);
CTL.put(property.getKey().getName().substring(4), ctl);
} else if (property.getKey().getName().startsWith("SQL.")) {
DBCPService dbcp = context.getProperty(property.getKey()).asControllerService(DBCPService.class);
SQL.put(property.getKey().getName().substring(4), dbcp);
} else {
// Add the dynamic property bound to its full PropertyValue to the script engine
if (property.getValue() != null) {
bindings.put(property.getKey().getName(), context.getProperty(property.getKey()));
}
}
}
}
onInitSQL(SQL);
bindings.put("session", session);
bindings.put("context", context);
bindings.put("log", getLogger());
bindings.put("REL_SUCCESS", REL_SUCCESS);
bindings.put("REL_FAILURE", REL_FAILURE);
bindings.put("CTL", CTL);
bindings.put("SQL", SQL);
script.run();
bindings.clear();
onCommitSQL(SQL);
session.commit();
} catch (Throwable t) {
getLogger().error(t.toString(), t);
onFailSQL(SQL);
if (toFailureOnError) {
//transfer all received to failure with two new attributes: ERROR_MESSAGE and ERROR_STACKTRACE.
session.revertReceivedTo(REL_FAILURE, StackTraceUtils.deepSanitize(t));
} else {
session.rollback(true);
}
} finally {
onFinitSQL(SQL);
}
}
/**
* Returns a PropertyDescriptor for the given name. This is for the user to be able to define their own properties
* which will be available as variables in the script
*
* @param propertyDescriptorName used to lookup if any property descriptors exist for that name
* @return a PropertyDescriptor object corresponding to the specified dynamic property name
*/
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
if (propertyDescriptorName.startsWith("CTL.")) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
.description("Controller service accessible from code as `" + propertyDescriptorName + "`")
.dynamic(true)
.identifiesControllerService(ControllerService.class)
.build();
}
if (propertyDescriptorName.startsWith("SQL.")) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
.description("The `groovy.sql.Sql` object created from DBCP Controller service and accessible from code as `" + propertyDescriptorName + "`")
.dynamic(true)
.identifiesControllerService(DBCPService.class)
.build();
}
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.dynamic(true)
.build();
}
/** simple HashMap with exception on access of non-existent key */
private class AccessMap extends HashMap {
private String parentKey;
AccessMap(String parentKey){
this.parentKey=parentKey;
}
@Override
public Object get(Object key) {
if (!containsKey(key)) {
throw new RuntimeException("The `" + parentKey + "." + key + "` not defined in processor properties");
}
return super.get(key);
}
}
}

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.groovyx;
import groovy.lang.DelegatingMetaClass;
import groovy.lang.GroovySystem;
import org.apache.nifi.processors.groovyx.flow.ProcessSessionWrap;
import org.apache.nifi.processors.groovyx.flow.SessionFile;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.flowfile.FlowFile;
import java.util.Collection;
import java.util.List;
/**
* Class to initialize additional groovy methods to work with SessionFile, Relationship, and Sessions easier
*/
class GroovyMethods {
private static boolean initialized = false;
static void init() {
if (!initialized) {
synchronized (GroovyMethods.class) {
if (!initialized) {
initialized = metaRelationship();
}
}
}
}
private static boolean metaRelationship() {
GroovySystem.getMetaClassRegistry().setMetaClass(Relationship.class, new DelegatingMetaClass(Relationship.class) {
@Override
public Object invokeMethod(Object object, String methodName, Object[] args) {
if (object instanceof Relationship) {
if ("leftShift".equals(methodName) && args.length == 1) {
if (args[0] instanceof SessionFile) {
return this.leftShift((Relationship) object, (SessionFile) args[0]);
} else if (args[0] instanceof Collection) {
return this.leftShift((Relationship) object, (Collection) args[0]);
}
}
}
return super.invokeMethod(object, methodName, args);
}
/** to support: REL_SUCCESS << sessionFile */
private Relationship leftShift(Relationship r, SessionFile f) {
f.transfer(r);
return r;
}
/** to support: REL_SUCCESS << sessionFileCollection */
@SuppressWarnings("unchecked")
private Relationship leftShift(Relationship r, Collection sfl) {
if (sfl != null && sfl.size() > 0) {
ProcessSessionWrap session = ((SessionFile) sfl.iterator().next()).session();
List<FlowFile> ffl = session.unwrap(sfl);
//assume all files has the same session
session.transfer(ffl, r);
}
return r;
}
});
return true;
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.groovyx.flow;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.FlowFileFilter;
import groovy.lang.Closure;
import java.util.List;
/**
* Wrapped session that produces groovy wrapped session-file.
*/
@SuppressWarnings("unused")
public class GroovyProcessSessionWrap extends ProcessSessionWrap {
public GroovyProcessSessionWrap(ProcessSession s, boolean toFailureOnError) {
super(s, toFailureOnError);
}
/**
* function returns wrapped flow file with session for the simplified script access.
*/
public SessionFile wrap(FlowFile f) {
if (f == null) {
return null;
}
if (f instanceof SessionFile) {
return ((SessionFile) f);
}
return new GroovySessionFile(this, f);
}
/**
* returns filtered list of input files. the closure receives each file from input queue and should return one of values:
* true - accept and continue, false - reject and continue, null - reject and stop, or any FlowFileFilterResult value.
*/
public List<FlowFile> get(Closure filter) {
return this.get(new FlowFileFilter() {
@SuppressWarnings("ConstantConditions")
public FlowFileFilterResult filter(FlowFile flowFile) {
Object res = filter.call(wrap(flowFile));
if (res == null) {
return FlowFileFilterResult.REJECT_AND_TERMINATE;
}
if (res instanceof Boolean) {
return ((Boolean) res ? FlowFileFilterResult.ACCEPT_AND_CONTINUE : FlowFileFilterResult.REJECT_AND_CONTINUE);
}
if (res instanceof FlowFileFilterResult) {
return (FlowFileFilterResult) res;
}
return (org.codehaus.groovy.runtime.DefaultGroovyMethods.asBoolean(res) ? FlowFileFilterResult.ACCEPT_AND_CONTINUE : FlowFileFilterResult.REJECT_AND_CONTINUE);
}
});
}
}

View File

@ -0,0 +1,284 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.groovyx.flow;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.io.InputStreamCallback;
import groovy.lang.Writable;
import groovy.lang.Closure;
import groovy.lang.MetaClass;
import groovy.lang.GroovyObject;
import org.codehaus.groovy.runtime.InvokerHelper;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
/**
* SessionFile with groovy specific methods.
*/
@SuppressWarnings("unused")
public class GroovySessionFile extends SessionFile implements GroovyObject {
private transient MetaClass metaClass;
protected GroovySessionFile(ProcessSessionWrap session, FlowFile f) {
super(session, f);
setMetaClass(null); //set default meta-class
}
/*----------------------GroovyObject methods >>---------------------------*/
/**
* alias method to getAttribute that will act in groovy as a property except for `size` and `attributes`
*/
@Override
public Object getProperty(String key) {
if ("size".equals(key)) return getSize();
if ("attributes".equals(key)) return getAttributes();
return this.getAttribute(key);
}
/**
* Calls putAttribute if value defined and removeAttribute if value is null
*/
@Override
public void setProperty(String key, Object value) {
if (value == null) {
this.removeAttribute(key);
} else if (value instanceof String) {
this.putAttribute(key, (String) value);
} else {
this.putAttribute(key, value.toString());
}
}
/**
* GroovyObject support method
*/
@Override
public MetaClass getMetaClass() {
return this.metaClass;
}
/**
* GroovyObject support method
*/
@Override
public void setMetaClass(MetaClass metaClass) {
this.metaClass = metaClass == null ? InvokerHelper.getMetaClass(this.getClass()) : metaClass;
}
/**
* GroovyObject support method
*/
@Override
public Object invokeMethod(String name, Object args) {
return this.metaClass.invokeMethod(this, name, args);
}
/*----------------------<< GroovyObject methods---------------------------*/
/*----------------------Extended Groovy methods >>------------------------*/
/**
* Write flow file contents through writer with defined charset.
*
* @param charset charset to use for writer
* @param c Closure that will receive writer as a parameter to write file content
* @return reference to self
*/
public GroovySessionFile write(String charset, Closure c) {
this.write(new OutputStreamCallback() {
public void process(OutputStream out) throws IOException {
Writer w = new OutputStreamWriter(out, charset);
c.call(w);
w.flush();
w.close();
}
});
return this;
}
/**
* Instantly writes into flow file contents the char sequence (string).
*
* @param charset charset to use for writer
* @param c content
* @return reference to self
*/
public GroovySessionFile write(String charset, CharSequence c) {
this.write(new OutputStreamCallback() {
public void process(OutputStream out) throws IOException {
Writer w = new OutputStreamWriter(out, charset);
w.append(c);
w.flush();
w.close();
}
});
return this;
}
/**
* Write flow file contents through writer with defined charset.
*
* @param charset charset to use for writer
* @param c content defined as writable
* @return reference to self
*/
public GroovySessionFile write(String charset, Writable c) {
this.write(new OutputStreamCallback() {
public void process(OutputStream out) throws IOException {
Writer w = new OutputStreamWriter(out, charset);
c.writeTo(w);
w.flush();
w.close();
}
});
return this;
}
/**
* Write or read+write flow file contents through streams.
*
* @param c Closure that could receive one parameter OutputStream to perform write,
* or two parameters InputStream and OutputStream to perform read and write.
* @return reference to self
*/
public GroovySessionFile write(Closure c) {
if (c.getMaximumNumberOfParameters() == 1) {
this.write(new OutputStreamCallback() {
public void process(OutputStream out) throws IOException {
c.call(out);
}
});
} else {
this.write(new StreamCallback() {
public void process(InputStream in, OutputStream out) throws IOException {
c.call(in, out);
}
});
}
return this;
}
/**
* Append the existing content of the flow file.
*
* @param c Closure that receives one parameter OutputStream to perform append.
* @return reference to self
*/
public GroovySessionFile append(Closure c) {
this.append(new OutputStreamCallback() {
public void process(OutputStream out) throws IOException {
c.call(out);
}
});
return this;
}
/**
* Append the existing content of the flow file through Writer with defined charset.
*
* @param charset charset to use for writer
* @param c content to append.
* @return reference to self
*/
public GroovySessionFile append(String charset, Writable c) {
this.append(new OutputStreamCallback() {
public void process(OutputStream out) throws IOException {
Writer w = new OutputStreamWriter(out, charset);
c.writeTo(w);
w.flush();
w.close();
}
});
return this;
}
/**
* Append the existing content of the flow file through Writer with defined charset.
*
* @param charset charset to use for writer
* @param c Closure with one parameter - Writer.
* @return reference to self
*/
public GroovySessionFile append(String charset, Closure c) {
this.append(new OutputStreamCallback() {
public void process(OutputStream out) throws IOException {
Writer w = new OutputStreamWriter(out, charset);
c.call(w);
w.flush();
w.close();
}
});
return this;
}
/**
* Append the existing content of the flow file through Writer with defined charset.
*
* @param charset charset to use for writer
* @param c content to append.
* @return reference to self
*/
public GroovySessionFile append(String charset, CharSequence c) {
this.append(new OutputStreamCallback() {
public void process(OutputStream out) throws IOException {
Writer w = new OutputStreamWriter(out, charset);
w.append(c);
w.flush();
w.close();
}
});
return this;
}
/**
* Reads content of the flow file and closes input stream.
*
* @param c Closure with one parameter InputStream.
*/
public void read(Closure c) {
this.read(new InputStreamCallback() {
public void process(InputStream in) throws IOException {
c.call(in);
}
});
}
/**
* Reads content of the flow file through Reader and closes the stream.
*
* @param charset charset to use for Reader
* @param c Closure with one parameter Reader.
*/
public void read(String charset, Closure c) {
this.read(new InputStreamCallback() {
public void process(InputStream in) throws IOException {
InputStreamReader r = new InputStreamReader(in, charset);
c.call(r);
r.close();
}
});
}
}

View File

@ -0,0 +1,969 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.groovyx.flow;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.FlowFileHandlingException;
import org.apache.nifi.processor.exception.MissingFlowFileException;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.processors.groovyx.util.Throwables;
/**
* wrapped session that collects all created/modified files if created with special flag
* and able to execute method revertReceivedTo(Relationship r, Throwable t)
* it will be good to create functionality with created file list and received file list in a standard session.
* Those file lists will simplify error management.
*/
public abstract class ProcessSessionWrap implements ProcessSession {
public static final String ERROR_STACKTRACE = "ERROR_STACKTRACE";
public static final String ERROR_MESSAGE = "ERROR_MESSAGE";
private ProcessSession s;
private boolean foe;
/*
list of files to be sent to failure on error
on get() we will store here clone
*/
private List<FlowFile> toFail = new ArrayList<>();
/*
list of files to be dropped on error
on get(),create(),write(),... we will store here last version of file by id
*/
private Map<String, FlowFile> toDrop = new HashMap<>();
public ProcessSessionWrap(ProcessSession s, boolean toFailureOnError) {
if (s instanceof ProcessSessionWrap) {
throw new RuntimeException("session could be instanceof ProcessSessionWrap");
}
if (s == null) {
throw new NullPointerException("Session is mandatory session=null");
}
this.s = s;
foe = toFailureOnError;
}
/**
* function returns wrapped flowfile with session for the simplified script access.
* The sample implementation: <code>
* public SessionFile wrap(FlowFile f) {
* if (f == null) {
* return null;
* }
* if (f instanceof SessionFile) {
* return ((SessionFile) f);
* }
* return new SessionFile(this, f);
* }</code>
*/
public abstract SessionFile wrap(FlowFile f);
public List<FlowFile> wrap(List ff) {
if (ff == null) {
return null;
}
for (int i = 0; i < ff.size(); i++) {
ff.set(i, wrap((FlowFile) ff.get(i)));
}
return ff;
}
public FlowFile unwrap(FlowFile f) {
if (f == null) {
return null;
}
if (f instanceof SessionFile) {
return ((SessionFile) f).flowFile;
}
return f;
}
public List<FlowFile> unwrap(Collection<FlowFile> _ff) {
if (_ff == null) {
return null;
}
List<FlowFile> ff = new ArrayList(_ff);
for (int i = 0; i < ff.size(); i++) {
ff.set(i, unwrap(ff.get(i)));
}
return ff;
}
private void assertNotSessionFile(FlowFile f) {
if (f instanceof SessionFile) {
throw new RuntimeException("SessionFile not accepted at this point. " + this.getClass() + " developer failure.");
}
}
/**
* called when file created or modified
*/
private FlowFile onMod(FlowFile f) {
assertNotSessionFile(f);
if (foe) {
toDrop.put(f.getAttribute("uuid"), f);
}
return f;
}
/**
* called when got file from incoming queue
*/
private FlowFile onGet(FlowFile f) {
assertNotSessionFile(f);
if (f == null) {
return null;
}
if (foe) {
toFail.add(s.clone(f));
onMod(f);
}
return f;
}
private List<FlowFile> onGet(List<FlowFile> ff) {
if (ff == null) {
return null;
}
if (foe) {
for (FlowFile f : ff) {
onGet(f);
}
}
return ff;
}
/**
* called when the file removed
*/
private void onDrop(FlowFile f) {
assertNotSessionFile(f);
if (foe) {
toDrop.remove(f.getAttribute("uuid"));
}
}
private void onDrop(Collection<FlowFile> ff) {
if (foe) {
for (FlowFile f : ff) {
onDrop(f);
}
}
}
private void onClear() {
if (foe) {
toDrop.clear();
toFail.clear();
}
}
/**
* transfers all input files to relationship and drops other files.
*
* @param r where to transfer flow files, when null then transfers to input with penalize.
* @param t the cause why we do this transfer, when relationship specified then additional properties populated: ERROR_MESSAGE and ERROR_STACKTRACE.
*/
public void revertReceivedTo(Relationship r, Throwable t) {
for (FlowFile f : toDrop.values()) {
s.remove(f);
}
String errorMessage = Throwables.getMessage(t, null, 950);
String stackTrace = Throwables.stringStackTrace(t);
for (FlowFile f : toFail) {
if (t != null && r != null) {
f = s.putAttribute(f, ERROR_MESSAGE, errorMessage);
f = s.putAttribute(f, ERROR_STACKTRACE, stackTrace);
}
if (r != null) {
s.transfer(f, r);
} else {
f = s.penalize(f);
s.transfer(f);
}
}
s.commit();
onClear();
}
/*============================================= NATIVE METHODS ================================================*/
/**
* <p>
* Commits the current session ensuring all operations against FlowFiles
* within this session are atomically persisted. All FlowFiles operated on
* within this session must be accounted for by transfer or removal or the
* commit will fail.
* </p>
* <p>
* <p>
* As soon as the commit completes the session is again ready to be used
* </p>
*
* @throws IllegalStateException if detected that this method is being called from within a callback of another method in this session.
* @throws FlowFileHandlingException if not all FlowFiles acted upon within this session are accounted for by user code such that they have a transfer identified or where marked for removal.
* Automated rollback occurs.
* @throws ProcessException if some general fault occurs while persisting the session. Initiates automatic rollback. The root cause can be obtained via <code>Exception.getCause()</code>
*/
@Override
public void commit() {
for (FlowFile f : toFail) {
s.remove(f);
}
s.commit();
onClear();
}
/**
* Reverts any changes made during this session. All FlowFiles are restored
* back to their initial session state and back to their original queues. If
* this session is already committed or rolled back then no changes will
* occur. This method can be called any number of times. Calling this method
* is identical to calling {@link #rollback(boolean)} passing
* <code>false</code> as the parameter.
*/
@Override
public void rollback() {
s.rollback();
onClear();
}
/**
* Reverts any changes made during this session. All FlowFiles are restored
* back to their initial session state and back to their original queues,
* after optionally being penalized. If this session is already committed or
* rolled back then no changes will occur. This method can be called any
* number of times.
*
* @param penalize whether or not the FlowFiles that are being restored back to their queues should be penalized
*/
@Override
public void rollback(boolean penalize) {
s.rollback(penalize);
onClear();
}
/**
* Adjusts counter data for the given counter name and takes care of
* registering the counter if not already present. The adjustment occurs
* only if and when the ProcessSession is committed.
*
* @param name the name of the counter
* @param delta the delta by which to modify the counter (+ or -)
* @param immediate if true, the counter will be updated immediately, without regard to whether the ProcessSession is commit or rolled back; otherwise, the counter will be incremented only if and
* when the ProcessSession is committed.
*/
@Override
public void adjustCounter(String name, long delta, boolean immediate) {
s.adjustCounter(name, delta, immediate);
}
/**
* @return FlowFile that is next highest priority FlowFile to process. Otherwise returns null.
*/
@Override
public SessionFile get() {
return wrap(onGet(s.get()));
}
/**
* Returns up to <code>maxResults</code> FlowFiles from the work queue. If
* no FlowFiles are available, returns an empty list. Will not return null.
* If multiple incoming queues are present, the behavior is unspecified in
* terms of whether all queues or only a single queue will be polled in a
* single call.
*
* @param maxResults the maximum number of FlowFiles to return
* @return up to <code>maxResults</code> FlowFiles from the work queue. If no FlowFiles are available, returns an empty list. Will not return null.
* @throws IllegalArgumentException if <code>maxResults</code> is less than 0
*/
@Override
public List<FlowFile> get(int maxResults) {
return wrap(onGet(s.get(maxResults)));
}
/**
* <p>
* Returns all FlowFiles from all of the incoming queues for which the given
* {@link FlowFileFilter} indicates should be accepted. Calls to this method
* provide exclusive access to the underlying queues. I.e., no other thread
* will be permitted to pull FlowFiles from this Processor's queues or add
* FlowFiles to this Processor's incoming queues until this method call has
* returned.
* </p>
*
* @param filter to limit which flow files are returned
* @return all FlowFiles from all of the incoming queues for which the given {@link FlowFileFilter} indicates should be accepted.
*/
@Override
public List<FlowFile> get(FlowFileFilter filter) {
return wrap(onGet(s.get(filter)));
}
/**
* @return the QueueSize that represents the number of FlowFiles and their combined data size for all FlowFiles waiting to be processed by the Processor that owns this ProcessSession, regardless
* of which Connection the FlowFiles live on
*/
@Override
public QueueSize getQueueSize() {
return s.getQueueSize();
}
/**
* Creates a new FlowFile in the repository with no content and without any
* linkage to a parent FlowFile. This method is appropriate only when data
* is received or created from an external system. Otherwise, this method
* should be avoided and should instead use {@link #create(FlowFile)} or
* {@see #create(Collection)}.
* <p>
* When this method is used, a Provenance CREATE or RECEIVE Event should be
* generated. See the {@link #getProvenanceReporter()} method and
* {@link ProvenanceReporter} class for more information
*
* @return newly created FlowFile
*/
@Override
public SessionFile create() {
return wrap(onMod(s.create()));
}
/**
* Creates a new FlowFile in the repository with no content but with a
* parent linkage to <code>parent</code>. The newly created FlowFile will
* inherit all of the parent's attributes except for the UUID. This method
* will automatically generate a Provenance FORK event or a Provenance JOIN
* event, depending on whether or not other FlowFiles are generated from the
* same parent before the ProcessSession is committed.
*
* @param parent to base the new flowfile on
* @return newly created flowfile
*/
@Override
public SessionFile create(FlowFile parent) {
return wrap(onMod(s.create(unwrap(parent))));
}
/**
* Creates a new FlowFile in the repository with no content but with a
* parent linkage to the FlowFiles specified by the parents Collection. The
* newly created FlowFile will inherit all of the attributes that are in
* common to all parents (except for the UUID, which will be in common if
* only a single parent exists). This method will automatically generate a
* Provenance JOIN event.
*
* @param parents which the new flowfile should inherit shared attributes from
* @return new flowfile
*/
@Override
public SessionFile create(Collection<FlowFile> parents) {
return wrap(onMod(s.create(unwrap(parents))));
}
/**
* Creates a new FlowFile that is a clone of the given FlowFile as of the
* time this is called, both in content and attributes. This method
* automatically emits a Provenance CLONE Event.
*
* @param example FlowFile to be the source of cloning - given FlowFile must be a part of the given session
* @return FlowFile that is a clone of the given example
* @throws IllegalStateException if detected that this method is being called from within a callback of another method in this session and for the given FlowFile(s)
* @throws FlowFileHandlingException if the given FlowFile is already transferred or removed or doesn't belong to this session. Automatic rollback will occur.
* @throws MissingFlowFileException if the given FlowFile content cannot be found. The FlowFile should no longer be reference, will be internally destroyed, and the session is automatically
* rolled back and what is left of the FlowFile is destroyed.
* @throws FlowFileAccessException if some IO problem occurs accessing FlowFile content
* @throws NullPointerException if the argument null
*/
@Override
public SessionFile clone(FlowFile example) {
return wrap(onMod(s.clone(unwrap(example))));
}
/**
* Creates a new FlowFile whose parent is the given FlowFile. The content of
* the new FlowFile will be a subset of the byte sequence of the given
* FlowFile starting at the specified offset and with the length specified.
* The new FlowFile will contain all of the attributes of the original. This
* method automatically emits a Provenance FORK Event (or a Provenance CLONE
* Event, if the offset is 0 and the size is exactly equal to the size of
* the example FlowFile).
*
* @param parent to base the new flowfile attributes on
* @param offset of the parent flowfile to base the child flowfile content on
* @param size of the new flowfile from the offset
* @return a FlowFile with the specified size whose parent is first argument to this function
* @throws IllegalStateException if detected that this method is being called from within a callback of another method in this session and for the given FlowFile
* @throws FlowFileHandlingException if the given FlowFile is already transferred or removed or doesn't belong to this session, or if the specified offset + size exceeds that of the size of the
* parent FlowFile. Automatic rollback will occur.
* @throws MissingFlowFileException if the given FlowFile content cannot be found. The FlowFile should no longer be reference, will be internally destroyed, and the session is automatically
* rolled back and what is left of the FlowFile is destroyed.
*/
@Override
public SessionFile clone(FlowFile parent, long offset, long size) {
return wrap(onMod(s.clone(unwrap(parent), offset, size)));
}
/**
* Sets a penalty for the given FlowFile which will make it unavailable to
* be operated on any further during the penalty period.
*
* @param flowFile to penalize
* @return FlowFile the new FlowFile reference to use
* @throws IllegalStateException if detected that this method is being called from within a callback of another method in this session and for the given FlowFile(s)
* @throws FlowFileHandlingException if the given FlowFile is already transferred or removed or doesn't belong to this session. Automatic rollback will occur.
* @throws NullPointerException if the argument null
*/
@Override
public SessionFile penalize(FlowFile flowFile) {
SessionFile sf = wrap(flowFile);
sf.flowFile = onMod(s.penalize(sf.flowFile));
return sf;
}
/**
* Updates the given FlowFiles attributes with the given key/value pair. If
* the key is named {@code uuid}, this attribute will be ignored.
*
* @param flowFile to update
* @param key of attribute
* @param value of attribute
* @return FlowFile the updated FlowFile
* @throws FlowFileHandlingException if the given FlowFile is already transferred or removed or doesn't belong to this session. Automatic rollback will occur.
* @throws NullPointerException if an argument is null
*/
@Override
public SessionFile putAttribute(FlowFile flowFile, String key, String value) {
SessionFile sf = wrap(flowFile);
sf.flowFile = onMod(s.putAttribute(sf.flowFile, key, value));
return sf;
}
/**
* Updates the given FlowFiles attributes with the given key/value pairs. If
* the map contains a key named {@code uuid}, this attribute will be
* ignored.
*
* @param flowFile to update
* @param attributes the attributes to add to the given FlowFile
* @return FlowFile the updated FlowFile
* @throws IllegalStateException if detected that this method is being called from within a callback of another method in this session and for the given FlowFile(s)
* @throws FlowFileHandlingException if the given FlowFile is already transferred or removed or doesn't belong to this session. Automatic rollback will occur.
* @throws NullPointerException if an argument is null
*/
@Override
public SessionFile putAllAttributes(FlowFile flowFile, Map<String, String> attributes) {
SessionFile sf = wrap(flowFile);
sf.flowFile = onMod(s.putAllAttributes(sf.flowFile, attributes));
return sf;
}
/**
* Removes the given FlowFile attribute with the given key. If the key is
* named {@code uuid}, this method will return the same FlowFile without
* removing any attribute.
*
* @param flowFile to update
* @param key of attribute
* @return FlowFile the updated FlowFile
* @throws IllegalStateException if detected that this method is being called from within a callback of another method in this session and for the given FlowFile(s)
* @throws FlowFileHandlingException if the given FlowFile is already transferred or removed or doesn't belong to this session. Automatic rollback will occur.
* @throws NullPointerException if the argument null
*/
@Override
public SessionFile removeAttribute(FlowFile flowFile, String key) {
SessionFile sf = wrap(flowFile);
sf.flowFile = onMod(s.removeAttribute(sf.flowFile, key));
return sf;
}
/**
* Removes the attributes with the given keys from the given FlowFile. If
* the set of keys contains the value {@code uuid}, this key will be ignored
*
* @param flowFile to update
* @param keys of attribute
* @return FlowFile the updated FlowFile
* @throws IllegalStateException if detected that this method is being called from within a callback of another method in this session and for the given FlowFile(s)
* @throws FlowFileHandlingException if the given FlowFile is already transferred or removed or doesn't belong to this session. Automatic rollback will occur.
* @throws NullPointerException if the argument null
*/
@Override
public SessionFile removeAllAttributes(FlowFile flowFile, Set<String> keys) {
SessionFile sf = wrap(flowFile);
sf.flowFile = onMod(s.removeAllAttributes(sf.flowFile, keys));
return sf;
}
/**
* Remove all attributes from the given FlowFile that have keys which match
* the given pattern. If the pattern matches the key {@code uuid}, this key
* will not be removed.
*
* @param flowFile to update
* @param keyPattern may be null; if supplied is matched against each of the FlowFile attribute keys
* @return FlowFile containing only attributes which did not meet the key pattern
*/
@Override
public SessionFile removeAllAttributes(FlowFile flowFile, Pattern keyPattern) {
SessionFile sf = wrap(flowFile);
sf.flowFile = onMod(s.removeAllAttributes(sf.flowFile, keyPattern));
return sf;
}
/**
* Transfers the given FlowFile to the appropriate destination processor
* work queue(s) based on the given relationship. If the relationship leads
* to more than one destination the state of the FlowFile is replicated such
* that each destination receives an exact copy of the FlowFile though each
* will have its own unique identity. The destination processors will not be
* able to operate on the given FlowFile until this session is committed or
* until the ownership of the session is migrated to another processor. If
* ownership of the session is passed to a destination processor then that
* destination processor will have immediate visibility of the transferred
* FlowFiles within the session.
*
* @param flowFile to transfer
* @param relationship to transfer to
* @throws IllegalStateException if detected that this method is being called from within a callback of another method in this session and for the given FlowFile(s)
* @throws FlowFileHandlingException if the given FlowFile is already transferred or removed or doesn't belong to this session. Automatic rollback will occur.
* @throws NullPointerException if the argument null
* @throws IllegalArgumentException if given relationship is not a known or registered relationship
*/
@Override
public void transfer(FlowFile flowFile, Relationship relationship) {
flowFile = unwrap(flowFile);
s.transfer(flowFile, relationship);
}
/**
* Transfers the given FlowFile back to the work queue from which it was
* pulled. The processor will not be able to operate on the given FlowFile
* until this session is committed. Any modifications that have been made to
* the FlowFile will be maintained. FlowFiles that are created by the
* processor cannot be transferred back to themselves via this method.
*
* @param flowFile to transfer
* @throws IllegalStateException if detected that this method is being called from within a callback of another method in this session and for the given FlowFile(s)
* @throws FlowFileHandlingException if the given FlowFile is already transferred or removed or doesn't belong to this session. Automatic rollback will occur.
* @throws IllegalArgumentException if the FlowFile was created by this processor
* @throws NullPointerException if the argument null
*/
@Override
public void transfer(FlowFile flowFile) {
flowFile = unwrap(flowFile);
s.transfer(flowFile);
}
/**
* Transfers the given FlowFiles back to the work queues from which the
* FlowFiles were pulled. The processor will not be able to operate on the
* given FlowFile until this session is committed. Any modifications that
* have been made to the FlowFile will be maintained. FlowFiles that are
* created by the processor cannot be transferred back to themselves via
* this method.
*
* @param flowFiles to transfer
* @throws IllegalStateException if detected that this method is being called from within a callback of another method in this session and for the given FlowFile(s)
* @throws FlowFileHandlingException if the given FlowFiles are already transferred or removed or don't belong to this session. Automatic rollback will occur.
* @throws IllegalArgumentException if the FlowFile was created by this processor
* @throws NullPointerException if the argument null
*/
@Override
public void transfer(Collection<FlowFile> flowFiles) {
flowFiles = unwrap(flowFiles);
s.transfer(flowFiles);
}
/**
* Transfers the given FlowFile to the appropriate destination processor
* work queue(s) based on the given relationship. If the relationship leads
* to more than one destination the state of the FlowFile is replicated such
* that each destination receives an exact copy of the FlowFile though each
* will have its own unique identity. The destination processors will not be
* able to operate on the given FlowFile until this session is committed or
* until the ownership of the session is migrated to another processor. If
* ownership of the session is passed to a destination processor then that
* destination processor will have immediate visibility of the transferred
* FlowFiles within the session.
*
* @param flowFiles to transfer
* @param relationship to transfer to
* @throws IllegalStateException if detected that this method is being called from within a callback of another method in this session and for the given FlowFile(s)
* @throws FlowFileHandlingException if the given FlowFile is already transferred or removed or doesn't belong to this session. Automatic rollback will occur.
* @throws NullPointerException if the argument null
* @throws IllegalArgumentException if given relationship is not a known or registered relationship
*/
@Override
public void transfer(Collection<FlowFile> flowFiles, Relationship relationship) {
flowFiles = unwrap(flowFiles);
s.transfer(flowFiles, relationship);
}
/**
* Ends the managed persistence for the given FlowFile. The persistent
* attributes for the FlowFile are deleted and so is the content assuming
* nothing else references it and this FlowFile will no longer be available
* for further operation.
*
* @param flowFile to remove
* @throws IllegalStateException if detected that this method is being called from within a callback of another method in this session and for the given FlowFile(s)
* @throws FlowFileHandlingException if the given FlowFile is already transferred or removed or doesn't belong to this session. Automatic rollback will occur.
*/
@Override
public void remove(FlowFile flowFile) {
flowFile = unwrap(flowFile);
s.remove(flowFile);
onDrop(flowFile);
}
/**
* Ends the managed persistence for the given FlowFiles. The persistent
* attributes for the FlowFile are deleted and so is the content assuming
* nothing else references it and this FlowFile will no longer be available
* for further operation.
*
* @param flowFiles to remove
* @throws IllegalStateException if detected that this method is being called from within a callback of another method in this session and for the given FlowFile(s)
* @throws FlowFileHandlingException if any of the given FlowFile is already transferred or removed or doesn't belong to this session. Automatic rollback will occur.
*/
@Override
public void remove(Collection<FlowFile> flowFiles) {
flowFiles = unwrap(flowFiles);
s.remove(flowFiles);
onDrop(flowFiles);
}
/**
* Executes the given callback against the contents corresponding to the
* given FlowFile.
*
* @param flowFile flow file to retrieve content of
* @param reader callback that will be called to read the flow file content
* @throws FlowFileAccessException if some IO problem occurs accessing FlowFile content; if an attempt is made to access the InputStream provided to the given InputStreamCallback after
* this method completed its execution
*/
@Override
public void read(FlowFile flowFile, InputStreamCallback reader) throws FlowFileAccessException {
flowFile = unwrap(flowFile);
s.read(flowFile, reader);
}
/**
* Provides an InputStream that can be used to read the contents of the given FlowFile.
* This method differs from those that make use of callbacks in that this method returns
* an InputStream and expects the caller to properly handle the lifecycle of the InputStream
* (i.e., the caller is responsible for ensuring that the InputStream is closed appropriately).
* The Process Session may or may not handle closing the stream when {@link #commit()} or {@link #rollback()}
* is called, but the responsibility of doing so belongs to the caller. The InputStream will throw
* an IOException if an attempt is made to read from the stream after the session is committed or
* rolled back.
*
* @param flowFile the FlowFile to read
* @return an InputStream that can be used to read the contents of the FlowFile
* @throws IllegalStateException if detected that this method is being called from within a callback of another method in this session and for the given FlowFile(s)
* @throws FlowFileHandlingException if the given FlowFile is already transferred or removed or doesn't belong to this session. Automatic rollback will occur.
* @throws MissingFlowFileException if the given FlowFile content cannot be found. The FlowFile should no longer be referenced, will be internally destroyed, and the session is automatically
* rolled back and what is left of the FlowFile is destroyed.
*/
@Override
public InputStream read(FlowFile flowFile) {
flowFile = unwrap(flowFile);
return s.read(flowFile);
}
/**
* Executes the given callback against the contents corresponding to the
* given FlowFile.
* <p>
* <i>Note</i>: The OutputStream provided to the given OutputStreamCallback
* will not be accessible once this method has completed its execution.
*
* @param flowFile flow file to retrieve content of
* @param allowSessionStreamManagement allow session to hold the stream open for performance reasons
* @param reader that will be called to read the flow file content
* @throws IllegalStateException if detected that this method is being called from within a callback of another method in this session and for the given FlowFile(s)
* @throws FlowFileHandlingException if the given FlowFile is already transferred or removed or doesn't belong to this session. Automatic rollback will occur.
* @throws MissingFlowFileException if the given FlowFile content cannot be found. The FlowFile should no longer be reference, will be internally destroyed, and the session is automatically
* rolled back and what is left of the FlowFile is destroyed.
* @throws FlowFileAccessException if some IO problem occurs accessing FlowFile content; if an attempt is made to access the InputStream provided to the given InputStreamCallback after this
* method completed its execution
*/
@Override
public void read(FlowFile flowFile, boolean allowSessionStreamManagement, InputStreamCallback reader) throws FlowFileAccessException {
flowFile = unwrap(flowFile);
s.read(flowFile, allowSessionStreamManagement, reader);
}
/**
* Combines the content of all given source FlowFiles into a single given
* destination FlowFile.
*
* @param sources the flowfiles to merge
* @param destination the flowfile to use as the merged result
* @return updated destination FlowFile (new size, etc...)
* @throws IllegalStateException if detected that this method is being called from within a callback of another method in this session and for the given FlowFile(s)
* @throws IllegalArgumentException if the given destination is contained within the sources
* @throws FlowFileHandlingException if the given FlowFile is already transferred or removed or doesn't belong to this session. Automatic rollback will occur.
* @throws MissingFlowFileException if the given FlowFile content cannot be found. The FlowFile should no longer be reference, will be internally destroyed, and the session is automatically
* rolled back and what is left of the FlowFile is destroyed.
* @throws FlowFileAccessException if some IO problem occurs accessing FlowFile content. The state of the destination will be as it was prior to this call.
*/
@Override
public SessionFile merge(Collection<FlowFile> sources, FlowFile destination) {
SessionFile sfDestination = wrap(destination);
sources = unwrap(sources);
sfDestination.flowFile = onMod(s.merge(sources, sfDestination.flowFile));
return sfDestination;
}
/**
* Combines the content of all given source FlowFiles into a single given
* destination FlowFile.
*
* @param sources to merge together
* @param destination to merge to
* @param header bytes that will be added to the beginning of the merged output. May be null or empty.
* @param footer bytes that will be added to the end of the merged output. May be null or empty.
* @param demarcator bytes that will be placed in between each object merged together. May be null or empty.
* @return updated destination FlowFile (new size, etc...)
* @throws IllegalStateException if detected that this method is being called from within a callback of another method in this session and for the given FlowFile(s)
* @throws IllegalArgumentException if the given destination is contained within the sources
* @throws FlowFileHandlingException if the given FlowFile is already transferred or removed or doesn't belong to this session. Automatic rollback will occur.
* @throws MissingFlowFileException if the given FlowFile content cannot be found. The FlowFile should no longer be reference, will be internally destroyed, and the session is automatically
* rolled back and what is left of the FlowFile is destroyed.
* @throws FlowFileAccessException if some IO problem occurs accessing FlowFile content. The state of the destination will be as it was prior to this call.
*/
@Override
public SessionFile merge(Collection<FlowFile> sources, FlowFile destination, byte[] header, byte[] footer, byte[] demarcator) {
SessionFile sfDestination = wrap(destination);
sources = unwrap(sources);
sfDestination.flowFile = onMod(s.merge(sources, sfDestination.flowFile, header, footer, demarcator));
return sfDestination;
}
/**
* Executes the given callback against the content corresponding to the
* given FlowFile.
* <p>
* <i>Note</i>: The OutputStream provided to the given OutputStreamCallback
* will not be accessible once this method has completed its execution.
*
* @param flowFile to write to
* @param writer used to write new content
* @return updated FlowFile
* @throws IllegalStateException if detected that this method is being called from within a callback of another method in this session and for the given FlowFile(s)
* @throws FlowFileHandlingException if the given FlowFile is already transferred or removed or doesn't belong to this session. Automatic rollback will occur.
* @throws MissingFlowFileException if the given FlowFile content cannot be found. The FlowFile should no longer be referenced, will be internally destroyed, and the session is automatically
* rolled back and what is left of the FlowFile is destroyed.
* @throws FlowFileAccessException if some IO problem occurs accessing FlowFile content; if an attempt is made to access the OutputStream provided to the given OutputStreamCallback after this
* method completed its execution
*/
@Override
public SessionFile write(FlowFile flowFile, OutputStreamCallback writer) throws FlowFileAccessException {
SessionFile sf = wrap(flowFile);
sf.flowFile = onMod(s.write(sf.flowFile, writer));
return sf;
}
/**
* Executes the given callback against the content corresponding to the
* given flow file.
* <p>
* <i>Note</i>: The InputStream & OutputStream provided to the given
* StreamCallback will not be accessible once this method has completed its
* execution.
*
* @param flowFile to read from and write to
* @param writer used to read the old content and write new content
* @return updated FlowFile
* @throws IllegalStateException if detected that this method is being called from within a callback of another method in this session and for the given FlowFile(s)
* @throws FlowFileHandlingException if the given FlowFile is already transferred or removed or doesn't belong to this session. Automatic rollback will occur.
* @throws MissingFlowFileException if the given FlowFile content cannot be found. The FlowFile should no longer be reference, will be internally destroyed, and the session is automatically
* rolled back and what is left of the FlowFile is destroyed.
* @throws FlowFileAccessException if some IO problem occurs accessing FlowFile content; if an attempt is made to access the InputStream or OutputStream provided to the given StreamCallback
* after this method completed its execution
*/
@Override
public SessionFile write(FlowFile flowFile, StreamCallback writer) throws FlowFileAccessException {
SessionFile sf = wrap(flowFile);
sf.flowFile = onMod(s.write(sf.flowFile, writer));
return sf;
}
/**
* Executes the given callback against the content corresponding to the
* given FlowFile, such that any data written to the OutputStream of the
* content will be appended to the end of FlowFile.
* <p>
* <i>Note</i>: The OutputStream provided to the given OutputStreamCallback
* will not be accessible once this method has completed its execution.
*
* @param flowFile the flowfile for which content should be appended
* @param writer used to write new bytes to the flowfile content
* @return the updated flowfile reference for the new content
* @throws FlowFileAccessException if an attempt is made to access the OutputStream provided to the given OutputStreamCallback after this method completed its execution
*/
@Override
public SessionFile append(FlowFile flowFile, OutputStreamCallback writer) throws FlowFileAccessException {
SessionFile sf = wrap(flowFile);
sf.flowFile = onMod(s.append(sf.flowFile, writer));
return sf;
}
/**
* Writes to the given FlowFile all content from the given content path.
*
* @param source the file from which content will be obtained
* @param keepSourceFile if true the content is simply copied; if false the original content might be used in a destructive way for efficiency such that the repository will have the data but the
* original data will be gone. If false the source object will be removed or gone once imported. It will not be restored if the session is rolled back
* so this must be used with caution. In some cases it can result in tremendous efficiency gains but is also dangerous.
* @param flowFile the FlowFile whose content will be updated
* @return the updated destination FlowFile (new size)
* @throws IllegalStateException if detected that this method is being called from within a callback of another method in this session and for the given FlowFile(s)
* @throws FlowFileHandlingException if the given FlowFile is already transferred or removed or doesn't belong to this session. Automatic rollback will occur.
* @throws MissingFlowFileException if the given FlowFile content cannot be found. The FlowFile should no longer be reference, will be internally destroyed, and the session is automatically
* rolled back and what is left of the FlowFile is destroyed.
* @throws FlowFileAccessException if some IO problem occurs accessing FlowFile content
*/
@Override
public SessionFile importFrom(Path source, boolean keepSourceFile, FlowFile flowFile) {
SessionFile sf = wrap(flowFile);
sf.flowFile = onMod(s.importFrom(source, keepSourceFile, sf.flowFile));
return sf;
}
/**
* Writes to the given FlowFile all content from the given content path.
*
* @param source the file from which content will be obtained
* @param flowFile the FlowFile whose content will be updated
* @return the updated destination FlowFile (new size)
* @throws IllegalStateException if detected that this method is being called from within a callback of another method in this session and for the given FlowFile(s)
* @throws FlowFileHandlingException if the given FlowFile is already transferred or removed or doesn't belong to this session. Automatic rollback will occur.
* @throws MissingFlowFileException if the given FlowFile content cannot be found. The FlowFile should no longer be reference, will be internally destroyed, and the session is automatically
* rolled back and what is left of the FlowFile is destroyed.
* @throws FlowFileAccessException if some IO problem occurs accessing FlowFile content
*/
@Override
public SessionFile importFrom(InputStream source, FlowFile flowFile) {
SessionFile sf = wrap(flowFile);
sf.flowFile = onMod(s.importFrom(source, sf.flowFile));
return sf;
}
/**
* Writes the content of the given FlowFile to the given destination path.
*
* @param flowFile to export the content of
* @param destination to export the content to
* @param append if true will append to the current content at the given path; if false will replace any current content
* @throws IllegalStateException if detected that this method is being called from within a callback of another method in this session and for the given FlowFile(s)
* @throws FlowFileHandlingException if the given FlowFile is already transferred or removed or doesn't belong to this session. Automatic rollback will occur.
* @throws MissingFlowFileException if the given FlowFile content cannot be found. The FlowFile should no longer be reference, will be internally destroyed, and the session is automatically
* rolled back and what is left of the FlowFile is destroyed.
* @throws FlowFileAccessException if some IO problem occurs accessing FlowFile content
*/
@Override
public void exportTo(FlowFile flowFile, Path destination, boolean append) {
flowFile = unwrap(flowFile);
s.exportTo(flowFile, destination, append);
}
/**
* Writes the content of the given FlowFile to the given destination stream
*
* @param flowFile to export the content of
* @param destination to export the content to
* @throws IllegalStateException if detected that this method is being called from within a callback of another method in this session and for the given FlowFile(s)
* @throws FlowFileHandlingException if the given FlowFile is already transferred or removed or doesn't belong to this session. Automatic rollback will occur.
* @throws MissingFlowFileException if the given FlowFile content cannot be found. The FlowFile should no longer be reference, will be internally destroyed, and the session is automatically
* rolled back and what is left of the FlowFile is destroyed.
* @throws FlowFileAccessException if some IO problem occurs accessing FlowFile content
*/
@Override
public void exportTo(FlowFile flowFile, OutputStream destination) {
flowFile = unwrap(flowFile);
s.exportTo(flowFile, destination);
}
/**
* Returns a ProvenanceReporter that is tied to this ProcessSession.
*
* @return the provenance reporter
*/
@Override
public ProvenanceReporter getProvenanceReporter() {
return s.getProvenanceReporter();
}
@Override
public void migrate(ProcessSession newOwner, Collection<FlowFile> flowFiles) {
flowFiles = unwrap(flowFiles);
s.migrate(newOwner, flowFiles);
}
/**
* Provides an OutputStream that can be used to write to the contents of the
* given FlowFile.
*
* @param source to write to
*
* @return an OutputStream that can be used to write to the contents of the FlowFile
*
* @throws IllegalStateException if detected that this method is being
* called from within a callback of another method in this session and for
* the given FlowFile(s), or if there is an open InputStream or OutputStream for the FlowFile's content
* (see {@link #read(FlowFile)}).
* @throws FlowFileHandlingException if the given FlowFile is already
* transferred or removed or doesn't belong to this session. Automatic
* rollback will occur.
* @throws MissingFlowFileException if the given FlowFile content cannot be
* found. The FlowFile should no longer be referenced, will be internally
* destroyed, and the session is automatically rolled back and what is left
* of the FlowFile is destroyed.
* @throws FlowFileAccessException if some IO problem occurs accessing
* FlowFile content; if an attempt is made to access the OutputStream
* provided to the given OutputStreamCallaback after this method completed
* its execution
*/
@Override
public OutputStream write(FlowFile source) {
source = unwrap(source);
return s.write(source);
}
}

View File

@ -0,0 +1,243 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.groovyx.flow;
import java.io.InputStream;
import java.util.Map;
import java.util.Set;
import java.util.HashSet;
import java.util.Collection;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.io.InputStreamCallback;
/**
* The Flow file implementation that contains reference to the session.
* So all commands become easier. Example:
* <code>flowFile.putAttribute("AttrName", "AttrValue");</code>
*/
@SuppressWarnings("unused")
public abstract class SessionFile implements FlowFile {
FlowFile flowFile;
ProcessSessionWrap session;
protected SessionFile(ProcessSessionWrap session, FlowFile f) {
if (f == null || session == null) {
throw new NullPointerException("Session and FlowFile are mandatory session=" + session + " file=" + f);
}
if (f instanceof SessionFile) {
throw new RuntimeException("file could be instanceof SessionFile");
}
this.flowFile = f;
this.session = session;
}
/**
* Returns original session.
*/
public ProcessSessionWrap session() {
return session;
}
/**
* Clone flowfile with or without content.
*
* @param cloneContent clone content or not. attributes cloned in any case.
* @return new flow file
*/
public SessionFile clone(boolean cloneContent) {
if (cloneContent) {
return session.clone(flowFile); //new SessionFile(session, session.clone(flowFile));
}
return session.create(flowFile); //session.wrap( session.create(flowFile) );
}
/**
* Returns content of the flow file as InputStream.
*/
public InputStream read() {
return session.read(flowFile);
}
/**
* read flowfile content.
*/
public void read(InputStreamCallback c) {
session.read(flowFile, c);
}
/**
* write flowfile content.
*
* @return reference to self
*/
public SessionFile write(StreamCallback c) {
session.write(this, c);
return this;
}
/**
* write flowfile content.
*
* @return reference to self
*/
public SessionFile write(OutputStreamCallback c) {
session.write(this, c);
return this;
}
/**
* append flowfile content.
*
* @return reference to self
*/
public SessionFile append(OutputStreamCallback c) {
session.append(this, c);
return this;
}
/**
* set attribute value.
*
* @return reference to self
*/
public SessionFile putAttribute(String key, String value) {
session.putAttribute(this, key, value);
return this;
}
/**
* Copy attributes from map into flowfile.
*
* @return reference to self
*/
public SessionFile putAllAttributes(Map<String,String> m) {
session.putAllAttributes(this, m);
return this;
}
/**
* Removes one attribute.
*
* @return reference to self
*/
public SessionFile removeAttribute(String key) {
session.removeAttribute(this, key);
return this;
}
/**
* Removes attributes by list.
*
* @return reference to self
*/
public SessionFile removeAllAttributes(Collection<String> keys) {
Set<String> keySet = (Set<String>) (keys instanceof Set ? keys : new HashSet<>(keys));
session.removeAllAttributes(this, keySet);
return this;
}
/**
* Transfers to defined relationship or to input relationship if parameter is null.
*/
public void transfer(Relationship r) {
if (r == null) {
session.transfer(this);
} else {
session.transfer(this, r);
}
}
/**
* Drops this flow file from session.
*/
public void remove() {
session.remove(this);
}
//OVERRIDE
@Override
public long getId() {
return flowFile.getId();
}
@Override
public long getEntryDate() {
return flowFile.getEntryDate();
}
@Override
public long getLineageStartDate() {
return flowFile.getLineageStartDate();
}
@Override
public long getLineageStartIndex() {
return flowFile.getLineageStartIndex();
}
@Override
public Long getLastQueueDate() {
return flowFile.getLastQueueDate();
}
@Override
public long getQueueDateIndex() {
return flowFile.getQueueDateIndex();
}
@Override
public boolean isPenalized() {
return flowFile.isPenalized();
}
@Override
public String getAttribute(String key) {
return flowFile.getAttribute(key);
}
@Override
public long getSize() {
return flowFile.getSize();
}
/**
* @return an unmodifiable map of the flow file attributes
*/
@Override
public Map<String, String> getAttributes() {
return flowFile.getAttributes();
}
@SuppressWarnings("NullableProblems")
public int compareTo(FlowFile other) {
if (other instanceof SessionFile) {
other = ((SessionFile) other).flowFile;
}
return flowFile.compareTo(other);
}
@Override
public String toString() {
return "WRAP[" + flowFile.toString() + "]";
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.groovyx.sql;
import groovy.sql.Sql;
import groovy.sql.InParameter;
import groovy.lang.GString;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.Types;
import java.sql.SQLException;
import java.io.InputStream;
import java.io.Reader;
/***
* class to simplify work with CLOB, BLOB, DATE, and TIMESTAMP types.
* Allows following parameters set correctly Sql.BLOB(InputStream), Sql.CLOB(Reader), DATE(java.util.Date), TIMESTAMP(java.util.Date)
*/
public class OSql extends Sql {
public OSql(Connection connection) {
super(connection);
}
protected void setObject(PreparedStatement statement, int i, Object value) throws SQLException {
try {
if (value instanceof InParameter) {
InParameter p = (InParameter) value;
if (p.getType() == Types.BLOB && p.getValue() instanceof InputStream) {
statement.setBlob(i, (InputStream) p.getValue());
return;
}
if (p.getType() == Types.CLOB && p.getValue() instanceof Reader) {
statement.setClob(i, (Reader) p.getValue());
return;
}
if (p.getType() == Types.DATE && p.getValue() instanceof java.util.Date && !(p.getValue() instanceof java.sql.Date)) {
statement.setDate(i, new java.sql.Date(((java.util.Date) p.getValue()).getTime()));
return;
}
if (p.getType() == Types.TIMESTAMP && p.getValue() instanceof java.util.Date && !(p.getValue() instanceof java.sql.Timestamp)) {
statement.setTimestamp(i, new java.sql.Timestamp(((java.util.Date) p.getValue()).getTime()));
return;
}
}
if (value instanceof GString) {
value = value.toString();
}
super.setObject(statement, i, value);
} catch (Exception e) {
throw new SQLException("Can't set a parameter #" + i + " to value type " + (value == null ? "null" : value.getClass().getName()) + ": " + e.getMessage(), e);
}
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.groovyx.util;
import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
/**
* Helpers to work with files
*/
public class Files {
/**
* Classpath list separated by semicolon. You can use masks like `*`, `*.jar` in file name.
*
* @return file list defined by classpath parameter
*/
public static Set<File> listPathsFiles(String classpath) {
if (classpath == null || classpath.length() == 0) {
return Collections.emptySet();
}
Set<File> files = new HashSet<>();
for (String cp : classpath.split("\\s*;\\s*")) {
files.addAll(listPathFiles(cp));
}
return files;
}
/**
* returns file list from one path. the path could be exact filename (one file returned), exact directory (all files from dir returned)
* or exact dir with masked file names like ./dir/*.jar (all jars returned)
*/
public static List<File> listPathFiles(String path) {
File f = new File(path);
String fname = f.getName();
if (fname.contains("?") || fname.contains("*")) {
Pattern pattern = Pattern.compile(fname.replace(".", "\\.").replace("?", ".?").replace("*", ".*?"));
File[] list = f.getParentFile().listFiles((dir, name) -> pattern.matcher(name).find());
return list==null ? Collections.emptyList() : Arrays.asList(list);
}
if (!f.exists()) {
System.err.println("WARN: path not found for: " + f);
}
return Arrays.asList(f);
}
}

View File

@ -0,0 +1,176 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.groovyx.util;
import java.lang.reflect.InvocationTargetException;
import java.io.StringWriter;
import java.io.PrintWriter;
/**
* Class with helper to return simplified human readable error message with one main `at` code position.
*/
public class Throwables {
/** returns stacktrace as a String */
public static String stringStackTrace(Throwable e) {
StringWriter sw = new StringWriter(500);
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
pw.flush();
sw.flush();
return sw.toString();
}
/**
* returns error message with one main line from stacktrace
*/
public static String getMessage(Throwable e) {
return getMessage(e, null, -1);
}
/**
* returns human readable error message with only one element from stacktrace.
* The parameter `priority` could identify the stacktrace element.
* To find stacktrace element tries to find `priority`,
* then any non-standard java/groovy class.
* @param e throwable to convert to message
* @param priority package name, class, or object that could identify the stacktrace element
* @param maxlen the max length of returned string or -1 for unlimited
*/
public static String getMessage(Throwable e, Object priority, int maxlen) {
if (e == null) {
return null;
}
e = getRootException(e);
StackTraceElement[] trace = e.getStackTrace();
int traceIndex = -1;
if (priority != null) {
if (priority instanceof String) {
for (int i = 0; i < trace.length; i++) {
if (trace[i].getClassName().startsWith((String) priority)) {
traceIndex = i;
break;
}
}
} else {
if (!(priority instanceof Class)) {
priority = priority.getClass();
}
String cl = ((Class) priority).getName();
for (int i = 0; i < trace.length; i++) {
if (trace[i].getClassName().startsWith(cl)) {
traceIndex = i;
break;
}
}
if (traceIndex == -1) {
cl = ((Class) priority).getPackage().getName();
for (int i = 0; i < trace.length; i++) {
if (trace[i].getClassName().startsWith(cl)) {
traceIndex = i;
break;
}
}
}
}
}
if (traceIndex == -1) {
for (int i = 0; i < trace.length; i++) {
String cl = trace[i].getClassName();
if (cl.startsWith("java.") || cl.startsWith("javax.") || cl.startsWith("org.omg.") || cl.startsWith("org.w3c.") || cl.startsWith("org.xml.") || cl.startsWith("groovy.lang.") || cl
.startsWith("groovy.util.") || cl.startsWith("org.codehaus.") || cl.startsWith("com.springsource.") || cl.startsWith("org.springframework.") || cl.startsWith("org.apache.")
|| cl.startsWith("sun.") || cl.startsWith("com.sun.") || cl.startsWith("org.junit.") || cl.startsWith("junit.framework.")
) {
//skip standard classes
} else {
traceIndex = i;
break;
}
}
}
if (traceIndex == -1) {
traceIndex = 0;
}
//build message text
String msg = e.getMessage();
if (msg == null) {
msg = "";
}
msg = msg.trim();
//append dot at the end if no others
if (msg.length() > 0 && ".!:,;?".indexOf(msg.substring(msg.length() - 1)) == -1) {
msg += ".";
}
//exception class name without package
String msgSuffix = " " + e.getClass().getName().replaceAll("^.*\\.(\\w+)$", "$1") + " at ";
//append callers line
if (traceIndex < 0 || traceIndex >= trace.length) {
System.err.println("Error formatting exception: " + e);
e.printStackTrace(System.err);
msgSuffix = e.getClass().getName();
} else {
msgSuffix += trace[traceIndex].toString();
}
if (maxlen > 0 && msgSuffix.length() + msg.length() > maxlen) {
if (maxlen > msgSuffix.length() + 2) {
int newlen = maxlen - msgSuffix.length() - 2;
if (newlen < msg.length()) {
msg = msg.substring(0, newlen);
}
msg = msg + ".." + msgSuffix;
} else if (msg.length() > maxlen) {
msg = msg.substring(0, maxlen);
}
} else {
msg = msg + msgSuffix;
}
return msg;
}
private static Throwable getRootException(Throwable e) {
Throwable t;
if (e instanceof InvocationTargetException) {
t = ((InvocationTargetException) e).getTargetException();
} else if (e instanceof RuntimeException) {
t = e.getCause();
} else if (e.getCause() != null && e.getClass().getName().equals(e.getCause().getClass().getName())) {
t = e.getCause();
/*
}else if(e instanceof UserError){
return e;
*/
} else {
return e;
}
if (t != null) {
return getRootException(t);
}
return e;
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.groovyx.util;
import java.io.File;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
/***
* class with validators
*/
public class Validators {
/**
* differs from standard file exists validator by supporting expression language values. TODO: maybe there is a bug in standard validator?
*/
public static Validator createFileExistsAndReadableValidator() {
return (subject, input, context) -> {
final String substituted;
try {
substituted = context.newPropertyValue(input).evaluateAttributeExpressions().getValue();
} catch (final Exception e) {
return new ValidationResult.Builder()
.subject(subject)
.input(input)
.valid(false)
.explanation("Not a valid Expression Language value: " + e.getMessage())
.build();
}
final File file = new File(substituted);
final boolean valid = file.exists() && file.canRead();
final String explanation = valid ? null : "File " + file + " does not exist or cannot be read";
return new ValidationResult.Builder()
.subject(subject)
.input(input)
.valid(valid)
.explanation(explanation)
.build();
};
}
}

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.groovyx.ExecuteGroovyScript

View File

@ -0,0 +1,203 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>Groovy</title>
<!--link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /-->
<link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
</head>
<body>
<h2>Summary</h2>
<p>This is a grooviest groovy script :)</p>
<h2>Script Bindings:</h2>
<table>
<tr><th>variable</th><th>type</th><th>description</th></tr>
<tr>
<td>session</td>
<td>org.apache.nifi.processor.ProcessSession</td>
<td>the session that is used to get, change, and transfer input files</td>
</tr>
<tr>
<td>context</td>
<td>org.apache.nifi.processor.ProcessContext</td>
<td>the context (almost unuseful)</td>
</tr>
<tr>
<td>log</td>
<td>org.apache.nifi.logging.ComponentLog</td>
<td>the logger for this processor instance</td>
</tr>
<tr>
<td>REL_SUCCESS</td>
<td>org.apache.nifi.processor.Relationship</td>
<td>the success relationship</td>
</tr>
<tr>
<td>REL_FAILURE</td>
<td>org.apache.nifi.processor.Relationship</td>
<td>the failure relationship</td>
</tr>
<tr>
<td>CTL</td>
<td>java.util.HashMap&lt;String,<a href="https://github.com/apache/nifi/blob/master/nifi-api/src/main/java/org/apache/nifi/controller/ControllerService.java">ControllerService</a>&gt;</td>
<td>Map populated with controller services defined with `CTL.*` processor properties.
<br/>The `CTL.` prefixed properties could be linked to controller service and provides access to this service from a script without additional code.</td>
</tr>
<tr>
<td>SQL</td>
<td>java.util.HashMap&lt;String, <a href="http://docs.groovy-lang.org/latest/html/api/groovy/sql/Sql.html">groovy.sql.Sql</a>&gt;</td>
<td>Map populated with `groovy.sql.Sql` objects connected to corresponding database defined with `SQL.*` processor properties.
<br/>The `SQL.` prefixed properties could be linked only to DBCPSercice.</td>
</tr>
<tr>
<td>Dynamic processor properties</td>
<td>org.apache.nifi.components.PropertyDescriptor</td>
<td>All processor properties not started with `CTL.` or `SQL.` are bound to script variables</td>
</tr>
</table>
<h2>SQL map details</h2>
<p>
<b>Example:</b> if you defined property <code>`SQL.mydb`</code> and linked it to any DBCPService,
then you can access it from code <code>SQL.mydb.rows('select * from mytable')</code><br/>
<br/>The processor automatically takes connection from dbcp service before executing script and tries to handle transaction:
<br/> database transactions automatically rolled back on script exception and committed on success.
<br/>Or you can manage transaction manually.
<br/>NOTE: Script must not disconnect connection.
<br/><img src="SQL.gif"/>
<br/><img src="SQL2.gif"/>
</p>
<h2>SessionFile - flow file extension</h2>
<p>
The (org.apache.nifi.processors.groovyx.flow.SessionFile) is an actual object returned by session in Extended Groovy processor.<br/>
This flow file is a container that references session and the real flow file.<br/>
This allows to use simplified syntax to work with file attributes and content:
</p>
<p><i>set new attribute value</i></p>
<pre>
flowFile.ATTRIBUTE_NAME = ATTRIBUTE_VALUE
flowFile.'mime.type' = 'text/xml'
flowFile.putAttribute("ATTRIBUTE_NAME", ATTRIBUTE_VALUE)
//the same as
flowFile = session.putAttribute(flowFile, "ATTRIBUTE_NAME", ATTRIBUTE_VALUE)
</pre>
<p><i>remove attribute</i></p>
<pre>
flowFile.ATTRIBUTE_NAME = null
//equals to
flowFile = session.removeAttribute(flowFile, "ATTRIBUTE_NAME")
</pre>
<p><i>get attribute value</i></p>
<pre>
String a = flowFile.ATTRIBUTE_NAME
</pre>
<p><i>write content</i></p>
<pre>
flowFile.write("UTF-8", "THE CharSequence to write into flow file replacing current content")
flowFile.write("UTF-8"){writer->
do something with java.io.Writer...
}
flowFile.write{outStream->
do something with output stream...
}
flowFile.write{inStream, outStream->
do something with input and output streams...
}
</pre>
<p><i>get content</i></p>
<pre>
InputStream i = flowFile.read()
def json = new groovy.json.JsonSlurper().parse( flowFile.read() )
String text = flowFile.read().getText("UTF-8")
</pre>
<p><i>transfer flow file to success relation</i></p>
<pre>
REL_SUCCESS << flowFile
flowFile.transfer(REL_SUCCESS)
//the same as:
session.transfer(flowFile, REL_SUCCESS)
</pre>
<p><i>work with dbcp</i></p>
<pre>
import groovy.sql.Sql
//define property named `SQL.db` connected to a DBCPConnectionPool controller service
//for this case it's an H2 database example
//read value from the database with prepared statement
//and assign into flowfile attribute `db.yesterday`
def daysAdd = -1
def row = SQL.db.firstRow("select dateadd('DAY', ${daysAdd}, sysdate) as DB_DATE from dual")
flowFile.'db.yesterday' = row.DB_DATE
//to work with BLOBs and CLOBs in the database
//use parameter casting using groovy.sql.Sql.BLOB(Stream) and groovy.sql.Sql.CLOB(Reader)
//write content of the flow file into database blob
flowFile.read{ rawIn->
def parms = [
p_id : flowFile.ID as Long, //get flow file attribute named `ID`
p_data : Sql.BLOB( rawIn ), //use input stream as BLOB sql parameter
]
SQL.db.executeUpdate(parms, "update mytable set data = :p_data where id = :p_id")
}
</pre>
<h2>Handling processor start &amp; stop</h2>
<p>In the extended groovy processor you can catch `start` and `stop` events by providing corresponding static methods:</p>
<pre>
import org.apache.nifi.processor.ProcessContext
import java.util.concurrent.atomic.AtomicLong
class Const{
static Date startTime = null;
static AtomicLong triggerCount = null;
}
static onStart(ProcessContext context){
Const.startTime = new Date()
Const.triggerCount = new AtomicLong(0)
println "onStart $context ${Const.startTime}"
}
static onStop(ProcessContext context){
def alive = (System.currentTimeMillis() - Const.startTime.getTime()) / 1000
println "onStop $context executed ${ Const.triggerCount } times during ${ alive } seconds"
}
flowFile.'trigger.count' = Const.triggerCount.incrementAndGet()
REL_SUCCESS << flowFile
</pre>
<br/>
<br/>
<br/>
<br/>
</body>
</html>

View File

@ -0,0 +1,392 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.groovyx;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.MockProcessorInitializationContext;
import org.apache.commons.io.FileUtils;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.processor.exception.ProcessException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Before;
import org.junit.Test;
import org.junit.FixMethodOrder;
import org.junit.runners.MethodSorters;
import java.io.File;
import java.io.FileInputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.HashMap;
import java.sql.DriverManager;
import java.sql.Connection;
import java.sql.Statement;
import static org.junit.Assert.assertNotNull;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.codehaus.groovy.runtime.ResourceGroovyMethods;
import groovy.json.JsonSlurper;
import groovy.json.JsonOutput;
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ExecuteGroovyScriptTest {
private final static String DB_LOCATION = "target/db";
protected TestRunner runner;
protected static DBCPService dbcp = null; //to make single initialization
protected ExecuteGroovyScript proc;
public final String TEST_RESOURCE_LOCATION = "target/test/resources/groovy/";
private final String TEST_CSV_DATA = "gender,title,first,last\n"
+ "female,miss,marlene,shaw\n"
+ "male,mr,todd,graham";
@AfterClass
public static void cleanUpAfterClass() throws Exception {
try {
DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";shutdown=true");
} catch (Exception e) {
}
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
FileUtils.deleteQuietly(dbLocation);
}
/**
* Copies all scripts to the target directory because when they are compiled they can leave unwanted .class files.
*
* @throws Exception Any error encountered while testing
*/
@BeforeClass
public static void setupBeforeClass() throws Exception {
FileUtils.copyDirectory(new File("src/test/resources"), new File("target/test/resources"));
//prepare database connection
System.setProperty("derby.stream.error.file", "target/derby.log");
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
FileUtils.deleteQuietly(dbLocation);
//insert some test data
dbcp = new DBCPServiceSimpleImpl();
Connection con = dbcp.getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table mytable");
} catch (Exception e) {
}
stmt.execute("create table mytable (id integer not null, name varchar(100), scale float, created timestamp, data blob)");
stmt.execute("insert into mytable (id, name, scale, created, data) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234', null)");
stmt.execute("insert into mytable (id, name, scale, created, data) VALUES (1, 'Carrie Jones', 5.1, '2000-01-01 03:23:34.234', null)");
stmt.close();
con.commit();
con.close();
}
@Before
public void setup() throws Exception {
//init processor
proc = new ExecuteGroovyScript();
MockProcessContext context = new MockProcessContext(proc);
MockProcessorInitializationContext initContext = new MockProcessorInitializationContext(proc, context);
proc.initialize(initContext);
assertNotNull(proc.getSupportedPropertyDescriptors());
runner = TestRunners.newTestRunner(proc);
runner.addControllerService("dbcp", dbcp, new HashMap<>());
runner.enableControllerService(dbcp);
}
/**
* Tests a script that reads content of the flowfile content and stores the value in an attribute of the outgoing flowfile.
*
* @throws Exception Any error encountered while testing
*/
@Test
public void testReadFlowFileContentAndStoreInFlowFileAttribute() throws Exception {
runner.setProperty(proc.SCRIPT_BODY, "def flowFile = session.get(); if(!flowFile)return; flowFile.testAttr = flowFile.read().getText('UTF-8'); REL_SUCCESS << flowFile;");
//runner.setProperty(proc.FAIL_STRATEGY, "rollback");
runner.assertValid();
runner.enqueue("test content".getBytes("UTF-8"));
runner.run();
runner.assertAllFlowFilesTransferred(proc.REL_SUCCESS.getName(), 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(proc.REL_SUCCESS.getName());
result.get(0).assertAttributeEquals("testAttr", "test content");
}
@Test
public void test_onTrigger_groovy() throws Exception {
runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_onTrigger.groovy");
//runner.setProperty(proc.FAIL_STRATEGY, "rollback");
runner.assertValid();
runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(proc.REL_SUCCESS.getName(), 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(proc.REL_SUCCESS.getName());
result.get(0).assertAttributeEquals("from-content", "test content");
}
@Test
public void test_onTriggerX_groovy() throws Exception {
runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_onTriggerX.groovy");
//runner.setProperty(proc.FAIL_STRATEGY, "rollback");
runner.assertValid();
runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(proc.REL_SUCCESS.getName(), 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(proc.REL_SUCCESS.getName());
result.get(0).assertAttributeEquals("from-content", "test content");
}
@Test
public void test_onTrigger_changeContent_groovy() throws Exception {
runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_onTrigger_changeContent.groovy");
//runner.setProperty(proc.FAIL_STRATEGY, "rollback");
runner.assertValid();
runner.enqueue(TEST_CSV_DATA.getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(proc.REL_SUCCESS.getName(), 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(proc.REL_SUCCESS.getName());
MockFlowFile resultFile = result.get(0);
resultFile.assertAttributeEquals("selected.columns", "first,last");
resultFile.assertContentEquals("Marlene Shaw\nTodd Graham\n");
}
@Test
public void test_onTrigger_changeContentX_groovy() throws Exception {
runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_onTrigger_changeContentX.groovy");
//runner.setProperty(proc.FAIL_STRATEGY, "rollback");
runner.assertValid();
runner.enqueue(TEST_CSV_DATA.getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(proc.REL_SUCCESS.getName(), 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(proc.REL_SUCCESS.getName());
MockFlowFile resultFile = result.get(0);
resultFile.assertAttributeEquals("selected.columns", "first,last");
resultFile.assertContentEquals("Marlene Shaw\nTodd Graham\n");
}
@Test
public void test_no_input_groovy() throws Exception {
runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_no_input.groovy");
//runner.setProperty(proc.FAIL_STRATEGY, "rollback");
runner.assertValid();
runner.run();
runner.assertAllFlowFilesTransferred(proc.REL_SUCCESS.getName(), 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(proc.REL_SUCCESS.getName());
MockFlowFile resultFile = result.get(0);
resultFile.assertAttributeEquals("filename", "test.txt");
resultFile.assertContentEquals("Test");
}
@Test
public void test_good_script() throws Exception {
runner.setProperty(proc.SCRIPT_BODY, " def ff = session.get(); if(!ff)return; REL_SUCCESS << ff ");
runner.assertValid();
}
@Test
public void test_bad_script() throws Exception {
runner.setProperty(proc.SCRIPT_BODY, " { { ");
runner.assertNotValid();
}
//---------------------------------------------------------
@Test
public void test_ctl_01_access() throws Exception {
runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_ctl_01_access.groovy");
runner.setProperty("CTL.mydbcp", "dbcp"); //pass dbcp as a service to script
runner.assertValid();
runner.run();
runner.assertAllFlowFilesTransferred(proc.REL_SUCCESS.getName(), 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(proc.REL_SUCCESS.getName());
MockFlowFile resultFile = result.get(0);
resultFile.assertContentEquals("OK", "UTF-8");
}
@Test
public void test_sql_01_select() throws Exception {
runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_sql_01_select.groovy");
runner.setProperty("SQL.mydb", "dbcp");
runner.assertValid();
runner.run();
runner.assertAllFlowFilesTransferred(proc.REL_SUCCESS.getName(), 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(proc.REL_SUCCESS.getName());
MockFlowFile resultFile = result.get(0);
resultFile.assertAttributeEquals("filename", "test.txt");
resultFile.assertContentEquals("Joe Smith\nCarrie Jones\n", "UTF-8");
}
@Test
public void test_sql_02_blob_write() throws Exception {
runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_sql_02_blob_write.groovy");
runner.setProperty("SQL.mydb", "dbcp");
//runner.setProperty("ID", "0");
runner.assertValid();
runner.enqueue(TEST_CSV_DATA.getBytes(StandardCharsets.UTF_8), map("ID", "0"));
runner.run();
runner.assertAllFlowFilesTransferred(proc.REL_SUCCESS.getName(), 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(proc.REL_SUCCESS.getName());
MockFlowFile resultFile = result.get(0);
resultFile.assertContentEquals(TEST_CSV_DATA.getBytes(StandardCharsets.UTF_8));
//let's check database content in next text case
}
@Test
public void test_sql_03_blob_read() throws Exception {
//read blob from database written at previous step and write to flow file
runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_sql_03_blob_read.groovy");
runner.setProperty("SQL.mydb", "dbcp");
runner.setProperty("ID", "0");
runner.setValidateExpressionUsage(false);
runner.assertValid();
runner.run();
runner.assertAllFlowFilesTransferred(proc.REL_SUCCESS.getName(), 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(proc.REL_SUCCESS.getName());
MockFlowFile resultFile = result.get(0);
resultFile.assertContentEquals(TEST_CSV_DATA.getBytes(StandardCharsets.UTF_8));
}
@Test
public void test_sql_04_insert_and_json() throws Exception {
//read blob from database written at previous step and write to flow file
runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_sql_04_insert_and_json.groovy");
runner.setProperty("SQL.mydb", "dbcp");
runner.setValidateExpressionUsage(false);
runner.assertValid();
runner.enqueue(new FileInputStream(TEST_RESOURCE_LOCATION + "test_sql_04_insert_and_json.json"));
runner.run();
runner.assertAllFlowFilesTransferred(proc.REL_SUCCESS.getName(), 3); //number of inserted rows
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(proc.REL_SUCCESS.getName());
MockFlowFile resultFile = result.get(0);
List<String> lines = ResourceGroovyMethods.readLines(new File(TEST_RESOURCE_LOCATION + "test_sql_04_insert_and_json.json"), "UTF-8");
//pass through to&from json before compare
resultFile.assertContentEquals(JsonOutput.toJson(new JsonSlurper().parseText(lines.get(1))), "UTF-8");
}
@Test
public void test_filter_01() throws Exception {
runner.setProperty(proc.SCRIPT_BODY, "def ff = session.get{it.FILTER=='3'}; if(!ff)return; REL_SUCCESS << ff;");
//runner.setProperty(proc.FAIL_STRATEGY, "rollback");
runner.assertValid();
runner.enqueue("01".getBytes("UTF-8"), map("FILTER", "1"));
runner.enqueue("31".getBytes("UTF-8"), map("FILTER", "3"));
runner.enqueue("03".getBytes("UTF-8"), map("FILTER", "2"));
runner.enqueue("32".getBytes("UTF-8"), map("FILTER", "3"));
runner.run();
runner.assertAllFlowFilesTransferred(proc.REL_SUCCESS.getName(), 2);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(proc.REL_SUCCESS.getName());
result.get(0).assertContentEquals("31", "UTF-8");
result.get(1).assertContentEquals("32", "UTF-8");
}
@Test
public void test_read_01() throws Exception {
runner.setProperty(proc.SCRIPT_BODY, "def ff = session.get(); if(!ff)return; assert ff.read().getText('UTF-8')=='1234'; REL_SUCCESS << ff; ");
runner.assertValid();
runner.enqueue("1234".getBytes("UTF-8"));
runner.run();
runner.assertAllFlowFilesTransferred(proc.REL_SUCCESS.getName(), 1);
}
@Test
public void test_read_02() throws Exception {
runner.setProperty(proc.SCRIPT_BODY, "def ff = session.get(); if(!ff)return; ff.read{s-> assert s.getText('UTF-8')=='1234' }; REL_SUCCESS << ff; ");
runner.assertValid();
runner.enqueue("1234".getBytes("UTF-8"));
runner.run();
runner.assertAllFlowFilesTransferred(proc.REL_SUCCESS.getName(), 1);
}
@Test
public void test_read_03() throws Exception {
runner.setProperty(proc.SCRIPT_BODY, "def ff = session.get(); if(!ff)return; ff.read('UTF-8'){r-> assert r.getText()=='1234' }; REL_SUCCESS << ff; ");
runner.assertValid();
runner.enqueue("1234".getBytes("UTF-8"));
runner.run();
runner.assertAllFlowFilesTransferred(proc.REL_SUCCESS.getName(), 1);
}
private HashMap<String, String> map(String key, String value) {
HashMap<String, String> attrs = new HashMap<>();
attrs.put(key, value);
return attrs;
}
private static class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService {
@Override
public String getIdentifier() {
return "dbcp";
}
@Override
public Connection getConnection() throws ProcessException {
try {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
return DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
} catch (final Exception e) {
throw new ProcessException("getConnection failed: " + e);
}
}
}
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//just check that it's possible to access controller services
def ff=session.create()
def con=CTL.mydbcp.getConnection()
assert con instanceof java.sql.Connection
con.close();
ff.write('UTF-8', 'OK')
REL_SUCCESS<<ff

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
the original script taken from this article
http://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html
and refactored for ExecuteGroovyScript
*/
def flowFile = session.create()
//flowfile.write defined here: org\apache\nifi\processors\groovyx\GroovyMethods.java
flowFile.write{out ->
out.withWriter("UTF-8"){ it.append("Test") }
}
flowFile.'filename' = 'test.txt'
REL_SUCCESS << flowFile

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
def flowFile = session.get();
if (flowFile == null) {
context?.yield();
}
else {
flowFile = session.putAttribute(flowFile, "from-content", "test content")
session.transfer(flowFile, REL_SUCCESS)
}

View File

@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
def flowFile = session.get()
if(!flowFile)return
flowFile."from-content" = "test content"
REL_SUCCESS << flowFile

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
def flowFile = session.get();
if (flowFile == null) {
return;
}
def selectedColumns = ''
flowFile = session.write(flowFile,
{ inputStream, outputStream ->
String line
final BufferedReader inReader = new BufferedReader(new InputStreamReader(inputStream, 'UTF-8'))
line = inReader.readLine()
String[] header = line?.split(',')
selectedColumns = "${header[2]},${header[3]}"
while (line = inReader.readLine()) {
String[] cols = line.split(',')
// Select/project cols
outputStream.write("${cols[2].capitalize()} ${cols[3].capitalize()}\n".getBytes('UTF-8'))
}
} as StreamCallback)
flowFile = session?.putAttribute(flowFile, "selected.columns", selectedColumns)
flowFile = session?.putAttribute(flowFile, "filename", "split_cols.txt")
session.transfer(flowFile, /*ExecuteScript.*/ REL_SUCCESS)

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
def flowFile = session.get()
if(!flowFile)return
def selectedColumns = ''
flowFile.write{inputStream, outputStream->
String[] header = null
outputStream.withWriter("UTF-8"){outputWriter->
inputStream.eachLine("UTF-8"){line->
if(header==null){
header = line.split(',')
selectedColumns = "${header[2]},${header[3]}"
}else{
String[] cols = line.split(',')
outputWriter.write("${cols[2].capitalize()} ${cols[3].capitalize()}\n")
}
}
}
}
flowFile."selected.columns" = selectedColumns
flowFile."filename" = "split_cols.txt"
REL_SUCCESS << flowFile

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
the original script taken from this article
http://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html
and refactored and simplified for ExecuteGroovyScript
*/
def flowFile = session.create()
flowFile.write("UTF-8"){wout ->
//assume SQL.mydb property is linked to desired database connection pool
SQL.mydb.eachRow('select * from mytable'){ row->
wout << row.name << '\n'
}
}
//set filename attribute
flowFile.'filename' = 'test.txt'
REL_SUCCESS << flowFile

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import groovy.sql.Sql
def flowFile = session.get()
if(!flowFile)return
//write content of the flow file into database blob
flowFile.read{ rawIn->
def parms = [
p_id : flowFile.ID as Long,
p_data : Sql.BLOB( rawIn ),
]
assert 1==SQL.mydb.executeUpdate(parms, "update mytable set data = :p_data where id = :p_id")
}
//transfer original to output
REL_SUCCESS << flowFile

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
def flowFile = session.create()
//read blob into flowFile content
flowFile.write{out->
//get id from property with name ID
def row = SQL.mydb.firstRow("select data from mytable where id = ${ ID.value as Long }")
assert row : "row with id=`${ID}` not found"
//write blob stream to flowFile output stream
out << row.data.getBinaryStream()
}
//transfer new file to output
REL_SUCCESS << flowFile

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import groovy.sql.Sql
import groovy.json.JsonSlurper
import groovy.json.JsonOutput
//read json from input file and insert into db each array element
//return input element as a content for each output file
//drop original file
//expecting for input:
/*
[
{"field":"value", "field2":"value2", ...},
...
]
*/
def flowFile = session.get()
if(!flowFile)return
def outFiles = [] //list for new flow files
def rows = new JsonSlurper().parse( flowFile.read() )
rows.each{row->
//at this point row is a map with keys corresponding to mytable column names.
//build query: insert into mytable(a,b,c,...) values(:a, :b, :c, ...)
//and pass row-map as an argument to this query
SQL.mydb.executeInsert(row, "insert into mytable( ${row.keySet().join(',')} ) values( :${row.keySet().join(', :')} )")
//create new flowfile based on original without copying content,
//write new content and add into outFiles list
outFiles << flowFile.clone(false).write( "UTF-8", JsonOutput.toJson(row) )
}
//just easier to assert sql here
assert 2+rows.size() == SQL.mydb.firstRow("select count(*) cnt from mytable").cnt
flowFile.remove()
//transfer all new files to success relationship
REL_SUCCESS << outFiles

View File

@ -0,0 +1,7 @@
[
{"id":"10","name":"name10","scale":"10.10","created":"2010-10-10 03:23:34.234"}
,
{"id":"11","name":"name11","scale":"11.11","created":"2011-11-11 03:23:34.234"}
,
{"id":"12","name":"name12","scale":"12.12","created":"2012-12-12 03:23:34.234"}
]

View File

@ -0,0 +1,48 @@
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>1.5.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-groovyx-bundle</artifactId>
<packaging>pom</packaging>
<description>NiFi Groovy Extended Processor</description>
<modules>
<module>nifi-groovyx-processors</module>
<module>nifi-groovyx-nar</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-groovyx-processors</artifactId>
<version>1.5.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>
<version>2.4.11</version>
<scope>compile</scope>
</dependency>
</dependencies>
</dependencyManagement>
</project>

View File

@ -75,6 +75,7 @@
<module>nifi-ignite-bundle</module>
<module>nifi-rethinkdb-bundle</module>
<module>nifi-email-bundle</module>
<module>nifi-groovyx-bundle</module>
<module>nifi-ranger-bundle</module>
<module>nifi-websocket-bundle</module>
<module>nifi-tcp-bundle</module>

View File

@ -1269,6 +1269,12 @@
<version>1.5.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-groovyx-nar</artifactId>
<version>1.5.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-slack-nar</artifactId>