NIFI-7906: Implemented RecordSetWriter support for ExecuteGraphQueryRecord

This closes #4704

Signed-off-by: Mike Thomsen <mthomsen@apache.org>
This commit is contained in:
Matthew Burgess 2020-12-04 12:23:07 -05:00 committed by Mike Thomsen
parent 19bf89f7ed
commit de88ce7a61
3 changed files with 123 additions and 73 deletions

View File

@ -20,20 +20,20 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.graph.GraphClientService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
@ -41,28 +41,31 @@ import org.apache.nifi.record.path.RecordPathResult;
import org.apache.nifi.record.path.util.RecordPathCache;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.Set;
import java.util.HashSet;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.stream.Collectors;
@Tags({"graph, gremlin"})
@CapabilityDescription("This uses a flowfile as input to perform graph mutations.")
@Tags({"graph", "gremlin", "cypher"})
@CapabilityDescription("This uses FlowFile records as input to perform graph mutations. Each record is associated with an individual query/mutation, and a FlowFile will "
+ "be output for each successful operation. Failed records will be sent as a single FlowFile to the failure relationship.")
@WritesAttributes({
@WritesAttribute(attribute = ExecuteGraphQueryRecord.GRAPH_OPERATION_TIME, description = "The amount of time it took to execute all of the graph operations."),
@WritesAttribute(attribute = ExecuteGraphQueryRecord.RECORD_COUNT, description = "The amount of record processed")
@WritesAttribute(attribute = ExecuteGraphQueryRecord.RECORD_COUNT, description = "The number of records unsuccessfully processed (written on FlowFiles routed to the "
+ "'failure' relationship.")
})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@DynamicProperty(name = "A dynamic property to be used as a parameter in the graph script",
@ -154,7 +157,7 @@ public class ExecuteGraphQueryRecord extends AbstractGraphExecutor {
private GraphClientService clientService;
private RecordReaderFactory recordReaderFactory;
private RecordSetWriterFactory recordSetWriterFactory;
private ObjectMapper mapper = new ObjectMapper();
private final ObjectMapper mapper = new ObjectMapper();
@OnScheduled
public void onScheduled(ProcessContext context) {
@ -179,8 +182,6 @@ public class ExecuteGraphQueryRecord extends AbstractGraphExecutor {
return;
}
List<FlowFile> graphList = new ArrayList<>();
String recordScript = context.getProperty(SUBMISSION_SCRIPT)
.evaluateAttributeExpressions(input)
.getValue();
@ -199,56 +200,70 @@ public class ExecuteGraphQueryRecord extends AbstractGraphExecutor {
.getValue()))
);
boolean failed = false;
long delta = 0;
long delta;
FlowFile failedRecords = session.create(input);
WriteResult failedWriteResult = null;
try (InputStream is = session.read(input);
RecordReader reader = recordReaderFactory.createRecordReader(input, is, getLogger());
OutputStream os = session.write(failedRecords);
RecordSetWriter failedWriter = recordSetWriterFactory.createWriter(getLogger(), reader.getSchema(), os, input.getAttributes())
) {
Record record;
long start = System.currentTimeMillis();
failedWriter.beginRecordSet();
while ((record = reader.nextRecord()) != null) {
FlowFile graph = session.create(input);
List<Map<String,Object>> graphResponses = new ArrayList<>();
Map<String, Object> dynamicPropertyMap = new HashMap<>();
for (String entry : dynamic.keySet()) {
if(!dynamicPropertyMap.containsKey(entry)) {
try {
Map<String, Object> dynamicPropertyMap = new HashMap<>();
for (String entry : dynamic.keySet()) {
if (!dynamicPropertyMap.containsKey(entry)) {
dynamicPropertyMap.put(entry, getRecordValue(record, dynamic.get(entry)));
}
}
dynamicPropertyMap.putAll(input.getAttributes());
List<Map<String, Object>> graphResponses = new ArrayList<>(executeQuery(recordScript, dynamicPropertyMap));
OutputStream graphOutputStream = session.write(graph);
String graphOutput = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(graphResponses);
graphOutputStream.write(graphOutput.getBytes(StandardCharsets.UTF_8));
graphOutputStream.close();
session.transfer(graph, GRAPH);
} catch (Exception e) {
// write failed records to a flowfile destined for the failure relationship
failedWriter.write(record);
session.remove(graph);
}
dynamicPropertyMap.putAll(input.getAttributes());
graphResponses.addAll(executeQuery(recordScript, dynamicPropertyMap));
OutputStream graphOutputStream = session.write(graph);
String graphOutput = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(graphResponses);
graphOutputStream.write(graphOutput.getBytes(StandardCharsets.UTF_8));
graphList.add(graph);
graphOutputStream.close();
}
long end = System.currentTimeMillis();
delta = (end - start) / 1000;
if (getLogger().isDebugEnabled()){
getLogger().debug(String.format("Took %s seconds.", delta));
}
failedWriteResult = failedWriter.finishRecordSet();
failedWriter.flush();
} catch (Exception ex) {
getLogger().error("", ex);
failed = true;
} finally {
if (failed) {
graphList.forEach(session::remove);
session.transfer(input, FAILURE);
} else {
input = session.putAttribute(input, GRAPH_OPERATION_TIME, String.valueOf(delta));
session.getProvenanceReporter().send(input, clientService.getTransitUrl(), delta*1000);
session.transfer(input, SUCCESS);
graphList.forEach(it -> {
session.transfer(it, GRAPH);
});
}
getLogger().error("Error reading records, routing input FlowFile to failure", ex);
session.remove(failedRecords);
session.transfer(input, FAILURE);
return;
}
// Generate provenance and send input flowfile to success
session.getProvenanceReporter().send(input, clientService.getTransitUrl(), delta*1000);
if (failedWriteResult.getRecordCount() < 1) {
// No failed records, remove the failure flowfile and send the input flowfile to success
session.remove(failedRecords);
input = session.putAttribute(input, GRAPH_OPERATION_TIME, String.valueOf(delta));
session.transfer(input, SUCCESS);
} else {
failedRecords = session.putAttribute(failedRecords, RECORD_COUNT, String.valueOf(failedWriteResult.getRecordCount()));
session.transfer(failedRecords, FAILURE);
// There were failures, don't send the input flowfile to SUCCESS
session.remove(input);
}
}

View File

@ -41,14 +41,12 @@ import static org.junit.Assert.assertTrue;
public class ExecuteGraphQueryRecordTest {
private TestRunner runner;
private JsonTreeReader reader;
private InMemoryGraphClient graphClient;
Map<String, String> enqueProperties = new HashMap<>();
@Before
public void setup() throws InitializationException {
MockRecordWriter writer = new MockRecordWriter();
reader = new JsonTreeReader();
JsonTreeReader reader = new JsonTreeReader();
runner = TestRunners.newTestRunner(ExecuteGraphQueryRecord.class);
runner.addControllerService("reader", reader);
runner.addControllerService("writer", writer);
@ -57,23 +55,12 @@ public class ExecuteGraphQueryRecordTest {
runner.enableControllerService(writer);
runner.enableControllerService(reader);
graphClient = new InMemoryGraphClient();
runner.addControllerService("graphClient", graphClient);
runner.setProperty(ExecuteGraphQueryRecord.CLIENT_SERVICE, "graphClient");
runner.enableControllerService(graphClient);
runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT, "[ 'testProperty': 'testResponse' ]");
runner.assertValid();
enqueProperties.put("graph.name", "graph");
}
@Test
public void testFlowFileContent() throws IOException {
List<Map> test = new ArrayList<>();
public void testFlowFileContent() throws Exception {
setupGraphClient(false);
List<Map<String,Object>> test = new ArrayList<>();
Map<String, Object> tempMap = new HashMap<>();
tempMap.put("M", 1);
test.add(tempMap);
@ -96,8 +83,9 @@ public class ExecuteGraphQueryRecordTest {
}
@Test
public void testFlowFileList() throws IOException {
List<Map> test = new ArrayList<>();
public void testFlowFileList() throws Exception {
setupGraphClient(false);
List<Map<String,Object>> test = new ArrayList<>();
Map<String, Object> tempMap = new HashMap<>();
tempMap.put("M", new ArrayList<Integer>(){
{
@ -127,8 +115,9 @@ public class ExecuteGraphQueryRecordTest {
}
@Test
public void testComplexFlowFile() throws IOException {
List<Map> test = new ArrayList<>();
public void testComplexFlowFile() throws Exception {
setupGraphClient(false);
List<Map<String,Object>> test = new ArrayList<>();
Map<String, Object> tempMap = new HashMap<>();
tempMap.put("tMap", "123");
tempMap.put("L", new ArrayList<Integer>(){
@ -159,7 +148,8 @@ public class ExecuteGraphQueryRecordTest {
}
@Test
public void testAttributes() throws IOException {
public void testAttributes() throws Exception {
setupGraphClient(false);
List<Map<String, Object>> test = new ArrayList<>();
Map<String, Object> tempMap = new HashMap<>();
tempMap.put("tMap", "123");
@ -193,4 +183,37 @@ public class ExecuteGraphQueryRecordTest {
return expected.equals(content);
}
@Test
public void testExceptionOnQuery() throws Exception {
setupGraphClient(true);
List<Map<String,Object>> test = new ArrayList<>();
Map<String, Object> tempMap = new HashMap<>();
tempMap.put("M", 1);
test.add(tempMap);
byte[] json = JsonOutput.toJson(test).getBytes();
String submissionScript;
submissionScript = "[ 'M': M[0] ]";
runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT, submissionScript);
runner.setProperty("M", "/M");
runner.enqueue(json, enqueProperties);
runner.run();
runner.assertTransferCount(ExecuteGraphQueryRecord.GRAPH, 0);
runner.assertTransferCount(ExecuteGraphQueryRecord.SUCCESS, 0);
runner.assertTransferCount(ExecuteGraphQueryRecord.FAILURE, 1);
}
private void setupGraphClient(boolean failOnQuery) throws InitializationException {
InMemoryGraphClient graphClient = new InMemoryGraphClient(failOnQuery);
runner.addControllerService("graphClient", graphClient);
runner.setProperty(ExecuteGraphQueryRecord.CLIENT_SERVICE, "graphClient");
runner.enableControllerService(graphClient);
runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT, "[ 'testProperty': 'testResponse' ]");
runner.assertValid();
enqueProperties.put("graph.name", "graph");
}
}

View File

@ -29,13 +29,22 @@ import org.janusgraph.core.JanusGraphFactory;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
import java.util.AbstractMap.SimpleEntry;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.AbstractMap.SimpleEntry;
public class InMemoryGraphClient extends AbstractControllerService implements GraphClientService {
private Graph graph;
private boolean generateExceptionOnQuery = false;
public InMemoryGraphClient() {
this(false);
}
public InMemoryGraphClient(final boolean generateExceptionOnQuery) {
this.generateExceptionOnQuery = generateExceptionOnQuery;
}
@OnEnabled
void onEnabled(ConfigurationContext context) {
@ -48,6 +57,9 @@ public class InMemoryGraphClient extends AbstractControllerService implements Gr
@Override
public Map<String, String> executeQuery(String query, Map<String, Object> parameters, GraphQueryResultCallback graphQueryResultCallback) {
if(generateExceptionOnQuery) {
throw new ProcessException("Generated test exception");
}
ScriptEngine engine = new ScriptEngineManager().getEngineByName("groovy");
parameters.entrySet().stream().forEach( it -> {
engine.put(it.getKey(), it.getValue());