NIFI-3288: Add Clojure support for ExecuteScript

This closes: #1402.

Signed-off-by: Andre F de Miranda <trixpan@users.noreply.github.com>
This commit is contained in:
Matt Burgess 2017-01-06 15:06:48 -05:00 committed by Andre F de Miranda
parent 690130b063
commit 7963df89eb
14 changed files with 546 additions and 3 deletions

View File

@ -1283,6 +1283,7 @@ The following binary components are provided under the Eclipse Public License 1.
(EPL 1.0) Common Service Data Objects (org.eclipse.persistence:commonj.sdo:2.1.1 - http://www.eclipse.org/eclipselink/)
(EPL 1.0) Java Persistence API (org.eclipse.persistence:javax.persistence:2.1.0 - http://www.eclipse.org/eclipselink/)
(EPL 1.0) JaCoCo Java Code Coverage Library ( org.jacoco ) http://www.eclemma.org/jacoco
(EPLv1.0) Clojure (org.clojure:clojure:1.8.0 - http://clojure.org)
*****************
Mozilla Public License v2.0

View File

@ -271,4 +271,14 @@ licenses.
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
The binary distribution of this produce bundles 'clojure' under a EPL 1.0 license.
* Copyright (c) Rich Hickey. All rights reserved.
* The use and distribution terms for this software are covered by the
* Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
* which can be found in the file epl-v10.html at the root of this distribution.
* By using this software in any fashion, you are agreeing to be bound by
* the terms of this license.
* You must not remove this notice, or any other, from this software.

View File

@ -58,4 +58,6 @@ The following binary components are provided under a EPL v1.0 license
JRuby is licensed under three licenses - the EPL 1.0, GPL 2 and LGPL 2.1. Apache NiFi uses the EPL v1.0 license.
The following NOTICE information applies:
Copyright (c) 2007-2015 The JRuby project
Copyright (c) 2007-2015 The JRuby project
(EPLv1.0) Clojure (org.clojure:clojure:1.8.0 - http://clojure.org)

View File

@ -57,6 +57,11 @@
<artifactId>jruby-complete</artifactId>
<version>9.0.4.0</version>
</dependency>
<dependency>
<groupId>org.clojure</groupId>
<artifactId>clojure</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>

View File

@ -54,7 +54,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
@Tags({"script", "execute", "groovy", "python", "jython", "jruby", "ruby", "javascript", "js", "lua", "luaj", "restricted"})
@Tags({"script", "execute", "groovy", "python", "jython", "jruby", "ruby", "javascript", "js", "lua", "luaj", "clojure", "restricted"})
@CapabilityDescription("Experimental - Executes a script given the flow file and a process session. The script is responsible for "
+ "handling the incoming flow file (transfer to SUCCESS or remove, e.g.) as well as any flow files created by "
+ "the script. If the handling is incomplete or incorrect, the session will be rolled back. Experimental: "

View File

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script.engine;
import clojure.lang.Compiler;
import clojure.lang.LineNumberingPushbackReader;
import clojure.lang.Namespace;
import clojure.lang.RT;
import clojure.lang.Symbol;
import clojure.lang.Var;
import javax.script.AbstractScriptEngine;
import javax.script.Bindings;
import javax.script.ScriptContext;
import javax.script.ScriptEngineFactory;
import javax.script.ScriptException;
import javax.script.SimpleBindings;
import java.io.Reader;
import java.io.StringReader;
import java.util.UUID;
/**
* A ScriptEngine implementation for the Clojure language
*/
public class ClojureScriptEngine extends AbstractScriptEngine {
public static final String ENGINE_NAME = "Clojure";
public static final String ENGINE_VERSION = "1.8.0";
private volatile ScriptEngineFactory scriptEngineFactory;
private final String uuid = "ns-" + UUID.randomUUID().toString();
private final Symbol NAMESPACE_SYMBOL = Symbol.create(uuid);
protected ClojureScriptEngine(ScriptEngineFactory scriptEngineFactory) {
this.scriptEngineFactory = scriptEngineFactory;
// Set up the engine bindings
Bindings engineScope = getBindings(ScriptContext.ENGINE_SCOPE);
engineScope.put(ENGINE, ENGINE_NAME);
engineScope.put(ENGINE_VERSION, ENGINE_VERSION);
engineScope.put(NAME, ENGINE_NAME);
engineScope.put(LANGUAGE, ENGINE_NAME);
engineScope.put(LANGUAGE_VERSION, ENGINE_VERSION);
}
@Override
public Object eval(String script, ScriptContext context) throws ScriptException {
if (script == null) {
throw new NullPointerException("script is null");
}
return eval(new StringReader(script), context);
}
@Override
public Object eval(Reader reader, ScriptContext context) throws ScriptException {
try {
// Get engine bindings and send them to Clojure
Bindings engineBindings = context.getBindings(ScriptContext.ENGINE_SCOPE);
engineBindings.entrySet().forEach((entry) -> Var.intern(Namespace.findOrCreate(NAMESPACE_SYMBOL), Symbol.create(entry.getKey().intern()), entry.getValue(), true));
Var.pushThreadBindings(
RT.map(RT.CURRENT_NS, RT.CURRENT_NS.deref(),
RT.IN, new LineNumberingPushbackReader(context.getReader()),
RT.OUT, context.getWriter(),
RT.ERR, context.getErrorWriter()));
Object result = Compiler.load(reader);
return result;
} catch (Exception e) {
throw new ScriptException(e);
} finally {
Namespace.remove(NAMESPACE_SYMBOL);
}
}
@Override
public Bindings createBindings() {
return new SimpleBindings();
}
@Override
public ScriptEngineFactory getFactory() {
return scriptEngineFactory;
}
public String getNamespace() {
return uuid;
}
}

View File

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script.engine;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineFactory;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
/**
* A ScriptEngineFactory implementation for the Clojure language
*/
public class ClojureScriptEngineFactory implements ScriptEngineFactory {
public static final List<String> EXTENSIONS = Collections.unmodifiableList(Collections.singletonList("clj"));
public static final List<String> MIME_TYPES = Collections.unmodifiableList(Arrays.asList("application/clojure", "text/clojure"));
public static final List<String> NAMES = Collections.unmodifiableList(Collections.singletonList("Clojure"));
// This engine provides constants, global engine properties, etc. to be returned by the ScriptEngineFactory interface methods,
// and is not used to execute scripts. A new ClojureScriptEngine will be returned by each call to getScriptEngine()
private static ScriptEngine scriptEngine;
public ClojureScriptEngineFactory() {
scriptEngine = getScriptEngine();
}
@Override
public String getEngineName() {
return (String) scriptEngine.get(ScriptEngine.ENGINE);
}
@Override
public String getEngineVersion() {
return (String) scriptEngine.get(ScriptEngine.ENGINE_VERSION);
}
@Override
public List<String> getExtensions() {
return EXTENSIONS;
}
@Override
public List<String> getMimeTypes() {
return MIME_TYPES;
}
@Override
public List<String> getNames() {
return NAMES;
}
@Override
public String getLanguageName() {
return (String) scriptEngine.get(ScriptEngine.LANGUAGE);
}
@Override
public String getLanguageVersion() {
return (String) scriptEngine.get(ScriptEngine.LANGUAGE_VERSION);
}
@Override
public Object getParameter(String key) {
return key == null ? null : scriptEngine.get(key);
}
@Override
public String getMethodCallSyntax(String object, String method, String... args) {
// construct a statement like (.method object arg1 arg2 ...). This works for instance methods as well as statics
List<String> params = Arrays.asList("(." + method, object);
params.addAll(Arrays.asList(args));
return params.stream().collect(Collectors.joining(" ")).concat(")");
}
@Override
public String getOutputStatement(String toDisplay) {
return toDisplay == null ? null : "(println \"" + toDisplay + "\")";
}
@Override
public String getProgram(String... statements) {
if (statements == null) {
return null;
}
return Arrays.stream(statements).collect(Collectors.joining("\""));
}
@Override
public ScriptEngine getScriptEngine() {
return new ClojureScriptEngine(this);
}
}

View File

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script.impl;
import org.apache.nifi.processors.script.engine.ClojureScriptEngine;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
public class ClojureScriptEngineConfigurator extends AbstractModuleClassloaderConfigurator {
private static final String PRELOADS =
"(:import \n"
+ "[org.apache.nifi.components "
+ "AbstractConfigurableComponent AllowableValue ConfigurableComponent PropertyDescriptor PropertyValue ValidationContext ValidationResult Validator"
+ "]\n"
+ "[org.apache.nifi.components.state Scope StateManager StateMap]\n"
+ "[org.apache.nifi.flowfile FlowFile]\n"
+ "[org.apache.nifi.processor "
+ "AbstractProcessor AbstractSessionFactoryProcessor DataUnit FlowFileFilter ProcessContext Processor "
+ "ProcessorInitializationContext ProcessSession ProcessSessionFactory Relationship SchedulingContext"
+ "]\n"
+ "[org.apache.nifi.processor.exception FlowFileAccessException FlowFileHandlingException MissingFlowFileException ProcessException]\n"
+ "[org.apache.nifi.processor.io InputStreamCallback OutputStreamCallback StreamCallback]\n"
+ "[org.apache.nifi.processor.util FlowFileFilters StandardValidators]\n"
+ "[org.apache.nifi.processors.script ScriptingComponentHelper ScriptingComponentUtils ExecuteScript InvokeScriptedProcessor ScriptEngineConfigurator]\n"
+ "[org.apache.nifi.logging ComponentLog]\n"
+ ")\n";
private ScriptEngine scriptEngine;
@Override
public String getScriptEngineName() {
return "Clojure";
}
@Override
public Object init(ScriptEngine engine, String[] modulePaths) throws ScriptException {
scriptEngine = engine;
return scriptEngine;
}
@Override
public Object eval(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException {
scriptEngine = engine;
StringBuilder sb = new StringBuilder("(ns ");
sb.append(((ClojureScriptEngine) scriptEngine).getNamespace());
sb.append(" ");
sb.append(PRELOADS);
sb.append(")\n");
sb.append(scriptBody);
return engine.eval(sb.toString());
}
}

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.script.engine.ClojureScriptEngineFactory

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.script.impl.ClojureScriptEngineConfigurator
org.apache.nifi.processors.script.impl.JythonScriptEngineConfigurator
org.apache.nifi.processors.script.impl.GroovyScriptEngineConfigurator
org.apache.nifi.processors.script.impl.JavascriptScriptEngineConfigurator

View File

@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script;
import org.apache.nifi.util.MockFlowFile;
import org.junit.Before;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class TestExecuteClojure extends BaseScriptTest {
public final String TEST_CSV_DATA = "gender,title,first,last\n"
+ "female,miss,marlene,shaw\n"
+ "male,mr,todd,graham";
@Before
public void setup() throws Exception {
super.setupExecuteScript();
}
/**
* Tests a script file that has provides the body of an onTrigger() function.
*
* @throws Exception Any error encountered while testing
*/
@Test
public void testReadFlowFileContentAndStoreInFlowFileAttributeWithScriptFile() throws Exception {
runner.setValidateExpressionUsage(false);
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Clojure");
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "clojure/test_onTrigger.clj");
runner.setProperty(ScriptingComponentUtils.MODULES, "target/test/resources/clojure");
runner.assertValid();
runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS);
result.get(0).assertAttributeEquals("from-content", "test content");
}
/**
* Tests a script file that has provides the body of an onTrigger() function.
*
* @throws Exception Any error encountered while testing
*/
@Test
public void testNoIncomingFlowFile() throws Exception {
runner.setValidateExpressionUsage(false);
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Clojure");
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "clojure/test_onTrigger.clj");
runner.setProperty(ScriptingComponentUtils.MODULES, "target/test/resources/clojure");
runner.assertValid();
runner.run();
runner.assertTransferCount(ExecuteScript.REL_SUCCESS, 0);
runner.assertTransferCount(ExecuteScript.REL_FAILURE, 0);
}
/**
* Tests a script file that creates and transfers a new flow file.
*
* @throws Exception Any error encountered while testing
*/
@Test
public void testCreateNewFlowFileWithScriptFile() throws Exception {
runner.setValidateExpressionUsage(false);
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Clojure");
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "clojure/test_onTrigger_newFlowFile.clj");
runner.setProperty(ScriptingComponentUtils.MODULES, TEST_RESOURCE_LOCATION + "clojure");
runner.assertValid();
runner.enqueue(TEST_CSV_DATA.getBytes(StandardCharsets.UTF_8));
runner.run();
// The script removes the original file and transfers only the new one
assertEquals(1, runner.getRemovedCount());
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS);
result.get(0).assertAttributeEquals("selected.columns", "title,first");
result.get(0).assertAttributeEquals("filename", "split_cols.txt");
}
/**
* Tests a script file that uses dynamic properties defined on the processor.
*
* @throws Exception Any error encountered while testing
*/
@Test
public void testDynamicProperties() throws Exception {
runner.setValidateExpressionUsage(true);
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Clojure");
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "clojure/test_dynamicProperties.clj");
runner.setProperty("myProp", "${myAttr}");
runner.assertValid();
runner.enqueue(TEST_CSV_DATA.getBytes(StandardCharsets.UTF_8),
new HashMap<String, String>(1) {{
put("myAttr", "testValue");
}});
runner.run();
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS);
result.get(0).assertAttributeEquals("from-content", "testValue");
}
}

