From 9228036e1db4624b3a90e5a0db607d2673191c59 Mon Sep 17 00:00:00 2001 From: levilentz Date: Sat, 23 Sep 2023 15:20:20 -0700 Subject: [PATCH] NIFI-7355: This closes #7677. TinkerpopClientService Signed-off-by: Joseph Witt --- .../src/main/resources/META-INF/NOTICE | 8 +- .../nifi-other-graph-services/pom.xml | 32 +- .../graph/AbstractTinkerpopClientService.java | 116 ---- .../nifi/graph/GremlinClientService.java | 107 ---- .../nifi/graph/TinkerpopClientService.java | 526 ++++++++++++++++++ .../nifi/graph/gremlin/SimpleEntry.java | 52 ++ ...g.apache.nifi.controller.ControllerService | 2 +- .../additionalDetails.html | 54 -- .../additionalDetails.html | 93 ++++ ...linClientServiceControllerSettingsIT.java} | 14 +- ...lientServiceYamlSettingsAndBytecodeIT.java | 133 +++++ .../graph/TestableGremlinClientService.java | 8 +- .../src/test/resources/gremlin.yml | 19 + 13 files changed, 870 insertions(+), 294 deletions(-) delete mode 100644 nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/main/java/org/apache/nifi/graph/AbstractTinkerpopClientService.java delete mode 100644 nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/main/java/org/apache/nifi/graph/GremlinClientService.java create mode 100644 nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/main/java/org/apache/nifi/graph/TinkerpopClientService.java create mode 100644 nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/main/java/org/apache/nifi/graph/gremlin/SimpleEntry.java delete mode 100644 nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/main/resources/docs/org.apache.nifi.graph.GremlinClientService/additionalDetails.html create mode 100644 nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/main/resources/docs/org.apache.nifi.graph.TinkerpopClientService/additionalDetails.html rename nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/test/java/org/apache/nifi/graph/{GremlinClientServiceIT.java => GremlinClientServiceControllerSettingsIT.java} (84%) create mode 100644 nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/test/java/org/apache/nifi/graph/GremlinClientServiceYamlSettingsAndBytecodeIT.java create mode 100644 nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/test/resources/gremlin.yml diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services-nar/src/main/resources/META-INF/NOTICE index 6b3e063ae4..df05408b23 100644 --- a/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services-nar/src/main/resources/META-INF/NOTICE +++ b/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services-nar/src/main/resources/META-INF/NOTICE @@ -108,13 +108,11 @@ The following binary components are provided under the Apache Software License v (ASLv2) Exp4j Copyright 2017 - (ASLv2) Groovy 2.4.16 (http://www.groovy-lang.org) - groovy-2.4.16-indy - groovy-json-2.4.16-indy - groovy-sql-2.4.16-indy + (ASLv2) Groovy 4.0.15 (http://www.groovy-lang.org) + groovy-4.0.15 The following NOTICE information applies: Apache Groovy - Copyright 2003-2018 The Apache Software Foundation + Copyright 2003-2020 The Apache Software Foundation This product includes software developed at The Apache Software Foundation (http://www.apache.org/). diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/pom.xml b/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/pom.xml index 005b9b8919..8e28435e91 100644 --- a/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/pom.xml +++ b/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/pom.xml @@ -21,6 +21,7 @@ 3.7.0 + org.apache.nifi @@ -34,7 +35,7 @@ org.apache.nifi nifi-graph-client-service-api - ${project.version} + 2.0.0-SNAPSHOT provided @@ -54,7 +55,6 @@ org.apache.nifi nifi-mock - 2.0.0-SNAPSHOT test @@ -90,6 +90,34 @@ gremlin-driver ${gremlin.version} + + + org.apache.groovy + groovy + ${nifi.groovy.version} + + + org.apache.groovy + groovy-dateutil + ${nifi.groovy.version} + + + commons-codec + commons-codec + + + + org.testcontainers + testcontainers + test + + + + org.testcontainers + junit-jupiter + test + + diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/main/java/org/apache/nifi/graph/AbstractTinkerpopClientService.java b/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/main/java/org/apache/nifi/graph/AbstractTinkerpopClientService.java deleted file mode 100644 index f85f2f6c4d..0000000000 --- a/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/main/java/org/apache/nifi/graph/AbstractTinkerpopClientService.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.graph; - -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.ssl.SSLContextService; -import org.apache.tinkerpop.gremlin.driver.Cluster; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -public abstract class AbstractTinkerpopClientService extends AbstractControllerService { - public static final PropertyDescriptor CONTACT_POINTS = new PropertyDescriptor.Builder() - .name("tinkerpop-contact-points") - .displayName("Contact Points") - .description("A comma-separated list of hostnames or IP addresses where an OpenCypher-enabled server can be found.") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) - .build(); - public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() - .name("tinkerpop-port") - .displayName("Port") - .description("The port where Gremlin Server is running on each host listed as a contact point.") - .required(true) - .defaultValue("8182") - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) - .build(); - public static final PropertyDescriptor PATH = new PropertyDescriptor.Builder() - .name("tinkerpop-path") - .displayName("Path") - .description("The URL path where Gremlin Server is running on each host listed as a contact point.") - .required(true) - .defaultValue("/gremlin") - .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) - .build(); - - public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() - .name("tinkerpop-ssl-context-service") - .displayName("SSL Context Service") - .description("The SSL Context Service used to provide client certificate information for TLS/SSL " - + "connections.") - .required(false) - .identifiesControllerService(SSLContextService.class) - .build(); - - public static final List DESCRIPTORS = Collections.unmodifiableList(Arrays.asList( - CONTACT_POINTS, PORT, PATH, SSL_CONTEXT_SERVICE - )); - - @Override - public List getSupportedPropertyDescriptors() { - return DESCRIPTORS; - } - - protected Cluster.Builder setupSSL(ConfigurationContext context, Cluster.Builder builder) { - if (context.getProperty(SSL_CONTEXT_SERVICE).isSet()) { - SSLContextService service = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - builder - .enableSsl(true) - .keyStore(service.getKeyStoreFile()) - .keyStorePassword(service.getKeyStorePassword()) - .keyStoreType(service.getKeyStoreType()) - .trustStore(service.getTrustStoreFile()) - .trustStorePassword(service.getTrustStorePassword()); - usesSSL = true; - } - - return builder; - } - - boolean usesSSL; - protected String transitUrl; - - protected Cluster buildCluster(ConfigurationContext context) { - String contactProp = context.getProperty(CONTACT_POINTS).evaluateAttributeExpressions().getValue(); - int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger(); - String path = context.getProperty(PATH).evaluateAttributeExpressions().getValue(); - String[] contactPoints = contactProp.split(",[\\s]*"); - Cluster.Builder builder = Cluster.build(); - for (String contactPoint : contactPoints) { - builder.addContactPoint(contactPoint.trim()); - } - - builder.port(port).path(path); - - builder = setupSSL(context, builder); - - transitUrl = String.format("gremlin%s://%s:%s%s", usesSSL ? "+ssl" : "", - contactProp, port, path); - - return builder.create(); - } -} diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/main/java/org/apache/nifi/graph/GremlinClientService.java b/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/main/java/org/apache/nifi/graph/GremlinClientService.java deleted file mode 100644 index e001fd0493..0000000000 --- a/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/main/java/org/apache/nifi/graph/GremlinClientService.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.graph; - -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnDisabled; -import org.apache.nifi.annotation.lifecycle.OnEnabled; -import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.tinkerpop.gremlin.driver.Client; -import org.apache.tinkerpop.gremlin.driver.Cluster; -import org.apache.tinkerpop.gremlin.driver.Result; - -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; - -@CapabilityDescription("A client service that connects to a graph database that can accept queries in the Tinkerpop Gremlin DSL.") -@Tags({ "graph", "database", "gremlin", "tinkerpop", }) -public class GremlinClientService extends AbstractTinkerpopClientService implements TinkerPopClientService { - private Cluster cluster; - protected Client client; - public static final String NOT_SUPPORTED = "NOT_SUPPORTED"; - private ConfigurationContext context; - - @OnEnabled - public void onEnabled(ConfigurationContext context) { - this.context = context; - cluster = buildCluster(context); - client = cluster.connect(); - } - - @OnDisabled - public void onDisabled() { - client.close(); - cluster.close(); - client = null; - cluster = null; - } - - public Map doQuery(String query, Map parameters, GraphQueryResultCallback handler) { - try { - Iterator iterator = client.submit(query, parameters).iterator(); - long count = 0; - while (iterator.hasNext()) { - Result result = iterator.next(); - Object obj = result.getObject(); - if (obj instanceof Map) { - handler.process((Map)obj, iterator.hasNext()); - } else { - handler.process(new HashMap(){{ - put("result", obj); - }}, iterator.hasNext()); - } - count++; - } - - Map resultAttributes = new HashMap<>(); - resultAttributes.put(NODES_CREATED, NOT_SUPPORTED); - resultAttributes.put(RELATIONS_CREATED, NOT_SUPPORTED); - resultAttributes.put(LABELS_ADDED, NOT_SUPPORTED); - resultAttributes.put(NODES_DELETED, NOT_SUPPORTED); - resultAttributes.put(RELATIONS_DELETED, NOT_SUPPORTED); - resultAttributes.put(PROPERTIES_SET, NOT_SUPPORTED); - resultAttributes.put(ROWS_RETURNED, String.valueOf(count)); - - return resultAttributes; - - } catch (Exception ex) { - throw new ProcessException(ex); - } - } - - @Override - public Map executeQuery(String query, Map parameters, GraphQueryResultCallback handler) { - try { - return doQuery(query, parameters, handler); - } catch (Exception ex) { - cluster.close(); - client.close(); - cluster = buildCluster(context); - client = cluster.connect(); - return doQuery(query, parameters, handler); - } - } - - @Override - public String getTransitUrl() { - return transitUrl; - } -} diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/main/java/org/apache/nifi/graph/TinkerpopClientService.java b/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/main/java/org/apache/nifi/graph/TinkerpopClientService.java new file mode 100644 index 0000000000..f94a5ba0c5 --- /dev/null +++ b/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/main/java/org/apache/nifi/graph/TinkerpopClientService.java @@ -0,0 +1,526 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.graph; + +import groovy.lang.Binding; +import groovy.lang.GroovyClassLoader; +import groovy.lang.GroovyShell; +import groovy.lang.Script; +import io.netty.handler.ssl.ClientAuth; +import io.netty.handler.ssl.JdkSslContext; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.components.resource.ResourceCardinality; +import org.apache.nifi.components.resource.ResourceType; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.graph.gremlin.SimpleEntry; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.util.StringUtils; +import org.apache.nifi.util.file.classloader.ClassLoaderUtils; +import org.apache.tinkerpop.gremlin.driver.Client; +import org.apache.tinkerpop.gremlin.driver.Cluster; +import org.apache.tinkerpop.gremlin.driver.Result; +import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection; +import org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +@Tags({"graph", "gremlin"}) +@CapabilityDescription("This service interacts with a tinkerpop-compliant graph service, providing both script submission and bytecode submission capabilities. " + + "Script submission is the default, with the script command being sent to the gremlin server as text. This should only be used for simple interactions with a tinkerpop-compliant server " + + "such as counts or other operations that do not require the injection of custom classed. " + + "Bytecode submission allows much more flexibility. When providing a jar, custom serializers can be used and pre-compiled graph logic can be utilized by groovy scripts" + + "provided by processors such as the ExecuteGraphQueryRecord.") +@RequiresInstanceClassLoading +public class TinkerpopClientService extends AbstractControllerService implements GraphClientService { + public static final String NOT_SUPPORTED = "NOT_SUPPORTED"; + private static final AllowableValue BYTECODE_SUBMISSION = new AllowableValue("bytecode-submission", "ByteCode Submission", + "Groovy scripts are compiled within NiFi, with the GraphTraversalSource injected as a variable 'g'. Effectively allowing " + + "your logic to directly manipulates the graph without string serialization overheard." + ); + + private static final AllowableValue SCRIPT_SUBMISSION = new AllowableValue("script-submission", "Script Submission", + "Script is sent to the gremlin server as a submission. Similar to a rest request. " + ); + + private static final AllowableValue YAML_SETTINGS = new AllowableValue("yaml-settings", "Yaml Settings", + "Connection to the gremlin server will be specified via a YAML file (very flexible)"); + + private static final AllowableValue SERVICE_SETTINGS = new AllowableValue("service-settings", "Service-Defined Settings", + "Connection to the gremlin server will be specified via values on this controller (simpler). " + + "Only recommended for testing and development with a simple grpah instance. "); + + public static final PropertyDescriptor SUBMISSION_TYPE = new PropertyDescriptor.Builder() + .name("submission-type") + .displayName("Script Submission Type") + .description("A selection that toggles for between script submission or as bytecode submission") + .allowableValues(SCRIPT_SUBMISSION, BYTECODE_SUBMISSION) + .defaultValue("script-submission") + .required(true) + .build(); + + public static final PropertyDescriptor CONNECTION_SETTINGS = new PropertyDescriptor.Builder() + .name("connection-settings") + .displayName("Settings Specification") + .description("Selecting \"Service-Defined Settings\" connects using the setting on this service. Selecting \"Yaml Settings\" uses the specified YAML file for connection settings. ") + .allowableValues(SERVICE_SETTINGS, YAML_SETTINGS) + .defaultValue("service-settings") + .required(true) + .build(); + + public static final PropertyDescriptor CONTACT_POINTS = new PropertyDescriptor.Builder() + .name("tinkerpop-contact-points") + .displayName("Contact Points") + .description("A comma-separated list of hostnames or IP addresses where an Gremlin-enabled server can be found.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .dependsOn(CONNECTION_SETTINGS, SERVICE_SETTINGS) + .build(); + + public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() + .name("tinkerpop-port") + .displayName("Port") + .description("The port where Gremlin Server is running on each host listed as a contact point.") + .required(true) + .defaultValue("8182") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .dependsOn(CONNECTION_SETTINGS, SERVICE_SETTINGS) + .build(); + + public static final PropertyDescriptor PATH = new PropertyDescriptor.Builder() + .name("tinkerpop-path") + .displayName("Path") + .description("The URL path where Gremlin Server is running on each host listed as a contact point.") + .required(true) + .defaultValue("/gremlin") + .addValidator(Validator.VALID) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .dependsOn(CONNECTION_SETTINGS, SERVICE_SETTINGS) + .build(); + + public static final PropertyDescriptor TRAVERSAL_SOURCE_NAME = new PropertyDescriptor.Builder() + .name("gremlin-traversal-source-name") + .displayName("Traversal Source Name") + .description("An optional property that lets you set the name of the remote traversal instance. " + + "This can be really important when working with databases like JanusGraph that support " + + "multiple backend traversal configurations simultaneously.") + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .addValidator(Validator.VALID) + .build(); + + public static final PropertyDescriptor REMOTE_OBJECTS_FILE = new PropertyDescriptor.Builder() + .name("remote-objects-file") + .displayName("Remote Objects File") + .description("The remote-objects file YAML used for connecting to the gremlin server.") + .required(true) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .dependsOn(CONNECTION_SETTINGS, YAML_SETTINGS) + .build(); + + public static final PropertyDescriptor USER_NAME = new PropertyDescriptor.Builder() + .name("user-name") + .displayName("Username") + .description("The username used to authenticate with the gremlin server." + + " Note: when using a remote.yaml file, this username value (if set) will overload any " + + "username set in the YAML file.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .build(); + + public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .name("password") + .displayName("Password") + .description("The password used to authenticate with the gremlin server." + + " Note: when using a remote.yaml file, this password setting (if set) will override any " + + "password set in the YAML file") + .required(false) + .sensitive(true) + .build(); + + public static final PropertyDescriptor EXTRA_RESOURCE = new PropertyDescriptor.Builder() + .name("extension") + .displayName("Extension JARs") + .description("A comma-separated list of Java JAR files to be loaded. This should contain any Serializers or other " + + "classes specified in the YAML file. Additionally, any custom classes required for the groovy script to " + + "work in the bytecode submission setting should also be contained in these JAR files.") + .dependsOn(CONNECTION_SETTINGS, YAML_SETTINGS) + .defaultValue(null) + .identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, ResourceType.DIRECTORY, ResourceType.URL) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .dynamicallyModifiesClasspath(true) + .build(); + + public static final PropertyDescriptor EXTENSION_CLASSES = new PropertyDescriptor.Builder() + .name("extension-classes") + .displayName("Extension Classes") + .addValidator(Validator.VALID) + .description("A comma-separated list of fully qualified Java class names that correspond to classes to implement. This " + + "is useful for services such as JanusGraph that need specific serialization classes. " + + "This configuration property has no effect unless a value for the Extension JAR field is " + + "also provided.") + .dependsOn(EXTRA_RESOURCE) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .dependsOn(CONNECTION_SETTINGS, YAML_SETTINGS) + .required(false) + .build(); + + public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("ssl-context-service") + .displayName("SSL Context Service") + .description("The SSL Context Service used to provide client certificate information for TLS/SSL " + + "connections.") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); + + public static final List DESCRIPTORS = Collections.unmodifiableList(Arrays.asList( + SUBMISSION_TYPE, + CONNECTION_SETTINGS, + REMOTE_OBJECTS_FILE, + EXTRA_RESOURCE, + EXTENSION_CLASSES, + CONTACT_POINTS, + PORT, + PATH, + TRAVERSAL_SOURCE_NAME, + USER_NAME, + PASSWORD, + SSL_CONTEXT_SERVICE + )); + + private GroovyShell groovyShell; + private Map compiledCode; + protected Cluster cluster; + private String traversalSourceName; + private GraphTraversalSource traversalSource; + private boolean scriptSubmission = true; + boolean usesSSL; + protected String transitUrl; + + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws InitializationException { + loadClasses(context); + GroovyClassLoader loader = new GroovyClassLoader(this.getClass().getClassLoader()); + groovyShell = new GroovyShell(loader); + compiledCode = new ConcurrentHashMap<>(); + + if (context.getProperty(TRAVERSAL_SOURCE_NAME).isSet()) { + traversalSourceName = context.getProperty(TRAVERSAL_SOURCE_NAME).evaluateAttributeExpressions() + .getValue(); + } + + scriptSubmission = context.getProperty(SUBMISSION_TYPE).getValue().equals(SCRIPT_SUBMISSION.getValue()); + + cluster = buildCluster(context); + } + + @OnDisabled + public void shutdown() { + try { + compiledCode = null; + if (traversalSource != null) { + traversalSource.close(); + } + } catch (Exception e) { + throw new ProcessException(e); + } finally { + if (cluster != null) { + cluster.close(); + } + cluster = null; + traversalSource = null; + } + } + + @Override + public Map executeQuery(String s, Map map, GraphQueryResultCallback graphQueryResultCallback) { + try { + if (scriptSubmission) { + return scriptSubmission(s, map, graphQueryResultCallback); + } else { + return bytecodeSubmission(s, map, graphQueryResultCallback); + } + } catch (Exception ex) { + throw new ProcessException(ex); + } + } + + @Override + public String getTransitUrl() { + return this.transitUrl; + } + + @Override + public List getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + + @Override + public Collection customValidate(ValidationContext context) { + Collection results = new ArrayList<>(); + boolean jarsIsSet = !StringUtils.isEmpty(context.getProperty(EXTRA_RESOURCE).getValue()); + boolean clzIsSet = !StringUtils.isEmpty(context.getProperty(EXTENSION_CLASSES).getValue()); + + if (jarsIsSet && clzIsSet) { + try { + final ClassLoader loader = ClassLoaderUtils + .getCustomClassLoader(context.getProperty(EXTRA_RESOURCE).getValue(), this.getClass().getClassLoader(), null); + String[] classes = context.getProperty(EXTENSION_CLASSES).evaluateAttributeExpressions().getValue().split(",[\\s]*"); + for (String clz : classes) { + Class.forName(clz, true, loader); + } + } catch (Exception ex) { + results.add(new ValidationResult.Builder().subject(EXTENSION_CLASSES.getDisplayName()).valid(false).explanation(ex.toString()).build()); + } + } + + if (context.getProperty(USER_NAME).isSet() && !context.getProperty(PASSWORD).isSet()) { + results.add(new ValidationResult.Builder() + .explanation("When specifying a username, the password must also be set").valid(false).build() + ); + } + if (context.getProperty(PASSWORD).isSet() && !context.getProperty(USER_NAME).isSet()) { + results.add(new ValidationResult.Builder() + .explanation("When specifying a password, the password must also be set").valid(false).build() + ); + } + return results; + } + + protected Cluster.Builder setupSSL(ConfigurationContext context, Cluster.Builder builder) { + if (context.getProperty(SSL_CONTEXT_SERVICE).isSet()) { + SSLContextService service = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + builder + .enableSsl(true) + .sslContext(new JdkSslContext(service.createContext(), true, ClientAuth.NONE)); + usesSSL = true; + } + + return builder; + } + + + public void loadClasses(ConfigurationContext context) { + String path = context.getProperty(EXTRA_RESOURCE).getValue(); + String classList = context.getProperty(EXTENSION_CLASSES).getValue(); + if (path != null && classList != null && !path.isEmpty() && !classList.isEmpty()) { + try { + ClassLoader loader = ClassLoaderUtils.getCustomClassLoader(path, this.getClass().getClassLoader(), null); + String[] classes = context.getProperty(EXTENSION_CLASSES).evaluateAttributeExpressions().getValue().split(",[\\s]*"); + for (String cls : classes) { + Class clz = Class.forName(cls.trim(), true, loader); + if (getLogger().isDebugEnabled()) { + getLogger().debug(clz.getName()); + } + } + } catch (Exception e) { + throw new ProcessException(e); + } + } + } + + + protected Cluster buildCluster(ConfigurationContext context) { + + Cluster.Builder builder = Cluster.build(); + List hosts = new ArrayList<>(); + if (!context.getProperty(REMOTE_OBJECTS_FILE).isSet()) { + String contactProp = context.getProperty(CONTACT_POINTS).evaluateAttributeExpressions().getValue(); + int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger(); + String path = context.getProperty(PATH).evaluateAttributeExpressions().getValue(); + String[] contactPoints = contactProp.split(",[\\s]*"); + for (String contactPoint : contactPoints) { + builder.addContactPoint(contactPoint.trim()); + hosts.add(contactPoint.trim()); + } + builder.port(port); + if (path != null && !path.isEmpty()) { + builder.path(path); + } + } else { + //ToDo: there is a bug in getting the hostname from the builder, therefore when doing + // bytecode submission, the transitUrl ends up being effectively useless. Need to extract it + // from the yaml to get this to work as expected. + File yamlFile = new File(context.getProperty(REMOTE_OBJECTS_FILE).evaluateAttributeExpressions().getValue()); + try { + builder = Cluster.build(yamlFile); + } catch (Exception ex) { + throw new ProcessException(ex); + } + } + builder = setupSSL(context, builder); + + if (context.getProperty(USER_NAME).isSet() && context.getProperty(PASSWORD).isSet()) { + String username = context.getProperty(USER_NAME).evaluateAttributeExpressions().getValue(); + String password = context.getProperty(PASSWORD).getValue(); + builder.credentials(username, password); + } + + Cluster cluster = builder.create(); + + transitUrl = String.format("gremlin%s://%s:%s%s", usesSSL ? "+ssl" : "", + String.join(",", hosts), cluster.getPort(), cluster.getPath()); + + return cluster; + } + + protected Map scriptSubmission(String query, Map parameters, GraphQueryResultCallback handler) { + try { + Client client = cluster.connect(); + Iterator iterator = client.submit(query, parameters).iterator(); + long count = 0; + while (iterator.hasNext()) { + Result result = iterator.next(); + Object obj = result.getObject(); + if (obj instanceof Map) { + handler.process((Map)obj, iterator.hasNext()); + } else { + handler.process(new HashMap<>() {{ + put("result", obj); + }}, iterator.hasNext()); + } + count++; + } + + Map resultAttributes = new HashMap<>(); + resultAttributes.put(NODES_CREATED, NOT_SUPPORTED); + resultAttributes.put(RELATIONS_CREATED, NOT_SUPPORTED); + resultAttributes.put(LABELS_ADDED, NOT_SUPPORTED); + resultAttributes.put(NODES_DELETED, NOT_SUPPORTED); + resultAttributes.put(RELATIONS_DELETED, NOT_SUPPORTED); + resultAttributes.put(PROPERTIES_SET, NOT_SUPPORTED); + resultAttributes.put(ROWS_RETURNED, String.valueOf(count)); + + return resultAttributes; + + } catch (Exception ex) { + throw new ProcessException(ex); + } + } + + protected Map bytecodeSubmission(String s, Map map, GraphQueryResultCallback graphQueryResultCallback) { + String hash = DigestUtils.sha256Hex(s); + Script compiled; + + if (this.traversalSource == null) { + this.traversalSource = createTraversal(); + } + int rowsReturned = 0; + + if (compiledCode.containsKey(hash)) { + compiled = compiledCode.get(hash); + } else { + compiled = groovyShell.parse(s); + compiledCode.put(s, compiled); + } + + if (getLogger().isDebugEnabled()) { + getLogger().debug(map.toString()); + } + + Binding bindings = new Binding(); + map.forEach(bindings::setProperty); + bindings.setProperty("g", traversalSource); + bindings.setProperty("log", getLogger()); + try { + compiled.setBinding(bindings); + Object result = compiled.run(); + if (result instanceof Map) { + Map resultMap = (Map) result; + if (!resultMap.isEmpty()) { + Iterator outerResultSet = resultMap.entrySet().iterator(); + while (outerResultSet.hasNext()) { + Map.Entry innerResultSet = (Map.Entry) outerResultSet.next(); + if (innerResultSet.getValue() instanceof Map) { + Iterator resultSet = ((Map) innerResultSet.getValue()).entrySet().iterator(); + while (resultSet.hasNext()) { + Map.Entry tempResult = (Map.Entry) resultSet.next(); + Map tempRetObject = new HashMap<>(); + tempRetObject.put(tempResult.getKey(), tempResult.getValue()); + SimpleEntry returnObject = new SimpleEntry(tempResult.getKey(), tempRetObject); + Map resultReturnMap = new HashMap<>(); + resultReturnMap.put(innerResultSet.getKey(), returnObject); + if (getLogger().isDebugEnabled()) { + getLogger().debug(resultReturnMap.toString()); + } + graphQueryResultCallback.process(resultReturnMap, resultSet.hasNext()); + } + } else { + Map resultReturnMap = new HashMap<>(); + resultReturnMap.put(innerResultSet.getKey(), innerResultSet.getValue()); + graphQueryResultCallback.process(resultReturnMap, false); + } + rowsReturned++; + } + + } + } + } catch (Exception e) { + throw new ProcessException(e); + } + + Map resultAttributes = new HashMap<>(); + resultAttributes.put(ROWS_RETURNED, String.valueOf(rowsReturned)); + + return resultAttributes; + } + + protected GraphTraversalSource createTraversal() { + GraphTraversalSource traversal; + try { + if (StringUtils.isEmpty(traversalSourceName)) { + traversal = AnonymousTraversalSource.traversal().withRemote(DriverRemoteConnection.using(cluster)); + } else { + traversal = AnonymousTraversalSource.traversal().withRemote(DriverRemoteConnection.using(cluster, traversalSourceName)); + } + } catch (Exception e) { + throw new ProcessException(e); + } + + + return traversal; + } +} diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/main/java/org/apache/nifi/graph/gremlin/SimpleEntry.java b/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/main/java/org/apache/nifi/graph/gremlin/SimpleEntry.java new file mode 100644 index 0000000000..21aba0125e --- /dev/null +++ b/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/main/java/org/apache/nifi/graph/gremlin/SimpleEntry.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.graph.gremlin; + +import java.util.Map; + +public class SimpleEntry implements Map.Entry { + private final K key; + private V value; + + public SimpleEntry(K key, V value) { + this.key = key; + this.value = value; + } + + @Override + public K getKey() { + return key; + } + + @Override + public V getValue() { + return value; + } + + @Override + public V setValue(V value) { + V old = this.value; + this.value = value; + return old; + } + + @Override + public String toString() { + return String.format("%s:%s", this.key, this.value); + } + +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService index 685741c262..dfb8a8e59a 100644 --- a/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.graph.GremlinClientService \ No newline at end of file +org.apache.nifi.graph.TinkerpopClientService diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/main/resources/docs/org.apache.nifi.graph.GremlinClientService/additionalDetails.html b/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/main/resources/docs/org.apache.nifi.graph.GremlinClientService/additionalDetails.html deleted file mode 100644 index 33bca23c16..0000000000 --- a/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/main/resources/docs/org.apache.nifi.graph.GremlinClientService/additionalDetails.html +++ /dev/null @@ -1,54 +0,0 @@ - - - - - - GremlinClientService - - - - - -

