NIFI-10966: Add option to QuerySalesforceObject to run custom query

This closes #6794.

Signed-off-by: Tamas Palfy <>
This commit is contained in:
Lehel Boér 2022-12-08 23:49:36 +01:00 committed by Tamas Palfy
parent 7295e3dc21
commit a61f019cbf
3 changed files with 197 additions and 37 deletions

View File

@ -18,18 +18,23 @@ package org.apache.nifi.processors.salesforce;
import org.apache.camel.component.salesforce.api.dto.SObjectDescription;
import org.apache.camel.component.salesforce.api.dto.SObjectField;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@ -47,6 +52,7 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.salesforce.util.SalesforceRestService;
import org.apache.nifi.processors.salesforce.util.SalesforceToRecordSchemaConverter;
@ -89,16 +95,17 @@ import static org.apache.nifi.processors.salesforce.util.CommonSalesforcePropert
import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.READ_TIMEOUT;
import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.TOKEN_PROVIDER;
@Tags({"salesforce", "sobject", "soql", "query"})
@CapabilityDescription("Retrieves records from a Salesforce sObject. Users can add arbitrary filter conditions by setting the 'Custom WHERE Condition' property."
+ " The processor can also run a custom query, although record processing is not supported in that case."
+ " Supports incremental retrieval: users can define a field in the 'Age Field' property that will be used to determine when the record was created."
+ " When this property is set the processor will retrieve new records. It's also possible to define an initial cutoff value for the age, filtering out all older records"
+ " even for the first run. This processor is intended to be run on the Primary Node only."
+ " FlowFile attribute 'record.count' indicates how many records were retrieved and written to the output.")
+ " even for the first run. In case of 'Property Based Query' this processor should run on the Primary Node only."
+ " FlowFile attribute 'record.count' indicates how many records were retrieved and written to the output."
+ " By using 'Custom Query', the processor can accept an optional input flowfile and reference the flowfile attributes in the query."
+ " However, incremental loading and record-based processing are not supported in this scenario.")
@Stateful(scopes = Scope.CLUSTER, description = "When 'Age Field' is set, after performing a query the time of execution is stored. Subsequent queries will be augmented"
+ " with an additional condition so that only records that are newer than the stored execution time (adjusted with the optional value of 'Age Delay') will be retrieved."
+ " State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected,"
@ -110,6 +117,28 @@ import static org.apache.nifi.processors.salesforce.util.CommonSalesforcePropert
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
public class QuerySalesforceObject extends AbstractProcessor {
static final AllowableValue PROPERTY_BASED_QUERY = new AllowableValue("property-based-query", "Property Based Query", "Provide query by properties.");
static final AllowableValue CUSTOM_QUERY = new AllowableValue("custom-query", "Custom Query", "Provide custom SOQL query.");
static final PropertyDescriptor QUERY_TYPE = new PropertyDescriptor.Builder()
.displayName("Query Type")
.description("Choose to provide the query by parameters or a full custom query.")
static final PropertyDescriptor CUSTOM_SOQL_QUERY = new PropertyDescriptor.Builder()
.displayName("Custom SOQL Query")
.description("Specify the SOQL query to run.")
static final PropertyDescriptor SOBJECT_NAME = new PropertyDescriptor.Builder()
.displayName("sObject Name")
@ -117,6 +146,7 @@ public class QuerySalesforceObject extends AbstractProcessor {
static final PropertyDescriptor FIELD_NAMES = new PropertyDescriptor.Builder()
@ -126,6 +156,7 @@ public class QuerySalesforceObject extends AbstractProcessor {
static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
@ -133,7 +164,8 @@ public class QuerySalesforceObject extends AbstractProcessor {
.displayName("Record Writer")
.description("Service used for writing records returned from the Salesforce REST API")
static final PropertyDescriptor CREATE_ZERO_RECORD_FILES = new PropertyDescriptor.Builder()
@ -144,6 +176,7 @@ public class QuerySalesforceObject extends AbstractProcessor {
.allowableValues("true", "false")
static final PropertyDescriptor AGE_FIELD = new PropertyDescriptor.Builder()
@ -155,6 +188,7 @@ public class QuerySalesforceObject extends AbstractProcessor {
static final PropertyDescriptor AGE_DELAY = new PropertyDescriptor.Builder()
@ -166,6 +200,7 @@ public class QuerySalesforceObject extends AbstractProcessor {
static final PropertyDescriptor INITIAL_AGE_FILTER = new PropertyDescriptor.Builder()
@ -176,6 +211,7 @@ public class QuerySalesforceObject extends AbstractProcessor {
static final PropertyDescriptor CUSTOM_WHERE_CONDITION = new PropertyDescriptor.Builder()
@ -185,6 +221,7 @@ public class QuerySalesforceObject extends AbstractProcessor {
static final Relationship REL_SUCCESS = new Relationship.Builder()
@ -192,19 +229,35 @@ public class QuerySalesforceObject extends AbstractProcessor {
.description("For FlowFiles created as a result of a successful query.")
static final Relationship REL_ORIGINAL = new Relationship.Builder()
.description("The input flowfile gets sent to this relationship when the query succeeds.")
static final Relationship REL_FAILURE = new Relationship.Builder()
.description("The input flowfile gets sent to this relationship when the query fails.")
private static final String LAST_AGE_FILTER = "last_age_filter";
private static final String STARTING_FIELD_NAME = "records";
private static final String DATE_FORMAT = "yyyy-MM-dd";
private static final String TIME_FORMAT = "HH:mm:ss.SSSX";
private static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZZZZ";
private static final String NEXT_RECORDS_URL = "nextRecordsUrl";
private static final String TOTAL_SIZE = "totalSize";
private static final String RECORDS = "records";
private static final BiPredicate<String, String> CAPTURE_PREDICATE = (fieldName, fieldValue) -> NEXT_RECORDS_URL.equals(fieldName);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory();
private static final String TOTAL_RECORD_COUNT = "total.record.count";
private volatile SalesforceToRecordSchemaConverter salesForceToRecordSchemaConverter;
private volatile SalesforceRestService salesforceRestService;
public void onScheduled(final ProcessContext context) {
public void onScheduled(ProcessContext context) {
salesForceToRecordSchemaConverter = new SalesforceToRecordSchemaConverter(
@ -223,34 +276,40 @@ public class QuerySalesforceObject extends AbstractProcessor {
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Collections.unmodifiableList(Arrays.asList(
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>();
return relationships;
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
if (validationContext.getProperty(INITIAL_AGE_FILTER).isSet() && !validationContext.getProperty(AGE_FIELD).isSet()) {
new ValidationResult.Builder()
@ -264,7 +323,24 @@ public class QuerySalesforceObject extends AbstractProcessor {
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
boolean isCustomQuery = CUSTOM_QUERY.getValue().equals(context.getProperty(QUERY_TYPE).getValue());
if (isCustomQuery) {
FlowFile flowFile = session.get();
if (flowFile == null && context.hasIncomingConnection()) {
processCustomQuery(context, session, flowFile);
processQuery(context, session);
private void processQuery(ProcessContext context, ProcessSession session) {
AtomicReference<String> nextRecordsUrl = new AtomicReference<>();
String sObject = context.getProperty(SOBJECT_NAME).getValue();
String fields = context.getProperty(FIELD_NAMES).getValue();
String customWhereClause = context.getProperty(CUSTOM_WHERE_CONDITION).getValue();
@ -316,10 +392,7 @@ public class QuerySalesforceObject extends AbstractProcessor {
AtomicReference<String> nextRecordsUrl = new AtomicReference<>();
do {
FlowFile flowFile = session.create();
Map<String, String> originalAttributes = flowFile.getAttributes();
Map<String, String> attributes = new HashMap<>();
@ -328,7 +401,7 @@ public class QuerySalesforceObject extends AbstractProcessor {
flowFile = session.write(flowFile, out -> {
try (
InputStream querySObjectResultInputStream = getResultInputStream(nextRecordsUrl, querySObject);
InputStream querySObjectResultInputStream = getResultInputStream(nextRecordsUrl.get(), querySObject);
JsonTreeRowRecordReader jsonReader = new JsonTreeRowRecordReader(
@ -398,11 +471,78 @@ public class QuerySalesforceObject extends AbstractProcessor {
} while (nextRecordsUrl.get() != null);
private InputStream getResultInputStream(AtomicReference<String> nextRecordsUrl, String querySObject) {
if (nextRecordsUrl.get() == null) {
private void processCustomQuery(ProcessContext context, ProcessSession session, FlowFile originalFlowFile) {
String customQuery = context.getProperty(CUSTOM_SOQL_QUERY).evaluateAttributeExpressions(originalFlowFile).getValue();
AtomicReference<String> nextRecordsUrl = new AtomicReference<>();
AtomicReference<String> totalSize = new AtomicReference<>();
boolean isOriginalTransferred = false;
List<FlowFile> outgoingFlowFiles = new ArrayList<>();
do {
FlowFile outgoingFlowFile;
try (InputStream response = getResultInputStream(nextRecordsUrl.get(), customQuery)) {
if (originalFlowFile != null) {
outgoingFlowFile = session.create(originalFlowFile);
} else {
outgoingFlowFile = session.create();
outgoingFlowFile = session.write(outgoingFlowFile, parseHttpResponse(response, nextRecordsUrl, totalSize));
int recordCount = nextRecordsUrl.get() != null ? 2000 : Integer.parseInt(totalSize.get()) % 2000;
Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
attributes.put(TOTAL_RECORD_COUNT, String.valueOf(recordCount));
session.adjustCounter("Salesforce records processed", recordCount, false);
session.putAllAttributes(outgoingFlowFile, attributes);
} catch (IOException e) {
throw new ProcessException("Couldn't get Salesforce records", e);
} catch (Exception e) {
if (originalFlowFile != null) {
session.transfer(originalFlowFile, REL_FAILURE);
isOriginalTransferred = true;
getLogger().error("Couldn't get Salesforce records", e);
} while (nextRecordsUrl.get() != null);
if (!outgoingFlowFiles.isEmpty()) {
session.transfer(outgoingFlowFiles, REL_SUCCESS);
if (originalFlowFile != null && !isOriginalTransferred) {
session.transfer(originalFlowFile, REL_ORIGINAL);
private OutputStreamCallback parseHttpResponse(InputStream in, AtomicReference<String> nextRecordsUrl, AtomicReference<String> totalSize) {
return out -> {
try (JsonParser jsonParser = JSON_FACTORY.createParser(in);
JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
while (jsonParser.nextToken() != null) {
if (nextTokenIs(jsonParser, TOTAL_SIZE)) {
} else if (nextTokenIs(jsonParser, NEXT_RECORDS_URL)) {
} else if (nextTokenIs(jsonParser, RECORDS)) {
private boolean nextTokenIs(JsonParser jsonParser, String value) throws IOException {
return jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
.equals(value) && jsonParser.nextToken() != null;
private InputStream getResultInputStream(String nextRecordsUrl, String querySObject) {
if (nextRecordsUrl == null) {
return salesforceRestService.query(querySObject);
return salesforceRestService.getNextRecords(nextRecordsUrl.get());
return salesforceRestService.getNextRecords(nextRecordsUrl);
private SalesforceSchemaHolder getConvertedSalesforceSchema(String sObject, String fields) {

View File

@ -34,8 +34,11 @@ td {text-align: left}
Objects in Salesforce are database tables, their rows are known as records, and their columns are called fields. The QuerySalesforceObject processor queries Salesforce objects and retrieves their records.
The processor constructs the query using SOQL (Salesforce Object Query Language) and retrieves the result record dataset using the Salesforce REST API.
The processor utilizes streams and NiFi record-based processing to be able to handle a large number of records and to allow arbitrary output format.
The processor constructs the query from processor properties or executes a custom SOQL (Salesforce Object Query Language) query and retrieves the result record dataset using the Salesforce REST API.
The 'Query Type' processor property allows the query to be built in two ways. The 'Property Based Query' option allows to define a 'SELECT &lt;fields&gt; from &lt;Salesforce object&gt;' type query,
with the fields defined in the 'Field Names' property and the Salesforce object defined in the 'sObject Name' property, whereas the 'Custom Query' option allows you to supply an arbitrary SOQL query.
By using 'Custom Query', the processor can accept an optional input flowfile and reference the flowfile attributes in the query. However, incremental loading and record-based processing are only supported
in 'Property Based Queries'.
<h3>OAuth2 Access Token Provider Service</h3>

View File

@ -63,6 +63,7 @@ class QuerySalesforceObjectIT implements SalesforceConfigAware {
runner.addControllerService("writer", writer);
runner.setProperty(QuerySalesforceObject.QUERY_TYPE, QuerySalesforceObject.PROPERTY_BASED_QUERY);
runner.setProperty(QuerySalesforceObject.SOBJECT_NAME, sObjectName);
runner.setProperty(QuerySalesforceObject.FIELD_NAMES, fieldNames);
runner.setProperty(CommonSalesforceProperties.API_VERSION, VERSION);
@ -77,4 +78,20 @@ class QuerySalesforceObjectIT implements SalesforceConfigAware {
void runCustomQuery() {
String customQuery = "SELECT Id, Name, AccountId, Account.ShippingAddress FROM Contact";
runner.setProperty(QuerySalesforceObject.QUERY_TYPE, QuerySalesforceObject.CUSTOM_QUERY);
runner.setProperty(QuerySalesforceObject.CUSTOM_SOQL_QUERY, customQuery);
runner.setProperty(CommonSalesforceProperties.API_VERSION, VERSION);
runner.setProperty(CommonSalesforceProperties.API_URL, BASE_URL);;
List<MockFlowFile> results = runner.getFlowFilesForRelationship(QuerySalesforceObject.REL_SUCCESS);