View File

@ -0,0 +1,25 @@
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(let [flowFile (.get session)]
(if (some? flowFile)
(do
(.transfer session
(.putAttribute session flowFile "from-content" (.getValue (.evaluateAttributeExpressions myProp flowFile)))
REL_SUCCESS
)
)
)
)

View File

@ -0,0 +1,18 @@
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(let [flowFile (.get session)]
(if (some? flowFile) (do (.transfer session (.putAttribute session flowFile "from-content" "test content") REL_SUCCESS)))
)

View File

@ -0,0 +1,53 @@
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(import org.apache.commons.io.IOUtils)
(import [java.io BufferedReader InputStreamReader])
(let [flowFile (.get session)]
(if (some? flowFile)
(do
(let [ inReader (BufferedReader. (InputStreamReader. (.read session flowFile) "UTF-8"))
header (.split (.readLine inReader) ",")]
(.transfer session
(.write session
(.putAttribute session
(.putAttribute session (.create session) "selected.columns" (str (aget header 1) "," (aget header 2)))
"filename" "split_cols.txt"
)
(reify OutputStreamCallback
(process [this outputStream]
(loop []
(let [line (.readLine inReader)]
(if (some? line)
(let [cols (.split line ",")]
(.write outputStream (.getBytes (str (aget cols 3) ", " (aget cols 2) "\n")))
(recur)
)
)
)
)
)
)
) REL_SUCCESS
)
(.close inReader)
)
(.remove session flowFile)
)
)
)