Description:

-

- This client service configures a connection to a Gremlin Server and allows Gremlin queries to be executed against - the Gremlin Server. For more information on Gremlin and Gremlin Server, see the Apache Tinkerpop project. -

-

Warning for New Users

-

- A common issue when creating Gremlin scripts for first time users is to accidentally return an unserializable object. Gremlin - is a Groovy DSL and so it behaves like compiled Groovy including returning the last statement in the script. This is an example - of a Gremlin script that could cause unexpected failures: -

-
-    g.V().hasLabel("person").has("name", "John Smith").valueMap()
-
-

- The valueMap() step is not directly serializable and will fail. To fix that you have two potential options: -

-
-    //Return a Map
-    g.V().hasLabel("person").has("name", "John Smith").valueMap().next()
-
-

- Alternative: -

-
-    g.V().hasLabel("person").has("name", "John Smith").valueMap()
-    true //Return boolean literal
-
- - \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/main/resources/docs/org.apache.nifi.graph.TinkerpopClientService/additionalDetails.html b/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/main/resources/docs/org.apache.nifi.graph.TinkerpopClientService/additionalDetails.html new file mode 100644 index 0000000000..e14c372edf --- /dev/null +++ b/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/main/resources/docs/org.apache.nifi.graph.TinkerpopClientService/additionalDetails.html @@ -0,0 +1,93 @@ + + + + + + GremlinClientService + + + + + +

