NIFI-7355: This closes #7677. TinkerpopClientService

Signed-off-by: Joseph Witt <joewitt@apache.org>
This commit is contained in:
levilentz 2023-09-23 15:20:20 -07:00 committed by Joseph Witt
parent 9b9dd4bae3
commit 9228036e1d
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
13 changed files with 870 additions and 294 deletions

View File

@ -108,13 +108,11 @@ The following binary components are provided under the Apache Software License v
(ASLv2) Exp4j
Copyright 2017
(ASLv2) Groovy 2.4.16 (http://www.groovy-lang.org)
groovy-2.4.16-indy
groovy-json-2.4.16-indy
groovy-sql-2.4.16-indy
(ASLv2) Groovy 4.0.15 (http://www.groovy-lang.org)
groovy-4.0.15
The following NOTICE information applies:
Apache Groovy
Copyright 2003-2018 The Apache Software Foundation
Copyright 2003-2020 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).

View File

@ -21,6 +21,7 @@
<properties>
<gremlin.version>3.7.0</gremlin.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
@ -34,7 +35,7 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-graph-client-service-api</artifactId>
<version>${project.version}</version>
<version>2.0.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
@ -54,7 +55,6 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
@ -90,6 +90,34 @@
<artifactId>gremlin-driver</artifactId>
<version>${gremlin.version}</version>
</dependency>
<dependency>
<groupId>org.apache.groovy</groupId>
<artifactId>groovy</artifactId>
<version>${nifi.groovy.version}</version>
</dependency>
<dependency>
<groupId>org.apache.groovy</groupId>
<artifactId>groovy-dateutil</artifactId>
<version>${nifi.groovy.version}</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -1,116 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.graph;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.tinkerpop.gremlin.driver.Cluster;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
public abstract class AbstractTinkerpopClientService extends AbstractControllerService {
public static final PropertyDescriptor CONTACT_POINTS = new PropertyDescriptor.Builder()
.name("tinkerpop-contact-points")
.displayName("Contact Points")
.description("A comma-separated list of hostnames or IP addresses where an OpenCypher-enabled server can be found.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
.name("tinkerpop-port")
.displayName("Port")
.description("The port where Gremlin Server is running on each host listed as a contact point.")
.required(true)
.defaultValue("8182")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
public static final PropertyDescriptor PATH = new PropertyDescriptor.Builder()
.name("tinkerpop-path")
.displayName("Path")
.description("The URL path where Gremlin Server is running on each host listed as a contact point.")
.required(true)
.defaultValue("/gremlin")
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("tinkerpop-ssl-context-service")
.displayName("SSL Context Service")
.description("The SSL Context Service used to provide client certificate information for TLS/SSL "
+ "connections.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
public static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
CONTACT_POINTS, PORT, PATH, SSL_CONTEXT_SERVICE
));
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
}
protected Cluster.Builder setupSSL(ConfigurationContext context, Cluster.Builder builder) {
if (context.getProperty(SSL_CONTEXT_SERVICE).isSet()) {
SSLContextService service = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
builder
.enableSsl(true)
.keyStore(service.getKeyStoreFile())
.keyStorePassword(service.getKeyStorePassword())
.keyStoreType(service.getKeyStoreType())
.trustStore(service.getTrustStoreFile())
.trustStorePassword(service.getTrustStorePassword());
usesSSL = true;
}
return builder;
}
boolean usesSSL;
protected String transitUrl;
protected Cluster buildCluster(ConfigurationContext context) {
String contactProp = context.getProperty(CONTACT_POINTS).evaluateAttributeExpressions().getValue();
int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
String path = context.getProperty(PATH).evaluateAttributeExpressions().getValue();
String[] contactPoints = contactProp.split(",[\\s]*");
Cluster.Builder builder = Cluster.build();
for (String contactPoint : contactPoints) {
builder.addContactPoint(contactPoint.trim());
}
builder.port(port).path(path);
builder = setupSSL(context, builder);
transitUrl = String.format("gremlin%s://%s:%s%s", usesSSL ? "+ssl" : "",
contactProp, port, path);
return builder.create();
}
}

View File

