From 89426d167b34f49bfce28abf38ccb990f8bb7393 Mon Sep 17 00:00:00 2001 From: Robert Kalmar Date: Thu, 25 Aug 2022 09:16:31 +0200 Subject: [PATCH] NIFI-10370 Create PutSnowflake processor using Snowpipe ingest This closes #6584. Signed-off-by: Peter Turcsanyi --- nifi-assembly/NOTICE | 6 +- nifi-assembly/pom.xml | 12 + .../nifi/dbcp/AbstractDBCPConnectionPool.java | 12 + .../nifi-snowflake-processors-nar/pom.xml | 45 ++++ .../src/main/resources/META-INF/LICENSE | 204 ++++++++++++++++ .../src/main/resources/META-INF/NOTICE | 5 + .../nifi-snowflake-processors/pom.xml | 121 ++++++++++ .../snowflake/GetSnowflakeIngestStatus.java | 152 ++++++++++++ .../snowflake/PutSnowflakeInternalStage.java | 182 ++++++++++++++ .../snowflake/StartSnowflakeIngest.java | 126 ++++++++++ .../snowflake/util/SnowflakeAttributes.java | 26 ++ .../util/SnowflakeInternalStageType.java | 92 +++++++ .../SnowflakeInternalStageTypeParameters.java | 49 ++++ .../org.apache.nifi.processor.Processor | 20 ++ .../additionalDetails.html | 48 ++++ .../additionalDetails.html | 43 ++++ .../additionalDetails.html | 52 ++++ .../snowflake/SnowflakeConfigAware.java | 128 ++++++++++ .../processors/snowflake/SnowflakePipeIT.java | 129 ++++++++++ .../nifi-snowflake-services-api-nar/pom.xml | 45 ++++ .../src/main/resources/META-INF/LICENSE | 202 ++++++++++++++++ .../src/main/resources/META-INF/NOTICE | 14 ++ .../nifi-snowflake-services-api/pom.xml | 46 ++++ .../SnowflakeConnectionProviderService.java | 25 ++ .../snowflake/SnowflakeConnectionWrapper.java | 40 ++++ ...SnowflakeIngestManagerProviderService.java | 27 +++ .../snowflake/util/SnowflakeProperties.java | 87 +++++++ .../nifi-snowflake-services-nar/pom.xml | 2 +- .../src/main/resources/META-INF/NOTICE | 13 +- .../nifi-snowflake-services/pom.xml | 39 ++- .../SnowflakeComputingConnectionPool.java | 194 ++++++++++++--- ...SnowflakeIngestManagerProviderService.java | 224 ++++++++++++++++++ .../service/util/AccountIdentifierFormat.java | 109 +++++++++ .../AccountIdentifierFormatParameters.java | 37 +++ .../service/util/ConnectionUrlFormat.java | 99 ++++++++ .../util/ConnectionUrlFormatParameters.java | 37 +++ .../util/SnowflakeCommonParameters.java | 60 +++++ ...g.apache.nifi.controller.ControllerService | 1 + .../nifi-snowflake-bundle/pom.xml | 15 ++ 39 files changed, 2729 insertions(+), 39 deletions(-) create mode 100644 nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors-nar/pom.xml create mode 100644 nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors-nar/src/main/resources/META-INF/LICENSE create mode 100644 nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors-nar/src/main/resources/META-INF/NOTICE create mode 100644 nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/pom.xml create mode 100644 nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/GetSnowflakeIngestStatus.java create mode 100644 nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java create mode 100644 nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/StartSnowflakeIngest.java create mode 100644 nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/util/SnowflakeAttributes.java create mode 100644 nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/util/SnowflakeInternalStageType.java create mode 100644 nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/util/SnowflakeInternalStageTypeParameters.java create mode 100644 nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor create mode 100644 nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/docs/org.apache.nifi.processors.snowflake.GetSnowflakeIngestStatus/additionalDetails.html create mode 100644 nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/docs/org.apache.nifi.processors.snowflake.PutSnowflakeInternalStage/additionalDetails.html create mode 100644 nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/docs/org.apache.nifi.processors.snowflake.StartSnowflakeIngest/additionalDetails.html create mode 100644 nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/test/java/org/apache/nifi/processors/snowflake/SnowflakeConfigAware.java create mode 100644 nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/test/java/org/apache/nifi/processors/snowflake/SnowflakePipeIT.java create mode 100644 nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api-nar/pom.xml create mode 100644 nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api-nar/src/main/resources/META-INF/LICENSE create mode 100644 nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api-nar/src/main/resources/META-INF/NOTICE create mode 100644 nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api/pom.xml create mode 100644 nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api/src/main/java/org/apache/nifi/processors/snowflake/SnowflakeConnectionProviderService.java create mode 100644 nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api/src/main/java/org/apache/nifi/processors/snowflake/SnowflakeConnectionWrapper.java create mode 100644 nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api/src/main/java/org/apache/nifi/processors/snowflake/SnowflakeIngestManagerProviderService.java create mode 100644 nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api/src/main/java/org/apache/nifi/processors/snowflake/util/SnowflakeProperties.java create mode 100644 nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/StandardSnowflakeIngestManagerProviderService.java create mode 100644 nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/AccountIdentifierFormat.java create mode 100644 nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/AccountIdentifierFormatParameters.java create mode 100644 nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/ConnectionUrlFormat.java create mode 100644 nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/ConnectionUrlFormatParameters.java create mode 100644 nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/SnowflakeCommonParameters.java diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE index 714847e698..451b81fb85 100644 --- a/nifi-assembly/NOTICE +++ b/nifi-assembly/NOTICE @@ -680,10 +680,10 @@ The following binary components are provided under the Apache Software License v Apache CXF Copyright 2008-2017 The Apache Software Foundation - (ASLv2) snowflake-jdbc + (ASLv2) Snowflake Ingest SDK The following NOTICE information applies: - snowflake-jdbc - Copyright (c) 2013-2018 Snowflake Computing, Inc. + Snowflake Ingest SDK + Copyright (c) 2013-2016 Snowflake Computing, Inc. (ASLv2) Apache Lucene The following NOTICE information applies: diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 5e559a5935..120ca2c92e 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -1471,12 +1471,24 @@ language governing permissions and limitations under the License. --> false + + org.apache.nifi + nifi-snowflake-services-api-nar + 1.19.0-SNAPSHOT + nar + org.apache.nifi nifi-snowflake-services-nar 1.19.0-SNAPSHOT nar + + org.apache.nifi + nifi-snowflake-processors-nar + 1.19.0-SNAPSHOT + nar + diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/AbstractDBCPConnectionPool.java b/nifi-nar-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/AbstractDBCPConnectionPool.java index 345b42657b..c71ac4766b 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/AbstractDBCPConnectionPool.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/AbstractDBCPConnectionPool.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.dbcp; +import java.util.HashMap; import org.apache.commons.dbcp2.BasicDataSource; import org.apache.commons.lang3.StringUtils; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; @@ -448,6 +449,8 @@ public abstract class AbstractDBCPConnectionPool extends AbstractControllerServi dataSource.addConnectionProperty(descriptor.getName(), propertyValue.evaluateAttributeExpressions().getValue()); } }); + + getConnectionProperties(context).forEach(dataSource::addConnectionProperty); } private KerberosUser getKerberosUser(final ConfigurationContext context) { @@ -496,6 +499,15 @@ public abstract class AbstractDBCPConnectionPool extends AbstractControllerServi } } + /** + * Override in subclasses to provide connection properties to the data source + * + * @return Key-value pairs that will be added as connection properties + */ + protected Map getConnectionProperties(final ConfigurationContext context) { + return new HashMap<>(); + } + protected Long extractMillisWithInfinite(PropertyValue prop) { return "-1".equals(prop.getValue()) ? -1 : prop.asTimePeriod(TimeUnit.MILLISECONDS); } diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors-nar/pom.xml b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors-nar/pom.xml new file mode 100644 index 0000000000..57f02d1305 --- /dev/null +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors-nar/pom.xml @@ -0,0 +1,45 @@ + + + + + + nifi-snowflake-bundle + org.apache.nifi + 1.19.0-SNAPSHOT + + 4.0.0 + + nifi-snowflake-processors-nar + nar + + + + org.apache.nifi + nifi-snowflake-processors + 1.19.0-SNAPSHOT + + + org.apache.nifi + nifi-snowflake-services-api-nar + 1.19.0-SNAPSHOT + nar + + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000000..de4b130f35 --- /dev/null +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,204 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..f9a71caca9 --- /dev/null +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,5 @@ +nifi-snowflake-processors-nar +Copyright 2015-2020 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/pom.xml b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/pom.xml new file mode 100644 index 0000000000..1e44d26b91 --- /dev/null +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/pom.xml @@ -0,0 +1,121 @@ + + + + + + nifi-snowflake-bundle + org.apache.nifi + 1.19.0-SNAPSHOT + + 4.0.0 + + nifi-snowflake-processors + jar + + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-utils + 1.19.0-SNAPSHOT + provided + + + org.apache.nifi + nifi-snowflake-services + 1.19.0-SNAPSHOT + provided + + + org.apache.nifi + nifi-snowflake-services-api + 1.19.0-SNAPSHOT + provided + + + net.snowflake + snowflake-ingest-sdk + provided + + + commons-io + commons-io + provided + + + com.squareup.okhttp3 + mockwebserver + test + + + org.apache.nifi + nifi-mock + test + + + org.apache.nifi + nifi-kerberos-credentials-service-api + test + + + org.apache.nifi + nifi-kerberos-user-service-api + test + + + org.slf4j + jcl-over-slf4j + test + + + org.apache.nifi + nifi-key-service-api + 1.19.0-SNAPSHOT + test + + + org.apache.nifi + nifi-key-service + 1.19.0-SNAPSHOT + test + + + org.apache.nifi + nifi-dbcp-service-api + 1.19.0-SNAPSHOT + test + + + org.apache.nifi + nifi-dbcp-base + 1.19.0-SNAPSHOT + test + + + org.apache.nifi + nifi-proxy-configuration-api + 1.19.0-SNAPSHOT + test + + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/GetSnowflakeIngestStatus.java b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/GetSnowflakeIngestStatus.java new file mode 100644 index 0000000000..c1d1f28181 --- /dev/null +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/GetSnowflakeIngestStatus.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.snowflake; + +import static org.apache.nifi.processors.snowflake.util.SnowflakeAttributes.ATTRIBUTE_STAGED_FILE_PATH; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import net.snowflake.ingest.SimpleIngestManager; +import net.snowflake.ingest.connection.HistoryResponse; +import net.snowflake.ingest.connection.HistoryResponse.FileEntry; +import net.snowflake.ingest.connection.IngestResponseException; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; + +@InputRequirement(Requirement.INPUT_REQUIRED) +@DefaultSettings(penaltyDuration = "5 sec") +@ReadsAttributes({ + @ReadsAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH, description = "Staged file path") +}) +@Tags({"snowflake", "snowpipe", "ingest", "history"}) +@CapabilityDescription("Waits until a file in a Snowflake stage is ingested. The stage must be created in the Snowflake account beforehand." + + " This processor is usually connected to an upstream StartSnowflakeIngest processor to make sure that the file is ingested.") +@SeeAlso({StartSnowflakeIngest.class, PutSnowflakeInternalStage.class}) +public class GetSnowflakeIngestStatus extends AbstractProcessor { + + public static final PropertyDescriptor INGEST_MANAGER_PROVIDER = new PropertyDescriptor.Builder() + .name("ingest-manager-provider") + .displayName("Ingest Manager Provider") + .description("Specifies the Controller Service to use for ingesting Snowflake staged files.") + .identifiesControllerService(SnowflakeIngestManagerProviderService.class) + .required(true) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("For FlowFiles of successful ingestion") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("For FlowFiles of failed ingestion") + .build(); + + static final Relationship REL_RETRY = new Relationship.Builder() + .name("retry") + .description("For FlowFiles whose file is still not ingested. These FlowFiles should be routed back to this processor to try again later") + .build(); + + static final List PROPERTIES = Collections.singletonList( + INGEST_MANAGER_PROVIDER + ); + + private static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + REL_SUCCESS, + REL_RETRY, + REL_FAILURE + ))); + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public Set getRelationships() { + return RELATIONSHIPS; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + final FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final String stagedFilePath = flowFile.getAttribute(ATTRIBUTE_STAGED_FILE_PATH); + if (stagedFilePath == null) { + getLogger().error("Missing required attribute [\"" + ATTRIBUTE_STAGED_FILE_PATH + "\"] for FlowFile"); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + } + + final SnowflakeIngestManagerProviderService ingestManagerProviderService = + context.getProperty(INGEST_MANAGER_PROVIDER) + .asControllerService(SnowflakeIngestManagerProviderService.class); + final HistoryResponse historyResponse; + try { + final SimpleIngestManager snowflakeIngestManager = ingestManagerProviderService.getIngestManager(); + historyResponse = snowflakeIngestManager.getHistory(null, null, null); + } catch (URISyntaxException | IOException e) { + throw new ProcessException("Failed to get Snowflake ingest history for staged file [" + stagedFilePath + "]", e); + } catch (IngestResponseException e) { + getLogger().error("Failed to get Snowflake ingest history for staged file [" + stagedFilePath + "]", e); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + } + + final Optional fileEntry = Optional.ofNullable(historyResponse.files) + .flatMap(files -> files.stream() + .filter(entry -> entry.getPath().equals(stagedFilePath) && entry.isComplete()) + .findFirst()); + + if (!fileEntry.isPresent()) { + session.transfer(session.penalize(flowFile), REL_RETRY); + return; + } + + if (fileEntry.get().getErrorsSeen() > 0) { + getLogger().error("Failed to ingest file [" + stagedFilePath + "] in Snowflake stage via pipe [" + ingestManagerProviderService.getPipeName() + "]." + + " Error: " + fileEntry.get().getFirstError()); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + } + session.transfer(flowFile, REL_SUCCESS); + + } +} diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java new file mode 100644 index 0000000000..e2edf12adf --- /dev/null +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.snowflake; + +import static org.apache.nifi.processors.snowflake.util.SnowflakeAttributes.ATTRIBUTE_STAGED_FILE_PATH; + +import java.io.IOException; +import java.io.InputStream; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.snowflake.util.SnowflakeInternalStageType; +import org.apache.nifi.processors.snowflake.util.SnowflakeInternalStageTypeParameters; +import org.apache.nifi.processors.snowflake.util.SnowflakeProperties; + +@InputRequirement(Requirement.INPUT_REQUIRED) +@WritesAttributes({ + @WritesAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH, + description = "Staged file path") +}) +@Tags({"snowflake", "jdbc", "database", "connection", "snowpipe"}) +@CapabilityDescription("Puts files into a Snowflake internal stage. The internal stage must be created in the Snowflake account beforehand." + + " This processor can be connected to a StartSnowflakeIngest processor to ingest the file in the internal stage") +@SeeAlso({StartSnowflakeIngest.class, GetSnowflakeIngestStatus.class}) +public class PutSnowflakeInternalStage extends AbstractProcessor { + + public static final PropertyDescriptor SNOWFLAKE_CONNECTION_PROVIDER = new PropertyDescriptor.Builder() + .name("snowflake-connection-provider") + .displayName("Snowflake Connection Provider") + .description("Specifies the Controller Service to use for creating SQL connections to Snowflake.") + .identifiesControllerService(SnowflakeConnectionProviderService.class) + .required(true) + .build(); + + public static final PropertyDescriptor INTERNAL_STAGE_TYPE = new PropertyDescriptor.Builder() + .name("internal-stage-type") + .displayName("Internal Stage Type") + .description("The type of internal stage to use") + .allowableValues(SnowflakeInternalStageType.class) + .required(true) + .build(); + + public static final PropertyDescriptor DATABASE = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(SnowflakeProperties.DATABASE) + .dependsOn(INTERNAL_STAGE_TYPE, SnowflakeInternalStageType.NAMED, SnowflakeInternalStageType.TABLE) + .build(); + + public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(SnowflakeProperties.SCHEMA) + .dependsOn(INTERNAL_STAGE_TYPE, SnowflakeInternalStageType.NAMED, SnowflakeInternalStageType.TABLE) + .build(); + + public static final PropertyDescriptor TABLE = new PropertyDescriptor.Builder() + .name("table") + .displayName("Table") + .description("The name of the table in the Snowflake account.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(true) + .dependsOn(INTERNAL_STAGE_TYPE, SnowflakeInternalStageType.TABLE) + .build(); + + public static final PropertyDescriptor INTERNAL_STAGE = new PropertyDescriptor.Builder() + .name("internal-stage") + .displayName("Stage") + .description("The name of the internal stage in the Snowflake account to put files into.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(true) + .dependsOn(INTERNAL_STAGE_TYPE, SnowflakeInternalStageType.NAMED) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("For FlowFiles of successful PUT operation") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("For FlowFiles of failed PUT operation") + .build(); + + static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList( + SNOWFLAKE_CONNECTION_PROVIDER, + INTERNAL_STAGE_TYPE, + DATABASE, + SCHEMA, + TABLE, + INTERNAL_STAGE + )); + + private static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + REL_SUCCESS, + REL_FAILURE + ))); + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public Set getRelationships() { + return RELATIONSHIPS; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final SnowflakeInternalStageType internalStageType = SnowflakeInternalStageType.forName(context.getProperty(INTERNAL_STAGE_TYPE) + .getValue()); + final SnowflakeInternalStageTypeParameters parameters = getSnowflakeInternalStageTypeParameters(context, flowFile); + final String internalStageName = internalStageType.getStage(parameters); + final SnowflakeConnectionProviderService connectionProviderService = + context.getProperty(SNOWFLAKE_CONNECTION_PROVIDER) + .asControllerService(SnowflakeConnectionProviderService.class); + + final String stagedFileName = UUID.randomUUID().toString(); + try (final InputStream inputStream = session.read(flowFile); + final SnowflakeConnectionWrapper snowflakeConnection = connectionProviderService.getSnowflakeConnection()) { + snowflakeConnection.unwrap() + .uploadStream(internalStageName, "", inputStream, stagedFileName, false); + } catch (SQLException e) { + getLogger().error("Failed to upload FlowFile content to internal Snowflake stage [" + internalStageName + "]. Staged file path [" + stagedFileName + "]", e); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + } catch (IOException e) { + throw new ProcessException("Failed to read FlowFile content", e); + } + + flowFile = session.putAttribute(flowFile, ATTRIBUTE_STAGED_FILE_PATH, stagedFileName); + session.transfer(flowFile, REL_SUCCESS); + } + + private SnowflakeInternalStageTypeParameters getSnowflakeInternalStageTypeParameters(ProcessContext context, + FlowFile flowFile) { + final String database = context.getProperty(DATABASE).evaluateAttributeExpressions().getValue(); + final String schema = context.getProperty(SCHEMA).evaluateAttributeExpressions().getValue(); + final String table = context.getProperty(TABLE).evaluateAttributeExpressions(flowFile).getValue(); + final String stageName = context.getProperty(INTERNAL_STAGE).evaluateAttributeExpressions(flowFile).getValue(); + return new SnowflakeInternalStageTypeParameters(database, schema, table, stageName); + } +} diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/StartSnowflakeIngest.java b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/StartSnowflakeIngest.java new file mode 100644 index 0000000000..0980f2b627 --- /dev/null +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/StartSnowflakeIngest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.snowflake; + +import static org.apache.nifi.processors.snowflake.util.SnowflakeAttributes.ATTRIBUTE_STAGED_FILE_PATH; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import net.snowflake.ingest.SimpleIngestManager; +import net.snowflake.ingest.connection.IngestResponseException; +import net.snowflake.ingest.utils.StagedFileWrapper; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; + +@InputRequirement(Requirement.INPUT_REQUIRED) +@ReadsAttributes({ + @ReadsAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH, description = "Staged file path") +}) +@Tags({"snowflake", "snowpipe", "ingest"}) +@CapabilityDescription("Ingests files from a Snowflake internal or external stage into a Snowflake table." + + " The stage must be created in the Snowflake account beforehand." + + " The result of the ingestion is not available immediately, so this processor can be connected to an" + + " GetSnowflakeIngestStatus processor to wait for the results") +@SeeAlso({PutSnowflakeInternalStage.class, GetSnowflakeIngestStatus.class}) +public class StartSnowflakeIngest extends AbstractProcessor { + + static final PropertyDescriptor INGEST_MANAGER_PROVIDER = new PropertyDescriptor.Builder() + .name("ingest-manager-provider") + .displayName("Ingest Manager Provider") + .description("Specifies the Controller Service to use for ingesting Snowflake staged files.") + .identifiesControllerService(SnowflakeIngestManagerProviderService.class) + .required(true) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("For FlowFiles of successful ingest request") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("For FlowFiles of failed ingest request") + .build(); + + static final List PROPERTIES = Collections.singletonList( + INGEST_MANAGER_PROVIDER + ); + + private static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + REL_SUCCESS, + REL_FAILURE + ))); + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public Set getRelationships() { + return RELATIONSHIPS; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + final FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final String stagedFilePath = flowFile.getAttribute(ATTRIBUTE_STAGED_FILE_PATH); + if (stagedFilePath == null) { + getLogger().error("Missing required attribute [\"" + ATTRIBUTE_STAGED_FILE_PATH + "\"] for FlowFile"); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + } + + final SnowflakeIngestManagerProviderService ingestManagerProviderService = + context.getProperty(INGEST_MANAGER_PROVIDER) + .asControllerService(SnowflakeIngestManagerProviderService.class); + final SimpleIngestManager snowflakeIngestManager = ingestManagerProviderService.getIngestManager(); + final StagedFileWrapper stagedFile = new StagedFileWrapper(stagedFilePath); + try { + snowflakeIngestManager.ingestFile(stagedFile, null); + } catch (URISyntaxException | IOException e) { + throw new ProcessException(String.format("Failed to ingest Snowflake file [%s]", stagedFilePath), e); + } catch (IngestResponseException e) { + getLogger().error("Failed to ingest Snowflake file [" + stagedFilePath + "]", e); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + } + session.transfer(flowFile, REL_SUCCESS); + } +} diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/util/SnowflakeAttributes.java b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/util/SnowflakeAttributes.java new file mode 100644 index 0000000000..98fe924055 --- /dev/null +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/util/SnowflakeAttributes.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.snowflake.util; + +public final class SnowflakeAttributes { + + private SnowflakeAttributes() { + } + + public static final String ATTRIBUTE_STAGED_FILE_PATH = "snowflake.staged.file.path"; +} diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/util/SnowflakeInternalStageType.java b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/util/SnowflakeInternalStageType.java new file mode 100644 index 0000000000..db4d857e15 --- /dev/null +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/util/SnowflakeInternalStageType.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.snowflake.util; + +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Stream; +import org.apache.nifi.components.DescribedValue; + +public enum SnowflakeInternalStageType implements DescribedValue { + USER("user", "User", "Use the user's internal stage") { + @Override + public String getStage(final SnowflakeInternalStageTypeParameters parameters) { + return "@~"; + } + }, + TABLE("table", "Table", "Use a table's internal stage") { + @Override + public String getStage(final SnowflakeInternalStageTypeParameters parameters) { + final StringBuilder stringBuilder = new StringBuilder("@"); + Optional.ofNullable(parameters.getDatabase()) + .ifPresent(database -> stringBuilder.append(database).append(".")); + Optional.ofNullable(parameters.getSchema()) + .ifPresent(schema -> stringBuilder.append(schema).append(".")); + + stringBuilder.append("%").append(Objects.requireNonNull(parameters.getTable())); + return stringBuilder.toString(); + } + }, + NAMED("named", "Named", "Use a named internal stage. This stage must be created beforehand in Snowflake") { + @Override + public String getStage(final SnowflakeInternalStageTypeParameters parameters) { + final StringBuilder stringBuilder = new StringBuilder("@"); + Optional.ofNullable(parameters.getDatabase()) + .ifPresent(database -> stringBuilder.append(database).append(".")); + Optional.ofNullable(parameters.getSchema()) + .ifPresent(schema -> stringBuilder.append(schema).append(".")); + stringBuilder.append(Objects.requireNonNull(parameters.getStageName())); + return stringBuilder.toString(); + } + }; + + private final String value; + private final String displayName; + private final String description; + + SnowflakeInternalStageType(final String value, final String displayName, final String description) { + this.value = value; + this.displayName = displayName; + this.description = description; + } + + @Override + public String getValue() { + return value; + } + + @Override + public String getDisplayName() { + return displayName; + } + + @Override + public String getDescription() { + return description; + } + + public abstract String getStage(final SnowflakeInternalStageTypeParameters parameters); + + public static SnowflakeInternalStageType forName(String stageType) { + return Stream.of(values()) + .filter(internalStageType -> internalStageType.getValue().equalsIgnoreCase(stageType)) + .findFirst() + .orElseThrow( + () -> new IllegalArgumentException("Invalid SnowflakeInternalStageType: " + stageType)); + } +} diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/util/SnowflakeInternalStageTypeParameters.java b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/util/SnowflakeInternalStageTypeParameters.java new file mode 100644 index 0000000000..6d26151b33 --- /dev/null +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/util/SnowflakeInternalStageTypeParameters.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.snowflake.util; + +public class SnowflakeInternalStageTypeParameters { + + private final String database; + private final String schema; + private final String table; + private final String stageName; + + public SnowflakeInternalStageTypeParameters(final String database, final String schema, final String table, final String stageName) { + this.database = database; + this.schema = schema; + this.table = table; + this.stageName = stageName; + } + + public String getDatabase() { + return database; + } + + public String getSchema() { + return schema; + } + + public String getTable() { + return table; + } + + public String getStageName() { + return stageName; + } +} diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000000..2712db84e4 --- /dev/null +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.nifi.processors.snowflake.GetSnowflakeIngestStatus +org.apache.nifi.processors.snowflake.StartSnowflakeIngest +org.apache.nifi.processors.snowflake.PutSnowflakeInternalStage diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/docs/org.apache.nifi.processors.snowflake.GetSnowflakeIngestStatus/additionalDetails.html b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/docs/org.apache.nifi.processors.snowflake.GetSnowflakeIngestStatus/additionalDetails.html new file mode 100644 index 0000000000..86f3f6069b --- /dev/null +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/docs/org.apache.nifi.processors.snowflake.GetSnowflakeIngestStatus/additionalDetails.html @@ -0,0 +1,48 @@ + + + + + + + GetSnowflakeIngestStatus + + + + + +