Description:

+

+ This client service configures a connection to a Gremlin Server and allows Gremlin queries to be executed against + the Gremlin Server. For more information on Gremlin and Gremlin Server, see the Apache Tinkerpop project. +

+ +

+ This client service supports two differnt modes of operation: Script Submission and Bytecode Submission, described below. +

+ +

Script Submission

+

+ Script submission is the default way to interact with the gremlin server. This takes the input script and uses Script Submission + to interact with the gremlin server. Because the script is shipped to the gremlin server as a string, only simple queries are recommended (count, path, etc) + as there are no complex serializers available in this operation. This also means that NiFi will not be opinionated about what is returned, whatever the response from + the tinkerpop server is, the response will be deserialized assuming common Java types. In the case of a Map return, the values + will be returned as a record in the FlowFile response, in all other cases, the return of the query will be coerced into a + Map with key "result" and value being the result of your script submission for that specific response. +

+ +

Serialization Issues in Script Submission

+

+ A common issue when creating Gremlin scripts for first time users is to accidentally return an unserializable object. Gremlin + is a Groovy DSL and so it behaves like compiled Groovy including returning the last statement in the script. This is an example + of a Gremlin script that could cause unexpected failures: +

+
+    g.V().hasLabel("person").has("name", "John Smith").valueMap()
+
+