@ -1,107 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.graph;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.tinkerpop.gremlin.driver.Client;
import org.apache.tinkerpop.gremlin.driver.Cluster;
import org.apache.tinkerpop.gremlin.driver.Result;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@CapabilityDescription("A client service that connects to a graph database that can accept queries in the Tinkerpop Gremlin DSL.")
@Tags({ "graph", "database", "gremlin", "tinkerpop", })
public class GremlinClientService extends AbstractTinkerpopClientService implements TinkerPopClientService {
private Cluster cluster;
protected Client client;
public static final String NOT_SUPPORTED = "NOT_SUPPORTED";
private ConfigurationContext context;
@OnEnabled
public void onEnabled(ConfigurationContext context) {
this.context = context;
cluster = buildCluster(context);
client = cluster.connect();
}
@OnDisabled
public void onDisabled() {
client.close();
cluster.close();
client = null;
cluster = null;
}
public Map<String, String> doQuery(String query, Map<String, Object> parameters, GraphQueryResultCallback handler) {
try {
Iterator<Result> iterator = client.submit(query, parameters).iterator();
long count = 0;
while (iterator.hasNext()) {
Result result = iterator.next();
Object obj = result.getObject();
if (obj instanceof Map) {
handler.process((Map)obj, iterator.hasNext());
} else {
handler.process(new HashMap<String, Object>(){{
put("result", obj);
}}, iterator.hasNext());
}
count++;
}
Map<String, String> resultAttributes = new HashMap<>();
resultAttributes.put(NODES_CREATED, NOT_SUPPORTED);
resultAttributes.put(RELATIONS_CREATED, NOT_SUPPORTED);
resultAttributes.put(LABELS_ADDED, NOT_SUPPORTED);
resultAttributes.put(NODES_DELETED, NOT_SUPPORTED);
resultAttributes.put(RELATIONS_DELETED, NOT_SUPPORTED);
resultAttributes.put(PROPERTIES_SET, NOT_SUPPORTED);
resultAttributes.put(ROWS_RETURNED, String.valueOf(count));
return resultAttributes;
} catch (Exception ex) {
throw new ProcessException(ex);
}
}
@Override
public Map<String, String> executeQuery(String query, Map<String, Object> parameters, GraphQueryResultCallback handler) {
try {
return doQuery(query, parameters, handler);
} catch (Exception ex) {
cluster.close();
client.close();
cluster = buildCluster(context);
client = cluster.connect();
return doQuery(query, parameters, handler);
}
}
@Override
public String getTransitUrl() {
return transitUrl;
}
}

View File

