From 73ee1a9a1e355d31f348a82acdfff4a387ef485b Mon Sep 17 00:00:00 2001 From: Lehel Date: Wed, 23 Feb 2022 16:07:00 +0100 Subject: [PATCH] NIFI-9720 Added QuerySalesforceObject Processor This closes #5802 Signed-off-by: David Handermann --- nifi-assembly/pom.xml | 6 + .../nifi-salesforce-nar/pom.xml | 46 ++ .../nifi-salesforce-processors/pom.xml | 122 +++++ .../salesforce/QuerySalesforceObject.java | 486 ++++++++++++++++++ .../util/SalesforceRestService.java | 91 ++++ .../SalesforceToRecordSchemaConverter.java | 120 +++++ .../org.apache.nifi.processor.Processor | 15 + .../salesforce/QuerySalesforceObjectIT.java | 79 +++ .../util/SalesforceConfigAware.java | 57 ++ .../util/SalesforceRestServiceIT.java | 80 +++ ...SalesforceToRecordSchemaConverterTest.java | 169 ++++++ .../converter/complex_sf_schema.json | 16 + .../resources/converter/simple_sf_schema.json | 76 +++ .../converter/unknown_type_sf_schema.json | 11 + .../nifi-salesforce-bundle/pom.xml | 32 ++ nifi-nar-bundles/pom.xml | 1 + 16 files changed, 1407 insertions(+) create mode 100644 nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-nar/pom.xml create mode 100644 nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/pom.xml create mode 100644 nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java create mode 100644 nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java create mode 100644 nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverter.java create mode 100644 nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor create mode 100644 nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java create mode 100644 nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceConfigAware.java create mode 100644 nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceRestServiceIT.java create mode 100644 nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverterTest.java create mode 100644 nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/resources/converter/complex_sf_schema.json create mode 100644 nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/resources/converter/simple_sf_schema.json create mode 100644 nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/resources/converter/unknown_type_sf_schema.json create mode 100644 nifi-nar-bundles/nifi-salesforce-bundle/pom.xml diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index becd82c40e..0501212288 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -839,6 +839,12 @@ language governing permissions and limitations under the License. --> 1.17.0-SNAPSHOT nar + + org.apache.nifi + nifi-salesforce-nar + 1.17.0-SNAPSHOT + nar + diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-nar/pom.xml b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-nar/pom.xml new file mode 100644 index 0000000000..68baa4d07d --- /dev/null +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-nar/pom.xml @@ -0,0 +1,46 @@ + + + + + nifi-salesforce-bundle + org.apache.nifi + 1.17.0-SNAPSHOT + + 4.0.0 + + nifi-salesforce-nar + + nar + + true + true + + + + org.apache.nifi + nifi-salesforce-processors + 1.17.0-SNAPSHOT + + + org.apache.nifi + nifi-record-serialization-services-nar + 1.17.0-SNAPSHOT + nar + + + diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/pom.xml b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/pom.xml new file mode 100644 index 0000000000..07071778ee --- /dev/null +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/pom.xml @@ -0,0 +1,122 @@ + + + + + nifi-salesforce-bundle + org.apache.nifi + 1.17.0-SNAPSHOT + + 4.0.0 + + nifi-salesforce-processors + + + + org.apache.nifi + nifi-api + + + com.squareup.okhttp3 + okhttp + + + org.apache.nifi + nifi-oauth2-provider-api + 1.17.0-SNAPSHOT + compile + + + org.apache.nifi + nifi-record + compile + + + org.apache.camel + camel-salesforce + 3.14.1 + + + * + * + + + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + ${jackson.version} + + + com.fasterxml.jackson.module + jackson-module-jsonSchema + ${jackson.version} + + + + org.apache.nifi + nifi-record-serialization-service-api + 1.17.0-SNAPSHOT + provided + + + org.apache.nifi + nifi-record-serialization-services + 1.17.0-SNAPSHOT + provided + + + com.squareup.okhttp3 + mockwebserver + test + + + + org.apache.nifi + nifi-mock + 1.17.0-SNAPSHOT + test + + + org.apache.nifi + nifi-oauth2-provider-service + 1.17.0-SNAPSHOT + test + + + org.apache.nifi + nifi-mock-record-utils + 1.17.0-SNAPSHOT + test + + + + + + + org.apache.rat + apache-rat-plugin + + + + src/test/resources/**/* + + + + + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java new file mode 100644 index 0000000000..89d675e1fc --- /dev/null +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java @@ -0,0 +1,486 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.salesforce; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.json.JsonTreeRowRecordReader; +import org.apache.nifi.json.StartingFieldStrategy; +import org.apache.nifi.oauth2.OAuth2AccessTokenProvider; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.salesforce.util.SalesforceRestService; +import org.apache.nifi.processors.salesforce.util.SalesforceToRecordSchemaConverter; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +@PrimaryNodeOnly +@TriggerSerially +@TriggerWhenEmpty +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"salesforce", "sobject", "soql", "query"}) +@CapabilityDescription("Retrieves records from a Salesforce sObject. Users can add arbitrary filter conditions by setting the 'Custom WHERE Condition' property." + + " Supports incremental retrieval: users can define a field in the 'Age Field' property that will be used to determine when the record was created." + + " When this property is set the processor will retrieve new records. It's also possible to define an initial cutoff value for the age, fitering out all older records" + + " even for the first run. This processor is intended to be run on the Primary Node only." + + " FlowFile attribute 'record.count' indicates how many records were retrieved and written to the output.") +@Stateful(scopes = Scope.CLUSTER, description = "When 'Age Field' is set, after performing a query the time of execution is stored. Subsequent queries will be augmented" + + " with an additional condition so that only records that are newer than the stored execution time (adjusted with the optional value of 'Age Delay') will be retrieved." + + " State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected," + + " the new node can pick up where the previous node left off, without duplicating the data.") +@WritesAttributes({ + @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer."), + @WritesAttribute(attribute = "record.count", description = "Sets the number of records in the FlowFile.") +}) +public class QuerySalesforceObject extends AbstractProcessor { + + static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder() + .name("salesforce-url") + .displayName("URL") + .description("The URL for the Salesforce REST API including the domain without additional path information, such as https://MyDomainName.my.salesforce.com") + .required(true) + .addValidator(StandardValidators.URL_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor API_VERSION = new PropertyDescriptor.Builder() + .name("salesforce-api-version") + .displayName("API Version") + .description("The version number of the Salesforce REST API appended to the URL after the services/data path. See Salesforce documentation for supported versions") + .required(true) + .addValidator(StandardValidators.NUMBER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .defaultValue("54.0") + .build(); + + static final PropertyDescriptor SOBJECT_NAME = new PropertyDescriptor.Builder() + .name("sobject-name") + .displayName("sObject Name") + .description("The Salesforce sObject to be queried") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor FIELD_NAMES = new PropertyDescriptor.Builder() + .name("field-names") + .displayName("Field Names") + .description("Comma-separated list of field names requested from the sObject to be queried") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor.Builder() + .name("read-timeout") + .displayName("Read Timeout") + .description("Maximum time allowed for reading a response from the Salesforce REST API") + .required(true) + .defaultValue("15 s") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor TOKEN_PROVIDER = new PropertyDescriptor.Builder() + .name("oauth2-access-token-provider") + .displayName("OAuth2 Access Token Provider") + .description("Service providing OAuth2 Access Tokens for authenticating using the HTTP Authorization Header") + .identifiesControllerService(OAuth2AccessTokenProvider.class) + .required(true) + .build(); + + static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("record-writer") + .displayName("Record Writer") + .description("Service used for writing records returned from the Salesforce REST API") + .identifiesControllerService(RecordSetWriterFactory.class) + .required(true) + .build(); + + static final PropertyDescriptor CREATE_ZERO_RECORD_FILES = new PropertyDescriptor.Builder() + .name("create-zero-record-files") + .displayName("Create Zero Record FlowFiles") + .description("Specifies whether or not to create a FlowFile when the Salesforce REST API does not return any records") + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + + static final PropertyDescriptor AGE_FIELD = new PropertyDescriptor.Builder() + .name("age-field") + .displayName("Age Field") + .description("The name of a TIMESTAMP field that will be used to limit all and filter already retrieved records." + + " Only records that are older than the previous run time of this processor will be retrieved." + ) + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor AGE_DELAY = new PropertyDescriptor.Builder() + .name("age-delay") + .displayName("Age Delay") + .description("When 'Age Field' is set the age-based filter will be adjusted by this amount." + + " Only records that are older than the previous run time of this processor, by at least this amount, will be retrieved." + ) + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .dependsOn(AGE_FIELD) + .build(); + + static final PropertyDescriptor INITIAL_AGE_FILTER = new PropertyDescriptor.Builder() + .name("initial-age-filter") + .displayName("Initial Age Filter") + .description("When 'Age Field' is set the value of this property will serve as a filter when this processor runs the first time." + + " Only records that are older than this value be retrieved." + ) + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .dependsOn(AGE_FIELD) + .build(); + + static final PropertyDescriptor CUSTOM_WHERE_CONDITION = new PropertyDescriptor.Builder() + .name("custom-where-condition") + .displayName("Custom WHERE Condition") + .description("A custom expression to be added in the WHERE clause of the query") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("For FlowFiles created as a result of a successful query.") + .build(); + + private static final String LAST_AGE_FILTER = "last_age_filter"; + private static final String STARTING_FIELD_NAME = "records"; + private static final String DATE_FORMAT = "yyyy-MM-dd"; + private static final String TIME_FORMAT = "HH:mm:ss.SSSX"; + private static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZZZZ"; + + private volatile SalesforceToRecordSchemaConverter salesForceToRecordSchemaConverter; + private volatile SalesforceRestService salesforceRestService; + + @OnScheduled + public void onScheduled(final ProcessContext context) { + salesForceToRecordSchemaConverter = new SalesforceToRecordSchemaConverter( + DATE_FORMAT, + DATE_TIME_FORMAT, + TIME_FORMAT + ); + + String salesforceVersion = context.getProperty(API_VERSION).getValue(); + String baseUrl = context.getProperty(API_URL).getValue(); + OAuth2AccessTokenProvider accessTokenProvider = context.getProperty(TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class); + + salesforceRestService = new SalesforceRestService( + salesforceVersion, + baseUrl, + () -> accessTokenProvider.getAccessDetails().getAccessToken(), + context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue() + ); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return Collections.unmodifiableList(Arrays.asList( + API_URL, + API_VERSION, + SOBJECT_NAME, + FIELD_NAMES, + READ_TIMEOUT, + TOKEN_PROVIDER, + RECORD_WRITER, + CREATE_ZERO_RECORD_FILES, + AGE_FIELD, + INITIAL_AGE_FILTER, + AGE_DELAY, + CUSTOM_WHERE_CONDITION + )); + } + + @Override + public Set getRelationships() { + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + return relationships; + } + + @Override + protected Collection customValidate(ValidationContext validationContext) { + final List results = new ArrayList<>(super.customValidate(validationContext)); + if (validationContext.getProperty(INITIAL_AGE_FILTER).isSet() && !validationContext.getProperty(AGE_FIELD).isSet()) { + results.add( + new ValidationResult.Builder() + .subject(INITIAL_AGE_FILTER.getDisplayName()) + .valid(false) + .explanation("it requires " + AGE_FIELD.getDisplayName() + " also to be set.") + .build() + ); + } + return results; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + String sObject = context.getProperty(SOBJECT_NAME).getValue(); + String fields = context.getProperty(FIELD_NAMES).getValue(); + String customWhereClause = context.getProperty(CUSTOM_WHERE_CONDITION).getValue(); + RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + boolean createZeroRecordFlowFiles = context.getProperty(CREATE_ZERO_RECORD_FILES).asBoolean(); + + String ageField = context.getProperty(AGE_FIELD).getValue(); + String initialAgeFilter = context.getProperty(INITIAL_AGE_FILTER).getValue(); + Long ageDelayMs = context.getProperty(AGE_DELAY).asTimePeriod(TimeUnit.MILLISECONDS); + + String ageFilterLower; + StateMap state; + try { + state = context.getStateManager().getState(Scope.CLUSTER); + ageFilterLower = state.get(LAST_AGE_FILTER); + } catch (IOException e) { + throw new ProcessException("Last Age Filter state retrieval failed", e); + } + + String ageFilterUpper; + if (ageField == null) { + ageFilterUpper = null; + } else { + OffsetDateTime ageFilterUpperTime; + if (ageDelayMs == null) { + ageFilterUpperTime = OffsetDateTime.now(); + } else { + ageFilterUpperTime = OffsetDateTime.now().minus(ageDelayMs, ChronoUnit.MILLIS); + } + ageFilterUpper = ageFilterUpperTime.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME); + } + + ConvertedSalesforceSchema convertedSalesforceSchema = getConvertedSalesforceSchema(sObject, fields); + + String querySObject = buildQuery( + sObject, + fields, + customWhereClause, + ageField, + initialAgeFilter, + ageFilterLower, + ageFilterUpper + ); + + FlowFile flowFile = session.create(); + + Map originalAttributes = flowFile.getAttributes(); + Map attributes = new HashMap<>(); + + AtomicInteger recordCountHolder = new AtomicInteger(); + + flowFile = session.write(flowFile, out -> { + try ( + InputStream querySObjectResultInputStream = salesforceRestService.query(querySObject); + JsonTreeRowRecordReader jsonReader = new JsonTreeRowRecordReader( + querySObjectResultInputStream, + getLogger(), + convertedSalesforceSchema.querySObjectResultSchema, + DATE_FORMAT, + TIME_FORMAT, + DATE_TIME_FORMAT, + StartingFieldStrategy.NESTED_FIELD, + STARTING_FIELD_NAME + ); + + RecordSetWriter writer = writerFactory.createWriter( + getLogger(), + writerFactory.getSchema( + originalAttributes, + convertedSalesforceSchema.recordSchema + ), + out, + originalAttributes + ) + ) { + writer.beginRecordSet(); + + Record querySObjectRecord; + while ((querySObjectRecord = jsonReader.nextRecord()) != null) { + writer.write(querySObjectRecord); + } + + WriteResult writeResult = writer.finishRecordSet(); + + attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); + attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); + attributes.putAll(writeResult.getAttributes()); + + recordCountHolder.set(writeResult.getRecordCount()); + + if (ageFilterUpper != null) { + Map newState = new HashMap<>(state.toMap()); + newState.put(LAST_AGE_FILTER, ageFilterUpper); + updateState(context, newState); + } + } catch (SchemaNotFoundException e) { + throw new ProcessException("Couldn't create record writer", e); + } catch (MalformedRecordException e) { + throw new ProcessException("Couldn't read records from input", e); + } + }); + + int recordCount = recordCountHolder.get(); + + if (!createZeroRecordFlowFiles && recordCount == 0) { + session.remove(flowFile); + } else { + flowFile = session.putAllAttributes(flowFile, attributes); + session.transfer(flowFile, REL_SUCCESS); + + session.adjustCounter("Records Processed", recordCount, false); + getLogger().info("Successfully written {} records for {}", recordCount, flowFile); + } + } + + private ConvertedSalesforceSchema getConvertedSalesforceSchema(String sObject, String fields) { + try (InputStream describeSObjectResult = salesforceRestService.describeSObject(sObject)) { + return convertSchema(describeSObjectResult, fields); + } catch (IOException e) { + throw new UncheckedIOException("Salesforce input stream close failed", e); + } + } + + private void updateState(ProcessContext context, Map newState) { + try { + context.getStateManager().setState(newState, Scope.CLUSTER); + } catch (IOException e) { + throw new ProcessException("Last Age Filter state update failed", e); + } + } + + protected ConvertedSalesforceSchema convertSchema(InputStream describeSObjectResult, String fields) { + try { + RecordSchema recordSchema = salesForceToRecordSchemaConverter.convertSchema(describeSObjectResult, fields); + + RecordSchema querySObjectResultSchema = new SimpleRecordSchema(Collections.singletonList( + new RecordField(STARTING_FIELD_NAME, RecordFieldType.ARRAY.getArrayDataType( + RecordFieldType.RECORD.getRecordDataType( + recordSchema + ) + )) + )); + + return new ConvertedSalesforceSchema(querySObjectResultSchema, recordSchema); + } catch (IOException e) { + throw new ProcessException("SObject to Record schema conversion failed", e); + } + } + + protected String buildQuery( + String sObject, + String fields, + String customWhereClause, + String ageField, + String initialAgeFilter, + String ageFilterLower, + String ageFilterUpper + ) { + StringBuilder queryBuilder = new StringBuilder("SELECT ") + .append(fields) + .append(" FROM ") + .append(sObject); + + List whereItems = new ArrayList<>(); + if (customWhereClause != null) { + whereItems.add("( " + customWhereClause + " )"); + } + + if (ageField != null) { + if (ageFilterLower != null) { + whereItems.add(ageField + " >= " + ageFilterLower); + } else if (initialAgeFilter != null) { + whereItems.add(ageField + " >= " + initialAgeFilter); + } + + whereItems.add(ageField + " < " + ageFilterUpper); + } + + if (!whereItems.isEmpty()) { + String finalWhereClause = String.join(" AND ", whereItems); + queryBuilder.append(" WHERE ").append(finalWhereClause); + } + + return queryBuilder.toString(); + } + + static class ConvertedSalesforceSchema { + RecordSchema querySObjectResultSchema; + RecordSchema recordSchema; + + public ConvertedSalesforceSchema(RecordSchema querySObjectResultSchema, RecordSchema recordSchema) { + this.querySObjectResultSchema = querySObjectResultSchema; + this.recordSchema = recordSchema; + } + } +} diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java new file mode 100644 index 0000000000..bc3f746158 --- /dev/null +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.salesforce.util; + +import okhttp3.HttpUrl; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.apache.nifi.processor.exception.ProcessException; + +import java.io.InputStream; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +public class SalesforceRestService { + private final String version; + private final String baseUrl; + private final Supplier accessTokenProvider; + private final OkHttpClient httpClient; + + public SalesforceRestService(String version, String baseUrl, Supplier accessTokenProvider, int responseTimeoutMillis) { + this.version = version; + this.baseUrl = baseUrl; + this.accessTokenProvider = accessTokenProvider; + httpClient = new OkHttpClient.Builder() + .readTimeout(responseTimeoutMillis, TimeUnit.MILLISECONDS) + .build(); + } + + public InputStream describeSObject(String sObject) { + String url = baseUrl + "/services/data/v" + version + "/sobjects/" + sObject + "/describe?maxRecords=1"; + + Request request = new Request.Builder() + .addHeader("Authorization", "Bearer " + accessTokenProvider.get()) + .url(url) + .get() + .build(); + + return request(request); + } + + public InputStream query(String query) { + String url = baseUrl + "/services/data/v" + version + "/query"; + + HttpUrl httpUrl = HttpUrl.get(url).newBuilder() + .addQueryParameter("q", query) + .build(); + + Request request = new Request.Builder() + .addHeader("Authorization", "Bearer " + accessTokenProvider.get()) + .url(httpUrl) + .get() + .build(); + + return request(request); + } + + private InputStream request(Request request) { + Response response = null; + try { + response = httpClient.newCall(request).execute(); + if (response.code() != 200) { + throw new ProcessException("Invalid response" + + " Code: " + response.code() + + " Message: " + response.message() + + " Body: " + (response.body() == null ? null : response.body().string()) + ); + } + return response.body().byteStream(); + } catch (Exception e) { + if (response != null) { + response.close(); + } + throw new ProcessException(String.format("Salesforce HTTP request failed [%s]", request.url()), e); + } + } +} diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverter.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverter.java new file mode 100644 index 0000000000..f4396415ea --- /dev/null +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverter.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.salesforce.util; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.camel.component.salesforce.api.dto.SObjectDescription; +import org.apache.camel.component.salesforce.api.dto.SObjectField; +import org.apache.camel.component.salesforce.api.utils.JsonUtils; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class SalesforceToRecordSchemaConverter { + + private final String dateFormat; + private final String dateTimeFormat; + private final String timeFormat; + private final ObjectMapper objectMapper; + + public SalesforceToRecordSchemaConverter(String dateFormat, String dateTimeFormat, String timeFormat) { + this.dateFormat = dateFormat; + this.dateTimeFormat = dateTimeFormat; + this.timeFormat = timeFormat; + objectMapper = JsonUtils.createObjectMapper(); + } + + public RecordSchema convertSchema(final InputStream describeSOjbectResultJsonString, final String fieldNamesOfInterest) throws IOException { + + final SObjectDescription sObjectDescription = objectMapper.readValue(describeSOjbectResultJsonString, SObjectDescription.class); + final List listOfFieldNamesOfInterest = Arrays.asList(fieldNamesOfInterest.toLowerCase().split("\\s*,\\s*")); + final List fields = sObjectDescription.getFields() + .stream() + .filter(sObjectField -> listOfFieldNamesOfInterest.contains(sObjectField.getName().toLowerCase())) + .collect(Collectors.toList()); + + final List recordFields = new ArrayList<>(); + + for (SObjectField field : fields) { + final String soapType = field.getSoapType(); + + switch (soapType.substring(soapType.indexOf(':') + 1)) { + case "ID": + case "string": + case "json": + case "base64Binary": + case "anyType": + recordFields.add(new RecordField(field.getName(), RecordFieldType.STRING.getDataType(), field.getDefaultValue(), field.isNillable())); + break; + case "int": + recordFields.add(new RecordField(field.getName(), RecordFieldType.INT.getDataType(), field.getDefaultValue(), field.isNillable())); + break; + case "long": + recordFields.add(new RecordField(field.getName(), RecordFieldType.LONG.getDataType(), field.getDefaultValue(), field.isNillable())); + break; + case "double": + recordFields.add(new RecordField(field.getName(), RecordFieldType.DOUBLE.getDataType(), field.getDefaultValue(), field.isNillable())); + break; + case "boolean": + recordFields.add(new RecordField(field.getName(), RecordFieldType.BOOLEAN.getDataType(), field.getDefaultValue(), field.isNillable())); + break; + case "date": + recordFields.add(new RecordField(field.getName(), RecordFieldType.DATE.getDataType(dateFormat), field.getDefaultValue(), field.isNillable())); + break; + case "dateTime": + recordFields.add(new RecordField(field.getName(), RecordFieldType.TIMESTAMP.getDataType(dateTimeFormat), field.getDefaultValue(), field.isNillable())); + break; + case "time": + recordFields.add(new RecordField(field.getName(), RecordFieldType.TIME.getDataType(timeFormat), field.getDefaultValue(), field.isNillable())); + break; + case "address": + final RecordSchema addressSchema = new SimpleRecordSchema(Arrays.asList( + new RecordField("city", RecordFieldType.STRING.getDataType(), true), + new RecordField("country", RecordFieldType.STRING.getDataType(), true), + new RecordField("countryCode", RecordFieldType.STRING.getDataType(), true), + new RecordField("postalCode", RecordFieldType.STRING.getDataType(), true), + new RecordField("state", RecordFieldType.STRING.getDataType(), true), + new RecordField("stateCode", RecordFieldType.STRING.getDataType(), true), + new RecordField("street", RecordFieldType.STRING.getDataType(), true), + new RecordField("geocodeAccuracy", RecordFieldType.STRING.getDataType(), true) + )); + recordFields.add(new RecordField(field.getName(), RecordFieldType.RECORD.getRecordDataType(addressSchema), field.getDefaultValue(), field.isNillable())); + break; + case "location": + final RecordSchema locationSchema = new SimpleRecordSchema(Arrays.asList( + new RecordField("latitude", RecordFieldType.STRING.getDataType(), true), + new RecordField("longitude", RecordFieldType.STRING.getDataType(), true) + )); + recordFields.add(new RecordField(field.getName(), RecordFieldType.RECORD.getRecordDataType(locationSchema), field.getDefaultValue(), field.isNillable())); + break; + default: + throw new IllegalArgumentException(String.format("Could not create determine schema for '%s'. Could not convert field '%s' of soap type '%s'.", + sObjectDescription.getName(), field.getName(), soapType)); + } + } + + return new SimpleRecordSchema(recordFields); + } +} diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000000..503138fb69 --- /dev/null +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.salesforce.QuerySalesforceObject diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java new file mode 100644 index 0000000000..547b99d54c --- /dev/null +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.salesforce; + +import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processors.salesforce.util.SalesforceConfigAware; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.MockRecordWriter; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +/** + * Set constants in {@link SalesforceConfigAware} + */ +class QuerySalesforceObjectIT implements SalesforceConfigAware { + private TestRunner runner; + + @BeforeEach + void setUp() throws Exception { + Processor querySObject = new QuerySalesforceObject(); + + runner = TestRunners.newTestRunner(querySObject); + + StandardOauth2AccessTokenProvider oauth2AccessTokenProvider = initOAuth2AccessTokenProvider(runner); + runner.setProperty(QuerySalesforceObject.TOKEN_PROVIDER, oauth2AccessTokenProvider.getIdentifier()); + } + + @AfterEach + void tearDown() { + runner.shutdown(); + } + + @Test + void retrievesAndWritesRecords() throws Exception { + String sObjectName = "Account"; + String fieldNames = "Id,name,CreatedDate"; + + RecordSetWriterFactory writer = new MockRecordWriter(); + runner.addControllerService("writer", writer); + runner.enableControllerService(writer); + + runner.setProperty(QuerySalesforceObject.SOBJECT_NAME, sObjectName); + runner.setProperty(QuerySalesforceObject.FIELD_NAMES, fieldNames); + runner.setProperty(QuerySalesforceObject.API_VERSION, VERSION); + runner.setProperty(QuerySalesforceObject.API_URL, BASE_URL); + runner.setProperty(QuerySalesforceObject.RECORD_WRITER, writer.getIdentifier()); + runner.setProperty(QuerySalesforceObject.AGE_FIELD, "CreatedDate"); + runner.setProperty(QuerySalesforceObject.INITIAL_AGE_FILTER, "2022-01-06T08:43:24.000+0000"); + + runner.run(); + + List results = runner.getFlowFilesForRelationship(QuerySalesforceObject.REL_SUCCESS); + + assertNotNull(results.get(0).getContent()); + } +} diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceConfigAware.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceConfigAware.java new file mode 100644 index 0000000000..c5b73e5897 --- /dev/null +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceConfigAware.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.salesforce.util; + +import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; + +/** + * Set the following constants:

+ * VERSION

+ * BASE_URL

+ * USERNAME

+ * PASSWORD

+ * CLIENT_ID

+ * CLIENT_SECRET

+ */ +public interface SalesforceConfigAware { + String VERSION = "54.0"; + String BASE_URL = "https://MyDomainName.my.salesforce.com"; + + String AUTHORIZATION_SERVER_URL = BASE_URL + "/services/oauth2/token"; + String USERNAME = "???"; + String PASSWORD = "???"; + String CLIENT_ID = "???"; + String CLIENT_SECRET = "???"; + + default StandardOauth2AccessTokenProvider initOAuth2AccessTokenProvider(TestRunner runner) throws InitializationException { + StandardOauth2AccessTokenProvider oauth2AccessTokenProvider = new StandardOauth2AccessTokenProvider(); + + runner.addControllerService("oauth2AccessTokenProvider", oauth2AccessTokenProvider); + + runner.setProperty(oauth2AccessTokenProvider, StandardOauth2AccessTokenProvider.AUTHORIZATION_SERVER_URL, AUTHORIZATION_SERVER_URL); + runner.setProperty(oauth2AccessTokenProvider, StandardOauth2AccessTokenProvider.USERNAME, USERNAME); + runner.setProperty(oauth2AccessTokenProvider, StandardOauth2AccessTokenProvider.PASSWORD, PASSWORD); + runner.setProperty(oauth2AccessTokenProvider, StandardOauth2AccessTokenProvider.CLIENT_ID, CLIENT_ID); + runner.setProperty(oauth2AccessTokenProvider, StandardOauth2AccessTokenProvider.CLIENT_SECRET, CLIENT_SECRET); + + runner.enableControllerService(oauth2AccessTokenProvider); + + return oauth2AccessTokenProvider; + } +} diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceRestServiceIT.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceRestServiceIT.java new file mode 100644 index 0000000000..a82b9ac863 --- /dev/null +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceRestServiceIT.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.salesforce.util; + +import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.InputStream; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +/** + * Set constants in {@link SalesforceConfigAware} + */ +class SalesforceRestServiceIT implements SalesforceConfigAware { + private TestRunner runner; + private SalesforceRestService testSubject; + + @BeforeEach + void setUp() throws Exception { + runner = TestRunners.newTestRunner(new AbstractSessionFactoryProcessor() { + @Override + public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { + } + }); + + StandardOauth2AccessTokenProvider oauth2AccessTokenProvider = initOAuth2AccessTokenProvider(runner); + + testSubject = new SalesforceRestService( + VERSION, + BASE_URL, + () -> oauth2AccessTokenProvider.getAccessDetails().getAccessToken(), + 5_000 + ); + } + + @AfterEach + void tearDown() { + runner.shutdown(); + } + + @Test + void describeSObjectSucceeds() throws IOException { + try (InputStream describeSObjectResultJson = testSubject.describeSObject("Account")) { + assertNotNull(describeSObjectResultJson); + } + } + + @Test + void querySucceeds() throws IOException { + String query = "SELECT id,BillingAddress FROM Account"; + + try (InputStream querySObjectRecordsResultJson = testSubject.query(query)) { + assertNotNull(querySObjectRecordsResultJson); + } + } +} diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverterTest.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverterTest.java new file mode 100644 index 0000000000..f65506a0aa --- /dev/null +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverterTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.salesforce.util; + +import com.fasterxml.jackson.databind.exc.MismatchedInputException; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.junit.jupiter.api.Test; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class SalesforceToRecordSchemaConverterTest { + + private static final String TEST_PATH = "src/test/resources/converter/"; + private static final String DATE_FORMAT = "yyyy-mm-dd"; + private static final String TIME_STAMP_FORMAT = "yyyy-mm-dd / hh:mm:ss"; + private static final String TIME_FORMAT = "hh:mm:ss"; + + private static final SalesforceToRecordSchemaConverter converter = new SalesforceToRecordSchemaConverter(DATE_FORMAT, TIME_STAMP_FORMAT, TIME_FORMAT); + + @Test + void testConvertSchema() throws IOException { + final String salesforceSchemaFileName = "simple_sf_schema.json"; + final String fieldNames = "ExampleInt,ExampleLong,ExampleDouble,ExampleBoolean," + + "ExampleID,ExampleString,ExampleJson,ExampleBase64Binary,ExampleAnyType," + + "ExampleDate,ExampleDateTime,ExampleTime"; + + final RecordSchema expected = new SimpleRecordSchema(Arrays.asList( + // primitives + new RecordField("ExampleInt", RecordFieldType.INT.getDataType()), + new RecordField("ExampleLong", RecordFieldType.LONG.getDataType()), + new RecordField("ExampleDouble", RecordFieldType.DOUBLE.getDataType()), + new RecordField("ExampleBoolean", RecordFieldType.BOOLEAN.getDataType()), + // string types + new RecordField("ExampleID", RecordFieldType.STRING.getDataType()), + new RecordField("ExampleString", RecordFieldType.STRING.getDataType()), + new RecordField("ExampleJson", RecordFieldType.STRING.getDataType()), + new RecordField("ExampleBase64Binary", RecordFieldType.STRING.getDataType()), + new RecordField("ExampleAnyType", RecordFieldType.STRING.getDataType()), + // date types + new RecordField("ExampleDate", RecordFieldType.DATE.getDataType(DATE_FORMAT)), + new RecordField("ExampleDateTime", RecordFieldType.TIMESTAMP.getDataType(TIME_STAMP_FORMAT)), + new RecordField("ExampleTime", RecordFieldType.TIME.getDataType(TIME_FORMAT)) + )); + + try (final InputStream sfSchema = readFile(TEST_PATH + salesforceSchemaFileName)) { + final RecordSchema actual = converter.convertSchema(sfSchema, fieldNames); + + assertEquals(expected, actual); + } + } + + @Test + void testConvertComplexTypes() throws IOException { + final String salesforceSchemaFileName = "complex_sf_schema.json"; + final String fieldNames = "ExampleAddress,ExampleLocation"; + + final RecordSchema addressSchema = new SimpleRecordSchema(Arrays.asList( + new RecordField("city", RecordFieldType.STRING.getDataType(), true), + new RecordField("country", RecordFieldType.STRING.getDataType(), true), + new RecordField("countryCode", RecordFieldType.STRING.getDataType(), true), + new RecordField("postalCode", RecordFieldType.STRING.getDataType(), true), + new RecordField("state", RecordFieldType.STRING.getDataType(), true), + new RecordField("stateCode", RecordFieldType.STRING.getDataType(), true), + new RecordField("street", RecordFieldType.STRING.getDataType(), true), + new RecordField("geocodeAccuracy", RecordFieldType.STRING.getDataType(), true) + )); + + final RecordSchema locationSchema = new SimpleRecordSchema(Arrays.asList( + new RecordField("latitude", RecordFieldType.STRING.getDataType(), true), + new RecordField("longitude", RecordFieldType.STRING.getDataType(), true) + )); + + RecordSchema expected = new SimpleRecordSchema(Arrays.asList( + new RecordField("ExampleAddress", RecordFieldType.RECORD.getRecordDataType(addressSchema)), + new RecordField("ExampleLocation", RecordFieldType.RECORD.getRecordDataType(locationSchema)) + )); + + try (final InputStream sfSchema = readFile(TEST_PATH + salesforceSchemaFileName)) { + final RecordSchema actual = converter.convertSchema(sfSchema, fieldNames); + + assertEquals(expected, actual); + } + } + + @Test + void testSelectFields() throws IOException { + final String salesforceSchemaFileName = "simple_sf_schema.json"; + final String fieldNames = "ExampleInt,ExampleTime"; + + final RecordSchema expected = new SimpleRecordSchema(Arrays.asList( + new RecordField("ExampleInt", RecordFieldType.INT.getDataType()), + new RecordField("ExampleTime", RecordFieldType.TIME.getDataType(TIME_FORMAT)) + )); + + try (final InputStream sfSchema = readFile(TEST_PATH + salesforceSchemaFileName)) { + final RecordSchema actual = converter.convertSchema(sfSchema, fieldNames); + + assertEquals(expected, actual); + } + } + + @Test + void testSelectEmptyFields() throws IOException { + final String salesforceSchemaFileName = "simple_sf_schema.json"; + final String fieldNames = ""; + + final RecordSchema expected = new SimpleRecordSchema(Collections.emptyList()); + + try (final InputStream sfSchema = readFile(TEST_PATH + salesforceSchemaFileName)) { + final RecordSchema actual = converter.convertSchema(sfSchema, fieldNames); + + assertEquals(expected, actual); + } + } + + @Test + void testConvertEmptySchema() throws IOException { + try (final InputStream sfSchema = IOUtils.toInputStream("", Charset.defaultCharset())) { + assertThrows(MismatchedInputException.class, () -> converter.convertSchema(sfSchema, "ExampleField")); + } + } + + @Test + void testConvertNullSchema() { + final InputStream sfSchema = null; + assertThrows(IllegalArgumentException.class, () -> converter.convertSchema(sfSchema, "ExampleField")); + } + + @Test + void testConvertUnknownDataType() throws IOException { + try (final InputStream sfSchema = readFile(TEST_PATH + "unknown_type_sf_schema.json")) { + final String fieldNames = "FieldWithUnknownType"; + final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> converter.convertSchema(sfSchema, fieldNames)); + final String errorMessage = "Could not create determine schema for 'SObjectWithUnknownFieldType'. Could not convert field 'FieldWithUnknownType' of soap type 'xsd:unknown'."; + assertEquals(errorMessage, exception.getMessage()); + } + } + + private InputStream readFile(final String path) throws IOException { + return new FileInputStream(path); + } + +} diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/resources/converter/complex_sf_schema.json b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/resources/converter/complex_sf_schema.json new file mode 100644 index 0000000000..cc62d1b51f --- /dev/null +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/resources/converter/complex_sf_schema.json @@ -0,0 +1,16 @@ +{ + "fields": [ + { + "defaultValue": null, + "name": "ExampleAddress", + "nillable": true, + "soapType": "urn:address" + }, + { + "defaultValue": null, + "name": "ExampleLocation", + "nillable": true, + "soapType": "tns:location" + } + ] +} diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/resources/converter/simple_sf_schema.json b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/resources/converter/simple_sf_schema.json new file mode 100644 index 0000000000..82073792c6 --- /dev/null +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/resources/converter/simple_sf_schema.json @@ -0,0 +1,76 @@ +{ + "fields": [ + { + "defaultValue": null, + "name": "ExampleInt", + "nillable": true, + "soapType": "xns:int" + }, + { + "defaultValue": null, + "name": "ExampleLong", + "nillable": true, + "soapType": "xns:long" + }, + { + "defaultValue": null, + "name": "ExampleDouble", + "nillable": true, + "soapType": "xns:double" + }, + { + "defaultValue": null, + "name": "ExampleBoolean", + "nillable": true, + "soapType": "xns:boolean" + }, + { + "defaultValue": null, + "name": "ExampleID", + "nillable": true, + "soapType": "xns:ID" + }, + { + "defaultValue": null, + "name": "ExampleString", + "nillable": true, + "soapType": "xns:string" + }, + { + "defaultValue": null, + "name": "ExampleJson", + "nillable": true, + "soapType": "xns:json" + }, + { + "defaultValue": null, + "name": "ExampleBase64Binary", + "nillable": true, + "soapType": "xns:base64Binary" + }, + { + "defaultValue": null, + "name": "ExampleAnyType", + "nillable": true, + "soapType": "xns:anyType" + }, + { + "defaultValue": null, + "name": "ExampleDate", + "nillable": true, + "soapType": "xsd:date" + }, + { + "defaultValue": null, + "name": "ExampleDateTime", + "nillable": true, + "soapType": "xsd:dateTime" + }, + { + "defaultValue": null, + "name": "ExampleTime", + "nillable": true, + "soapType": "xsd:time" + } + ] +} diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/resources/converter/unknown_type_sf_schema.json b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/resources/converter/unknown_type_sf_schema.json new file mode 100644 index 0000000000..4416a35f46 --- /dev/null +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/resources/converter/unknown_type_sf_schema.json @@ -0,0 +1,11 @@ +{ + "name": "SObjectWithUnknownFieldType", + "fields": [ + { + "defaultValue": null, + "name": "FieldWithUnknownType", + "nillable": true, + "soapType": "xsd:unknown" + } + ] +} diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/pom.xml b/nifi-nar-bundles/nifi-salesforce-bundle/pom.xml new file mode 100644 index 0000000000..f07b3c1a6f --- /dev/null +++ b/nifi-nar-bundles/nifi-salesforce-bundle/pom.xml @@ -0,0 +1,32 @@ + + + + + nifi-nar-bundles + org.apache.nifi + 1.17.0-SNAPSHOT + + 4.0.0 + + nifi-salesforce-bundle + pom + + nifi-salesforce-nar + nifi-salesforce-processors + + diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index fa49f9c1dc..e92f54ff3e 100755 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -110,6 +110,7 @@ nifi-stateless-processor-bundle nifi-geohash-bundle nifi-snowflake-bundle + nifi-salesforce-bundle