+ The valueMap() step is not directly serializable and will fail. To fix that you have two potential options: +

+
+    //Return a Map
+    g.V().hasLabel("person").has("name", "John Smith").valueMap().next()
+
+

+ Alternative: +

+
+    g.V().hasLabel("person").has("name", "John Smith").valueMap()
+    true //Return boolean literal
+
+

Bytecode Submission

+

+ Bytecode submission is the more flexible of the two submission method and will be much more performant in a production + system. When combined with the Yaml connection settings and a custom jar, very complex graph queries can be run directly + within the NiFi JVM, leveraging custom serializers to decrease serialization overhead. +

+

+ Instead of submitting a script to the gremlin server, requiring string serialization on both sides of the string result + set, the groovy script is compiled within the NiFi JVM. This compiled script has the bindings of g (the GraphTraversalSource) + and log (the NiFi logger) injected into the compiled code. Utilizing g, your result set is contained within NiFi and serialization + should take care of the overhead of your responses drastically decreasing the likelihood of serialization errors. +

+

+ As the result returned cannot be known by NiFi to be a specific type, your groovy script must rerun a Map<String, Object>, + otherwise the response will be ignored. Here is an example: +

+
+    Object results = g.V().hasLabel("person").has("name", "John Smith").valueMap().collect()
+    [result: results]
+
+