GetSnowflakeIngestStatus

+ +

Description

+

+ The GetSnowflakeIngestStatus processor can be used to get the status of a staged file ingested by a Snowflake pipe. + To wait until a staged file is fully ingested (copied into the table) you should connect this processor's "retry" relationship to itself. + The processor requires an upstream connection that provides the path of the staged file to be checked through the "snowflake.staged.file.path" attribute. + See StartSnowflakeIngest processor for details about how to properly set up a flow to ingest staged files. + + NOTE: Snowflake pipes cache the paths of ingested files and never ingest the same file multiple times. + This can cause the processor to enter an "infinite loop" with a FlowFile that has the same "snowflake.staged.file.path" attribute as a staged file that has been previously ingested by the pipe. + It is recommended that the retry mechanism be configured to avoid these scenarios. + +

+ + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/docs/org.apache.nifi.processors.snowflake.PutSnowflakeInternalStage/additionalDetails.html b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/docs/org.apache.nifi.processors.snowflake.PutSnowflakeInternalStage/additionalDetails.html new file mode 100644 index 0000000000..de3ae542ba --- /dev/null +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/docs/org.apache.nifi.processors.snowflake.PutSnowflakeInternalStage/additionalDetails.html @@ -0,0 +1,43 @@ + + + + + + + PutSnowflakeInternalStage + + + + + +

