NIFI-9720 Added QuerySalesforceObject Processor

This closes #5802

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Lehel 2022-02-23 16:07:00 +01:00 committed by exceptionfactory
parent 285a8cbc3c
commit 73ee1a9a1e
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
16 changed files with 1407 additions and 0 deletions

View File

@ -839,6 +839,12 @@ language governing permissions and limitations under the License. -->
<version>1.17.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-salesforce-nar</artifactId>
<version>1.17.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<!-- dependencies for jaxb/activation/annotation for running NiFi on Java 11 -->
<!-- TODO: remove these once minimum Java version is 11 -->
<dependency>

View File

@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-salesforce-bundle</artifactId>
<groupId>org.apache.nifi</groupId>
<version>1.17.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-salesforce-nar</artifactId>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-salesforce-processors</artifactId>
<version>1.17.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-services-nar</artifactId>
<version>1.17.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,122 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-salesforce-bundle</artifactId>
<groupId>org.apache.nifi</groupId>
<version>1.17.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-salesforce-processors</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-oauth2-provider-api</artifactId>
<version>1.17.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-salesforce</artifactId>
<version>3.14.1</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-jsonSchema</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- Provided from parent nar -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
<version>1.17.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-services</artifactId>
<version>1.17.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>mockwebserver</artifactId>
<scope>test</scope>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.17.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-oauth2-provider-service</artifactId>
<version>1.17.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock-record-utils</artifactId>
<version>1.17.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes>
<!-- test data -->
<exclude>src/test/resources/**/*</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,486 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.salesforce;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.json.JsonTreeRowRecordReader;
import org.apache.nifi.json.StartingFieldStrategy;
import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.salesforce.util.SalesforceRestService;
import org.apache.nifi.processors.salesforce.util.SalesforceToRecordSchemaConverter;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@PrimaryNodeOnly
@TriggerSerially
@TriggerWhenEmpty
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@Tags({"salesforce", "sobject", "soql", "query"})
@CapabilityDescription("Retrieves records from a Salesforce sObject. Users can add arbitrary filter conditions by setting the 'Custom WHERE Condition' property."
+ " Supports incremental retrieval: users can define a field in the 'Age Field' property that will be used to determine when the record was created."
+ " When this property is set the processor will retrieve new records. It's also possible to define an initial cutoff value for the age, fitering out all older records"
+ " even for the first run. This processor is intended to be run on the Primary Node only."
+ " FlowFile attribute 'record.count' indicates how many records were retrieved and written to the output.")
@Stateful(scopes = Scope.CLUSTER, description = "When 'Age Field' is set, after performing a query the time of execution is stored. Subsequent queries will be augmented"
+ " with an additional condition so that only records that are newer than the stored execution time (adjusted with the optional value of 'Age Delay') will be retrieved."
+ " State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected,"
+ " the new node can pick up where the previous node left off, without duplicating the data.")
@WritesAttributes({
@WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer."),
@WritesAttribute(attribute = "record.count", description = "Sets the number of records in the FlowFile.")
})
public class QuerySalesforceObject extends AbstractProcessor {
static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
.name("salesforce-url")
.displayName("URL")
.description("The URL for the Salesforce REST API including the domain without additional path information, such as https://MyDomainName.my.salesforce.com")
.required(true)
.addValidator(StandardValidators.URL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor API_VERSION = new PropertyDescriptor.Builder()
.name("salesforce-api-version")
.displayName("API Version")
.description("The version number of the Salesforce REST API appended to the URL after the services/data path. See Salesforce documentation for supported versions")
.required(true)
.addValidator(StandardValidators.NUMBER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("54.0")
.build();
static final PropertyDescriptor SOBJECT_NAME = new PropertyDescriptor.Builder()
.name("sobject-name")
.displayName("sObject Name")
.description("The Salesforce sObject to be queried")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
static final PropertyDescriptor FIELD_NAMES = new PropertyDescriptor.Builder()
.name("field-names")
.displayName("Field Names")
.description("Comma-separated list of field names requested from the sObject to be queried")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor.Builder()
.name("read-timeout")
.displayName("Read Timeout")
.description("Maximum time allowed for reading a response from the Salesforce REST API")
.required(true)
.defaultValue("15 s")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor TOKEN_PROVIDER = new PropertyDescriptor.Builder()
.name("oauth2-access-token-provider")
.displayName("OAuth2 Access Token Provider")
.description("Service providing OAuth2 Access Tokens for authenticating using the HTTP Authorization Header")
.identifiesControllerService(OAuth2AccessTokenProvider.class)
.required(true)
.build();
static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
.name("record-writer")
.displayName("Record Writer")
.description("Service used for writing records returned from the Salesforce REST API")
.identifiesControllerService(RecordSetWriterFactory.class)
.required(true)
.build();
static final PropertyDescriptor CREATE_ZERO_RECORD_FILES = new PropertyDescriptor.Builder()
.name("create-zero-record-files")
.displayName("Create Zero Record FlowFiles")
.description("Specifies whether or not to create a FlowFile when the Salesforce REST API does not return any records")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
static final PropertyDescriptor AGE_FIELD = new PropertyDescriptor.Builder()
.name("age-field")
.displayName("Age Field")
.description("The name of a TIMESTAMP field that will be used to limit all and filter already retrieved records."
+ " Only records that are older than the previous run time of this processor will be retrieved."
)
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
static final PropertyDescriptor AGE_DELAY = new PropertyDescriptor.Builder()
.name("age-delay")
.displayName("Age Delay")
.description("When 'Age Field' is set the age-based filter will be adjusted by this amount."
+ " Only records that are older than the previous run time of this processor, by at least this amount, will be retrieved."
)
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.dependsOn(AGE_FIELD)
.build();
static final PropertyDescriptor INITIAL_AGE_FILTER = new PropertyDescriptor.Builder()
.name("initial-age-filter")
.displayName("Initial Age Filter")
.description("When 'Age Field' is set the value of this property will serve as a filter when this processor runs the first time."
+ " Only records that are older than this value be retrieved."
)
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.dependsOn(AGE_FIELD)
.build();
static final PropertyDescriptor CUSTOM_WHERE_CONDITION = new PropertyDescriptor.Builder()
.name("custom-where-condition")
.displayName("Custom WHERE Condition")
.description("A custom expression to be added in the WHERE clause of the query")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("For FlowFiles created as a result of a successful query.")
.build();
private static final String LAST_AGE_FILTER = "last_age_filter";
private static final String STARTING_FIELD_NAME = "records";
private static final String DATE_FORMAT = "yyyy-MM-dd";
private static final String TIME_FORMAT = "HH:mm:ss.SSSX";
private static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZZZZ";
private volatile SalesforceToRecordSchemaConverter salesForceToRecordSchemaConverter;
private volatile SalesforceRestService salesforceRestService;
@OnScheduled
public void onScheduled(final ProcessContext context) {
salesForceToRecordSchemaConverter = new SalesforceToRecordSchemaConverter(
DATE_FORMAT,
DATE_TIME_FORMAT,
TIME_FORMAT
);
String salesforceVersion = context.getProperty(API_VERSION).getValue();
String baseUrl = context.getProperty(API_URL).getValue();
OAuth2AccessTokenProvider accessTokenProvider = context.getProperty(TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
salesforceRestService = new SalesforceRestService(
salesforceVersion,
baseUrl,
() -> accessTokenProvider.getAccessDetails().getAccessToken(),
context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue()
);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Collections.unmodifiableList(Arrays.asList(
API_URL,
API_VERSION,
SOBJECT_NAME,
FIELD_NAMES,
READ_TIMEOUT,
TOKEN_PROVIDER,
RECORD_WRITER,
CREATE_ZERO_RECORD_FILES,
AGE_FIELD,
INITIAL_AGE_FILTER,
AGE_DELAY,
CUSTOM_WHERE_CONDITION
));
}
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
return relationships;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
if (validationContext.getProperty(INITIAL_AGE_FILTER).isSet() && !validationContext.getProperty(AGE_FIELD).isSet()) {
results.add(
new ValidationResult.Builder()
.subject(INITIAL_AGE_FILTER.getDisplayName())
.valid(false)
.explanation("it requires " + AGE_FIELD.getDisplayName() + " also to be set.")
.build()
);
}
return results;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
String sObject = context.getProperty(SOBJECT_NAME).getValue();
String fields = context.getProperty(FIELD_NAMES).getValue();
String customWhereClause = context.getProperty(CUSTOM_WHERE_CONDITION).getValue();
RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
boolean createZeroRecordFlowFiles = context.getProperty(CREATE_ZERO_RECORD_FILES).asBoolean();
String ageField = context.getProperty(AGE_FIELD).getValue();
String initialAgeFilter = context.getProperty(INITIAL_AGE_FILTER).getValue();
Long ageDelayMs = context.getProperty(AGE_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
String ageFilterLower;
StateMap state;
try {
state = context.getStateManager().getState(Scope.CLUSTER);
ageFilterLower = state.get(LAST_AGE_FILTER);
} catch (IOException e) {
throw new ProcessException("Last Age Filter state retrieval failed", e);
}
String ageFilterUpper;
if (ageField == null) {
ageFilterUpper = null;
} else {
OffsetDateTime ageFilterUpperTime;
if (ageDelayMs == null) {
ageFilterUpperTime = OffsetDateTime.now();
} else {
ageFilterUpperTime = OffsetDateTime.now().minus(ageDelayMs, ChronoUnit.MILLIS);
}
ageFilterUpper = ageFilterUpperTime.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
}
ConvertedSalesforceSchema convertedSalesforceSchema = getConvertedSalesforceSchema(sObject, fields);
String querySObject = buildQuery(
sObject,
fields,
customWhereClause,
ageField,
initialAgeFilter,
ageFilterLower,
ageFilterUpper
);
FlowFile flowFile = session.create();
Map<String, String> originalAttributes = flowFile.getAttributes();
Map<String, String> attributes = new HashMap<>();
AtomicInteger recordCountHolder = new AtomicInteger();
flowFile = session.write(flowFile, out -> {
try (
InputStream querySObjectResultInputStream = salesforceRestService.query(querySObject);
JsonTreeRowRecordReader jsonReader = new JsonTreeRowRecordReader(
querySObjectResultInputStream,
getLogger(),
convertedSalesforceSchema.querySObjectResultSchema,
DATE_FORMAT,
TIME_FORMAT,
DATE_TIME_FORMAT,
StartingFieldStrategy.NESTED_FIELD,
STARTING_FIELD_NAME
);
RecordSetWriter writer = writerFactory.createWriter(
getLogger(),
writerFactory.getSchema(
originalAttributes,
convertedSalesforceSchema.recordSchema
),
out,
originalAttributes
)
) {
writer.beginRecordSet();
Record querySObjectRecord;
while ((querySObjectRecord = jsonReader.nextRecord()) != null) {
writer.write(querySObjectRecord);
}
WriteResult writeResult = writer.finishRecordSet();
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
attributes.putAll(writeResult.getAttributes());
recordCountHolder.set(writeResult.getRecordCount());
if (ageFilterUpper != null) {
Map<String, String> newState = new HashMap<>(state.toMap());
newState.put(LAST_AGE_FILTER, ageFilterUpper);
updateState(context, newState);
}
} catch (SchemaNotFoundException e) {
throw new ProcessException("Couldn't create record writer", e);
} catch (MalformedRecordException e) {
throw new ProcessException("Couldn't read records from input", e);
}
});
int recordCount = recordCountHolder.get();
if (!createZeroRecordFlowFiles && recordCount == 0) {
session.remove(flowFile);
} else {
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
session.adjustCounter("Records Processed", recordCount, false);
getLogger().info("Successfully written {} records for {}", recordCount, flowFile);
}
}
private ConvertedSalesforceSchema getConvertedSalesforceSchema(String sObject, String fields) {
try (InputStream describeSObjectResult = salesforceRestService.describeSObject(sObject)) {
return convertSchema(describeSObjectResult, fields);
} catch (IOException e) {
throw new UncheckedIOException("Salesforce input stream close failed", e);
}
}
private void updateState(ProcessContext context, Map<String, String> newState) {
try {
context.getStateManager().setState(newState, Scope.CLUSTER);
} catch (IOException e) {
throw new ProcessException("Last Age Filter state update failed", e);
}
}
protected ConvertedSalesforceSchema convertSchema(InputStream describeSObjectResult, String fields) {
try {
RecordSchema recordSchema = salesForceToRecordSchemaConverter.convertSchema(describeSObjectResult, fields);
RecordSchema querySObjectResultSchema = new SimpleRecordSchema(Collections.singletonList(
new RecordField(STARTING_FIELD_NAME, RecordFieldType.ARRAY.getArrayDataType(
RecordFieldType.RECORD.getRecordDataType(
recordSchema
)
))
));
return new ConvertedSalesforceSchema(querySObjectResultSchema, recordSchema);
} catch (IOException e) {
throw new ProcessException("SObject to Record schema conversion failed", e);
}
}
protected String buildQuery(
String sObject,
String fields,
String customWhereClause,
String ageField,
String initialAgeFilter,
String ageFilterLower,
String ageFilterUpper
) {
StringBuilder queryBuilder = new StringBuilder("SELECT ")
.append(fields)
.append(" FROM ")
.append(sObject);
List<String> whereItems = new ArrayList<>();
if (customWhereClause != null) {
whereItems.add("( " + customWhereClause + " )");
}
if (ageField != null) {
if (ageFilterLower != null) {
whereItems.add(ageField + " >= " + ageFilterLower);
} else if (initialAgeFilter != null) {
whereItems.add(ageField + " >= " + initialAgeFilter);
}
whereItems.add(ageField + " < " + ageFilterUpper);
}
if (!whereItems.isEmpty()) {
String finalWhereClause = String.join(" AND ", whereItems);
queryBuilder.append(" WHERE ").append(finalWhereClause);
}
return queryBuilder.toString();
}
static class ConvertedSalesforceSchema {
RecordSchema querySObjectResultSchema;
RecordSchema recordSchema;
public ConvertedSalesforceSchema(RecordSchema querySObjectResultSchema, RecordSchema recordSchema) {
this.querySObjectResultSchema = querySObjectResultSchema;
this.recordSchema = recordSchema;
}
}
}

View File

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.salesforce.util;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.apache.nifi.processor.exception.ProcessException;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
public class SalesforceRestService {
private final String version;
private final String baseUrl;
private final Supplier<String> accessTokenProvider;
private final OkHttpClient httpClient;
public SalesforceRestService(String version, String baseUrl, Supplier<String> accessTokenProvider, int responseTimeoutMillis) {
this.version = version;
this.baseUrl = baseUrl;
this.accessTokenProvider = accessTokenProvider;
httpClient = new OkHttpClient.Builder()
.readTimeout(responseTimeoutMillis, TimeUnit.MILLISECONDS)
.build();
}
public InputStream describeSObject(String sObject) {
String url = baseUrl + "/services/data/v" + version + "/sobjects/" + sObject + "/describe?maxRecords=1";
Request request = new Request.Builder()
.addHeader("Authorization", "Bearer " + accessTokenProvider.get())
.url(url)
.get()
.build();
return request(request);
}
public InputStream query(String query) {
String url = baseUrl + "/services/data/v" + version + "/query";
HttpUrl httpUrl = HttpUrl.get(url).newBuilder()
.addQueryParameter("q", query)
.build();
Request request = new Request.Builder()
.addHeader("Authorization", "Bearer " + accessTokenProvider.get())
.url(httpUrl)
.get()
.build();
return request(request);
}
private InputStream request(Request request) {
Response response = null;
try {
response = httpClient.newCall(request).execute();
if (response.code() != 200) {
throw new ProcessException("Invalid response" +
" Code: " + response.code() +
" Message: " + response.message() +
" Body: " + (response.body() == null ? null : response.body().string())
);
}
return response.body().byteStream();
} catch (Exception e) {
if (response != null) {
response.close();
}
throw new ProcessException(String.format("Salesforce HTTP request failed [%s]", request.url()), e);
}
}
}

View File

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.salesforce.util;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.camel.component.salesforce.api.dto.SObjectDescription;
import org.apache.camel.component.salesforce.api.dto.SObjectField;
import org.apache.camel.component.salesforce.api.utils.JsonUtils;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class SalesforceToRecordSchemaConverter {
private final String dateFormat;
private final String dateTimeFormat;
private final String timeFormat;
private final ObjectMapper objectMapper;
public SalesforceToRecordSchemaConverter(String dateFormat, String dateTimeFormat, String timeFormat) {
this.dateFormat = dateFormat;
this.dateTimeFormat = dateTimeFormat;
this.timeFormat = timeFormat;
objectMapper = JsonUtils.createObjectMapper();
}
public RecordSchema convertSchema(final InputStream describeSOjbectResultJsonString, final String fieldNamesOfInterest) throws IOException {
final SObjectDescription sObjectDescription = objectMapper.readValue(describeSOjbectResultJsonString, SObjectDescription.class);
final List<String> listOfFieldNamesOfInterest = Arrays.asList(fieldNamesOfInterest.toLowerCase().split("\\s*,\\s*"));
final List<SObjectField> fields = sObjectDescription.getFields()
.stream()
.filter(sObjectField -> listOfFieldNamesOfInterest.contains(sObjectField.getName().toLowerCase()))
.collect(Collectors.toList());
final List<RecordField> recordFields = new ArrayList<>();
for (SObjectField field : fields) {
final String soapType = field.getSoapType();
switch (soapType.substring(soapType.indexOf(':') + 1)) {
case "ID":
case "string":
case "json":
case "base64Binary":
case "anyType":
recordFields.add(new RecordField(field.getName(), RecordFieldType.STRING.getDataType(), field.getDefaultValue(), field.isNillable()));
break;
case "int":
recordFields.add(new RecordField(field.getName(), RecordFieldType.INT.getDataType(), field.getDefaultValue(), field.isNillable()));
break;
case "long":
recordFields.add(new RecordField(field.getName(), RecordFieldType.LONG.getDataType(), field.getDefaultValue(), field.isNillable()));
break;
case "double":
recordFields.add(new RecordField(field.getName(), RecordFieldType.DOUBLE.getDataType(), field.getDefaultValue(), field.isNillable()));
break;
case "boolean":
recordFields.add(new RecordField(field.getName(), RecordFieldType.BOOLEAN.getDataType(), field.getDefaultValue(), field.isNillable()));
break;
case "date":
recordFields.add(new RecordField(field.getName(), RecordFieldType.DATE.getDataType(dateFormat), field.getDefaultValue(), field.isNillable()));
break;
case "dateTime":
recordFields.add(new RecordField(field.getName(), RecordFieldType.TIMESTAMP.getDataType(dateTimeFormat), field.getDefaultValue(), field.isNillable()));
break;
case "time":
recordFields.add(new RecordField(field.getName(), RecordFieldType.TIME.getDataType(timeFormat), field.getDefaultValue(), field.isNillable()));
break;
case "address":
final RecordSchema addressSchema = new SimpleRecordSchema(Arrays.asList(
new RecordField("city", RecordFieldType.STRING.getDataType(), true),
new RecordField("country", RecordFieldType.STRING.getDataType(), true),
new RecordField("countryCode", RecordFieldType.STRING.getDataType(), true),
new RecordField("postalCode", RecordFieldType.STRING.getDataType(), true),
new RecordField("state", RecordFieldType.STRING.getDataType(), true),
new RecordField("stateCode", RecordFieldType.STRING.getDataType(), true),
new RecordField("street", RecordFieldType.STRING.getDataType(), true),
new RecordField("geocodeAccuracy", RecordFieldType.STRING.getDataType(), true)
));
recordFields.add(new RecordField(field.getName(), RecordFieldType.RECORD.getRecordDataType(addressSchema), field.getDefaultValue(), field.isNillable()));
break;
case "location":
final RecordSchema locationSchema = new SimpleRecordSchema(Arrays.asList(
new RecordField("latitude", RecordFieldType.STRING.getDataType(), true),
new RecordField("longitude", RecordFieldType.STRING.getDataType(), true)
));
recordFields.add(new RecordField(field.getName(), RecordFieldType.RECORD.getRecordDataType(locationSchema), field.getDefaultValue(), field.isNillable()));
break;
default:
throw new IllegalArgumentException(String.format("Could not create determine schema for '%s'. Could not convert field '%s' of soap type '%s'.",
sObjectDescription.getName(), field.getName(), soapType));
}
}
return new SimpleRecordSchema(recordFields);
}
}

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.salesforce.QuerySalesforceObject

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.salesforce;
import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.salesforce.util.SalesforceConfigAware;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertNotNull;
/**
* Set constants in {@link SalesforceConfigAware}
*/
class QuerySalesforceObjectIT implements SalesforceConfigAware {
private TestRunner runner;
@BeforeEach
void setUp() throws Exception {
Processor querySObject = new QuerySalesforceObject();
runner = TestRunners.newTestRunner(querySObject);
StandardOauth2AccessTokenProvider oauth2AccessTokenProvider = initOAuth2AccessTokenProvider(runner);
runner.setProperty(QuerySalesforceObject.TOKEN_PROVIDER, oauth2AccessTokenProvider.getIdentifier());
}
@AfterEach
void tearDown() {
runner.shutdown();
}
@Test
void retrievesAndWritesRecords() throws Exception {
String sObjectName = "Account";
String fieldNames = "Id,name,CreatedDate";
RecordSetWriterFactory writer = new MockRecordWriter();
runner.addControllerService("writer", writer);
runner.enableControllerService(writer);
runner.setProperty(QuerySalesforceObject.SOBJECT_NAME, sObjectName);
runner.setProperty(QuerySalesforceObject.FIELD_NAMES, fieldNames);
runner.setProperty(QuerySalesforceObject.API_VERSION, VERSION);
runner.setProperty(QuerySalesforceObject.API_URL, BASE_URL);
runner.setProperty(QuerySalesforceObject.RECORD_WRITER, writer.getIdentifier());
runner.setProperty(QuerySalesforceObject.AGE_FIELD, "CreatedDate");
runner.setProperty(QuerySalesforceObject.INITIAL_AGE_FILTER, "2022-01-06T08:43:24.000+0000");
runner.run();
List<MockFlowFile> results = runner.getFlowFilesForRelationship(QuerySalesforceObject.REL_SUCCESS);
assertNotNull(results.get(0).getContent());
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.salesforce.util;
import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
/**
* Set the following constants:<p>
* VERSION<p>
* BASE_URL<p>
* USERNAME<p>
* PASSWORD<p>
* CLIENT_ID<p>
* CLIENT_SECRET<p>
*/
public interface SalesforceConfigAware {
String VERSION = "54.0";
String BASE_URL = "https://MyDomainName.my.salesforce.com";
String AUTHORIZATION_SERVER_URL = BASE_URL + "/services/oauth2/token";
String USERNAME = "???";
String PASSWORD = "???";
String CLIENT_ID = "???";
String CLIENT_SECRET = "???";
default StandardOauth2AccessTokenProvider initOAuth2AccessTokenProvider(TestRunner runner) throws InitializationException {
StandardOauth2AccessTokenProvider oauth2AccessTokenProvider = new StandardOauth2AccessTokenProvider();
runner.addControllerService("oauth2AccessTokenProvider", oauth2AccessTokenProvider);
runner.setProperty(oauth2AccessTokenProvider, StandardOauth2AccessTokenProvider.AUTHORIZATION_SERVER_URL, AUTHORIZATION_SERVER_URL);
runner.setProperty(oauth2AccessTokenProvider, StandardOauth2AccessTokenProvider.USERNAME, USERNAME);
runner.setProperty(oauth2AccessTokenProvider, StandardOauth2AccessTokenProvider.PASSWORD, PASSWORD);
runner.setProperty(oauth2AccessTokenProvider, StandardOauth2AccessTokenProvider.CLIENT_ID, CLIENT_ID);
runner.setProperty(oauth2AccessTokenProvider, StandardOauth2AccessTokenProvider.CLIENT_SECRET, CLIENT_SECRET);
runner.enableControllerService(oauth2AccessTokenProvider);
return oauth2AccessTokenProvider;
}
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.salesforce.util;
import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.io.InputStream;
import static org.junit.jupiter.api.Assertions.assertNotNull;
/**
* Set constants in {@link SalesforceConfigAware}
*/
class SalesforceRestServiceIT implements SalesforceConfigAware {
private TestRunner runner;
private SalesforceRestService testSubject;
@BeforeEach
void setUp() throws Exception {
runner = TestRunners.newTestRunner(new AbstractSessionFactoryProcessor() {
@Override
public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
}
});
StandardOauth2AccessTokenProvider oauth2AccessTokenProvider = initOAuth2AccessTokenProvider(runner);
testSubject = new SalesforceRestService(
VERSION,
BASE_URL,
() -> oauth2AccessTokenProvider.getAccessDetails().getAccessToken(),
5_000
);
}
@AfterEach
void tearDown() {
runner.shutdown();
}
@Test
void describeSObjectSucceeds() throws IOException {
try (InputStream describeSObjectResultJson = testSubject.describeSObject("Account")) {
assertNotNull(describeSObjectResultJson);
}
}
@Test
void querySucceeds() throws IOException {
String query = "SELECT id,BillingAddress FROM Account";
try (InputStream querySObjectRecordsResultJson = testSubject.query(query)) {
assertNotNull(querySObjectRecordsResultJson);
}
}
}

View File

@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.salesforce.util;
import com.fasterxml.jackson.databind.exc.MismatchedInputException;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.junit.jupiter.api.Test;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collections;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
class SalesforceToRecordSchemaConverterTest {
private static final String TEST_PATH = "src/test/resources/converter/";
private static final String DATE_FORMAT = "yyyy-mm-dd";
private static final String TIME_STAMP_FORMAT = "yyyy-mm-dd / hh:mm:ss";
private static final String TIME_FORMAT = "hh:mm:ss";
private static final SalesforceToRecordSchemaConverter converter = new SalesforceToRecordSchemaConverter(DATE_FORMAT, TIME_STAMP_FORMAT, TIME_FORMAT);
@Test
void testConvertSchema() throws IOException {
final String salesforceSchemaFileName = "simple_sf_schema.json";
final String fieldNames = "ExampleInt,ExampleLong,ExampleDouble,ExampleBoolean," +
"ExampleID,ExampleString,ExampleJson,ExampleBase64Binary,ExampleAnyType," +
"ExampleDate,ExampleDateTime,ExampleTime";
final RecordSchema expected = new SimpleRecordSchema(Arrays.asList(
// primitives
new RecordField("ExampleInt", RecordFieldType.INT.getDataType()),
new RecordField("ExampleLong", RecordFieldType.LONG.getDataType()),
new RecordField("ExampleDouble", RecordFieldType.DOUBLE.getDataType()),
new RecordField("ExampleBoolean", RecordFieldType.BOOLEAN.getDataType()),
// string types
new RecordField("ExampleID", RecordFieldType.STRING.getDataType()),
new RecordField("ExampleString", RecordFieldType.STRING.getDataType()),
new RecordField("ExampleJson", RecordFieldType.STRING.getDataType()),
new RecordField("ExampleBase64Binary", RecordFieldType.STRING.getDataType()),
new RecordField("ExampleAnyType", RecordFieldType.STRING.getDataType()),
// date types
new RecordField("ExampleDate", RecordFieldType.DATE.getDataType(DATE_FORMAT)),
new RecordField("ExampleDateTime", RecordFieldType.TIMESTAMP.getDataType(TIME_STAMP_FORMAT)),
new RecordField("ExampleTime", RecordFieldType.TIME.getDataType(TIME_FORMAT))
));
try (final InputStream sfSchema = readFile(TEST_PATH + salesforceSchemaFileName)) {
final RecordSchema actual = converter.convertSchema(sfSchema, fieldNames);
assertEquals(expected, actual);
}
}
@Test
void testConvertComplexTypes() throws IOException {
final String salesforceSchemaFileName = "complex_sf_schema.json";
final String fieldNames = "ExampleAddress,ExampleLocation";
final RecordSchema addressSchema = new SimpleRecordSchema(Arrays.asList(
new RecordField("city", RecordFieldType.STRING.getDataType(), true),
new RecordField("country", RecordFieldType.STRING.getDataType(), true),
new RecordField("countryCode", RecordFieldType.STRING.getDataType(), true),
new RecordField("postalCode", RecordFieldType.STRING.getDataType(), true),
new RecordField("state", RecordFieldType.STRING.getDataType(), true),
new RecordField("stateCode", RecordFieldType.STRING.getDataType(), true),
new RecordField("street", RecordFieldType.STRING.getDataType(), true),
new RecordField("geocodeAccuracy", RecordFieldType.STRING.getDataType(), true)
));
final RecordSchema locationSchema = new SimpleRecordSchema(Arrays.asList(
new RecordField("latitude", RecordFieldType.STRING.getDataType(), true),
new RecordField("longitude", RecordFieldType.STRING.getDataType(), true)
));
RecordSchema expected = new SimpleRecordSchema(Arrays.asList(
new RecordField("ExampleAddress", RecordFieldType.RECORD.getRecordDataType(addressSchema)),
new RecordField("ExampleLocation", RecordFieldType.RECORD.getRecordDataType(locationSchema))
));
try (final InputStream sfSchema = readFile(TEST_PATH + salesforceSchemaFileName)) {
final RecordSchema actual = converter.convertSchema(sfSchema, fieldNames);
assertEquals(expected, actual);
}
}
@Test
void testSelectFields() throws IOException {
final String salesforceSchemaFileName = "simple_sf_schema.json";
final String fieldNames = "ExampleInt,ExampleTime";
final RecordSchema expected = new SimpleRecordSchema(Arrays.asList(
new RecordField("ExampleInt", RecordFieldType.INT.getDataType()),
new RecordField("ExampleTime", RecordFieldType.TIME.getDataType(TIME_FORMAT))
));
try (final InputStream sfSchema = readFile(TEST_PATH + salesforceSchemaFileName)) {
final RecordSchema actual = converter.convertSchema(sfSchema, fieldNames);
assertEquals(expected, actual);
}
}
@Test
void testSelectEmptyFields() throws IOException {
final String salesforceSchemaFileName = "simple_sf_schema.json";
final String fieldNames = "";
final RecordSchema expected = new SimpleRecordSchema(Collections.emptyList());
try (final InputStream sfSchema = readFile(TEST_PATH + salesforceSchemaFileName)) {
final RecordSchema actual = converter.convertSchema(sfSchema, fieldNames);
assertEquals(expected, actual);
}
}
@Test
void testConvertEmptySchema() throws IOException {
try (final InputStream sfSchema = IOUtils.toInputStream("", Charset.defaultCharset())) {
assertThrows(MismatchedInputException.class, () -> converter.convertSchema(sfSchema, "ExampleField"));
}
}
@Test
void testConvertNullSchema() {
final InputStream sfSchema = null;
assertThrows(IllegalArgumentException.class, () -> converter.convertSchema(sfSchema, "ExampleField"));
}
@Test
void testConvertUnknownDataType() throws IOException {
try (final InputStream sfSchema = readFile(TEST_PATH + "unknown_type_sf_schema.json")) {
final String fieldNames = "FieldWithUnknownType";
final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> converter.convertSchema(sfSchema, fieldNames));
final String errorMessage = "Could not create determine schema for 'SObjectWithUnknownFieldType'. Could not convert field 'FieldWithUnknownType' of soap type 'xsd:unknown'.";
assertEquals(errorMessage, exception.getMessage());
}
}
private InputStream readFile(final String path) throws IOException {
return new FileInputStream(path);
}
}

View File

@ -0,0 +1,16 @@
{
"fields": [
{
"defaultValue": null,
"name": "ExampleAddress",
"nillable": true,
"soapType": "urn:address"
},
{
"defaultValue": null,
"name": "ExampleLocation",
"nillable": true,
"soapType": "tns:location"
}
]
}

View File

@ -0,0 +1,76 @@
{
"fields": [
{
"defaultValue": null,
"name": "ExampleInt",
"nillable": true,
"soapType": "xns:int"
},
{
"defaultValue": null,
"name": "ExampleLong",
"nillable": true,
"soapType": "xns:long"
},
{
"defaultValue": null,
"name": "ExampleDouble",
"nillable": true,
"soapType": "xns:double"
},
{
"defaultValue": null,
"name": "ExampleBoolean",
"nillable": true,
"soapType": "xns:boolean"
},
{
"defaultValue": null,
"name": "ExampleID",
"nillable": true,
"soapType": "xns:ID"
},
{
"defaultValue": null,
"name": "ExampleString",
"nillable": true,
"soapType": "xns:string"
},
{
"defaultValue": null,
"name": "ExampleJson",
"nillable": true,
"soapType": "xns:json"
},
{
"defaultValue": null,
"name": "ExampleBase64Binary",
"nillable": true,
"soapType": "xns:base64Binary"
},
{
"defaultValue": null,
"name": "ExampleAnyType",
"nillable": true,
"soapType": "xns:anyType"
},
{
"defaultValue": null,
"name": "ExampleDate",
"nillable": true,
"soapType": "xsd:date"
},
{
"defaultValue": null,
"name": "ExampleDateTime",
"nillable": true,
"soapType": "xsd:dateTime"
},
{
"defaultValue": null,
"name": "ExampleTime",
"nillable": true,
"soapType": "xsd:time"
}
]
}

View File

@ -0,0 +1,11 @@
{
"name": "SObjectWithUnknownFieldType",
"fields": [
{
"defaultValue": null,
"name": "FieldWithUnknownType",
"nillable": true,
"soapType": "xsd:unknown"
}
]
}

View File

@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-nar-bundles</artifactId>
<groupId>org.apache.nifi</groupId>
<version>1.17.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-salesforce-bundle</artifactId>
<packaging>pom</packaging>
<modules>
<module>nifi-salesforce-nar</module>
<module>nifi-salesforce-processors</module>
</modules>
</project>

View File

@ -110,6 +110,7 @@
<module>nifi-stateless-processor-bundle</module>
<module>nifi-geohash-bundle</module>
<module>nifi-snowflake-bundle</module>
<module>nifi-salesforce-bundle</module>
</modules>
<build>