NIFI-7293 Add in-memory janusgraph implementation of GraphClientService to help with live testing.

Added new in memory janus graph client for testing.
Added integration test to ExecuteGraphQuery.

NIFI-7293 Added missing getter.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #4168
This commit is contained in:
Mike Thomsen 2020-03-28 07:07:42 -04:00 committed by Matthew Burgess
parent daddf400a2
commit 4a2a91135e
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
6 changed files with 302 additions and 0 deletions

View File

@ -72,5 +72,11 @@
<version>1.12.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-graph-test-clients</artifactId>
<version>1.12.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.graph;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.graph.InMemoryJanusGraphClientService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
public class ExecuteGraphQueryIT {
TestRunner runner;
public static final String QUERY = "0.upto(9) {\n" +
"g.addV(\"test\").property(\"uuid\", UUID.randomUUID().toString()).next()\n" +
"}\n" +
"g.V().hasLabel(\"test\").count().next()";
@Before
public void setUp() throws Exception {
InMemoryJanusGraphClientService service = new InMemoryJanusGraphClientService();
runner = TestRunners.newTestRunner(ExecuteGraphQuery.class);
runner.addControllerService("clientService", service);
runner.enableControllerService(service);
runner.setProperty(AbstractGraphExecutor.CLIENT_SERVICE, "clientService");
runner.setProperty(AbstractGraphExecutor.QUERY, QUERY);
}
@Test
public void test() throws Exception {
runner.run();
runner.assertTransferCount(ExecuteGraphQuery.REL_FAILURE, 0);
runner.assertTransferCount(ExecuteGraphQuery.REL_ORIGINAL, 0);
runner.assertTransferCount(ExecuteGraphQuery.REL_SUCCESS, 1);
List<MockFlowFile> flowFileList = runner.getFlowFilesForRelationship(ExecuteGraphQuery.REL_SUCCESS);
MockFlowFile ff = flowFileList.get(0);
byte[] raw = runner.getContentAsByteArray(ff);
String str = new String(raw);
List<Map<String, Object>> result = new ObjectMapper().readValue(str, List.class);
assertEquals(1, result.size());
assertEquals(10, result.get(0).get("result"));
}
}

View File

@ -0,0 +1,67 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-graph-bundle</artifactId>
<version>1.12.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-graph-test-clients</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>1.12.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-graph-client-service-api</artifactId>
<version>1.12.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.janusgraph</groupId>
<artifactId>janusgraph-core</artifactId>
<version>0.4.1</version>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy</artifactId>
<version>2.5.4</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.graph;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.janusgraph.core.JanusGraphFactory;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
import java.util.HashMap;
import java.util.Map;
/**
* This is an in memory implementation of the GraphClientService using JanusGraph. It should in no way be confused for a
* production-ready capability. It is intended to provide a fairly realistic environment for testing Gremlin script
* submission.
*/
public class InMemoryJanusGraphClientService extends AbstractControllerService implements GraphClientService {
private Graph graph;
@OnEnabled
public void onEnabled(ConfigurationContext context) {
graph = JanusGraphFactory.build().set("storage.backend", "inmemory").open();
}
/**
* Execute the query.
*
* This instantiate a new script engine every time to ensure a pristine environment for testing.
*
* @param query A gremlin query (Groovy syntax)
* @param parameters A map of parameters to be injected into the script. This can be structured the way you would
* expect a REST API call to Gremlin Server.
* @param handler The callback for parsing the rsponse.
* @return Empty map. This API feature is only filled with values typically when dealing with Cypher clients, Neo4J
* in particular.
*/
@Override
public Map<String, String> executeQuery(String query, Map<String, Object> parameters, GraphQueryResultCallback handler) {
ScriptEngine engine = new ScriptEngineManager().getEngineByName("groovy");
parameters.entrySet().forEach(entry -> engine.put(entry.getKey(), entry.getValue()));
engine.put("graph", graph);
engine.put("g", graph.traversal());
try {
Object response = engine.eval(query);
if (response instanceof Map) {
Map resp = (Map) response;
Map<String, Object> result = new HashMap<>();
result.put("result", resp.entrySet().iterator().next());
handler.process(result,false);
} else {
Map<String, Object> result = new HashMap<>();
result.put("result", response);
handler.process(result, false);
}
return new HashMap<>();
} catch (Exception ex) {
throw new ProcessException(ex);
}
}
@Override
public String getTransitUrl() {
return "janusgraph:memory://localhost";
}
/**
* Getter for accessing the generated JanusGraph object once the client service is activated in a test.
* The purpose of this is to allow testers to get access to the graph so they can do things like run traversals
* on it.
*
* @return Tinkerpop Graph object representing the in memory graph database.
*/
public Graph getGraph() {
return graph;
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.graph
import org.apache.nifi.processor.AbstractProcessor
import org.apache.nifi.processor.ProcessContext
import org.apache.nifi.processor.ProcessSession
import org.apache.nifi.processor.exception.ProcessException
import org.apache.nifi.util.TestRunners
import org.junit.Test
class InMemoryJanusGraphClientServiceTest {
@Test
void test() {
def runner = TestRunners.newTestRunner(new AbstractProcessor() {
@Override
void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
}
})
def service = new InMemoryJanusGraphClientService()
runner.addControllerService("service", service)
runner.enableControllerService(service)
def create = """
1.upto(10) {
g.addV("test").property("uuid", UUID.randomUUID().toString()).next()
}
"""
service.executeQuery(create, [:], { record, more ->
assert !more
} as GraphQueryResultCallback)
def query = """
g.V().hasLabel("test").count().next()
"""
def executed = false
service.executeQuery(query, [:], { record, more ->
assert record["result"] == 10
executed = true
} as GraphQueryResultCallback)
assert executed
}
}

View File

@ -26,6 +26,7 @@
<packaging>pom</packaging>
<modules>
<module>nifi-graph-test-clients</module>
<module>nifi-graph-client-service-api</module>
<module>nifi-graph-client-service-api-nar</module>
<module>nifi-graph-processors</module>