PutSnowflakeInternalStage

+ +

Description

+

+ The PutSnowflakeInternalStage processor can upload a file to a Snowflake internal stage. Please note, that named stages needs to be created in your Snowflake account manually. + See the documentation on how to set up an internal stage here. + The processor requires an upstream connection and the incoming FlowFiles' content will be uploaded to the stage. A unique file name is generated for the file's staged file name. + While the processor may be used separately, it's recommended to connect it to a StartSnowflakeIngest processor so that the uploaded file can be piped into your Snowflake table. +

+ + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/docs/org.apache.nifi.processors.snowflake.StartSnowflakeIngest/additionalDetails.html b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/docs/org.apache.nifi.processors.snowflake.StartSnowflakeIngest/additionalDetails.html new file mode 100644 index 0000000000..42a30c3d18 --- /dev/null +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/docs/org.apache.nifi.processors.snowflake.StartSnowflakeIngest/additionalDetails.html @@ -0,0 +1,52 @@ + + + + + + + StartSnowflakeIngest + + + + + +

StartSnowflakeIngest

+ +

Description

+

+ The StartSnowflakeIngest processor triggers a Snowflake pipe ingestion for a staged file. Please note, that the pipe has to be created in your Snowflake account manually. + The processor requires an upstream connection that provides the path of the file to be ingested in the stage through the "snowflake.staged.file.path" attribute. + This attribute is automatically filled in by the PutSnowflakeInternalStage processor when using an internal stage. + In case a pipe copies data from an external stage, the attribute shall be manually provided (e.g. with an UpdateAttribute processor). + + NOTE: Since Snowflake pipes ingest files asynchronously, this processor transfers FlowFiles to the "success" relationship when they're marked for ingestion. + In order to wait for the actual result of the ingestion, the processor may be connected to a downstream GetSnowflakeIngestStatus processor. + +

