diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/pom.xml b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/pom.xml
index 9b68f459a9..1a9b6f6002 100644
--- a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/pom.xml
@@ -78,5 +78,60 @@
1.13.0-SNAPSHOTtest
+
+ org.apache.nifi
+ nifi-record-path
+ 1.13.0-SNAPSHOT
+ compile
+
+
+ org.apache.nifi
+ nifi-record-serialization-service-api
+ provided
+
+
+ org.apache.nifi
+ nifi-json-utils
+ 1.13.0-SNAPSHOT
+ test
+
+
+ org.apache.nifi
+ nifi-record-serialization-service-api
+ 1.13.0-SNAPSHOT
+
+
+ org.apache.nifi
+ nifi-record-serialization-services
+ 1.13.0-SNAPSHOT
+ test
+
+
+ org.apache.nifi
+ nifi-mock-record-utils
+ 1.13.0-SNAPSHOT
+
+
+ org.apache.nifi
+ nifi-schema-registry-service-api
+ 1.13.0-SNAPSHOT
+
+
+
+
+
+ org.apache.rat
+ apache-rat-plugin
+
+
+ src/test/resources/testAttributes.json
+ src/test/resources/testComplexFlowFile.json
+ src/test/resources/testFlowFileContent.json
+ src/test/resources/testFlowFileList.json
+
+
+
+
+
diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecord.java b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecord.java
new file mode 100644
index 0000000000..e750b6cf48
--- /dev/null
+++ b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecord.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.graph;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.graph.GraphClientService;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.stream.Collectors;
+
+@Tags({"graph, gremlin"})
+@CapabilityDescription("This uses a flowfile as input to perform graph mutations.")
+@WritesAttributes({
+ @WritesAttribute(attribute = ExecuteGraphQueryRecord.GRAPH_OPERATION_TIME, description = "The amount of time it took to execute all of the graph operations."),
+ @WritesAttribute(attribute = ExecuteGraphQueryRecord.RECORD_COUNT, description = "The amount of record processed")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@DynamicProperty(name = "A dynamic property to be used as a parameter in the graph script",
+ value = "The variable name to be set", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+ description = "Uses a record path to set a variable as a parameter in the graph script")
+public class ExecuteGraphQueryRecord extends AbstractGraphExecutor {
+
+ public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
+ .name("client-service")
+ .displayName("Client Service")
+ .description("The graph client service for connecting to a graph database.")
+ .identifiesControllerService(GraphClientService.class)
+ .addValidator(Validator.VALID)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor READER_SERVICE = new PropertyDescriptor.Builder()
+ .name("reader-service")
+ .displayName("Record Reader")
+ .description("The record reader to use with this processor.")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .addValidator(Validator.VALID)
+ .build();
+
+ public static final PropertyDescriptor WRITER_SERVICE = new PropertyDescriptor.Builder()
+ .name("writer-service")
+ .displayName("Failed Record Writer")
+ .description("The record writer to use for writing failed records.")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .required(true)
+ .addValidator(Validator.VALID)
+ .build();
+
+ public static final PropertyDescriptor SUBMISSION_SCRIPT = new PropertyDescriptor.Builder()
+ .name("record-script")
+ .displayName("Graph Record Script")
+ .description("Script to perform the business logic on graph, using flow file attributes and custom properties " +
+ "as variable-value pairs in its logic.")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .build();
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .required(false)
+ .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .dynamic(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+ }
+
+ public static final List DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
+ CLIENT_SERVICE, READER_SERVICE, WRITER_SERVICE, SUBMISSION_SCRIPT
+ ));
+
+ public static final Relationship SUCCESS = new Relationship.Builder().name("original")
+ .description("Original flow files that successfully interacted with " +
+ "graph server.")
+ .build();
+ public static final Relationship FAILURE = new Relationship.Builder().name("failure")
+ .description("Flow files that fail to interact with graph server.")
+ .build();
+ public static final Relationship GRAPH = new Relationship.Builder().name("response")
+ .description("The response object from the graph server.")
+ .autoTerminateDefault(true)
+ .build();
+
+ public static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+ SUCCESS, FAILURE, GRAPH
+ )));
+
+ public static final String RECORD_COUNT = "records.count";
+ public static final String GRAPH_OPERATION_TIME = "graph.operations.took";
+ private volatile RecordPathCache recordPathCache;
+
+ @Override
+ public Set getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public List getSupportedPropertyDescriptors() {
+ return DESCRIPTORS;
+ }
+
+ private GraphClientService clientService;
+ private RecordReaderFactory recordReaderFactory;
+ private RecordSetWriterFactory recordSetWriterFactory;
+ private ObjectMapper mapper = new ObjectMapper();
+
+ @OnScheduled
+ public void onScheduled(ProcessContext context) {
+ clientService = context.getProperty(CLIENT_SERVICE).asControllerService(GraphClientService.class);
+ recordReaderFactory = context.getProperty(READER_SERVICE).asControllerService(RecordReaderFactory.class);
+ recordSetWriterFactory = context.getProperty(WRITER_SERVICE).asControllerService(RecordSetWriterFactory.class);
+ recordPathCache = new RecordPathCache(100);
+ }
+
+ private List