mirror of https://github.com/apache/nifi.git
NIFI-7906: parameterized graph query
NIFI-7906: addressing PR concerns NIFI-7906: code styling fixes NIFI-7906: adding in license information to new files + enables processor in META-INF NIFI-7906: exclude test files from RAT NIFI-7906: PR refactor to streamline graph response NIFI-7906: removing ERRORS output Unused after refactor Did a few cleanups for the contributor. This closes #4638 Signed-off-by: Mike Thomsen <mthomsen@apache.org>
This commit is contained in:
parent
64e3599f05
commit
c29cced269
|
@ -78,5 +78,60 @@
|
|||
<version>1.13.0-SNAPSHOT</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-record-path</artifactId>
|
||||
<version>1.13.0-SNAPSHOT</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-record-serialization-service-api</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-json-utils</artifactId>
|
||||
<version>1.13.0-SNAPSHOT</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-record-serialization-service-api</artifactId>
|
||||
<version>1.13.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-record-serialization-services</artifactId>
|
||||
<version>1.13.0-SNAPSHOT</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mock-record-utils</artifactId>
|
||||
<version>1.13.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-schema-registry-service-api</artifactId>
|
||||
<version>1.13.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.rat</groupId>
|
||||
<artifactId>apache-rat-plugin</artifactId>
|
||||
<configuration>
|
||||
<excludes combine.children="append">
|
||||
<exclude>src/test/resources/testAttributes.json</exclude>
|
||||
<exclude>src/test/resources/testComplexFlowFile.json</exclude>
|
||||
<exclude>src/test/resources/testFlowFileContent.json</exclude>
|
||||
<exclude>src/test/resources/testFlowFileList.json</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
|
|
|
@ -0,0 +1,270 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.graph;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.Validator;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.graph.GraphClientService;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.record.path.FieldValue;
|
||||
import org.apache.nifi.record.path.RecordPath;
|
||||
import org.apache.nifi.record.path.RecordPathResult;
|
||||
import org.apache.nifi.record.path.util.RecordPathCache;
|
||||
import org.apache.nifi.serialization.RecordReader;
|
||||
import org.apache.nifi.serialization.RecordReaderFactory;
|
||||
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
import java.util.Set;
|
||||
import java.util.HashSet;
|
||||
import java.util.Arrays;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Tags({"graph, gremlin"})
|
||||
@CapabilityDescription("This uses a flowfile as input to perform graph mutations.")
|
||||
@WritesAttributes({
|
||||
@WritesAttribute(attribute = ExecuteGraphQueryRecord.GRAPH_OPERATION_TIME, description = "The amount of time it took to execute all of the graph operations."),
|
||||
@WritesAttribute(attribute = ExecuteGraphQueryRecord.RECORD_COUNT, description = "The amount of record processed")
|
||||
})
|
||||
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
|
||||
@DynamicProperty(name = "A dynamic property to be used as a parameter in the graph script",
|
||||
value = "The variable name to be set", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
|
||||
description = "Uses a record path to set a variable as a parameter in the graph script")
|
||||
public class ExecuteGraphQueryRecord extends AbstractGraphExecutor {
|
||||
|
||||
public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("client-service")
|
||||
.displayName("Client Service")
|
||||
.description("The graph client service for connecting to a graph database.")
|
||||
.identifiesControllerService(GraphClientService.class)
|
||||
.addValidator(Validator.VALID)
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor READER_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("reader-service")
|
||||
.displayName("Record Reader")
|
||||
.description("The record reader to use with this processor.")
|
||||
.identifiesControllerService(RecordReaderFactory.class)
|
||||
.required(true)
|
||||
.addValidator(Validator.VALID)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor WRITER_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("writer-service")
|
||||
.displayName("Failed Record Writer")
|
||||
.description("The record writer to use for writing failed records.")
|
||||
.identifiesControllerService(RecordSetWriterFactory.class)
|
||||
.required(true)
|
||||
.addValidator(Validator.VALID)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor SUBMISSION_SCRIPT = new PropertyDescriptor.Builder()
|
||||
.name("record-script")
|
||||
.displayName("Graph Record Script")
|
||||
.description("Script to perform the business logic on graph, using flow file attributes and custom properties " +
|
||||
"as variable-value pairs in its logic.")
|
||||
.required(true)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
|
||||
.build();
|
||||
|
||||
@Override
|
||||
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
|
||||
return new PropertyDescriptor.Builder()
|
||||
.name(propertyDescriptorName)
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.dynamic(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
|
||||
}
|
||||
|
||||
public static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
|
||||
CLIENT_SERVICE, READER_SERVICE, WRITER_SERVICE, SUBMISSION_SCRIPT
|
||||
));
|
||||
|
||||
public static final Relationship SUCCESS = new Relationship.Builder().name("original")
|
||||
.description("Original flow files that successfully interacted with " +
|
||||
"graph server.")
|
||||
.build();
|
||||
public static final Relationship FAILURE = new Relationship.Builder().name("failure")
|
||||
.description("Flow files that fail to interact with graph server.")
|
||||
.build();
|
||||
public static final Relationship GRAPH = new Relationship.Builder().name("response")
|
||||
.description("The response object from the graph server.")
|
||||
.autoTerminateDefault(true)
|
||||
.build();
|
||||
|
||||
public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
|
||||
SUCCESS, FAILURE, GRAPH
|
||||
)));
|
||||
|
||||
public static final String RECORD_COUNT = "records.count";
|
||||
public static final String GRAPH_OPERATION_TIME = "graph.operations.took";
|
||||
private volatile RecordPathCache recordPathCache;
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
return RELATIONSHIPS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return DESCRIPTORS;
|
||||
}
|
||||
|
||||
private GraphClientService clientService;
|
||||
private RecordReaderFactory recordReaderFactory;
|
||||
private RecordSetWriterFactory recordSetWriterFactory;
|
||||
private ObjectMapper mapper = new ObjectMapper();
|
||||
|
||||
@OnScheduled
|
||||
public void onScheduled(ProcessContext context) {
|
||||
clientService = context.getProperty(CLIENT_SERVICE).asControllerService(GraphClientService.class);
|
||||
recordReaderFactory = context.getProperty(READER_SERVICE).asControllerService(RecordReaderFactory.class);
|
||||
recordSetWriterFactory = context.getProperty(WRITER_SERVICE).asControllerService(RecordSetWriterFactory.class);
|
||||
recordPathCache = new RecordPathCache(100);
|
||||
}
|
||||
|
||||
private List<Object> getRecordValue(Record record, RecordPath recordPath){
|
||||
final RecordPathResult result = recordPath.evaluate(record);
|
||||
return result.getSelectedFields()
|
||||
.filter(fv -> fv.getValue() != null)
|
||||
.map(FieldValue::getValue)
|
||||
.collect( Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
FlowFile input = session.get();
|
||||
if ( input == null ) {
|
||||
return;
|
||||
}
|
||||
|
||||
List<FlowFile> graphList = new ArrayList<>();
|
||||
|
||||
String recordScript = context.getProperty(SUBMISSION_SCRIPT)
|
||||
.evaluateAttributeExpressions(input)
|
||||
.getValue();
|
||||
|
||||
Map<String, RecordPath> dynamic = new HashMap<>();
|
||||
|
||||
FlowFile finalInput = input;
|
||||
context.getProperties()
|
||||
.keySet().stream()
|
||||
.filter(PropertyDescriptor::isDynamic)
|
||||
.forEach(it ->
|
||||
dynamic.put(it.getName(), recordPathCache.getCompiled(
|
||||
context
|
||||
.getProperty(it.getName())
|
||||
.evaluateAttributeExpressions(finalInput)
|
||||
.getValue()))
|
||||
);
|
||||
|
||||
|
||||
boolean failed = false;
|
||||
long delta = 0;
|
||||
try (InputStream is = session.read(input);
|
||||
RecordReader reader = recordReaderFactory.createRecordReader(input, is, getLogger());
|
||||
) {
|
||||
Record record;
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
while ((record = reader.nextRecord()) != null) {
|
||||
FlowFile graph = session.create(input);
|
||||
|
||||
List<Map<String,Object>> graphResponses = new ArrayList<>();
|
||||
|
||||
Map<String, Object> dynamicPropertyMap = new HashMap<>();
|
||||
for (String entry : dynamic.keySet()) {
|
||||
if(!dynamicPropertyMap.containsKey(entry)) {
|
||||
dynamicPropertyMap.put(entry, getRecordValue(record, dynamic.get(entry)));
|
||||
}
|
||||
}
|
||||
|
||||
dynamicPropertyMap.putAll(input.getAttributes());
|
||||
graphResponses.addAll(executeQuery(recordScript, dynamicPropertyMap));
|
||||
|
||||
OutputStream graphOutputStream = session.write(graph);
|
||||
String graphOutput = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(graphResponses);
|
||||
graphOutputStream.write(graphOutput.getBytes(StandardCharsets.UTF_8));
|
||||
graphList.add(graph);
|
||||
graphOutputStream.close();
|
||||
}
|
||||
long end = System.currentTimeMillis();
|
||||
delta = (end - start) / 1000;
|
||||
if (getLogger().isDebugEnabled()){
|
||||
getLogger().debug(String.format("Took %s seconds.", delta));
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
getLogger().error("", ex);
|
||||
failed = true;
|
||||
} finally {
|
||||
if (failed) {
|
||||
graphList.forEach(session::remove);
|
||||
session.transfer(input, FAILURE);
|
||||
} else {
|
||||
input = session.putAttribute(input, GRAPH_OPERATION_TIME, String.valueOf(delta));
|
||||
session.getProvenanceReporter().send(input, clientService.getTransitUrl(), delta*1000);
|
||||
session.transfer(input, SUCCESS);
|
||||
graphList.forEach(it -> {
|
||||
session.transfer(it, GRAPH);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private List<Map<String, Object>> executeQuery(String recordScript, Map<String, Object> parameters) {
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
List<Map<String, Object>> graphResponses = new ArrayList<>();
|
||||
clientService.executeQuery(recordScript, parameters, (map, b) -> {
|
||||
if (getLogger().isDebugEnabled()){
|
||||
try {
|
||||
getLogger().debug(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(map));
|
||||
} catch (JsonProcessingException ex) {
|
||||
getLogger().error("Error converted map to JSON ", ex);
|
||||
}
|
||||
}
|
||||
graphResponses.add(map);
|
||||
});
|
||||
return graphResponses;
|
||||
}
|
||||
}
|
|
@ -13,3 +13,4 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
org.apache.nifi.processors.graph.ExecuteGraphQuery
|
||||
org.apache.nifi.processors.graph.ExecuteGraphQueryRecord
|
|
@ -0,0 +1,176 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.graph;
|
||||
|
||||
import groovy.json.JsonOutput;
|
||||
import org.apache.nifi.processors.graph.util.InMemoryGraphClient;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.apache.nifi.json.JsonTreeReader;
|
||||
import org.apache.nifi.serialization.record.MockRecordWriter;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
|
||||
public class ExecuteGraphQueryRecordTest {
|
||||
private TestRunner runner;
|
||||
private JsonTreeReader reader;
|
||||
private InMemoryGraphClient graphClient;
|
||||
Map<String, String> enqueProperties = new HashMap<>();
|
||||
|
||||
@Before
|
||||
public void setup() throws InitializationException {
|
||||
MockRecordWriter writer = new MockRecordWriter();
|
||||
reader = new JsonTreeReader();
|
||||
runner = TestRunners.newTestRunner(ExecuteGraphQueryRecord.class);
|
||||
runner.addControllerService("reader", reader);
|
||||
runner.addControllerService("writer", writer);
|
||||
runner.setProperty(ExecuteGraphQueryRecord.READER_SERVICE, "reader");
|
||||
runner.setProperty(ExecuteGraphQueryRecord.WRITER_SERVICE, "writer");
|
||||
|
||||
runner.enableControllerService(writer);
|
||||
runner.enableControllerService(reader);
|
||||
|
||||
graphClient = new InMemoryGraphClient();
|
||||
|
||||
|
||||
runner.addControllerService("graphClient", graphClient);
|
||||
|
||||
runner.setProperty(ExecuteGraphQueryRecord.CLIENT_SERVICE, "graphClient");
|
||||
runner.enableControllerService(graphClient);
|
||||
runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT, "[ 'testProperty': 'testResponse' ]");
|
||||
runner.assertValid();
|
||||
enqueProperties.put("graph.name", "graph");
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlowFileContent() throws IOException {
|
||||
List<Map> test = new ArrayList<>();
|
||||
Map<String, Object> tempMap = new HashMap<>();
|
||||
tempMap.put("M", 1);
|
||||
test.add(tempMap);
|
||||
|
||||
byte[] json = JsonOutput.toJson(test).getBytes();
|
||||
String submissionScript;
|
||||
submissionScript = "[ 'M': M[0] ]";
|
||||
|
||||
runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT, submissionScript);
|
||||
runner.setProperty("M", "/M");
|
||||
runner.enqueue(json, enqueProperties);
|
||||
|
||||
runner.run();
|
||||
runner.assertTransferCount(ExecuteGraphQueryRecord.GRAPH, 1);
|
||||
runner.assertTransferCount(ExecuteGraphQueryRecord.SUCCESS, 1);
|
||||
runner.assertTransferCount(ExecuteGraphQueryRecord.FAILURE, 0);
|
||||
MockFlowFile relGraph = runner.getFlowFilesForRelationship(ExecuteGraphQueryRecord.GRAPH).get(0);
|
||||
relGraph.assertContentEquals(ExecuteGraphQueryRecordTest.class.getResourceAsStream("/testFlowFileContent.json"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlowFileList() throws IOException {
|
||||
List<Map> test = new ArrayList<>();
|
||||
Map<String, Object> tempMap = new HashMap<>();
|
||||
tempMap.put("M", new ArrayList<Integer>(){
|
||||
{
|
||||
add(1);
|
||||
add(2);
|
||||
add(3);
|
||||
}
|
||||
});
|
||||
test.add(tempMap);
|
||||
|
||||
byte[] json = JsonOutput.toJson(test).getBytes();
|
||||
String submissionScript = "[ " +
|
||||
"'M': M[0] " +
|
||||
"]";
|
||||
|
||||
runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT, submissionScript);
|
||||
runner.setProperty("M", "/M");
|
||||
runner.enqueue(json, enqueProperties);
|
||||
|
||||
runner.run();
|
||||
runner.assertTransferCount(ExecuteGraphQueryRecord.GRAPH, 1);
|
||||
runner.assertTransferCount(ExecuteGraphQueryRecord.SUCCESS, 1);
|
||||
runner.assertTransferCount(ExecuteGraphQueryRecord.FAILURE, 0);
|
||||
MockFlowFile relGraph = runner.getFlowFilesForRelationship(ExecuteGraphQueryRecord.GRAPH).get(0);
|
||||
relGraph.assertContentEquals(ExecuteGraphQueryRecordTest.class.getResourceAsStream("/testFlowFileList.json"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testComplexFlowFile() throws IOException {
|
||||
List<Map> test = new ArrayList<>();
|
||||
Map<String, Object> tempMap = new HashMap<>();
|
||||
tempMap.put("tMap", "123");
|
||||
tempMap.put("L", new ArrayList<Integer>(){
|
||||
{
|
||||
add(1);
|
||||
add(2);
|
||||
add(3);
|
||||
}
|
||||
});
|
||||
test.add(tempMap);
|
||||
|
||||
byte[] json = JsonOutput.toJson(test).getBytes();
|
||||
String submissionScript = "Map<String, Object> vertexHashes = new HashMap()\n" +
|
||||
"vertexHashes.put('1234', tMap[0])\n" +
|
||||
"[ 'L': L[0], 'result': vertexHashes ]";
|
||||
runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT, submissionScript);
|
||||
runner.setProperty("tMap", "/tMap");
|
||||
runner.setProperty("L", "/L");
|
||||
runner.enqueue(json, enqueProperties);
|
||||
|
||||
runner.run();
|
||||
runner.assertTransferCount(ExecuteGraphQueryRecord.GRAPH, 1);
|
||||
runner.assertTransferCount(ExecuteGraphQueryRecord.SUCCESS, 1);
|
||||
runner.assertTransferCount(ExecuteGraphQueryRecord.FAILURE, 0);
|
||||
MockFlowFile relGraph = runner.getFlowFilesForRelationship(ExecuteGraphQueryRecord.GRAPH).get(0);
|
||||
relGraph.assertContentEquals(ExecuteGraphQueryRecordTest.class.getResourceAsStream("/testComplexFlowFile.json"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAttributes() throws IOException {
|
||||
List<Map<String, Object>> test = new ArrayList<>();
|
||||
Map<String, Object> tempMap = new HashMap<>();
|
||||
tempMap.put("tMap", "123");
|
||||
test.add(tempMap);
|
||||
|
||||
byte[] json = JsonOutput.toJson(test).getBytes();
|
||||
String submissionScript = "[ " +
|
||||
"'testProperty': testProperty " +
|
||||
"] ";
|
||||
runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT, submissionScript);
|
||||
Map<String, String> enqueProperties = new HashMap<>();
|
||||
enqueProperties.put("testProperty", "test");
|
||||
runner.enqueue(json, enqueProperties);
|
||||
|
||||
runner.run();
|
||||
runner.assertTransferCount(ExecuteGraphQueryRecord.GRAPH, 1);
|
||||
runner.assertTransferCount(ExecuteGraphQueryRecord.SUCCESS, 1);
|
||||
runner.assertTransferCount(ExecuteGraphQueryRecord.FAILURE, 0);
|
||||
MockFlowFile relGraph = runner.getFlowFilesForRelationship(ExecuteGraphQueryRecord.GRAPH).get(0);
|
||||
relGraph.assertContentEquals(ExecuteGraphQueryRecordTest.class.getResourceAsStream("/testAttributes.json"));
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,113 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.graph.util;
|
||||
|
||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.graph.GraphClientService;
|
||||
import org.apache.nifi.graph.GraphQueryResultCallback;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.tinkerpop.gremlin.structure.Graph;
|
||||
import org.janusgraph.core.JanusGraph;
|
||||
import org.janusgraph.core.JanusGraphFactory;
|
||||
|
||||
import javax.script.ScriptEngine;
|
||||
import javax.script.ScriptEngineManager;
|
||||
import javax.script.ScriptException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.AbstractMap.SimpleEntry;
|
||||
|
||||
public class InMemoryGraphClient extends AbstractControllerService implements GraphClientService {
|
||||
private Graph graph;
|
||||
|
||||
@OnEnabled
|
||||
void onEnabled(ConfigurationContext context) {
|
||||
graph = buildGraph();
|
||||
}
|
||||
|
||||
private static JanusGraph buildGraph() {
|
||||
return JanusGraphFactory.build().set("storage.backend", "inmemory").open();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> executeQuery(String query, Map<String, Object> parameters, GraphQueryResultCallback graphQueryResultCallback) {
|
||||
ScriptEngine engine = new ScriptEngineManager().getEngineByName("groovy");
|
||||
parameters.entrySet().stream().forEach( it -> {
|
||||
engine.put(it.getKey(), it.getValue());
|
||||
});
|
||||
if (graph == null) {
|
||||
graph = buildGraph();
|
||||
}
|
||||
engine.put("graph", graph);
|
||||
engine.put("g", graph.traversal());
|
||||
|
||||
Object response;
|
||||
try {
|
||||
response = engine.eval(query);
|
||||
} catch (ScriptException ex) {
|
||||
throw new ProcessException(ex);
|
||||
}
|
||||
|
||||
if (response instanceof Map) {
|
||||
//The below logic helps with the handling of complex Map<String, Object> relationships
|
||||
Map resultMap = (Map) response;
|
||||
if (!resultMap.isEmpty()) {
|
||||
// Convertex a resultMap to an entrySet iterator
|
||||
Iterator outerResultSet = resultMap.entrySet().iterator();
|
||||
// this loops over the outermost map
|
||||
while(outerResultSet.hasNext()) {
|
||||
Map.Entry<String, Object> innerResultSet = (Map.Entry<String, Object>) outerResultSet.next();
|
||||
// this is for edge case handling where innerResultSet is also a Map
|
||||
if (innerResultSet.getValue() instanceof Map) {
|
||||
Iterator resultSet = ((Map) innerResultSet.getValue()).entrySet().iterator();
|
||||
// looping over each result in the inner map
|
||||
while (resultSet.hasNext()) {
|
||||
Map.Entry<String, Object> tempResult = (Map.Entry<String, Object>) resultSet.next();
|
||||
Map<String, Object> tempRetObject = new HashMap<>();
|
||||
tempRetObject.put(tempResult.getKey(), tempResult.getValue());
|
||||
SimpleEntry returnObject = new SimpleEntry<String, Object>(tempResult.getKey(), tempRetObject);
|
||||
Map<String, Object> resultReturnMap = new HashMap<>();
|
||||
resultReturnMap.put(innerResultSet.getKey(), returnObject);
|
||||
if (getLogger().isDebugEnabled()) {
|
||||
getLogger().debug(resultReturnMap.toString());
|
||||
}
|
||||
// return the object to the graphQueryResultCallback object
|
||||
graphQueryResultCallback.process(resultReturnMap, resultSet.hasNext());
|
||||
}
|
||||
} else {
|
||||
// for non-maps, return objects need to be a map<string, object> this simply converts the object
|
||||
// to a map to be return to the graphQueryResultCallback object
|
||||
Map<String, Object> resultReturnMap = new HashMap<>();
|
||||
resultReturnMap.put(innerResultSet.getKey(), innerResultSet.getValue());
|
||||
graphQueryResultCallback.process(resultReturnMap, false);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
return new HashMap<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTransitUrl() {
|
||||
return "memory://localhost/graph";
|
||||
}
|
||||
}
|
|
@ -0,0 +1,3 @@
|
|||
[ {
|
||||
"testProperty" : "test"
|
||||
} ]
|
|
@ -0,0 +1,9 @@
|
|||
[ {
|
||||
"L" : [ 1, 2, 3 ]
|
||||
}, {
|
||||
"result" : {
|
||||
"1234" : {
|
||||
"1234" : "123"
|
||||
}
|
||||
}
|
||||
} ]
|
|
@ -0,0 +1,3 @@
|
|||
[ {
|
||||
"M" : 1
|
||||
} ]
|
|
@ -0,0 +1,3 @@
|
|||
[ {
|
||||
"M" : [ 1, 2, 3 ]
|
||||
} ]
|
Loading…
Reference in New Issue