+

Example flow for internal stage

+

GetFile -> PutSnowflakeInternalStage -> StartSnowflakeIngest -> GetSnowflakeIngestStatus

+ +

Example flow for external stage

+

ListS3 -> UpdateAttribute (add the "snowflake.staged.file.path" attribute) -> StartSnowflakeIngest -> GetSnowflakeIngestStatus

+ + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/test/java/org/apache/nifi/processors/snowflake/SnowflakeConfigAware.java b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/test/java/org/apache/nifi/processors/snowflake/SnowflakeConfigAware.java new file mode 100644 index 0000000000..ff7c03a684 --- /dev/null +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/test/java/org/apache/nifi/processors/snowflake/SnowflakeConfigAware.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.snowflake; + +import java.nio.file.Path; +import java.nio.file.Paths; +import org.apache.nifi.key.service.StandardPrivateKeyService; +import org.apache.nifi.key.service.api.PrivateKeyService; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.snowflake.service.util.ConnectionUrlFormat; +import org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool; +import org.apache.nifi.snowflake.service.StandardSnowflakeIngestManagerProviderService; +import org.apache.nifi.processors.snowflake.util.SnowflakeProperties; +import org.apache.nifi.util.TestRunner; + +public interface SnowflakeConfigAware { + + Path filePath = Paths.get("???"); + String stagedFilePath = "???"; + + String organizationName = "???"; + String accountName = "???"; + String userName = "???"; + String password = "???"; + String database = "???"; + String schema = "PUBLIC"; + String pipeName = "???"; + String internalStageName = "???"; + String privateKeyFile = "???"; + String privateKeyPassphrase = "???"; + + default PrivateKeyService createPrivateKeyService(TestRunner runner) throws InitializationException { + final StandardPrivateKeyService privateKeyService = new StandardPrivateKeyService(); + + runner.addControllerService("privateKeyService", privateKeyService); + + runner.setProperty(privateKeyService, + StandardPrivateKeyService.KEY_FILE, + privateKeyFile); + runner.setProperty(privateKeyService, + StandardPrivateKeyService.KEY_PASSWORD, + privateKeyPassphrase); + + runner.enableControllerService(privateKeyService); + return privateKeyService; + } + + default SnowflakeConnectionProviderService createConnectionProviderService(TestRunner runner) + throws InitializationException { + final SnowflakeConnectionProviderService connectionProviderService = new SnowflakeComputingConnectionPool(); + + runner.addControllerService("connectionProviderService", connectionProviderService); + + runner.setProperty(connectionProviderService, + SnowflakeComputingConnectionPool.CONNECTION_URL_FORMAT, + ConnectionUrlFormat.ACCOUNT_NAME.getValue()); + runner.setProperty(connectionProviderService, + SnowflakeComputingConnectionPool.SNOWFLAKE_ORGANIZATION_NAME, + organizationName); + runner.setProperty(connectionProviderService, + SnowflakeComputingConnectionPool.SNOWFLAKE_ACCOUNT_NAME, + accountName); + runner.setProperty(connectionProviderService, + SnowflakeComputingConnectionPool.SNOWFLAKE_USER, + userName); + runner.setProperty(connectionProviderService, + SnowflakeComputingConnectionPool.SNOWFLAKE_PASSWORD, + password); + runner.setProperty(connectionProviderService, + SnowflakeProperties.DATABASE, + database); + runner.setProperty(connectionProviderService, + SnowflakeProperties.SCHEMA, + schema); + + runner.enableControllerService(connectionProviderService); + return connectionProviderService; + } + + default SnowflakeIngestManagerProviderService createIngestManagerProviderService(TestRunner runner) + throws InitializationException { + final SnowflakeIngestManagerProviderService ingestManagerProviderService = + new StandardSnowflakeIngestManagerProviderService(); + final PrivateKeyService privateKeyService = createPrivateKeyService(runner); + + runner.addControllerService("ingestManagerProviderService", ingestManagerProviderService); + + runner.setProperty(ingestManagerProviderService, + StandardSnowflakeIngestManagerProviderService.ORGANIZATION_NAME, + organizationName); + runner.setProperty(ingestManagerProviderService, + StandardSnowflakeIngestManagerProviderService.ACCOUNT_NAME, + accountName); + runner.setProperty(ingestManagerProviderService, + StandardSnowflakeIngestManagerProviderService.USER_NAME, + userName); + runner.setProperty(ingestManagerProviderService, + StandardSnowflakeIngestManagerProviderService.DATABASE, + database); + runner.setProperty(ingestManagerProviderService, + StandardSnowflakeIngestManagerProviderService.SCHEMA, + schema); + runner.setProperty(ingestManagerProviderService, + StandardSnowflakeIngestManagerProviderService.PIPE, + pipeName); + runner.setProperty(ingestManagerProviderService, + StandardSnowflakeIngestManagerProviderService.PRIVATE_KEY_SERVICE, + privateKeyService.getIdentifier()); + + runner.enableControllerService(ingestManagerProviderService); + return ingestManagerProviderService; + } +} diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/test/java/org/apache/nifi/processors/snowflake/SnowflakePipeIT.java b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/test/java/org/apache/nifi/processors/snowflake/SnowflakePipeIT.java new file mode 100644 index 0000000000..d895e73f72 --- /dev/null +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/test/java/org/apache/nifi/processors/snowflake/SnowflakePipeIT.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.snowflake; + +import java.security.Security; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import net.snowflake.ingest.utils.StagedFileWrapper; +import org.apache.commons.io.FileUtils; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processors.snowflake.util.SnowflakeAttributes; +import org.apache.nifi.processors.snowflake.util.SnowflakeInternalStageType; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.bouncycastle.jce.provider.BouncyCastleProvider; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +class SnowflakePipeIT implements SnowflakeConfigAware { + + @BeforeAll + static void setUpOnce() { + Security.addProvider(new BouncyCastleProvider()); + } + + @Test + void shouldPutIntoInternalStage() throws Exception { + final PutSnowflakeInternalStage processor = new PutSnowflakeInternalStage(); + + final TestRunner runner = TestRunners.newTestRunner(processor); + final SnowflakeConnectionProviderService connectionProviderService = createConnectionProviderService(runner); + + runner.setProperty(PutSnowflakeInternalStage.SNOWFLAKE_CONNECTION_PROVIDER, connectionProviderService.getIdentifier()); + runner.setProperty(PutSnowflakeInternalStage.INTERNAL_STAGE_TYPE, SnowflakeInternalStageType.NAMED.getValue()); + runner.setProperty(PutSnowflakeInternalStage.INTERNAL_STAGE, internalStageName); + + final String uuid = UUID.randomUUID().toString(); + final String fileName = filePath.getFileName().toString(); + + final Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), fileName); + attributes.put(CoreAttributes.PATH.key(), uuid + "/"); + runner.enqueue(filePath, attributes); + + runner.run(); + + runner.assertAllFlowFilesTransferred(StartSnowflakeIngest.REL_SUCCESS); + } + + @Test + void shouldStartFileIngestion() throws Exception { + final StartSnowflakeIngest processor = new StartSnowflakeIngest(); + + final TestRunner runner = TestRunners.newTestRunner(processor); + final SnowflakeConnectionProviderService connectionProviderService = createConnectionProviderService(runner); + final SnowflakeIngestManagerProviderService ingestManagerProviderService = createIngestManagerProviderService(runner); + + final String uuid = UUID.randomUUID().toString(); + final String fileName = filePath.getFileName().toString(); + + runner.setProperty(StartSnowflakeIngest.INGEST_MANAGER_PROVIDER, ingestManagerProviderService.getIdentifier()); + + try (final SnowflakeConnectionWrapper snowflakeConnection = connectionProviderService.getSnowflakeConnection()) { + snowflakeConnection.unwrap().uploadStream(internalStageName, + uuid, + FileUtils.openInputStream(filePath.toFile()), + fileName, + false); + } + + final Map attributes = Collections.singletonMap(SnowflakeAttributes.ATTRIBUTE_STAGED_FILE_PATH, uuid + "/" + stagedFilePath); + runner.enqueue("", attributes); + + runner.run(); + + runner.assertAllFlowFilesTransferred(StartSnowflakeIngest.REL_SUCCESS); + } + + @Test + void shouldAwaitSnowflakePipeIngestion() throws Exception { + final GetSnowflakeIngestStatus processor = new GetSnowflakeIngestStatus(); + + final TestRunner runner = TestRunners.newTestRunner(processor); + final SnowflakeConnectionProviderService connectionProviderService = createConnectionProviderService(runner); + final SnowflakeIngestManagerProviderService ingestManagerProviderService = createIngestManagerProviderService(runner); + + final String uuid = UUID.randomUUID().toString(); + final String fileName = filePath.getFileName().toString(); + + runner.setProperty(GetSnowflakeIngestStatus.INGEST_MANAGER_PROVIDER, ingestManagerProviderService.getIdentifier()); + + try (final SnowflakeConnectionWrapper snowflakeConnection = connectionProviderService.getSnowflakeConnection()) { + snowflakeConnection.unwrap().uploadStream(internalStageName, + uuid, + FileUtils.openInputStream(filePath.toFile()), + fileName, + false); + } + + final String stagedFilePathAttribute = uuid + "/" + stagedFilePath; + + final StagedFileWrapper stagedFile = new StagedFileWrapper(stagedFilePathAttribute); + ingestManagerProviderService.getIngestManager().ingestFile(stagedFile, null); + + final Map attributes = Collections.singletonMap(SnowflakeAttributes.ATTRIBUTE_STAGED_FILE_PATH, stagedFilePathAttribute); + runner.enqueue("", attributes); + + runner.run(); + + runner.assertAllFlowFilesTransferred(GetSnowflakeIngestStatus.REL_RETRY); + } +} diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api-nar/pom.xml b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api-nar/pom.xml new file mode 100644 index 0000000000..17a357a737 --- /dev/null +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api-nar/pom.xml @@ -0,0 +1,45 @@ + + + + + + nifi-snowflake-bundle + org.apache.nifi + 1.19.0-SNAPSHOT + + 4.0.0 + + nifi-snowflake-services-api-nar + nar + + + + org.apache.nifi + nifi-snowflake-services-api + 1.19.0-SNAPSHOT + + + org.apache.nifi + nifi-standard-services-api-nar + 1.19.0-SNAPSHOT + nar + + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000000..d645695673 --- /dev/null +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..615b6f0e49 --- /dev/null +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,14 @@ +nifi-snowflake-services-api-nar +Copyright 2014-2022 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +************************** +Apache Software License v2 +************************** + + (ASLv2) Snowflake Ingest SDK + The following NOTICE information applies: + Snowflake Ingest SDK + Copyright (c) 2013-2016 Snowflake Computing, Inc. diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api/pom.xml b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api/pom.xml new file mode 100644 index 0000000000..4bf45b3720 --- /dev/null +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api/pom.xml @@ -0,0 +1,46 @@ + + + + + + nifi-snowflake-bundle + org.apache.nifi + 1.19.0-SNAPSHOT + + 4.0.0 + + nifi-snowflake-services-api + + + net.snowflake + snowflake-ingest-sdk + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-utils + 1.19.0-SNAPSHOT + provided + + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api/src/main/java/org/apache/nifi/processors/snowflake/SnowflakeConnectionProviderService.java b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api/src/main/java/org/apache/nifi/processors/snowflake/SnowflakeConnectionProviderService.java new file mode 100644 index 0000000000..2ae4417d2f --- /dev/null +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api/src/main/java/org/apache/nifi/processors/snowflake/SnowflakeConnectionProviderService.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.snowflake; + +import org.apache.nifi.controller.ControllerService; + +public interface SnowflakeConnectionProviderService extends ControllerService { + + SnowflakeConnectionWrapper getSnowflakeConnection(); +} diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api/src/main/java/org/apache/nifi/processors/snowflake/SnowflakeConnectionWrapper.java b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api/src/main/java/org/apache/nifi/processors/snowflake/SnowflakeConnectionWrapper.java new file mode 100644 index 0000000000..2c66a81a70 --- /dev/null +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api/src/main/java/org/apache/nifi/processors/snowflake/SnowflakeConnectionWrapper.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.snowflake; + +import java.sql.Connection; +import java.sql.SQLException; +import net.snowflake.client.jdbc.SnowflakeConnection; + +public class SnowflakeConnectionWrapper implements AutoCloseable { + + final Connection connection; + + public SnowflakeConnectionWrapper(Connection connection) { + this.connection = connection; + } + + @Override + public void close() throws SQLException { + connection.close(); + } + + public SnowflakeConnection unwrap() throws SQLException { + return connection.unwrap(SnowflakeConnection.class); + } +} diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api/src/main/java/org/apache/nifi/processors/snowflake/SnowflakeIngestManagerProviderService.java b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api/src/main/java/org/apache/nifi/processors/snowflake/SnowflakeIngestManagerProviderService.java new file mode 100644 index 0000000000..7cb90a31f5 --- /dev/null +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api/src/main/java/org/apache/nifi/processors/snowflake/SnowflakeIngestManagerProviderService.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.snowflake; + +import net.snowflake.ingest.SimpleIngestManager; +import org.apache.nifi.controller.ControllerService; + +public interface SnowflakeIngestManagerProviderService extends ControllerService { + + String getPipeName(); + SimpleIngestManager getIngestManager(); +} diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api/src/main/java/org/apache/nifi/processors/snowflake/util/SnowflakeProperties.java b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api/src/main/java/org/apache/nifi/processors/snowflake/util/SnowflakeProperties.java new file mode 100644 index 0000000000..1c732e2d50 --- /dev/null +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api/src/main/java/org/apache/nifi/processors/snowflake/util/SnowflakeProperties.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.snowflake.util; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; + +public final class SnowflakeProperties { + private SnowflakeProperties() { + } + + public static final PropertyDescriptor ACCOUNT_LOCATOR = new PropertyDescriptor.Builder() + .name("account-locator") + .displayName("Account Locator") + .description("Snowflake account locator to use for connection.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .required(true) + .build(); + + public static final PropertyDescriptor CLOUD_REGION = new PropertyDescriptor.Builder() + .name("cloud-region") + .displayName("Cloud Region") + .description("Snowflake cloud region to use for connection.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .required(true) + .build(); + + public static final PropertyDescriptor CLOUD_TYPE = new PropertyDescriptor.Builder() + .name("cloud-type") + .displayName("Cloud Type") + .description("Snowflake cloud type to use for connection.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor ORGANIZATION_NAME = new PropertyDescriptor.Builder() + .name("organization-name") + .displayName("Organization Name") + .description("Snowflake organization name to use for connection.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .required(true) + .build(); + + public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder() + .name("account-name") + .displayName("Account Name") + .description("Snowflake account name to use for connection.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .required(true) + .build(); + + public static final PropertyDescriptor DATABASE = new PropertyDescriptor.Builder() + .name("database") + .displayName("Database") + .description("The database to use by default. The same as passing 'db=DATABASE_NAME' to the connection string.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder() + .name("schema") + .displayName("Schema") + .description("The schema to use by default. The same as passing 'schema=SCHEMA' to the connection string.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); +} diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-nar/pom.xml b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-nar/pom.xml index 4e0da23e93..78ca845948 100644 --- a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-nar/pom.xml +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-nar/pom.xml @@ -33,7 +33,7 @@ org.apache.nifi - nifi-standard-services-api-nar + nifi-snowflake-services-api-nar 1.19.0-SNAPSHOT nar diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-nar/src/main/resources/META-INF/NOTICE index 17172bae30..2284f86bbb 100644 --- a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-nar/src/main/resources/META-INF/NOTICE +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-nar/src/main/resources/META-INF/NOTICE @@ -25,7 +25,14 @@ The following binary components are provided under the Apache Software License v Apache Commons Pool Copyright 2002-2019 The Apache Software Foundation - (ASLv2) snowflake-jdbc +************ +Bouncy Castle (MIT style) +************ + +The following binary components are provided under the Bouncy Castle (an MIT style) license. + + Bouncy Castle PKIX, CMS, EAC, TSP, PKCS, OCSP, CMP, and CRMF APIs + Bouncy Castle Provider + Bouncy Castle ASN.1 Extension and Utility APIs The following NOTICE information applies: - snowflake-jdbc - Copyright (c) 2013-2018 Snowflake Computing, Inc. + Copyright (c) 2000 - 2021 The Legion of the Bouncy Castle Inc. (https://www.bouncycastle.org) diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/pom.xml b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/pom.xml index 780ee260bb..1ae42a3078 100644 --- a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/pom.xml +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/pom.xml @@ -34,11 +34,46 @@ org.apache.nifi nifi-dbcp-base 1.19.0-SNAPSHOT + + + org.apache.nifi + nifi-utils + + + + + org.apache.nifi + nifi-key-service-api + 1.19.0-SNAPSHOT + provided + + + org.apache.nifi + nifi-snowflake-services-api + 1.19.0-SNAPSHOT + provided net.snowflake - snowflake-jdbc - 3.13.24 + snowflake-ingest-sdk + provided + + + org.apache.nifi + nifi-utils + 1.19.0-SNAPSHOT + provided + + + org.apache.nifi + nifi-api + 1.19.0-SNAPSHOT + + + org.apache.nifi + nifi-proxy-configuration-api + 1.19.0-SNAPSHOT + provided org.apache.nifi diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/SnowflakeComputingConnectionPool.java b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/SnowflakeComputingConnectionPool.java index ba346ed184..a3f3c0c55f 100644 --- a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/SnowflakeComputingConnectionPool.java +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/SnowflakeComputingConnectionPool.java @@ -16,6 +16,15 @@ */ package org.apache.nifi.snowflake.service; +import java.sql.Driver; +import java.sql.DriverManager; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import net.snowflake.client.core.SFSessionProperty; import net.snowflake.client.jdbc.SnowflakeDriver; import org.apache.nifi.annotation.behavior.DynamicProperties; import org.apache.nifi.annotation.behavior.DynamicProperty; @@ -27,20 +36,20 @@ import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.dbcp.AbstractDBCPConnectionPool; -import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.processor.exception.ProcessException; - -import java.sql.Driver; -import java.sql.DriverManager; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.snowflake.SnowflakeConnectionProviderService; +import org.apache.nifi.processors.snowflake.SnowflakeConnectionWrapper; +import org.apache.nifi.proxy.ProxyConfiguration; +import org.apache.nifi.proxy.ProxyConfigurationService; +import org.apache.nifi.snowflake.service.util.ConnectionUrlFormatParameters; +import org.apache.nifi.processors.snowflake.util.SnowflakeProperties; +import org.apache.nifi.snowflake.service.util.ConnectionUrlFormat; /** - * Implementation of Database Connection Pooling Service for Snowflake. - * Apache DBCP is used for connection pooling functionality. + * Implementation of Database Connection Pooling Service for Snowflake. Apache DBCP is used for connection pooling + * functionality. */ @Tags({"snowflake", "dbcp", "jdbc", "database", "connection", "pooling", "store"}) @CapabilityDescription("Provides Snowflake Connection Pooling Service. Connections can be asked from pool and returned after usage.") @@ -55,34 +64,88 @@ import java.util.List; description = "Snowflake JDBC driver property name prefixed with 'SENSITIVE.' handled as a sensitive property.") }) @RequiresInstanceClassLoading -public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool implements DBCPService { +public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool implements SnowflakeConnectionProviderService { + + public static final PropertyDescriptor CONNECTION_URL_FORMAT = new PropertyDescriptor.Builder() + .name("connection-url-format") + .displayName("Connection URL Format") + .description("The format of the connection URL.") + .allowableValues(ConnectionUrlFormat.class) + .required(true) + .defaultValue(ConnectionUrlFormat.FULL_URL.getValue()) + .build(); public static final PropertyDescriptor SNOWFLAKE_URL = new PropertyDescriptor.Builder() - .fromPropertyDescriptor(AbstractDBCPConnectionPool.DATABASE_URL) - .displayName("Snowflake URL") - .description("Example connection string: jdbc:snowflake://[account].[region].snowflakecomputing.com/?[connection_params]" + - " The connection parameters can include db=DATABASE_NAME to avoid using qualified table names such as DATABASE_NAME.PUBLIC.TABLE_NAME") - .build(); + .fromPropertyDescriptor(AbstractDBCPConnectionPool.DATABASE_URL) + .displayName("Snowflake URL") + .description("Example connection string: jdbc:snowflake://[account].[region]" + ConnectionUrlFormat.SNOWFLAKE_HOST_SUFFIX + "/?[connection_params]" + + " The connection parameters can include db=DATABASE_NAME to avoid using qualified table names such as DATABASE_NAME.PUBLIC.TABLE_NAME") + .required(true) + .dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.FULL_URL) + .build(); + + public static final PropertyDescriptor SNOWFLAKE_ACCOUNT_LOCATOR = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(SnowflakeProperties.ACCOUNT_LOCATOR) + .dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_LOCATOR) + .build(); + + public static final PropertyDescriptor SNOWFLAKE_CLOUD_REGION = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(SnowflakeProperties.CLOUD_REGION) + .dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_LOCATOR) + .build(); + + public static final PropertyDescriptor SNOWFLAKE_CLOUD_TYPE = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(SnowflakeProperties.CLOUD_TYPE) + .dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_LOCATOR) + .build(); + + public static final PropertyDescriptor SNOWFLAKE_ORGANIZATION_NAME = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(SnowflakeProperties.ORGANIZATION_NAME) + .dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_NAME) + .build(); + + public static final PropertyDescriptor SNOWFLAKE_ACCOUNT_NAME = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(SnowflakeProperties.ACCOUNT_NAME) + .dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_NAME) + .build(); public static final PropertyDescriptor SNOWFLAKE_USER = new PropertyDescriptor.Builder() - .fromPropertyDescriptor(AbstractDBCPConnectionPool.DB_USER) - .displayName("Snowflake User") - .description("The Snowflake user name") - .build(); + .fromPropertyDescriptor(AbstractDBCPConnectionPool.DB_USER) + .displayName("Username") + .description("The Snowflake user name.") + .build(); public static final PropertyDescriptor SNOWFLAKE_PASSWORD = new PropertyDescriptor.Builder() - .fromPropertyDescriptor(AbstractDBCPConnectionPool.DB_PASSWORD) - .displayName("Snowflake Password") - .description("The password for the Snowflake user") - .build(); + .fromPropertyDescriptor(AbstractDBCPConnectionPool.DB_PASSWORD) + .displayName("Password") + .description("The password for the Snowflake user.") + .build(); + + public static final PropertyDescriptor SNOWFLAKE_WAREHOUSE = new PropertyDescriptor.Builder() + .name("warehouse") + .displayName("Warehouse") + .description("The warehouse to use by default. The same as passing 'warehouse=WAREHOUSE' to the connection string.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); private static final List PROPERTIES; static { final List props = new ArrayList<>(); + props.add(CONNECTION_URL_FORMAT); props.add(SNOWFLAKE_URL); + props.add(SNOWFLAKE_ACCOUNT_LOCATOR); + props.add(SNOWFLAKE_CLOUD_REGION); + props.add(SNOWFLAKE_CLOUD_TYPE); + props.add(SNOWFLAKE_ORGANIZATION_NAME); + props.add(SNOWFLAKE_ACCOUNT_NAME); props.add(SNOWFLAKE_USER); props.add(SNOWFLAKE_PASSWORD); + props.add(SnowflakeProperties.DATABASE); + props.add(SnowflakeProperties.SCHEMA); + props.add(SNOWFLAKE_WAREHOUSE); + props.add(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE); props.add(VALIDATION_QUERY); props.add(MAX_WAIT_TIME); props.add(MAX_TOTAL_CONNECTIONS); @@ -108,13 +171,11 @@ public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool @Override protected String getUrl(final ConfigurationContext context) { - String snowflakeUrl = context.getProperty(SNOWFLAKE_URL).evaluateAttributeExpressions().getValue(); + final ConnectionUrlFormat connectionUrlFormat = ConnectionUrlFormat.forName(context.getProperty(CONNECTION_URL_FORMAT) + .getValue()); + final ConnectionUrlFormatParameters parameters = getConnectionUrlFormatParameters(context); - if (!snowflakeUrl.startsWith("jdbc:snowflake")) { - snowflakeUrl = "jdbc:snowflake://" + snowflakeUrl; - } - - return snowflakeUrl; + return connectionUrlFormat.buildConnectionUrl(parameters); } @Override @@ -126,4 +187,77 @@ public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool throw new ProcessException("Snowflake driver unavailable or incompatible connection URL", e); } } + + @Override + protected Map getConnectionProperties(final ConfigurationContext context) { + final String database = context.getProperty(SnowflakeProperties.DATABASE).evaluateAttributeExpressions().getValue(); + final String schema = context.getProperty(SnowflakeProperties.SCHEMA).evaluateAttributeExpressions().getValue(); + final String warehouse = context.getProperty(SNOWFLAKE_WAREHOUSE).evaluateAttributeExpressions().getValue(); + + final Map connectionProperties = new HashMap<>(); + if (database != null) { + connectionProperties.put("db", database); + } + if (schema != null) { + connectionProperties.put("schema", schema); + } + if (warehouse != null) { + connectionProperties.put("warehouse", warehouse); + } + + final ProxyConfigurationService proxyConfigurationService = context + .getProperty(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE) + .asControllerService(ProxyConfigurationService.class); + if (proxyConfigurationService != null) { + final ProxyConfiguration proxyConfiguration = proxyConfigurationService.getConfiguration(); + connectionProperties.put(SFSessionProperty.USE_PROXY.getPropertyKey(), "true"); + if (proxyConfiguration.getProxyServerHost() != null) { + connectionProperties.put(SFSessionProperty.PROXY_HOST.getPropertyKey(), proxyConfiguration.getProxyServerHost()); + } + if (proxyConfiguration.getProxyServerPort() != null) { + connectionProperties.put(SFSessionProperty.PROXY_PORT.getPropertyKey(), proxyConfiguration.getProxyServerPort().toString()); + } + if (proxyConfiguration.getProxyUserName() != null) { + connectionProperties.put(SFSessionProperty.PROXY_USER.getPropertyKey(), proxyConfiguration.getProxyUserName()); + } + if (proxyConfiguration.getProxyUserPassword() != null) { + connectionProperties.put(SFSessionProperty.PROXY_PASSWORD.getPropertyKey(), proxyConfiguration.getProxyUserPassword()); + } + if (proxyConfiguration.getProxyType() != null) { + connectionProperties.put(SFSessionProperty.PROXY_PROTOCOL.getPropertyKey(), proxyConfiguration.getProxyType().name().toLowerCase()); + } + } + return connectionProperties; + } + + @Override + public SnowflakeConnectionWrapper getSnowflakeConnection() { + return new SnowflakeConnectionWrapper(getConnection()); + } + + private ConnectionUrlFormatParameters getConnectionUrlFormatParameters(ConfigurationContext context) { + final String snowflakeUrl = context.getProperty(SNOWFLAKE_URL).evaluateAttributeExpressions().getValue(); + final String organizationName = context.getProperty(SNOWFLAKE_ORGANIZATION_NAME) + .evaluateAttributeExpressions() + .getValue(); + final String accountName = context.getProperty(SNOWFLAKE_ACCOUNT_NAME) + .evaluateAttributeExpressions() + .getValue(); + final String accountLocator = context.getProperty(SNOWFLAKE_ACCOUNT_LOCATOR) + .evaluateAttributeExpressions() + .getValue(); + final String cloudRegion = context.getProperty(SNOWFLAKE_CLOUD_REGION) + .evaluateAttributeExpressions() + .getValue(); + final String cloudType = context.getProperty(SNOWFLAKE_CLOUD_TYPE) + .evaluateAttributeExpressions() + .getValue(); + return new ConnectionUrlFormatParameters( + snowflakeUrl, + organizationName, + accountName, + accountLocator, + cloudRegion, + cloudType); + } } diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/StandardSnowflakeIngestManagerProviderService.java b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/StandardSnowflakeIngestManagerProviderService.java new file mode 100644 index 0000000000..3b2f075669 --- /dev/null +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/StandardSnowflakeIngestManagerProviderService.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.snowflake.service; + +import java.security.NoSuchAlgorithmException; +import java.security.PrivateKey; +import java.security.spec.InvalidKeySpecException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import net.snowflake.ingest.SimpleIngestManager; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.key.service.api.PrivateKeyService; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.snowflake.SnowflakeIngestManagerProviderService; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.snowflake.service.util.AccountIdentifierFormat; +import org.apache.nifi.snowflake.service.util.AccountIdentifierFormatParameters; +import org.apache.nifi.snowflake.service.util.ConnectionUrlFormat; +import org.apache.nifi.processors.snowflake.util.SnowflakeProperties; + +@Tags({"snowflake", "jdbc", "database", "connection"}) +@CapabilityDescription("Provides a Snowflake Ingest Manager for Snowflake pipe processors") +public class StandardSnowflakeIngestManagerProviderService extends AbstractControllerService + implements SnowflakeIngestManagerProviderService { + + public static final PropertyDescriptor ACCOUNT_IDENTIFIER_FORMAT = new PropertyDescriptor.Builder() + .name("account-identifier-format") + .displayName("Account Identifier Format") + .description("The format of the account identifier.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .required(true) + .allowableValues(AccountIdentifierFormat.class) + .defaultValue(AccountIdentifierFormat.ACCOUNT_NAME.getValue()) + .build(); + + public static final PropertyDescriptor HOST_URL = new PropertyDescriptor.Builder() + .name("host-url") + .displayName("Snowflake URL") + .description("Example host url: [account-locator].[cloud-region].[cloud]" + ConnectionUrlFormat.SNOWFLAKE_HOST_SUFFIX) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .required(true) + .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.FULL_URL) + .build(); + + public static final PropertyDescriptor ACCOUNT_LOCATOR = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(SnowflakeProperties.ACCOUNT_LOCATOR) + .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_LOCATOR) + .build(); + + public static final PropertyDescriptor CLOUD_REGION = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(SnowflakeProperties.CLOUD_REGION) + .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_LOCATOR) + .build(); + + public static final PropertyDescriptor CLOUD_TYPE = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(SnowflakeProperties.CLOUD_TYPE) + .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_LOCATOR) + .build(); + + public static final PropertyDescriptor ORGANIZATION_NAME = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(SnowflakeProperties.ORGANIZATION_NAME) + .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_NAME) + .build(); + + public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(SnowflakeProperties.ACCOUNT_NAME) + .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_NAME) + .build(); + + public static final PropertyDescriptor USER_NAME = new PropertyDescriptor.Builder() + .name("user-name") + .displayName("User Name") + .description("The Snowflake user name.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .required(true) + .build(); + + public static final PropertyDescriptor PRIVATE_KEY_SERVICE = new PropertyDescriptor.Builder() + .name("private-key-service") + .displayName("Private Key Service") + .description("Specifies the Controller Service that will provide the private key. The public key needs to be added to the user account in the Snowflake account beforehand.") + .identifiesControllerService(PrivateKeyService.class) + .required(true) + .build(); + + public static final PropertyDescriptor DATABASE = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(SnowflakeProperties.DATABASE) + .required(true) + .build(); + + public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(SnowflakeProperties.SCHEMA) + .required(true) + .build(); + + public static final PropertyDescriptor PIPE = new PropertyDescriptor.Builder() + .name("pipe") + .displayName("Pipe") + .description("The Snowflake pipe to ingest from.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .required(true) + .build(); + + static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList( + ACCOUNT_IDENTIFIER_FORMAT, + HOST_URL, + ACCOUNT_LOCATOR, + CLOUD_REGION, + CLOUD_TYPE, + ORGANIZATION_NAME, + ACCOUNT_NAME, + USER_NAME, + PRIVATE_KEY_SERVICE, + DATABASE, + SCHEMA, + PIPE + )); + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + private volatile String fullyQualifiedPipeName; + private volatile SimpleIngestManager ingestManager; + + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws InitializationException { + final String user = context.getProperty(USER_NAME).evaluateAttributeExpressions().getValue(); + final String database = context.getProperty(DATABASE) + .evaluateAttributeExpressions() + .getValue(); + final String schema = context.getProperty(SCHEMA) + .evaluateAttributeExpressions() + .getValue(); + final String pipe = context.getProperty(PIPE).evaluateAttributeExpressions().getValue(); + fullyQualifiedPipeName = database + "." + schema + "." + pipe; + final PrivateKeyService privateKeyService = context.getProperty(PRIVATE_KEY_SERVICE) + .asControllerService(PrivateKeyService.class); + final PrivateKey privateKey = privateKeyService.getPrivateKey(); + + final AccountIdentifierFormat accountIdentifierFormat = AccountIdentifierFormat.forName(context.getProperty(ACCOUNT_IDENTIFIER_FORMAT) + .getValue()); + final AccountIdentifierFormatParameters parameters = getAccountIdentifierFormatParameters(context); + final String account = accountIdentifierFormat.getAccount(parameters); + final String host = accountIdentifierFormat.getHostname(parameters); + try { + ingestManager = new SimpleIngestManager(account, user, fullyQualifiedPipeName, privateKey, "https", host, 443); + } catch (NoSuchAlgorithmException | InvalidKeySpecException e) { + throw new InitializationException("Failed create Snowflake ingest manager", e); + } + } + + @OnDisabled + public void onDisabled() { + if (ingestManager != null) { + ingestManager.close(); + ingestManager = null; + } + } + + @Override + public String getPipeName() { + return fullyQualifiedPipeName; + } + + @Override + public SimpleIngestManager getIngestManager() { + return ingestManager; + } + + private AccountIdentifierFormatParameters getAccountIdentifierFormatParameters(ConfigurationContext context) { + final String hostUrl = context.getProperty(HOST_URL) + .evaluateAttributeExpressions() + .getValue(); + final String organizationName = context.getProperty(ORGANIZATION_NAME) + .evaluateAttributeExpressions() + .getValue(); + final String accountName = context.getProperty(ACCOUNT_NAME) + .evaluateAttributeExpressions() + .getValue(); + final String accountLocator = context.getProperty(ACCOUNT_LOCATOR) + .evaluateAttributeExpressions() + .getValue(); + final String cloudRegion = context.getProperty(CLOUD_REGION) + .evaluateAttributeExpressions() + .getValue(); + final String cloudType = context.getProperty(CLOUD_TYPE) + .evaluateAttributeExpressions() + .getValue(); + return new AccountIdentifierFormatParameters(hostUrl, + organizationName, + accountName, + accountLocator, + cloudRegion, + cloudType); + } +} diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/AccountIdentifierFormat.java b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/AccountIdentifierFormat.java new file mode 100644 index 0000000000..4b52dd69db --- /dev/null +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/AccountIdentifierFormat.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.snowflake.service.util; + +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Stream; +import org.apache.nifi.components.DescribedValue; + +public enum AccountIdentifierFormat implements DescribedValue { + FULL_URL("full-url", "Full URL", "Provide an account identifier in a single property") { + @Override + public String getAccount(final AccountIdentifierFormatParameters parameters) { + final String[] hostParts = getHostname(parameters).split("\\."); + if (hostParts.length == 0) { + throw new IllegalArgumentException("Invalid Snowflake host url"); + } + return hostParts[0]; + } + + @Override + public String getHostname(final AccountIdentifierFormatParameters parameters) { + return Objects.requireNonNull(parameters.getHostUrl()); + } + }, + ACCOUNT_NAME("account-name", "Account Name", "Provide a Snowflake Account Name") { + @Override + public String getAccount(final AccountIdentifierFormatParameters parameters) { + final String organizationName = Objects.requireNonNull(parameters.getOrganizationName()); + final String accountName = Objects.requireNonNull(parameters.getAccountName()); + return organizationName + "-" + accountName; + } + + @Override + public String getHostname(final AccountIdentifierFormatParameters parameters) { + return getAccount(parameters) + ConnectionUrlFormat.SNOWFLAKE_HOST_SUFFIX; + } + }, + ACCOUNT_LOCATOR("account-locator", "Account Locator", "Provide a Snowflake Account Locator") { + @Override + public String getAccount(final AccountIdentifierFormatParameters parameters) { + return Objects.requireNonNull(parameters.getAccountLocator()); + } + + @Override + public String getHostname(final AccountIdentifierFormatParameters parameters) { + final String accountLocator = Objects.requireNonNull(parameters.getAccountLocator()); + final String cloudRegion = Objects.requireNonNull(parameters.getCloudRegion()); + final String optCloudType = parameters.getCloudType(); + final StringBuilder hostBuilder = new StringBuilder(); + hostBuilder.append(accountLocator) + .append(".").append(cloudRegion); + Optional.ofNullable(optCloudType) + .ifPresent(cloudType -> hostBuilder.append(".").append(cloudType)); + hostBuilder.append(ConnectionUrlFormat.SNOWFLAKE_HOST_SUFFIX); + return hostBuilder.toString(); + } + }; + + private final String value; + private final String displayName; + private final String description; + + AccountIdentifierFormat(final String value, final String displayName, final String description) { + this.value = value; + this.displayName = displayName; + this.description = description; + } + + @Override + public String getValue() { + return value; + } + + @Override + public String getDisplayName() { + return displayName; + } + + @Override + public String getDescription() { + return description; + } + + public abstract String getAccount(final AccountIdentifierFormatParameters parameters); + public abstract String getHostname(final AccountIdentifierFormatParameters parameters); + + public static AccountIdentifierFormat forName(String provideMethod) { + return Stream.of(values()).filter(provider -> provider.getValue().equalsIgnoreCase(provideMethod)) + .findFirst() + .orElseThrow( + () -> new IllegalArgumentException("Invalid AccountIdentifierFormat: " + provideMethod)); + } +} diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/AccountIdentifierFormatParameters.java b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/AccountIdentifierFormatParameters.java new file mode 100644 index 0000000000..21408c6ca5 --- /dev/null +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/AccountIdentifierFormatParameters.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.snowflake.service.util; + +public final class AccountIdentifierFormatParameters extends SnowflakeCommonParameters { + + private final String hostUrl; + + public AccountIdentifierFormatParameters(final String hostUrl, + final String organizationName, + final String accountName, + final String accountLocator, + final String cloudRegion, + final String cloudType) { + super(organizationName, accountName, accountLocator, cloudRegion, cloudType); + this.hostUrl = hostUrl; + } + + public String getHostUrl() { + return hostUrl; + } +} diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/ConnectionUrlFormat.java b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/ConnectionUrlFormat.java new file mode 100644 index 0000000000..04ea51eb44 --- /dev/null +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/ConnectionUrlFormat.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.snowflake.service.util; + +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Stream; +import org.apache.nifi.components.DescribedValue; + +public enum ConnectionUrlFormat implements DescribedValue { + FULL_URL("full-url", "Full URL", "Provide connection URL in a single property") { + @Override + public String buildConnectionUrl(final ConnectionUrlFormatParameters parameters) { + String snowflakeUrl = parameters.getSnowflakeUrl(); + if (!snowflakeUrl.startsWith(SNOWFLAKE_SCHEME)) { + snowflakeUrl = SNOWFLAKE_URI_PREFIX + snowflakeUrl; + } + + return snowflakeUrl; + } + }, + ACCOUNT_NAME("account-name", "Account Name", "Provide a Snowflake Account Name") { + @Override + public String buildConnectionUrl(final ConnectionUrlFormatParameters parameters) { + final String organizationName = Objects.requireNonNull(parameters.getOrganizationName()); + final String accountName = Objects.requireNonNull(parameters.getAccountName()); + return SNOWFLAKE_URI_PREFIX + organizationName + "-" + accountName + SNOWFLAKE_HOST_SUFFIX; + } + }, + ACCOUNT_LOCATOR("account-locator", "Account Locator", "Provide a Snowflake Account Locator") { + @Override + public String buildConnectionUrl(final ConnectionUrlFormatParameters parameters) { + final String accountLocator = Objects.requireNonNull(parameters.getAccountLocator()); + final String cloudRegion = Objects.requireNonNull(parameters.getCloudRegion()); + final String optCloudType = parameters.getCloudType(); + + final StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append(SNOWFLAKE_URI_PREFIX).append(accountLocator) + .append(".").append(cloudRegion); + Optional.ofNullable(optCloudType) + .ifPresent(cloudType -> stringBuilder.append(".").append(cloudType)); + stringBuilder.append(SNOWFLAKE_HOST_SUFFIX); + return stringBuilder.toString(); + } + }; + + public static final String SNOWFLAKE_HOST_SUFFIX = ".snowflakecomputing.com"; + public static final String SNOWFLAKE_SCHEME = "jdbc:snowflake"; + public static final String SNOWFLAKE_URI_PREFIX = SNOWFLAKE_SCHEME + "://"; + + private final String value; + private final String displayName; + private final String description; + + ConnectionUrlFormat(final String value, final String displayName, final String description) { + this.value = value; + this.displayName = displayName; + this.description = description; + } + + @Override + public String getValue() { + return value; + } + + @Override + public String getDisplayName() { + return displayName; + } + + @Override + public String getDescription() { + return description; + } + + public abstract String buildConnectionUrl(final ConnectionUrlFormatParameters parameters); + + public static ConnectionUrlFormat forName(String provideMethod) { + return Stream.of(values()).filter(provider -> provider.getValue().equalsIgnoreCase(provideMethod)) + .findFirst() + .orElseThrow( + () -> new IllegalArgumentException("Invalid ConnectionUrlFormat: " + provideMethod)); + } +} diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/ConnectionUrlFormatParameters.java b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/ConnectionUrlFormatParameters.java new file mode 100644 index 0000000000..2cd966633b --- /dev/null +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/ConnectionUrlFormatParameters.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.snowflake.service.util; + +public final class ConnectionUrlFormatParameters extends SnowflakeCommonParameters { + + private final String snowflakeUrl; + + public ConnectionUrlFormatParameters(final String snowflakeUrl, + final String organizationName, + final String accountName, + final String accountLocator, + final String cloudRegion, + final String cloudType) { + super(organizationName, accountName, accountLocator, cloudRegion, cloudType); + this.snowflakeUrl = snowflakeUrl; + } + + public String getSnowflakeUrl() { + return snowflakeUrl; + } +} diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/SnowflakeCommonParameters.java b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/SnowflakeCommonParameters.java new file mode 100644 index 0000000000..999d67e5d0 --- /dev/null +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/SnowflakeCommonParameters.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.snowflake.service.util; + +public class SnowflakeCommonParameters { + + protected final String organizationName; + protected final String accountName; + protected final String accountLocator; + protected final String cloudRegion; + protected final String cloudType; + + public SnowflakeCommonParameters( + final String organizationName, + final String accountName, + final String accountLocator, + final String cloudRegion, + final String cloudType) { + this.organizationName = organizationName; + this.accountName = accountName; + this.accountLocator = accountLocator; + this.cloudRegion = cloudRegion; + this.cloudType = cloudType; + } + + public String getOrganizationName() { + return organizationName; + } + + public String getAccountName() { + return accountName; + } + + public String getAccountLocator() { + return accountLocator; + } + + public String getCloudRegion() { + return cloudRegion; + } + + public String getCloudType() { + return cloudType; + } +} diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService index 0e0a16be3f..9ad56927a4 100644 --- a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -13,3 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool +org.apache.nifi.snowflake.service.StandardSnowflakeIngestManagerProviderService diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/pom.xml b/nifi-nar-bundles/nifi-snowflake-bundle/pom.xml index 8445d14126..99dedf871f 100644 --- a/nifi-nar-bundles/nifi-snowflake-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-snowflake-bundle/pom.xml @@ -24,5 +24,20 @@ nifi-snowflake-services nifi-snowflake-services-nar + nifi-snowflake-processors + nifi-snowflake-processors-nar + nifi-snowflake-services-api + nifi-snowflake-services-api-nar + + + + + + net.snowflake + snowflake-ingest-sdk + 1.0.2-beta.6 + + +