+ This will break up your response objects into an array within your result key, allowing further processing within nifi + if necessary. +

+ + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/test/java/org/apache/nifi/graph/GremlinClientServiceIT.java b/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/test/java/org/apache/nifi/graph/GremlinClientServiceControllerSettingsIT.java similarity index 84% rename from nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/test/java/org/apache/nifi/graph/GremlinClientServiceIT.java rename to nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/test/java/org/apache/nifi/graph/GremlinClientServiceControllerSettingsIT.java index 15e8a87e83..92221731e1 100644 --- a/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/test/java/org/apache/nifi/graph/GremlinClientServiceIT.java +++ b/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/test/java/org/apache/nifi/graph/GremlinClientServiceControllerSettingsIT.java @@ -33,21 +33,25 @@ import static org.junit.jupiter.api.Assertions.assertEquals; /* * As of JanusGraph 0.3.X these tests can be a little inconsistent for a few runs at first. */ -public class GremlinClientServiceIT { +public class GremlinClientServiceControllerSettingsIT { private TestRunner runner; private TestableGremlinClientService clientService; + @BeforeEach public void setup() throws Exception { clientService = new TestableGremlinClientService(); runner = TestRunners.newTestRunner(NoOpProcessor.class); runner.addControllerService("gremlinService", clientService); - runner.setProperty(clientService, AbstractTinkerpopClientService.CONTACT_POINTS, "localhost"); + runner.setProperty(clientService, TinkerpopClientService.CONTACT_POINTS, "localhost"); + runner.setProperty(clientService, TinkerpopClientService.PORT, "8182"); runner.enableControllerService(clientService); runner.assertValid(); + String teardown = IOUtils.toString(getClass().getResourceAsStream("/teardown.gremlin"), "UTF-8"); + clientService.getCluster().connect().submit(teardown); String setup = IOUtils.toString(getClass().getResourceAsStream("/setup.gremlin"), "UTF-8"); - clientService.getClient().submit(setup); + clientService.getCluster().connect().submit(setup); assertEquals("gremlin://localhost:8182/gremlin", clientService.getTransitUrl()); } @@ -55,7 +59,7 @@ public class GremlinClientServiceIT { @AfterEach public void tearDown() throws Exception { String teardown = IOUtils.toString(getClass().getResourceAsStream("/teardown.gremlin"), "UTF-8"); - clientService.getClient().submit(teardown); + clientService.getCluster().connect().submit(teardown); } @Test @@ -74,4 +78,4 @@ public class GremlinClientServiceIT { Map result = clientService.executeQuery(gremlin, new HashMap<>(), (record, isMore) -> integer.incrementAndGet()); assertEquals(1, integer.get()); } -} +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/test/java/org/apache/nifi/graph/GremlinClientServiceYamlSettingsAndBytecodeIT.java b/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/test/java/org/apache/nifi/graph/GremlinClientServiceYamlSettingsAndBytecodeIT.java new file mode 100644 index 0000000000..6c6ad4e865 --- /dev/null +++ b/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/test/java/org/apache/nifi/graph/GremlinClientServiceYamlSettingsAndBytecodeIT.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.graph; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.util.NoOpProcessor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/* + * Note: this integration test does not work out of the box + * because nifi needs to understand/load specific serializers + * to use when connecting via yaml. If using the yaml in the + * resources folder, please update line 54 with the path to a jar + * That contains org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV1 + * gremlin-util generally has this available + */ +@Testcontainers +public class GremlinClientServiceYamlSettingsAndBytecodeIT { + private TestRunner runner; + private TestableGremlinClientService clientService; + + @BeforeEach + public void setup() throws Exception { + + String remoteYamlFile = "src/test/resources/gremlin.yml"; + String customJarFile = "src/test/resources/gremlin-util-3.7.0.jar"; + clientService = new TestableGremlinClientService(); + runner = TestRunners.newTestRunner(NoOpProcessor.class); + runner.addControllerService("gremlinService", clientService); + runner.setProperty(clientService, TinkerpopClientService.CONNECTION_SETTINGS, "yaml-settings"); + runner.setProperty(clientService, TinkerpopClientService.SUBMISSION_TYPE, "bytecode-submission"); + runner.setProperty(clientService, TinkerpopClientService.REMOTE_OBJECTS_FILE, remoteYamlFile); + runner.setProperty(clientService, TinkerpopClientService.EXTRA_RESOURCE, customJarFile); + /*Note: This does not seem to have much of an effect. As long as EXTRA_RESOURCE has the + classes of interest, both the gremlin driver and ExecuteGraphQueryRecord will properly execute. + However, something like this will be needed if we ever move away from groovy. + */ +// runner.setProperty(clientService, TinkerpopClientService.EXTENSION_CLASSES, "org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV1"); + runner.enableControllerService(clientService); + runner.assertValid(); + + String teardown = IOUtils.toString(getClass().getResourceAsStream("/teardown.gremlin"), "UTF-8"); + clientService.getCluster().connect().submit(teardown); + String setup = IOUtils.toString(getClass().getResourceAsStream("/setup.gremlin"), "UTF-8"); + clientService.getCluster().connect().submit(setup); + + } + + @AfterEach + public void tearDown() throws Exception { + String teardown = IOUtils.toString(getClass().getResourceAsStream("/teardown.gremlin"), "UTF-8"); + clientService.getCluster().connect().submit(teardown); + } + + @Test + public void testValueMap() { + String gremlin = "[result: g.V().hasLabel('dog').valueMap().collect()]"; + AtomicInteger integer = new AtomicInteger(); + Map result = clientService.executeQuery(gremlin, new HashMap<>(), (record, isMore) -> { + Assertions.assertTrue(record.containsKey("result")); + Assertions.assertTrue(record.get("result") instanceof List); + ((List) record.get("result")).forEach(it -> { + integer.incrementAndGet(); + }); + }); + } + + @Test + public void testCount() { + String gremlin = "[result: g.V().hasLabel('dog').count().next()]"; + AtomicLong dogCount = new AtomicLong(); + Map result = clientService.executeQuery(gremlin, new HashMap<>(), (record, isMore) -> { + Assertions.assertTrue(record.containsKey("result")); + dogCount.set((Long) record.get("result")); + }); + assertEquals(2, dogCount.get()); + } + + @Test + public void testSubGraph() { + String gremlin = "[dogInE: g.V().hasLabel('dog').inE().count().next(), dogOutE: g.V().hasLabel('dog').outE().count().next(), " + + "dogProps: g.V().hasLabel('dog').valueMap().collect()]"; + List> recordSet = new ArrayList<>(); + Map result = clientService.executeQuery(gremlin, new HashMap<>(), (record, isMore) -> { + recordSet.add(record); + }); + Assertions.assertEquals(3, recordSet.size()); + List> dogInE = recordSet.stream().filter(it -> it.containsKey("dogInE")).toList(); + List> dogOutE = recordSet.stream().filter(it -> it.containsKey("dogOutE")).toList(); + List> dogProps = recordSet.stream().filter(it -> it.containsKey("dogProps")).toList(); + + Assertions.assertFalse(dogInE.isEmpty()); + Assertions.assertFalse(dogOutE.isEmpty()); + Assertions.assertFalse(dogProps.isEmpty()); + + Assertions.assertEquals(2, (Long) dogOutE.get(0).get("dogOutE")); + Assertions.assertEquals(4, (Long) dogInE.get(0).get("dogInE")); + Assertions.assertTrue(dogProps.get(0).get("dogProps") instanceof List); + Map dogPropsMap = (Map) ((List) dogProps.get(0).get("dogProps")).get(0); + Assertions.assertTrue(dogPropsMap.containsKey("name")); + Assertions.assertTrue(dogPropsMap.containsKey("age")); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/test/java/org/apache/nifi/graph/TestableGremlinClientService.java b/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/test/java/org/apache/nifi/graph/TestableGremlinClientService.java index 5006e4496f..b3b8d13e24 100644 --- a/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/test/java/org/apache/nifi/graph/TestableGremlinClientService.java +++ b/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/test/java/org/apache/nifi/graph/TestableGremlinClientService.java @@ -17,10 +17,10 @@ package org.apache.nifi.graph; -import org.apache.tinkerpop.gremlin.driver.Client; +import org.apache.tinkerpop.gremlin.driver.Cluster; -public class TestableGremlinClientService extends GremlinClientService { - public Client getClient() { - return client; +public class TestableGremlinClientService extends TinkerpopClientService { + public Cluster getCluster() { + return cluster; } } diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/test/resources/gremlin.yml b/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/test/resources/gremlin.yml new file mode 100644 index 0000000000..0d9764e182 --- /dev/null +++ b/nifi-nar-bundles/nifi-graph-bundle/nifi-other-graph-services/src/test/resources/gremlin.yml @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +hosts: [localhost] +port: 8182 +serializer: { className: org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV1, + config: { serializeResultToString: false }}