@ -0,0 +1,526 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.graph;
import groovy.lang.Binding;
import groovy.lang.GroovyClassLoader;
import groovy.lang.GroovyShell;
import groovy.lang.Script;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.JdkSslContext;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.graph.gremlin.SimpleEntry;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
import org.apache.tinkerpop.gremlin.driver.Client;
import org.apache.tinkerpop.gremlin.driver.Cluster;
import org.apache.tinkerpop.gremlin.driver.Result;
import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection;
import org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Tags({"graph", "gremlin"})
@CapabilityDescription("This service interacts with a tinkerpop-compliant graph service, providing both script submission and bytecode submission capabilities. " +
"Script submission is the default, with the script command being sent to the gremlin server as text. This should only be used for simple interactions with a tinkerpop-compliant server " +
"such as counts or other operations that do not require the injection of custom classed. " +
"Bytecode submission allows much more flexibility. When providing a jar, custom serializers can be used and pre-compiled graph logic can be utilized by groovy scripts" +
"provided by processors such as the ExecuteGraphQueryRecord.")
@RequiresInstanceClassLoading
public class TinkerpopClientService extends AbstractControllerService implements GraphClientService {
public static final String NOT_SUPPORTED = "NOT_SUPPORTED";
private static final AllowableValue BYTECODE_SUBMISSION = new AllowableValue("bytecode-submission", "ByteCode Submission",
"Groovy scripts are compiled within NiFi, with the GraphTraversalSource injected as a variable 'g'. Effectively allowing " +
"your logic to directly manipulates the graph without string serialization overheard."
);
private static final AllowableValue SCRIPT_SUBMISSION = new AllowableValue("script-submission", "Script Submission",
"Script is sent to the gremlin server as a submission. Similar to a rest request. "
);
private static final AllowableValue YAML_SETTINGS = new AllowableValue("yaml-settings", "Yaml Settings",
"Connection to the gremlin server will be specified via a YAML file (very flexible)");
private static final AllowableValue SERVICE_SETTINGS = new AllowableValue("service-settings", "Service-Defined Settings",
"Connection to the gremlin server will be specified via values on this controller (simpler). " +
"Only recommended for testing and development with a simple grpah instance. ");
public static final PropertyDescriptor SUBMISSION_TYPE = new PropertyDescriptor.Builder()
.name("submission-type")
.displayName("Script Submission Type")
.description("A selection that toggles for between script submission or as bytecode submission")
.allowableValues(SCRIPT_SUBMISSION, BYTECODE_SUBMISSION)
.defaultValue("script-submission")
.required(true)
.build();
public static final PropertyDescriptor CONNECTION_SETTINGS = new PropertyDescriptor.Builder()
.name("connection-settings")
.displayName("Settings Specification")
.description("Selecting \"Service-Defined Settings\" connects using the setting on this service. Selecting \"Yaml Settings\" uses the specified YAML file for connection settings. ")
.allowableValues(SERVICE_SETTINGS, YAML_SETTINGS)
.defaultValue("service-settings")
.required(true)
.build();
public static final PropertyDescriptor CONTACT_POINTS = new PropertyDescriptor.Builder()
.name("tinkerpop-contact-points")
.displayName("Contact Points")
.description("A comma-separated list of hostnames or IP addresses where an Gremlin-enabled server can be found.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.dependsOn(CONNECTION_SETTINGS, SERVICE_SETTINGS)
.build();
public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
.name("tinkerpop-port")
.displayName("Port")
.description("The port where Gremlin Server is running on each host listed as a contact point.")
.required(true)
.defaultValue("8182")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.dependsOn(CONNECTION_SETTINGS, SERVICE_SETTINGS)
.build();
public static final PropertyDescriptor PATH = new PropertyDescriptor.Builder()
.name("tinkerpop-path")
.displayName("Path")
.description("The URL path where Gremlin Server is running on each host listed as a contact point.")
.required(true)
.defaultValue("/gremlin")
.addValidator(Validator.VALID)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.dependsOn(CONNECTION_SETTINGS, SERVICE_SETTINGS)
.build();
public static final PropertyDescriptor TRAVERSAL_SOURCE_NAME = new PropertyDescriptor.Builder()
.name("gremlin-traversal-source-name")
.displayName("Traversal Source Name")
.description("An optional property that lets you set the name of the remote traversal instance. " +
"This can be really important when working with databases like JanusGraph that support " +
"multiple backend traversal configurations simultaneously.")
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(Validator.VALID)
.build();
public static final PropertyDescriptor REMOTE_OBJECTS_FILE = new PropertyDescriptor.Builder()
.name("remote-objects-file")
.displayName("Remote Objects File")
.description("The remote-objects file YAML used for connecting to the gremlin server.")
.required(true)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.dependsOn(CONNECTION_SETTINGS, YAML_SETTINGS)
.build();
public static final PropertyDescriptor USER_NAME = new PropertyDescriptor.Builder()
.name("user-name")
.displayName("Username")
.description("The username used to authenticate with the gremlin server." +
" Note: when using a remote.yaml file, this username value (if set) will overload any " +
"username set in the YAML file.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
.name("password")
.displayName("Password")
.description("The password used to authenticate with the gremlin server." +
" Note: when using a remote.yaml file, this password setting (if set) will override any " +
"password set in the YAML file")
.required(false)
.sensitive(true)
.build();
public static final PropertyDescriptor EXTRA_RESOURCE = new PropertyDescriptor.Builder()
.name("extension")
.displayName("Extension JARs")
.description("A comma-separated list of Java JAR files to be loaded. This should contain any Serializers or other " +
"classes specified in the YAML file. Additionally, any custom classes required for the groovy script to " +
"work in the bytecode submission setting should also be contained in these JAR files.")
.dependsOn(CONNECTION_SETTINGS, YAML_SETTINGS)
.defaultValue(null)
.identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, ResourceType.DIRECTORY, ResourceType.URL)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.dynamicallyModifiesClasspath(true)
.build();
public static final PropertyDescriptor EXTENSION_CLASSES = new PropertyDescriptor.Builder()
.name("extension-classes")
.displayName("Extension Classes")
.addValidator(Validator.VALID)
.description("A comma-separated list of fully qualified Java class names that correspond to classes to implement. This " +
"is useful for services such as JanusGraph that need specific serialization classes. " +
"This configuration property has no effect unless a value for the Extension JAR field is " +
"also provided.")
.dependsOn(EXTRA_RESOURCE)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.dependsOn(CONNECTION_SETTINGS, YAML_SETTINGS)
.required(false)
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("ssl-context-service")
.displayName("SSL Context Service")
.description("The SSL Context Service used to provide client certificate information for TLS/SSL "
+ "connections.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
public static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
SUBMISSION_TYPE,
CONNECTION_SETTINGS,
REMOTE_OBJECTS_FILE,
EXTRA_RESOURCE,
EXTENSION_CLASSES,
CONTACT_POINTS,
PORT,
PATH,
TRAVERSAL_SOURCE_NAME,
USER_NAME,
PASSWORD,
SSL_CONTEXT_SERVICE
));
private GroovyShell groovyShell;
private Map<String, Script> compiledCode;
protected Cluster cluster;
private String traversalSourceName;
private GraphTraversalSource traversalSource;
private boolean scriptSubmission = true;
boolean usesSSL;
protected String transitUrl;
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws InitializationException {
loadClasses(context);
GroovyClassLoader loader = new GroovyClassLoader(this.getClass().getClassLoader());
groovyShell = new GroovyShell(loader);
compiledCode = new ConcurrentHashMap<>();
if (context.getProperty(TRAVERSAL_SOURCE_NAME).isSet()) {
traversalSourceName = context.getProperty(TRAVERSAL_SOURCE_NAME).evaluateAttributeExpressions()
.getValue();
}
scriptSubmission = context.getProperty(SUBMISSION_TYPE).getValue().equals(SCRIPT_SUBMISSION.getValue());
cluster = buildCluster(context);
}
@OnDisabled
public void shutdown() {
try {
compiledCode = null;
if (traversalSource != null) {
traversalSource.close();
}
} catch (Exception e) {
throw new ProcessException(e);
} finally {
if (cluster != null) {
cluster.close();
}
cluster = null;
traversalSource = null;
}
}
@Override
public Map<String, String> executeQuery(String s, Map<String, Object> map, GraphQueryResultCallback graphQueryResultCallback) {
try {
if (scriptSubmission) {
return scriptSubmission(s, map, graphQueryResultCallback);
} else {
return bytecodeSubmission(s, map, graphQueryResultCallback);
}
} catch (Exception ex) {
throw new ProcessException(ex);
}
}
@Override
public String getTransitUrl() {
return this.transitUrl;
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
}
@Override
public Collection<ValidationResult> customValidate(ValidationContext context) {
Collection<ValidationResult> results = new ArrayList<>();
boolean jarsIsSet = !StringUtils.isEmpty(context.getProperty(EXTRA_RESOURCE).getValue());
boolean clzIsSet = !StringUtils.isEmpty(context.getProperty(EXTENSION_CLASSES).getValue());
if (jarsIsSet && clzIsSet) {
try {
final ClassLoader loader = ClassLoaderUtils
.getCustomClassLoader(context.getProperty(EXTRA_RESOURCE).getValue(), this.getClass().getClassLoader(), null);
String[] classes = context.getProperty(EXTENSION_CLASSES).evaluateAttributeExpressions().getValue().split(",[\\s]*");
for (String clz : classes) {
Class.forName(clz, true, loader);
}
} catch (Exception ex) {
results.add(new ValidationResult.Builder().subject(EXTENSION_CLASSES.getDisplayName()).valid(false).explanation(ex.toString()).build());
}
}
if (context.getProperty(USER_NAME).isSet() && !context.getProperty(PASSWORD).isSet()) {
results.add(new ValidationResult.Builder()
.explanation("When specifying a username, the password must also be set").valid(false).build()
);
}
if (context.getProperty(PASSWORD).isSet() && !context.getProperty(USER_NAME).isSet()) {
results.add(new ValidationResult.Builder()
.explanation("When specifying a password, the password must also be set").valid(false).build()
);
}
return results;
}
protected Cluster.Builder setupSSL(ConfigurationContext context, Cluster.Builder builder) {
if (context.getProperty(SSL_CONTEXT_SERVICE).isSet()) {
SSLContextService service = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
builder
.enableSsl(true)
.sslContext(new JdkSslContext(service.createContext(), true, ClientAuth.NONE));
usesSSL = true;
}
return builder;
}
public void loadClasses(ConfigurationContext context) {
String path = context.getProperty(EXTRA_RESOURCE).getValue();
String classList = context.getProperty(EXTENSION_CLASSES).getValue();
if (path != null && classList != null && !path.isEmpty() && !classList.isEmpty()) {
try {
ClassLoader loader = ClassLoaderUtils.getCustomClassLoader(path, this.getClass().getClassLoader(), null);
String[] classes = context.getProperty(EXTENSION_CLASSES).evaluateAttributeExpressions().getValue().split(",[\\s]*");
for (String cls : classes) {
Class<?> clz = Class.forName(cls.trim(), true, loader);
if (getLogger().isDebugEnabled()) {
getLogger().debug(clz.getName());
}
}
} catch (Exception e) {
throw new ProcessException(e);
}
}
}
protected Cluster buildCluster(ConfigurationContext context) {
Cluster.Builder builder = Cluster.build();
List<String> hosts = new ArrayList<>();
if (!context.getProperty(REMOTE_OBJECTS_FILE).isSet()) {
String contactProp = context.getProperty(CONTACT_POINTS).evaluateAttributeExpressions().getValue();
int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
String path = context.getProperty(PATH).evaluateAttributeExpressions().getValue();
String[] contactPoints = contactProp.split(",[\\s]*");
for (String contactPoint : contactPoints) {
builder.addContactPoint(contactPoint.trim());
hosts.add(contactPoint.trim());
}
builder.port(port);
if (path != null && !path.isEmpty()) {
builder.path(path);
}
} else {
//ToDo: there is a bug in getting the hostname from the builder, therefore when doing
// bytecode submission, the transitUrl ends up being effectively useless. Need to extract it
// from the yaml to get this to work as expected.
File yamlFile = new File(context.getProperty(REMOTE_OBJECTS_FILE).evaluateAttributeExpressions().getValue());
try {
builder = Cluster.build(yamlFile);
} catch (Exception ex) {
throw new ProcessException(ex);
}
}
builder = setupSSL(context, builder);
if (context.getProperty(USER_NAME).isSet() && context.getProperty(PASSWORD).isSet()) {
String username = context.getProperty(USER_NAME).evaluateAttributeExpressions().getValue();
String password = context.getProperty(PASSWORD).getValue();
builder.credentials(username, password);
}
Cluster cluster = builder.create();
transitUrl = String.format("gremlin%s://%s:%s%s", usesSSL ? "+ssl" : "",
String.join(",", hosts), cluster.getPort(), cluster.getPath());
return cluster;
}
protected Map<String, String> scriptSubmission(String query, Map<String, Object> parameters, GraphQueryResultCallback handler) {
try {
Client client = cluster.connect();
Iterator<Result> iterator = client.submit(query, parameters).iterator();
long count = 0;
while (iterator.hasNext()) {
Result result = iterator.next();
Object obj = result.getObject();
if (obj instanceof Map) {
handler.process((Map)obj, iterator.hasNext());
} else {
handler.process(new HashMap<>() {{
put("result", obj);
}}, iterator.hasNext());
}
count++;
}
Map<String, String> resultAttributes = new HashMap<>();
resultAttributes.put(NODES_CREATED, NOT_SUPPORTED);
resultAttributes.put(RELATIONS_CREATED, NOT_SUPPORTED);
resultAttributes.put(LABELS_ADDED, NOT_SUPPORTED);
resultAttributes.put(NODES_DELETED, NOT_SUPPORTED);
resultAttributes.put(RELATIONS_DELETED, NOT_SUPPORTED);
resultAttributes.put(PROPERTIES_SET, NOT_SUPPORTED);
resultAttributes.put(ROWS_RETURNED, String.valueOf(count));
return resultAttributes;
} catch (Exception ex) {
throw new ProcessException(ex);
}
}
protected Map<String, String> bytecodeSubmission(String s, Map<String, Object> map, GraphQueryResultCallback graphQueryResultCallback) {
String hash = DigestUtils.sha256Hex(s);
Script compiled;
if (this.traversalSource == null) {
this.traversalSource = createTraversal();
}
int rowsReturned = 0;
if (compiledCode.containsKey(hash)) {
compiled = compiledCode.get(hash);
} else {
compiled = groovyShell.parse(s);
compiledCode.put(s, compiled);
}
if (getLogger().isDebugEnabled()) {
getLogger().debug(map.toString());
}
Binding bindings = new Binding();
map.forEach(bindings::setProperty);
bindings.setProperty("g", traversalSource);
bindings.setProperty("log", getLogger());
try {
compiled.setBinding(bindings);
Object result = compiled.run();
if (result instanceof Map) {
Map<String, Object> resultMap = (Map<String, Object>) result;
if (!resultMap.isEmpty()) {
Iterator outerResultSet = resultMap.entrySet().iterator();
while (outerResultSet.hasNext()) {
Map.Entry<String, Object> innerResultSet = (Map.Entry<String, Object>) outerResultSet.next();
if (innerResultSet.getValue() instanceof Map) {
Iterator resultSet = ((Map) innerResultSet.getValue()).entrySet().iterator();
while (resultSet.hasNext()) {
Map.Entry<String, Object> tempResult = (Map.Entry<String, Object>) resultSet.next();
Map<String, Object> tempRetObject = new HashMap<>();
tempRetObject.put(tempResult.getKey(), tempResult.getValue());
SimpleEntry returnObject = new SimpleEntry<String, Object>(tempResult.getKey(), tempRetObject);
Map<String, Object> resultReturnMap = new HashMap<>();
resultReturnMap.put(innerResultSet.getKey(), returnObject);
if (getLogger().isDebugEnabled()) {
getLogger().debug(resultReturnMap.toString());
}
graphQueryResultCallback.process(resultReturnMap, resultSet.hasNext());
}
} else {
Map<String, Object> resultReturnMap = new HashMap<>();
resultReturnMap.put(innerResultSet.getKey(), innerResultSet.getValue());
graphQueryResultCallback.process(resultReturnMap, false);
}
rowsReturned++;
}
}
}
} catch (Exception e) {
throw new ProcessException(e);
}
Map<String, String> resultAttributes = new HashMap<>();
resultAttributes.put(ROWS_RETURNED, String.valueOf(rowsReturned));
return resultAttributes;
}
protected GraphTraversalSource createTraversal() {
GraphTraversalSource traversal;
try {
if (StringUtils.isEmpty(traversalSourceName)) {
traversal = AnonymousTraversalSource.traversal().withRemote(DriverRemoteConnection.using(cluster));
} else {
traversal = AnonymousTraversalSource.traversal().withRemote(DriverRemoteConnection.using(cluster, traversalSourceName));
}
} catch (Exception e) {
throw new ProcessException(e);
}
return traversal;
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.graph.gremlin;
import java.util.Map;
public class SimpleEntry<K, V> implements Map.Entry<K, V> {
private final K key;
private V value;
public SimpleEntry(K key, V value) {
this.key = key;
this.value = value;
}
@Override
public K getKey() {
return key;
}
@Override
public V getValue() {
return value;
}
@Override
public V setValue(V value) {
V old = this.value;
this.value = value;
return old;
}
@Override
public String toString() {
return String.format("%s:%s", this.key, this.value);
}
}

View File

@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.graph.GremlinClientService
org.apache.nifi.graph.TinkerpopClientService

View File

@ -1,54 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>GremlinClientService</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<!-- Processor Documentation ================================================== -->
<h2>Description:</h2>
<p>
This client service configures a connection to a Gremlin Server and allows Gremlin queries to be executed against
the Gremlin Server. For more information on Gremlin and Gremlin Server, see the <a href="http://tinkerpop.apache.org/">Apache Tinkerpop</a> project.
</p>
<h2>Warning for New Users</h2>
<p>
A common issue when creating Gremlin scripts for first time users is to accidentally return an unserializable object. Gremlin
is a Groovy DSL and so it behaves like compiled Groovy including returning the last statement in the script. This is an example
of a Gremlin script that could cause unexpected failures:
</p>
<pre>
g.V().hasLabel("person").has("name", "John Smith").valueMap()
</pre>
<p>
The <em>valueMap()</em> step is not directly serializable and will fail. To fix that you have two potential options:
</p>
<pre>
//Return a Map
g.V().hasLabel("person").has("name", "John Smith").valueMap().next()
</pre>
<p>
Alternative:
</p>
<pre>
g.V().hasLabel("person").has("name", "John Smith").valueMap()
true //Return boolean literal
</pre>
</body>
</html>

View File

@ -0,0 +1,93 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>GremlinClientService</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<!-- Processor Documentation ================================================== -->
<h2>Description:</h2>
<p>
This client service configures a connection to a Gremlin Server and allows Gremlin queries to be executed against
the Gremlin Server. For more information on Gremlin and Gremlin Server, see the <a href="http://tinkerpop.apache.org/">Apache Tinkerpop</a> project.
</p>
<p>
This client service supports two differnt modes of operation: Script Submission and Bytecode Submission, described below.
</p>
<h2>Script Submission</h2>
<p>
Script submission is the default way to interact with the gremlin server. This takes the input script and uses <a href="https://tinkerpop.apache.org/docs/current/reference/#gremlin-go-scripts">Script Submission</a>
to interact with the gremlin server. Because the script is shipped to the gremlin server as a string, only simple queries are recommended (count, path, etc)
as there are no complex serializers available in this operation. This also means that NiFi will not be opinionated about what is returned, whatever the response from
the tinkerpop server is, the response will be deserialized assuming common Java types. In the case of a Map return, the values
will be returned as a record in the FlowFile response, in all other cases, the return of the query will be coerced into a
Map with key "result" and value being the result of your script submission for that specific response.
</p>
<h3>Serialization Issues in Script Submission</h3>
<p>
A common issue when creating Gremlin scripts for first time users is to accidentally return an unserializable object. Gremlin
is a Groovy DSL and so it behaves like compiled Groovy including returning the last statement in the script. This is an example
of a Gremlin script that could cause unexpected failures:
</p>
<pre>
g.V().hasLabel("person").has("name", "John Smith").valueMap()
</pre>
<p>
The <em>valueMap()</em> step is not directly serializable and will fail. To fix that you have two potential options:
</p>
<pre>
//Return a Map
g.V().hasLabel("person").has("name", "John Smith").valueMap().next()
</pre>
<p>
Alternative:
</p>
<pre>
g.V().hasLabel("person").has("name", "John Smith").valueMap()
true //Return boolean literal
</pre>
<h2>Bytecode Submission</h2>
<p>
Bytecode submission is the more flexible of the two submission method and will be much more performant in a production
system. When combined with the Yaml connection settings and a custom jar, very complex graph queries can be run directly
within the NiFi JVM, leveraging custom serializers to decrease serialization overhead.
</p>
<p>
Instead of submitting a script to the gremlin server, requiring string serialization on both sides of the string result
set, the groovy script is compiled within the NiFi JVM. This compiled script has the bindings of g (the GraphTraversalSource)
and log (the NiFi logger) injected into the compiled code. Utilizing g, your result set is contained within NiFi and serialization
should take care of the overhead of your responses drastically decreasing the likelihood of serialization errors.
</p>
<p>
As the result returned cannot be known by NiFi to be a specific type, your groovy script <b>must</b> rerun a Map&lt;String, Object&gt;,
otherwise the response will be ignored. Here is an example:
</p>
<pre>
Object results = g.V().hasLabel("person").has("name", "John Smith").valueMap().collect()
[result: results]
</pre>
<p>
This will break up your response objects into an array within your result key, allowing further processing within nifi
if necessary.
</p>
</body>
</html>

View File

@ -33,21 +33,25 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
/*
* As of JanusGraph 0.3.X these tests can be a little inconsistent for a few runs at first.
*/
public class GremlinClientServiceIT {
public class GremlinClientServiceControllerSettingsIT {
private TestRunner runner;
private TestableGremlinClientService clientService;
@BeforeEach
public void setup() throws Exception {
clientService = new TestableGremlinClientService();
runner = TestRunners.newTestRunner(NoOpProcessor.class);
runner.addControllerService("gremlinService", clientService);
runner.setProperty(clientService, AbstractTinkerpopClientService.CONTACT_POINTS, "localhost");
runner.setProperty(clientService, TinkerpopClientService.CONTACT_POINTS, "localhost");
runner.setProperty(clientService, TinkerpopClientService.PORT, "8182");
runner.enableControllerService(clientService);
runner.assertValid();
String teardown = IOUtils.toString(getClass().getResourceAsStream("/teardown.gremlin"), "UTF-8");
clientService.getCluster().connect().submit(teardown);
String setup = IOUtils.toString(getClass().getResourceAsStream("/setup.gremlin"), "UTF-8");
clientService.getClient().submit(setup);
clientService.getCluster().connect().submit(setup);
assertEquals("gremlin://localhost:8182/gremlin", clientService.getTransitUrl());
}
@ -55,7 +59,7 @@ public class GremlinClientServiceIT {
@AfterEach
public void tearDown() throws Exception {
String teardown = IOUtils.toString(getClass().getResourceAsStream("/teardown.gremlin"), "UTF-8");
clientService.getClient().submit(teardown);
clientService.getCluster().connect().submit(teardown);
}
@Test
@ -74,4 +78,4 @@ public class GremlinClientServiceIT {
Map<String, String> result = clientService.executeQuery(gremlin, new HashMap<>(), (record, isMore) -> integer.incrementAndGet());
assertEquals(1, integer.get());
}
}
}

View File

@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.graph;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.jupiter.api.Assertions.assertEquals;
/*
* Note: this integration test does not work out of the box
* because nifi needs to understand/load specific serializers
* to use when connecting via yaml. If using the yaml in the
* resources folder, please update line 54 with the path to a jar
* That contains org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV1
* gremlin-util generally has this available
*/
@Testcontainers
public class GremlinClientServiceYamlSettingsAndBytecodeIT {
private TestRunner runner;
private TestableGremlinClientService clientService;
@BeforeEach
public void setup() throws Exception {
String remoteYamlFile = "src/test/resources/gremlin.yml";
String customJarFile = "src/test/resources/gremlin-util-3.7.0.jar";
clientService = new TestableGremlinClientService();
runner = TestRunners.newTestRunner(NoOpProcessor.class);
runner.addControllerService("gremlinService", clientService);
runner.setProperty(clientService, TinkerpopClientService.CONNECTION_SETTINGS, "yaml-settings");
runner.setProperty(clientService, TinkerpopClientService.SUBMISSION_TYPE, "bytecode-submission");
runner.setProperty(clientService, TinkerpopClientService.REMOTE_OBJECTS_FILE, remoteYamlFile);
runner.setProperty(clientService, TinkerpopClientService.EXTRA_RESOURCE, customJarFile);
/*Note: This does not seem to have much of an effect. As long as EXTRA_RESOURCE has the
classes of interest, both the gremlin driver and ExecuteGraphQueryRecord will properly execute.
However, something like this will be needed if we ever move away from groovy.
*/
// runner.setProperty(clientService, TinkerpopClientService.EXTENSION_CLASSES, "org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV1");
runner.enableControllerService(clientService);
runner.assertValid();
String teardown = IOUtils.toString(getClass().getResourceAsStream("/teardown.gremlin"), "UTF-8");
clientService.getCluster().connect().submit(teardown);
String setup = IOUtils.toString(getClass().getResourceAsStream("/setup.gremlin"), "UTF-8");
clientService.getCluster().connect().submit(setup);
}
@AfterEach
public void tearDown() throws Exception {
String teardown = IOUtils.toString(getClass().getResourceAsStream("/teardown.gremlin"), "UTF-8");
clientService.getCluster().connect().submit(teardown);
}
@Test
public void testValueMap() {
String gremlin = "[result: g.V().hasLabel('dog').valueMap().collect()]";
AtomicInteger integer = new AtomicInteger();
Map<String, String> result = clientService.executeQuery(gremlin, new HashMap<>(), (record, isMore) -> {
Assertions.assertTrue(record.containsKey("result"));
Assertions.assertTrue(record.get("result") instanceof List);
((List) record.get("result")).forEach(it -> {
integer.incrementAndGet();
});
});
}
@Test
public void testCount() {
String gremlin = "[result: g.V().hasLabel('dog').count().next()]";
AtomicLong dogCount = new AtomicLong();
Map<String, String> result = clientService.executeQuery(gremlin, new HashMap<>(), (record, isMore) -> {
Assertions.assertTrue(record.containsKey("result"));
dogCount.set((Long) record.get("result"));
});
assertEquals(2, dogCount.get());
}
@Test
public void testSubGraph() {
String gremlin = "[dogInE: g.V().hasLabel('dog').inE().count().next(), dogOutE: g.V().hasLabel('dog').outE().count().next(), " +
"dogProps: g.V().hasLabel('dog').valueMap().collect()]";
List<Map<String, Object>> recordSet = new ArrayList<>();
Map<String, String> result = clientService.executeQuery(gremlin, new HashMap<>(), (record, isMore) -> {
recordSet.add(record);
});
Assertions.assertEquals(3, recordSet.size());
List<Map<String, Object>> dogInE = recordSet.stream().filter(it -> it.containsKey("dogInE")).toList();
List<Map<String, Object>> dogOutE = recordSet.stream().filter(it -> it.containsKey("dogOutE")).toList();
List<Map<String, Object>> dogProps = recordSet.stream().filter(it -> it.containsKey("dogProps")).toList();
Assertions.assertFalse(dogInE.isEmpty());
Assertions.assertFalse(dogOutE.isEmpty());
Assertions.assertFalse(dogProps.isEmpty());
Assertions.assertEquals(2, (Long) dogOutE.get(0).get("dogOutE"));
Assertions.assertEquals(4, (Long) dogInE.get(0).get("dogInE"));
Assertions.assertTrue(dogProps.get(0).get("dogProps") instanceof List);
Map<String, Object> dogPropsMap = (Map) ((List<?>) dogProps.get(0).get("dogProps")).get(0);
Assertions.assertTrue(dogPropsMap.containsKey("name"));
Assertions.assertTrue(dogPropsMap.containsKey("age"));
}
}

View File

@ -17,10 +17,10 @@
package org.apache.nifi.graph;
import org.apache.tinkerpop.gremlin.driver.Client;
import org.apache.tinkerpop.gremlin.driver.Cluster;
public class TestableGremlinClientService extends GremlinClientService {
public Client getClient() {
return client;
public class TestableGremlinClientService extends TinkerpopClientService {
public Cluster getCluster() {
return cluster;
}
}

View File

@ -0,0 +1,19 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
hosts: [localhost]
port: 8182
serializer: { className: org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV1,
config: { serializeResultToString: false }}