diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE index 96b6927000..6c47d36ab6 100644 --- a/nifi-assembly/NOTICE +++ b/nifi-assembly/NOTICE @@ -412,6 +412,78 @@ The following binary components are provided under the Apache Software License v in some artifacts (usually source distributions); but is always available from the source code management (SCM) system project uses. + (ASLv2) Bean Validation API + The following NOTICE information applies: + Bean Validation API + Copyright (c) Emmanuel Bernard + + (ASLv2) Jackson Annotations + The following NOTICE information applies: + Jackson Annotations + Copyright (c) 2019 Tatu Saloranta + + (ASLv2) Jackson Core + The following NOTICE information applies: + Jackson Core + Copyright (c) Tatu Saloranta + + (ASLv2) Jackson Databind + The following NOTICE information applies: + Jackson Databind + Copyright (c) 2007- Tatu Saloranta, tatu.saloranta@iki.fi + + Jackson is a high-performance, Free/Open Source JSON processing library. + It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has + been in development since 2007. + + It is currently developed by a community of developers, as well as supported + commercially by FasterXML.com. + + (ASLv2) Jackson Datatype: JSR310 + The following NOTICE information applies: + Jackson Datatype: JSR310 + Copyright (c) Tatu Saloranta + + (ASLv2) Jackson Module JsonSchema + The following NOTICE information applies: + Jackson Module JsonSchema + Copyright (c) Tatu Saloranta + + (ASLv2) JSON Small and Fast Parser + The following NOTICE information applies: + JSON Small and Fast Parser + Copyright (c) 2010 Uriel Chemouni + + (ASLv2) Camel :: Salesforce + The following NOTICE information applies: + Camel :: Salesforce + Copyright (c) 2007-2022 The Apache Software Foundation + + (ASLv2) IntelliJ IDEA Annotations (org.jetbrains:annotations:13.0 - http://www.jetbrains.org) + The following NOTICE information applies: + IntelliJ IDEA Annotations + Copyright 2000-2022 JetBrains s.r.o. and Kotlin Programming Language contributors + + (ASLv2) Kotlin Stdlib (org.jetbrains.kotlin:kotlin-stdlib:1.7.20 - https://kotlinlang.org/) + The following NOTICE information applies: + Kotlin Stdlib + Copyright 2010-2022 JetBrains s.r.o. and Kotlin Programming Language contributors + + (ASLv2) Kotlin Stdlib Common (org.jetbrains.kotlin:kotlin-stdlib-common:1.7.20 - https://kotlinlang.org/) + The following NOTICE information applies: + Kotlin Stdlib Common + Copyright 2010-2022 JetBrains s.r.o. and Kotlin Programming Language contributors + + (ASLv2) Kotlin Stdlib Jdk7 (org.jetbrains.kotlin:kotlin-stdlib-jdk7:1.7.20 - https://kotlinlang.org/) + The following NOTICE information applies: + Kotlin Stdlib Jdk7 + Copyright 2010-2022 JetBrains s.r.o. and Kotlin Programming Language contributors + + (ASLv2) Kotlin Stdlib Jdk8 (org.jetbrains.kotlin:kotlin-stdlib-jdk8:1.7.20 - https://kotlinlang.org/) + The following NOTICE information applies: + Kotlin Stdlib Jdk8 + Copyright 2010-2022 JetBrains s.r.o. and Kotlin Programming Language contributors + (ASLv2) Apache Thrift The following NOTICE information applies: Apache Thrift @@ -2137,6 +2209,11 @@ The following binary components are provided under the Apache Software License v ASM Based Accessors Helper Used By JSON Smart 1.2 Copyright 2017, Uriel Chemouni + (ASLv2) ASM Based Accessors Helper Used By JSON Smart (net.minidev:accessors-smart:jar:2.4.8 - http://www.minidev.net/) + The following NOTICE information applies: + ASM Based Accessors Helper Used By JSON Smart 2.4.8 + Copyright 2017, Uriel Chemouni + (ASLv2) Adapter: RxJava (com.squareup.retrofit2:adapter-rxjava:jar:2.6.2) The following NOTICE information applies: Adapter: RxJava 2.6.2 @@ -2172,6 +2249,11 @@ The following binary components are provided under the Apache Software License v OkHttp 3.12.6 Copyright 2019 Square, Inc. + (ASLv2) OkHttp (com.squareup.okhttp3:okhttp:jar:4.10.0) + The following NOTICE information applies: + OkHttp 4.10.0 + Copyright (c) 2019 Square, Inc. + (ASLv2) OkHttp URLConnection (com.squareup.okhttp3:okhttp-urlconnection:jar:3.12.2) The following NOTICE information applies: OkHttp 3.12.2 diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java index 1e653c196c..4ad3b817b6 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java @@ -68,7 +68,7 @@ public abstract class AbstractRecordSetWriter implements RecordSetWriter { return recordCount; } - protected final boolean isActiveRecordSet() { + public final boolean isActiveRecordSet() { return activeRecordSet; } @@ -88,6 +88,7 @@ public abstract class AbstractRecordSetWriter implements RecordSetWriter { throw new IllegalStateException("Cannot finish RecordSet because no RecordSet has begun"); } + activeRecordSet = false; final Map attributes = onFinishRecordSet(); return WriteResult.of(recordCount, attributes == null ? Collections.emptyMap() : attributes); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/OutputGrouping.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/OutputGrouping.java similarity index 100% rename from nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/OutputGrouping.java rename to nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/OutputGrouping.java diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/WriteJsonResult.java similarity index 99% rename from nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java rename to nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/WriteJsonResult.java index d96860f91a..b696344d94 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/WriteJsonResult.java @@ -21,8 +21,8 @@ import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.util.MinimalPrettyPrinter; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.NullSuppression; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.record.NullSuppression; import org.apache.nifi.schema.access.SchemaAccessWriter; import org.apache.nifi.serialization.AbstractRecordSetWriter; import org.apache.nifi.serialization.RecordSetWriter; diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/record/NullSuppression.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/NullSuppression.java similarity index 96% rename from nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/record/NullSuppression.java rename to nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/NullSuppression.java index fed54a173e..44249abf5d 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/record/NullSuppression.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/NullSuppression.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.nifi.record; +package org.apache.nifi; public enum NullSuppression { ALWAYS_SUPPRESS, diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000000..a75a700107 --- /dev/null +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,239 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +APACHE NIFI SUBCOMPONENTS: + +The Apache NiFi project contains subcomponents with separate copyright +notices and license terms. Your use of the source code for the these +subcomponents is subject to the terms and conditions of the following +licenses. + + This product bundles 'asm' which is available under a 3-Clause BSD style license. + For details see http://asm.ow2.org/asmdex-license.html + + Copyright (c) 2012 France Télécom + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions + are met: + 1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + 2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + 3. Neither the name of the copyright holders nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..9a2c9db91f --- /dev/null +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,125 @@ +nifi-salesforce-nar +Copyright 2014-2022 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +=========================================== +Apache Software License v2 +=========================================== + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Jackson Annotations + The following NOTICE information applies: + Jackson Annotations + Copyright (c) 2019 Tatu Saloranta + + (ASLv2) Jackson Core + The following NOTICE information applies: + Jackson Core + Copyright (c) Tatu Saloranta + + (ASLv2) Jackson Databind + The following NOTICE information applies: + Jackson Databind + Copyright (c) 2007- Tatu Saloranta, tatu.saloranta@iki.fi + + Jackson is a high-performance, Free/Open Source JSON processing library. + It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has + been in development since 2007. + + It is currently developed by a community of developers, as well as supported + commercially by FasterXML.com. + + (ASLv2) Jackson Datatype: JSR310 + The following NOTICE information applies: + Jackson Datatype: JSR310 + Copyright (c) Tatu Saloranta + + (ASLv2) Jackson Module JsonSchema + The following NOTICE information applies: + Jackson Module JsonSchema + Copyright (c) Tatu Saloranta + + (ASLv2) JsonPath + The following NOTICE information applies: + Copyright (c) 2011 JsonPath authors + + (ASLv2) OkHttp (com.squareup.okhttp3:okhttp:jar:4.10.0) + The following NOTICE information applies: + OkHttp 4.10.0 + Copyright (c) 2019 Square, Inc. + + (ASLv2) Okio + The following NOTICE information applies: + Okio + Copyright (c) 2013 Square, Inc. + + (ASLv2) Bean Validation API + The following NOTICE information applies: + Bean Validation API + Copyright (c) Emmanuel Bernard + + (ASLv2) ASM Based Accessors Helper Used By JSON Smart (net.minidev:accessors-smart:jar:2.4.8 - http://www.minidev.net/) + The following NOTICE information applies: + ASM Based Accessors Helper Used By JSON Smart 2.4.8 + Copyright 2017, Uriel Chemouni + + (ASLv2) JSON Small and Fast Parser + The following NOTICE information applies: + JSON Small and Fast Parser + Copyright (c) 2010 Uriel Chemouni + + (ASLv2) Camel :: Salesforce + The following NOTICE information applies: + Camel :: Salesforce + Copyright (c) 2007-2022 The Apache Software Foundation + + (ASLv2) Apache Commons CSV + The following NOTICE information applies: + Apache Commons CSV + Copyright 2005-2016 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2015 The Apache Software Foundation + + This product includes software from the Spring Framework, + under the Apache License 2.0 (see: StringUtils.containsWhitespace()) + + (ASLv2) Apache Commons Text + The following NOTICE information applies: + Apache Commons Text + Copyright 2001-2018 The Apache Software Foundation + + (ASLv2) IntelliJ IDEA Annotations (org.jetbrains:annotations:13.0 - http://www.jetbrains.org) + The following NOTICE information applies: + IntelliJ IDEA Annotations + Copyright 2000-2022 JetBrains s.r.o. and Kotlin Programming Language contributors + + (ASLv2) Kotlin Stdlib (org.jetbrains.kotlin:kotlin-stdlib:1.7.20 - https://kotlinlang.org/) + The following NOTICE information applies: + Kotlin Stdlib + Copyright 2010-2022 JetBrains s.r.o. and Kotlin Programming Language contributors + + (ASLv2) Kotlin Stdlib Common (org.jetbrains.kotlin:kotlin-stdlib-common:1.7.20 - https://kotlinlang.org/) + The following NOTICE information applies: + Kotlin Stdlib Common + Copyright 2010-2022 JetBrains s.r.o. and Kotlin Programming Language contributors + + (ASLv2) Kotlin Stdlib Jdk7 (org.jetbrains.kotlin:kotlin-stdlib-jdk7:1.7.20 - https://kotlinlang.org/) + The following NOTICE information applies: + Kotlin Stdlib Jdk7 + Copyright 2010-2022 JetBrains s.r.o. and Kotlin Programming Language contributors + + (ASLv2) Kotlin Stdlib Jdk8 (org.jetbrains.kotlin:kotlin-stdlib-jdk8:1.7.20 - https://kotlinlang.org/) + The following NOTICE information applies: + Kotlin Stdlib Jdk8 + Copyright 2010-2022 JetBrains s.r.o. and Kotlin Programming Language contributors + + diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceObject.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceObject.java new file mode 100644 index 0000000000..00d0c2b783 --- /dev/null +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceObject.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.salesforce; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.nifi.NullSuppression; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.json.OutputGrouping; +import org.apache.nifi.json.WriteJsonResult; +import org.apache.nifi.oauth2.OAuth2AccessTokenProvider; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.salesforce.util.RecordExtender; +import org.apache.nifi.processors.salesforce.util.SalesforceRestService; +import org.apache.nifi.schema.access.NopSchemaAccessWriter; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.API_URL; +import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.API_VERSION; +import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.READ_TIMEOUT; +import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.TOKEN_PROVIDER; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"salesforce", "sobject", "put"}) +@CapabilityDescription("Creates new records for the specified Salesforce sObject. The type of the Salesforce object must be set in the input flowfile's" + + " 'objectType' attribute. This processor cannot update existing records.") +@ReadsAttribute(attribute = "objectType", description = "The Salesforce object type to upload records to. E.g. Account, Contact, Campaign.") +public class PutSalesforceObject extends AbstractProcessor { + + private static final int MAX_RECORD_COUNT = 200; + + protected static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description( + "Specifies the Controller Service to use for parsing incoming data and determining the data's schema") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("For FlowFiles created as a result of a successful execution.") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("For FlowFiles created as a result of an execution error.") + .build(); + + private static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList( + API_URL, + API_VERSION, + READ_TIMEOUT, + TOKEN_PROVIDER, + RECORD_READER_FACTORY + )); + + private static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + REL_SUCCESS, + REL_FAILURE + ))); + + private volatile SalesforceRestService salesforceRestService; + private volatile int maxRecordCount; + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public Set getRelationships() { + return RELATIONSHIPS; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + maxRecordCount = getMaxRecordCount(); + + String salesforceVersion = context.getProperty(API_VERSION).getValue(); + String baseUrl = context.getProperty(API_URL).getValue(); + OAuth2AccessTokenProvider accessTokenProvider = + context.getProperty(TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class); + + salesforceRestService = new SalesforceRestService( + salesforceVersion, + baseUrl, + () -> accessTokenProvider.getAccessDetails().getAccessToken(), + context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS) + .intValue() + ); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + String objectType = flowFile.getAttribute("objectType"); + if (objectType == null) { + throw new ProcessException("Salesforce object type not found among the incoming flowfile attributes"); + } + + RecordReaderFactory readerFactory = context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class); + + RecordExtender extender; + + try (InputStream in = session.read(flowFile); + RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger()); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + WriteJsonResult writer = getWriter(extender = new RecordExtender(reader.getSchema()), out)) { + + int count = 0; + Record record; + + while ((record = reader.nextRecord()) != null) { + count++; + if (!writer.isActiveRecordSet()) { + writer.beginRecordSet(); + } + + MapRecord extendedRecord = extender.getExtendedRecord(objectType, count, record); + writer.write(extendedRecord); + + if (count == maxRecordCount) { + count = 0; + processRecords(objectType, out, writer, extender); + out.reset(); + } + } + + if (writer.isActiveRecordSet()) { + processRecords(objectType, out, writer, extender); + } + session.transfer(flowFile, REL_SUCCESS); + + } catch (MalformedRecordException e) { + getLogger().error("Couldn't read records from input", e); + session.transfer(flowFile, REL_FAILURE); + } catch (SchemaNotFoundException e) { + getLogger().error("Couldn't create record writer", e); + session.transfer(flowFile, REL_FAILURE); + } catch (Exception e) { + getLogger().error("Failed to put records to Salesforce.", e); + session.transfer(flowFile, REL_FAILURE); + } + } + + private void processRecords(String objectType, ByteArrayOutputStream out, WriteJsonResult writer, RecordExtender extender) throws IOException { + writer.finishRecordSet(); + writer.flush(); + ObjectNode wrappedJson = extender.getWrappedRecordsJson(out); + salesforceRestService.postRecord(objectType, wrappedJson.toPrettyString()); + } + + private WriteJsonResult getWriter(RecordExtender extender, ByteArrayOutputStream out) throws IOException { + final RecordSchema extendedSchema = extender.getExtendedSchema(); + return new WriteJsonResult(getLogger(), extendedSchema, new NopSchemaAccessWriter(), out, + true, NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, null, null, null); + } + + int getMaxRecordCount() { + return MAX_RECORD_COUNT; + } +} diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java index cffbcb9692..8235375d69 100644 --- a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java @@ -80,6 +80,11 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiPredicate; +import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.API_URL; +import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.API_VERSION; +import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.READ_TIMEOUT; +import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.TOKEN_PROVIDER; + @PrimaryNodeOnly @TriggerSerially @TriggerWhenEmpty @@ -101,25 +106,6 @@ import java.util.function.BiPredicate; @DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min") public class QuerySalesforceObject extends AbstractProcessor { - static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder() - .name("salesforce-url") - .displayName("URL") - .description("The URL for the Salesforce REST API including the domain without additional path information, such as https://MyDomainName.my.salesforce.com") - .required(true) - .addValidator(StandardValidators.URL_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .build(); - - static final PropertyDescriptor API_VERSION = new PropertyDescriptor.Builder() - .name("salesforce-api-version") - .displayName("API Version") - .description("The version number of the Salesforce REST API appended to the URL after the services/data path. See Salesforce documentation for supported versions") - .required(true) - .addValidator(StandardValidators.NUMBER_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .defaultValue("54.0") - .build(); - static final PropertyDescriptor SOBJECT_NAME = new PropertyDescriptor.Builder() .name("sobject-name") .displayName("sObject Name") @@ -138,24 +124,6 @@ public class QuerySalesforceObject extends AbstractProcessor { .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .build(); - static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor.Builder() - .name("read-timeout") - .displayName("Read Timeout") - .description("Maximum time allowed for reading a response from the Salesforce REST API") - .required(true) - .defaultValue("15 s") - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .build(); - - static final PropertyDescriptor TOKEN_PROVIDER = new PropertyDescriptor.Builder() - .name("oauth2-access-token-provider") - .displayName("OAuth2 Access Token Provider") - .description("Service providing OAuth2 Access Tokens for authenticating using the HTTP Authorization Header") - .identifiesControllerService(OAuth2AccessTokenProvider.class) - .required(true) - .build(); - static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() .name("record-writer") .displayName("Record Writer") diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/CommonSalesforceProperties.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/CommonSalesforceProperties.java new file mode 100644 index 0000000000..39188396d9 --- /dev/null +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/CommonSalesforceProperties.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.salesforce.util; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.oauth2.OAuth2AccessTokenProvider; +import org.apache.nifi.processor.util.StandardValidators; + +public final class CommonSalesforceProperties { + + public static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder() + .name("salesforce-url") + .displayName("URL") + .description("The URL for the Salesforce REST API including the domain without additional path information, such as https://MyDomainName.my.salesforce.com") + .required(true) + .addValidator(StandardValidators.URL_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor API_VERSION = new PropertyDescriptor.Builder() + .name("salesforce-api-version") + .displayName("API Version") + .description( + "The version number of the Salesforce REST API appended to the URL after the services/data path. See Salesforce documentation for supported versions") + .required(true) + .addValidator(StandardValidators.NUMBER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .defaultValue("54.0") + .build(); + + public static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor.Builder() + .name("read-timeout") + .displayName("Read Timeout") + .description("Maximum time allowed for reading a response from the Salesforce REST API") + .required(true) + .defaultValue("15 s") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor TOKEN_PROVIDER = new PropertyDescriptor.Builder() + .name("oauth2-access-token-provider") + .displayName("OAuth2 Access Token Provider") + .description("Service providing OAuth2 Access Tokens for authenticating using the HTTP Authorization Header") + .identifiesControllerService(OAuth2AccessTokenProvider.class) + .required(true) + .build(); + + private CommonSalesforceProperties() { + } +} diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/RecordExtender.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/RecordExtender.java new file mode 100644 index 0000000000..9e8b03e9d5 --- /dev/null +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/RecordExtender.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.salesforce.util; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class RecordExtender { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + static final SimpleRecordSchema ATTRIBUTES_RECORD_SCHEMA = new SimpleRecordSchema(Arrays.asList( + new RecordField("type", RecordFieldType.STRING.getDataType()), + new RecordField("referenceId", RecordFieldType.STRING.getDataType()) + )); + + private final RecordSchema extendedSchema; + + public RecordExtender(final RecordSchema originalSchema) { + List recordFields = new ArrayList<>(originalSchema.getFields()); + recordFields.add(new RecordField("attributes", RecordFieldType.RECORD.getRecordDataType( + ATTRIBUTES_RECORD_SCHEMA + ))); + + extendedSchema = new SimpleRecordSchema(recordFields); + } + + public ObjectNode getWrappedRecordsJson(ByteArrayOutputStream out) throws IOException { + ObjectNode root = MAPPER.createObjectNode(); + JsonNode jsonNode = MAPPER.readTree(out.toByteArray()); + root.set("records", jsonNode); + return root; + } + + public MapRecord getExtendedRecord(String objectType, int count, Record record) { + + Set rawFieldNames = record.getRawFieldNames(); + Map objectMap = rawFieldNames.stream() + .collect(Collectors.toMap(Function.identity(), record::getValue)); + + Map attributesMap = new HashMap<>(); + attributesMap.put("type", objectType); + attributesMap.put("referenceId", count); + + MapRecord attributesRecord = new MapRecord(ATTRIBUTES_RECORD_SCHEMA, attributesMap); + + objectMap.put("attributes", attributesRecord); + + return new MapRecord(extendedSchema, objectMap); + } + + public RecordSchema getExtendedSchema() { + return extendedSchema; + } +} diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java index 8d6a8fddae..cc3f4b2bf8 100644 --- a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java @@ -17,8 +17,10 @@ package org.apache.nifi.processors.salesforce.util; import okhttp3.HttpUrl; +import okhttp3.MediaType; import okhttp3.OkHttpClient; import okhttp3.Request; +import okhttp3.RequestBody; import okhttp3.Response; import org.apache.nifi.processor.exception.ProcessException; @@ -84,11 +86,28 @@ public class SalesforceRestService { return request(request); } + public InputStream postRecord(String sObjectApiName, String body) { + String url = baseUrl + "/services/data/v" + version + "/composite/tree/" + sObjectApiName; + + HttpUrl httpUrl = HttpUrl.get(url).newBuilder() + .build(); + + final RequestBody requestBody = RequestBody.create(body, MediaType.parse("application/json")); + + Request request = new Request.Builder() + .addHeader("Authorization", "Bearer " + accessTokenProvider.get()) + .url(httpUrl) + .post(requestBody) + .build(); + + return request(request); + } + private InputStream request(Request request) { Response response = null; try { response = httpClient.newCall(request).execute(); - if (response.code() != 200) { + if (response.code() < 200 || response.code() > 201) { throw new ProcessException("Invalid response" + " Code: " + response.code() + " Message: " + response.message() + diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 503138fb69..937c84d8a5 100644 --- a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -13,3 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. org.apache.nifi.processors.salesforce.QuerySalesforceObject +org.apache.nifi.processors.salesforce.PutSalesforceObject diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/resources/docs/org.apache.nifi.processors.salesforce.PutSalesforceObject/additionalDetails.html b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/resources/docs/org.apache.nifi.processors.salesforce.PutSalesforceObject/additionalDetails.html new file mode 100644 index 0000000000..daaef23ad3 --- /dev/null +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/resources/docs/org.apache.nifi.processors.salesforce.PutSalesforceObject/additionalDetails.html @@ -0,0 +1,71 @@ + + + + + + + PutSalesforceObject + + + + + + +

QuerySalesforceObject

+ +

Description

+ +

+ Objects in Salesforce are database tables, their rows are known as records, and their columns are called fields. The + PutSalesforceObject creates a new a Salesforce record in a Salesforce object. + The Salesforce object must be set as the "objectType" attribute of an incoming flowfile. Check Salesforce + documentation for object types and metadata. + The processor utilizes NiFi record-based processing to allow arbitrary input format. +

+ +

Example

+ +

+ If the "objectType" is set to "Account", the following JSON input will create two records in the Account object with + the names "SampleAccount1" and "SampleAccount2". +

+        [
+          {
+            "name":"SampleAccount1",
+            "phone":"1111111111",
+            "website":"www.salesforce1.com",
+            "numberOfEmployees":"100",
+            "industry":"Banking"
+          },
+          {
+            "name":"SampleAccount2",
+            "phone":"22222222",
+            "website":"www.salesforce2.com",
+            "numberOfEmployees":"200",
+            "industry":"Banking"
+          }
+        ]
+    
+

+ + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceObjectIT.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceObjectIT.java new file mode 100644 index 0000000000..390fa8ac0f --- /dev/null +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceObjectIT.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.salesforce; + +import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties; +import org.apache.nifi.processors.salesforce.util.SalesforceConfigAware; +import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class PutSalesforceObjectIT implements SalesforceConfigAware { + + private TestRunner runner; + + @BeforeEach + void setUp() throws Exception { + Processor putSalesforceObject = new PutSalesforceObject() { + @Override + int getMaxRecordCount() { + return 2; + } + }; + + runner = TestRunners.newTestRunner(putSalesforceObject); + + StandardOauth2AccessTokenProvider oauth2AccessTokenProvider = initOAuth2AccessTokenProvider(runner); + runner.setProperty(CommonSalesforceProperties.TOKEN_PROVIDER, oauth2AccessTokenProvider.getIdentifier()); + } + + @Test + void testPutSalesforceObject() throws Exception { + + MockRecordParser reader = new MockRecordParser(); + reader.addSchemaField("name", RecordFieldType.STRING); + reader.addSchemaField("phone", RecordFieldType.STRING); + reader.addSchemaField("website", RecordFieldType.STRING); + reader.addSchemaField("numberOfEmployees", RecordFieldType.STRING); + reader.addSchemaField("industry", RecordFieldType.STRING); + + reader.addRecord("SampleAccount1", "111111", "www.salesforce1.com", "100", "Banking"); + reader.addRecord("SampleAccount2", "222222", "www.salesforce2.com", "200", "Banking"); + reader.addRecord("SampleAccount3", "333333", "www.salesforce3.com", "300", "Banking"); + reader.addRecord("SampleAccount4", "444444", "www.salesforce4.com", "400", "Banking"); + reader.addRecord("SampleAccount5", "555555", "www.salesforce5.com", "500", "Banking"); + + runner.enqueue("", Collections.singletonMap("objectType", "Account")); + + runner.addControllerService("reader", reader); + runner.enableControllerService(reader); + + runner.setProperty(CommonSalesforceProperties.API_VERSION, VERSION); + runner.setProperty(CommonSalesforceProperties.API_URL, BASE_URL); + runner.setProperty(PutSalesforceObject.RECORD_READER_FACTORY, reader.getIdentifier()); + + runner.run(); + + List results = runner.getFlowFilesForRelationship(PutSalesforceObject.REL_SUCCESS); + + assertEquals(1, results.size()); + } +} diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java index 547b99d54c..f3f5bd50eb 100644 --- a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java @@ -18,6 +18,7 @@ package org.apache.nifi.processors.salesforce; import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider; import org.apache.nifi.processor.Processor; +import org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties; import org.apache.nifi.processors.salesforce.util.SalesforceConfigAware; import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.record.MockRecordWriter; @@ -45,7 +46,7 @@ class QuerySalesforceObjectIT implements SalesforceConfigAware { runner = TestRunners.newTestRunner(querySObject); StandardOauth2AccessTokenProvider oauth2AccessTokenProvider = initOAuth2AccessTokenProvider(runner); - runner.setProperty(QuerySalesforceObject.TOKEN_PROVIDER, oauth2AccessTokenProvider.getIdentifier()); + runner.setProperty(CommonSalesforceProperties.TOKEN_PROVIDER, oauth2AccessTokenProvider.getIdentifier()); } @AfterEach @@ -64,8 +65,8 @@ class QuerySalesforceObjectIT implements SalesforceConfigAware { runner.setProperty(QuerySalesforceObject.SOBJECT_NAME, sObjectName); runner.setProperty(QuerySalesforceObject.FIELD_NAMES, fieldNames); - runner.setProperty(QuerySalesforceObject.API_VERSION, VERSION); - runner.setProperty(QuerySalesforceObject.API_URL, BASE_URL); + runner.setProperty(CommonSalesforceProperties.API_VERSION, VERSION); + runner.setProperty(CommonSalesforceProperties.API_URL, BASE_URL); runner.setProperty(QuerySalesforceObject.RECORD_WRITER, writer.getIdentifier()); runner.setProperty(QuerySalesforceObject.AGE_FIELD, "CreatedDate"); runner.setProperty(QuerySalesforceObject.INITIAL_AGE_FILTER, "2022-01-06T08:43:24.000+0000"); diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverterTest.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverterTest.java index 805b89496b..c0bc3a80c1 100644 --- a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverterTest.java +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverterTest.java @@ -165,5 +165,4 @@ class SalesforceToRecordSchemaConverterTest { private InputStream readFile(final String path) throws IOException { return new FileInputStream(path); } - } diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/TestRecordExtender.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/TestRecordExtender.java new file mode 100644 index 0000000000..a55f10b8c1 --- /dev/null +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/TestRecordExtender.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.salesforce.util; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; + +import static org.apache.nifi.processors.salesforce.util.RecordExtender.ATTRIBUTES_RECORD_SCHEMA; +import static org.junit.jupiter.api.Assertions.assertEquals; + +class TestRecordExtender { + + private static ObjectMapper OBJECT_MAPPER; + private static RecordSchema ORIGINAL_SCHEMA; + private static RecordSchema EXPECTED_EXTENDED_SCHEMA; + + @BeforeAll + public static void setup() { + OBJECT_MAPPER = new ObjectMapper(); + ORIGINAL_SCHEMA = new SimpleRecordSchema(Arrays.asList( + new RecordField("testRecordField1", RecordFieldType.STRING.getDataType()), + new RecordField("testRecordField2", RecordFieldType.STRING.getDataType()) + )); + EXPECTED_EXTENDED_SCHEMA = new SimpleRecordSchema(Arrays.asList( + new RecordField("testRecordField1", RecordFieldType.STRING.getDataType()), + new RecordField("testRecordField2", RecordFieldType.STRING.getDataType()), + new RecordField("attributes", RecordFieldType.RECORD.getRecordDataType(new SimpleRecordSchema(Arrays.asList( + new RecordField("type", RecordFieldType.STRING.getDataType()), + new RecordField("referenceId", RecordFieldType.STRING.getDataType() + ))))) + )); + } + + private RecordExtender testSubject; + + @BeforeEach + public void init() { + testSubject = new RecordExtender(ORIGINAL_SCHEMA); + } + + @Test + void testGetWrappedRecordJson() throws IOException { + ObjectNode testNode = OBJECT_MAPPER.createObjectNode(); + testNode.put("testField1", "testValue1"); + testNode.put("testField2", "testValue2"); + + ObjectNode expectedWrappedNode = OBJECT_MAPPER.createObjectNode(); + expectedWrappedNode.set("records", testNode); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + out.write(testNode.toString().getBytes()); + + ObjectNode actualWrappedJson = testSubject.getWrappedRecordsJson(out); + + assertEquals(expectedWrappedNode, actualWrappedJson); + } + + @Test + void testGetExtendedSchema() { + final RecordSchema actualExtendedSchema = testSubject.getExtendedSchema(); + + assertEquals(EXPECTED_EXTENDED_SCHEMA, actualExtendedSchema); + } + + @Test + void testGetExtendedRecord() { + int referenceId = 0; + String objectType = "Account"; + + MapRecord testRecord = new MapRecord(ORIGINAL_SCHEMA, new HashMap() {{ + put("testRecordField1", "testRecordValue1"); + put("testRecordField2", "testRecordValue2"); + }}); + + + MapRecord expectedRecord = new MapRecord(EXPECTED_EXTENDED_SCHEMA, new HashMap() {{ + put("attributes", + new MapRecord(ATTRIBUTES_RECORD_SCHEMA, new HashMap() {{ + put("type", objectType); + put("referenceId", referenceId); + }}) + ); + put("testRecordField1", "testRecordValue1"); + put("testRecordField2", "testRecordValue2"); + }}); + + MapRecord actualRecord = testSubject.getExtendedRecord(objectType, referenceId, testRecord); + + assertEquals(expectedRecord, actualRecord); + } + +} diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/resources/json/put_records.json b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/resources/json/put_records.json new file mode 100644 index 0000000000..1c58798f89 --- /dev/null +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/resources/json/put_records.json @@ -0,0 +1,37 @@ +[ + { + "name": "SampleAccount1", + "phone": "1111111111", + "website": "www.salesforce1.com", + "numberOfEmployees": "100", + "industry": "Banking" + }, + { + "name": "SampleAccount2", + "phone": "22222222", + "website": "www.salesforce2.com", + "numberOfEmployees": "200", + "industry": "Banking" + }, + { + "name": "SampleAccount3", + "phone": "1111111111", + "website": "www.salesforce1.com", + "numberOfEmployees": "100", + "industry": "Banking" + }, + { + "name": "SampleAccount4", + "phone": "22222222", + "website": "www.salesforce2.com", + "numberOfEmployees": "200", + "industry": "Banking" + }, + { + "name": "SampleAccount5", + "phone": "22222222", + "website": "www.salesforce2.com", + "numberOfEmployees": "200", + "industry": "Banking" + } +] \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java index 153b1308bd..1f7e16c890 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java @@ -29,7 +29,7 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.record.NullSuppression; +import org.apache.nifi.NullSuppression; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.DateTimeTextRecordSetWriter; import org.apache.nifi.serialization.RecordSetWriter; diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/WriteXMLResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/WriteXMLResult.java index 8e272d07dc..ca509fd2d8 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/WriteXMLResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/WriteXMLResult.java @@ -18,7 +18,7 @@ package org.apache.nifi.xml; import javanet.staxutils.IndentingXMLStreamWriter; -import org.apache.nifi.record.NullSuppression; +import org.apache.nifi.NullSuppression; import org.apache.nifi.schema.access.SchemaAccessWriter; import org.apache.nifi.serialization.AbstractRecordSetWriter; import org.apache.nifi.serialization.RecordSetWriter; diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordSetWriter.java index bef9dec47b..1645b02219 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordSetWriter.java @@ -17,7 +17,7 @@ package org.apache.nifi.xml; -import org.apache.nifi.record.NullSuppression; +import org.apache.nifi.NullSuppression; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.AllowableValue; diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java index 1cb0133b7d..9169af8c6c 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java @@ -19,7 +19,7 @@ package org.apache.nifi.json; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.record.NullSuppression; +import org.apache.nifi.NullSuppression; import org.apache.nifi.schema.access.SchemaNameAsAttribute; import org.apache.nifi.schema.inference.TimeValueInference; import org.apache.nifi.serialization.SimpleRecordSchema; diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestWriteXMLResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestWriteXMLResult.java index 91304d958b..e70547752f 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestWriteXMLResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestWriteXMLResult.java @@ -46,9 +46,9 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import static org.apache.nifi.record.NullSuppression.ALWAYS_SUPPRESS; -import static org.apache.nifi.record.NullSuppression.NEVER_SUPPRESS; -import static org.apache.nifi.record.NullSuppression.SUPPRESS_MISSING; +import static org.apache.nifi.NullSuppression.ALWAYS_SUPPRESS; +import static org.apache.nifi.NullSuppression.NEVER_SUPPRESS; +import static org.apache.nifi.NullSuppression.SUPPRESS_MISSING; import static org.apache.nifi.xml.ArrayWrapping.NO_WRAPPING; import static org.apache.nifi.xml.ArrayWrapping.USE_PROPERTY_AS_WRAPPER; import static org.apache.nifi.xml.ArrayWrapping.USE_PROPERTY_FOR_ELEMENTS;