NIFI-10370 Create PutSnowflake processor using Snowpipe ingest

This closes #6584.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Robert Kalmar 2022-08-25 09:16:31 +02:00 committed by Peter Turcsanyi
parent 2698282615
commit 89426d167b
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
39 changed files with 2729 additions and 39 deletions

View File

@ -680,10 +680,10 @@ The following binary components are provided under the Apache Software License v
Apache CXF
Copyright 2008-2017 The Apache Software Foundation
(ASLv2) snowflake-jdbc
(ASLv2) Snowflake Ingest SDK
The following NOTICE information applies:
snowflake-jdbc
Copyright (c) 2013-2018 Snowflake Computing, Inc.
Snowflake Ingest SDK
Copyright (c) 2013-2016 Snowflake Computing, Inc.
(ASLv2) Apache Lucene
The following NOTICE information applies:

View File

@ -1471,12 +1471,24 @@ language governing permissions and limitations under the License. -->
<activeByDefault>false</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-snowflake-services-api-nar</artifactId>
<version>1.19.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-snowflake-services-nar</artifactId>
<version>1.19.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-snowflake-processors-nar</artifactId>
<version>1.19.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
</dependencies>
</profile>
<profile>

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.dbcp;
import java.util.HashMap;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
@ -448,6 +449,8 @@ public abstract class AbstractDBCPConnectionPool extends AbstractControllerServi
dataSource.addConnectionProperty(descriptor.getName(), propertyValue.evaluateAttributeExpressions().getValue());
}
});
getConnectionProperties(context).forEach(dataSource::addConnectionProperty);
}
private KerberosUser getKerberosUser(final ConfigurationContext context) {
@ -496,6 +499,15 @@ public abstract class AbstractDBCPConnectionPool extends AbstractControllerServi
}
}
/**
* Override in subclasses to provide connection properties to the data source
*
* @return Key-value pairs that will be added as connection properties
*/
protected Map<String, String> getConnectionProperties(final ConfigurationContext context) {
return new HashMap<>();
}
protected Long extractMillisWithInfinite(PropertyValue prop) {
return "-1".equals(prop.getValue()) ? -1 : prop.asTimePeriod(TimeUnit.MILLISECONDS);
}

View File

@ -0,0 +1,45 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-snowflake-bundle</artifactId>
<groupId>org.apache.nifi</groupId>
<version>1.19.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-snowflake-processors-nar</artifactId>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-snowflake-processors</artifactId>
<version>1.19.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-snowflake-services-api-nar</artifactId>
<version>1.19.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,204 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -0,0 +1,5 @@
nifi-snowflake-processors-nar
Copyright 2015-2020 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).

View File

@ -0,0 +1,121 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-snowflake-bundle</artifactId>
<groupId>org.apache.nifi</groupId>
<version>1.19.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-snowflake-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>1.19.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-snowflake-services</artifactId>
<version>1.19.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-snowflake-services-api</artifactId>
<version>1.19.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-ingest-sdk</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>mockwebserver</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-credentials-service-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-user-service-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-key-service-api</artifactId>
<version>1.19.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-key-service</artifactId>
<version>1.19.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-api</artifactId>
<version>1.19.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-base</artifactId>
<version>1.19.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-proxy-configuration-api</artifactId>
<version>1.19.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.snowflake;
import static org.apache.nifi.processors.snowflake.util.SnowflakeAttributes.ATTRIBUTE_STAGED_FILE_PATH;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import net.snowflake.ingest.SimpleIngestManager;
import net.snowflake.ingest.connection.HistoryResponse;
import net.snowflake.ingest.connection.HistoryResponse.FileEntry;
import net.snowflake.ingest.connection.IngestResponseException;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.configuration.DefaultSettings;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
@InputRequirement(Requirement.INPUT_REQUIRED)
@DefaultSettings(penaltyDuration = "5 sec")
@ReadsAttributes({
@ReadsAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH, description = "Staged file path")
})
@Tags({"snowflake", "snowpipe", "ingest", "history"})
@CapabilityDescription("Waits until a file in a Snowflake stage is ingested. The stage must be created in the Snowflake account beforehand."
+ " This processor is usually connected to an upstream StartSnowflakeIngest processor to make sure that the file is ingested.")
@SeeAlso({StartSnowflakeIngest.class, PutSnowflakeInternalStage.class})
public class GetSnowflakeIngestStatus extends AbstractProcessor {
public static final PropertyDescriptor INGEST_MANAGER_PROVIDER = new PropertyDescriptor.Builder()
.name("ingest-manager-provider")
.displayName("Ingest Manager Provider")
.description("Specifies the Controller Service to use for ingesting Snowflake staged files.")
.identifiesControllerService(SnowflakeIngestManagerProviderService.class)
.required(true)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("For FlowFiles of successful ingestion")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("For FlowFiles of failed ingestion")
.build();
static final Relationship REL_RETRY = new Relationship.Builder()
.name("retry")
.description("For FlowFiles whose file is still not ingested. These FlowFiles should be routed back to this processor to try again later")
.build();
static final List<PropertyDescriptor> PROPERTIES = Collections.singletonList(
INGEST_MANAGER_PROVIDER
);
private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
REL_SUCCESS,
REL_RETRY,
REL_FAILURE
)));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final String stagedFilePath = flowFile.getAttribute(ATTRIBUTE_STAGED_FILE_PATH);
if (stagedFilePath == null) {
getLogger().error("Missing required attribute [\"" + ATTRIBUTE_STAGED_FILE_PATH + "\"] for FlowFile");
session.transfer(session.penalize(flowFile), REL_FAILURE);
return;
}
final SnowflakeIngestManagerProviderService ingestManagerProviderService =
context.getProperty(INGEST_MANAGER_PROVIDER)
.asControllerService(SnowflakeIngestManagerProviderService.class);
final HistoryResponse historyResponse;
try {
final SimpleIngestManager snowflakeIngestManager = ingestManagerProviderService.getIngestManager();
historyResponse = snowflakeIngestManager.getHistory(null, null, null);
} catch (URISyntaxException | IOException e) {
throw new ProcessException("Failed to get Snowflake ingest history for staged file [" + stagedFilePath + "]", e);
} catch (IngestResponseException e) {
getLogger().error("Failed to get Snowflake ingest history for staged file [" + stagedFilePath + "]", e);
session.transfer(session.penalize(flowFile), REL_FAILURE);
return;
}
final Optional<FileEntry> fileEntry = Optional.ofNullable(historyResponse.files)
.flatMap(files -> files.stream()
.filter(entry -> entry.getPath().equals(stagedFilePath) && entry.isComplete())
.findFirst());
if (!fileEntry.isPresent()) {
session.transfer(session.penalize(flowFile), REL_RETRY);
return;
}
if (fileEntry.get().getErrorsSeen() > 0) {
getLogger().error("Failed to ingest file [" + stagedFilePath + "] in Snowflake stage via pipe [" + ingestManagerProviderService.getPipeName() + "]."
+ " Error: " + fileEntry.get().getFirstError());
session.transfer(session.penalize(flowFile), REL_FAILURE);
return;
}
session.transfer(flowFile, REL_SUCCESS);
}
}

View File

@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.snowflake;
import static org.apache.nifi.processors.snowflake.util.SnowflakeAttributes.ATTRIBUTE_STAGED_FILE_PATH;
import java.io.IOException;
import java.io.InputStream;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.snowflake.util.SnowflakeInternalStageType;
import org.apache.nifi.processors.snowflake.util.SnowflakeInternalStageTypeParameters;
import org.apache.nifi.processors.snowflake.util.SnowflakeProperties;
@InputRequirement(Requirement.INPUT_REQUIRED)
@WritesAttributes({
@WritesAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH,
description = "Staged file path")
})
@Tags({"snowflake", "jdbc", "database", "connection", "snowpipe"})
@CapabilityDescription("Puts files into a Snowflake internal stage. The internal stage must be created in the Snowflake account beforehand."
+ " This processor can be connected to a StartSnowflakeIngest processor to ingest the file in the internal stage")
@SeeAlso({StartSnowflakeIngest.class, GetSnowflakeIngestStatus.class})
public class PutSnowflakeInternalStage extends AbstractProcessor {
public static final PropertyDescriptor SNOWFLAKE_CONNECTION_PROVIDER = new PropertyDescriptor.Builder()
.name("snowflake-connection-provider")
.displayName("Snowflake Connection Provider")
.description("Specifies the Controller Service to use for creating SQL connections to Snowflake.")
.identifiesControllerService(SnowflakeConnectionProviderService.class)
.required(true)
.build();
public static final PropertyDescriptor INTERNAL_STAGE_TYPE = new PropertyDescriptor.Builder()
.name("internal-stage-type")
.displayName("Internal Stage Type")
.description("The type of internal stage to use")
.allowableValues(SnowflakeInternalStageType.class)
.required(true)
.build();
public static final PropertyDescriptor DATABASE = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(SnowflakeProperties.DATABASE)
.dependsOn(INTERNAL_STAGE_TYPE, SnowflakeInternalStageType.NAMED, SnowflakeInternalStageType.TABLE)
.build();
public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(SnowflakeProperties.SCHEMA)
.dependsOn(INTERNAL_STAGE_TYPE, SnowflakeInternalStageType.NAMED, SnowflakeInternalStageType.TABLE)
.build();
public static final PropertyDescriptor TABLE = new PropertyDescriptor.Builder()
.name("table")
.displayName("Table")
.description("The name of the table in the Snowflake account.")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.dependsOn(INTERNAL_STAGE_TYPE, SnowflakeInternalStageType.TABLE)
.build();
public static final PropertyDescriptor INTERNAL_STAGE = new PropertyDescriptor.Builder()
.name("internal-stage")
.displayName("Stage")
.description("The name of the internal stage in the Snowflake account to put files into.")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.dependsOn(INTERNAL_STAGE_TYPE, SnowflakeInternalStageType.NAMED)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("For FlowFiles of successful PUT operation")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("For FlowFiles of failed PUT operation")
.build();
static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
SNOWFLAKE_CONNECTION_PROVIDER,
INTERNAL_STAGE_TYPE,
DATABASE,
SCHEMA,
TABLE,
INTERNAL_STAGE
));
private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
REL_SUCCESS,
REL_FAILURE
)));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final SnowflakeInternalStageType internalStageType = SnowflakeInternalStageType.forName(context.getProperty(INTERNAL_STAGE_TYPE)
.getValue());
final SnowflakeInternalStageTypeParameters parameters = getSnowflakeInternalStageTypeParameters(context, flowFile);
final String internalStageName = internalStageType.getStage(parameters);
final SnowflakeConnectionProviderService connectionProviderService =
context.getProperty(SNOWFLAKE_CONNECTION_PROVIDER)
.asControllerService(SnowflakeConnectionProviderService.class);
final String stagedFileName = UUID.randomUUID().toString();
try (final InputStream inputStream = session.read(flowFile);
final SnowflakeConnectionWrapper snowflakeConnection = connectionProviderService.getSnowflakeConnection()) {
snowflakeConnection.unwrap()
.uploadStream(internalStageName, "", inputStream, stagedFileName, false);
} catch (SQLException e) {
getLogger().error("Failed to upload FlowFile content to internal Snowflake stage [" + internalStageName + "]. Staged file path [" + stagedFileName + "]", e);
session.transfer(session.penalize(flowFile), REL_FAILURE);
return;
} catch (IOException e) {
throw new ProcessException("Failed to read FlowFile content", e);
}
flowFile = session.putAttribute(flowFile, ATTRIBUTE_STAGED_FILE_PATH, stagedFileName);
session.transfer(flowFile, REL_SUCCESS);
}
private SnowflakeInternalStageTypeParameters getSnowflakeInternalStageTypeParameters(ProcessContext context,
FlowFile flowFile) {
final String database = context.getProperty(DATABASE).evaluateAttributeExpressions().getValue();
final String schema = context.getProperty(SCHEMA).evaluateAttributeExpressions().getValue();
final String table = context.getProperty(TABLE).evaluateAttributeExpressions(flowFile).getValue();
final String stageName = context.getProperty(INTERNAL_STAGE).evaluateAttributeExpressions(flowFile).getValue();
return new SnowflakeInternalStageTypeParameters(database, schema, table, stageName);
}
}

View File

@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.snowflake;
import static org.apache.nifi.processors.snowflake.util.SnowflakeAttributes.ATTRIBUTE_STAGED_FILE_PATH;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import net.snowflake.ingest.SimpleIngestManager;
import net.snowflake.ingest.connection.IngestResponseException;
import net.snowflake.ingest.utils.StagedFileWrapper;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
@InputRequirement(Requirement.INPUT_REQUIRED)
@ReadsAttributes({
@ReadsAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH, description = "Staged file path")
})
@Tags({"snowflake", "snowpipe", "ingest"})
@CapabilityDescription("Ingests files from a Snowflake internal or external stage into a Snowflake table."
+ " The stage must be created in the Snowflake account beforehand."
+ " The result of the ingestion is not available immediately, so this processor can be connected to an"
+ " GetSnowflakeIngestStatus processor to wait for the results")
@SeeAlso({PutSnowflakeInternalStage.class, GetSnowflakeIngestStatus.class})
public class StartSnowflakeIngest extends AbstractProcessor {
static final PropertyDescriptor INGEST_MANAGER_PROVIDER = new PropertyDescriptor.Builder()
.name("ingest-manager-provider")
.displayName("Ingest Manager Provider")
.description("Specifies the Controller Service to use for ingesting Snowflake staged files.")
.identifiesControllerService(SnowflakeIngestManagerProviderService.class)
.required(true)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("For FlowFiles of successful ingest request")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("For FlowFiles of failed ingest request")
.build();
static final List<PropertyDescriptor> PROPERTIES = Collections.singletonList(
INGEST_MANAGER_PROVIDER
);
private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
REL_SUCCESS,
REL_FAILURE
)));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final String stagedFilePath = flowFile.getAttribute(ATTRIBUTE_STAGED_FILE_PATH);
if (stagedFilePath == null) {
getLogger().error("Missing required attribute [\"" + ATTRIBUTE_STAGED_FILE_PATH + "\"] for FlowFile");
session.transfer(session.penalize(flowFile), REL_FAILURE);
return;
}
final SnowflakeIngestManagerProviderService ingestManagerProviderService =
context.getProperty(INGEST_MANAGER_PROVIDER)
.asControllerService(SnowflakeIngestManagerProviderService.class);
final SimpleIngestManager snowflakeIngestManager = ingestManagerProviderService.getIngestManager();
final StagedFileWrapper stagedFile = new StagedFileWrapper(stagedFilePath);
try {
snowflakeIngestManager.ingestFile(stagedFile, null);
} catch (URISyntaxException | IOException e) {
throw new ProcessException(String.format("Failed to ingest Snowflake file [%s]", stagedFilePath), e);
} catch (IngestResponseException e) {
getLogger().error("Failed to ingest Snowflake file [" + stagedFilePath + "]", e);
session.transfer(session.penalize(flowFile), REL_FAILURE);
return;
}
session.transfer(flowFile, REL_SUCCESS);
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.snowflake.util;
public final class SnowflakeAttributes {
private SnowflakeAttributes() {
}
public static final String ATTRIBUTE_STAGED_FILE_PATH = "snowflake.staged.file.path";
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.snowflake.util;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.nifi.components.DescribedValue;
public enum SnowflakeInternalStageType implements DescribedValue {
USER("user", "User", "Use the user's internal stage") {
@Override
public String getStage(final SnowflakeInternalStageTypeParameters parameters) {
return "@~";
}
},
TABLE("table", "Table", "Use a table's internal stage") {
@Override
public String getStage(final SnowflakeInternalStageTypeParameters parameters) {
final StringBuilder stringBuilder = new StringBuilder("@");
Optional.ofNullable(parameters.getDatabase())
.ifPresent(database -> stringBuilder.append(database).append("."));
Optional.ofNullable(parameters.getSchema())
.ifPresent(schema -> stringBuilder.append(schema).append("."));
stringBuilder.append("%").append(Objects.requireNonNull(parameters.getTable()));
return stringBuilder.toString();
}
},
NAMED("named", "Named", "Use a named internal stage. This stage must be created beforehand in Snowflake") {
@Override
public String getStage(final SnowflakeInternalStageTypeParameters parameters) {
final StringBuilder stringBuilder = new StringBuilder("@");
Optional.ofNullable(parameters.getDatabase())
.ifPresent(database -> stringBuilder.append(database).append("."));
Optional.ofNullable(parameters.getSchema())
.ifPresent(schema -> stringBuilder.append(schema).append("."));
stringBuilder.append(Objects.requireNonNull(parameters.getStageName()));
return stringBuilder.toString();
}
};
private final String value;
private final String displayName;
private final String description;
SnowflakeInternalStageType(final String value, final String displayName, final String description) {
this.value = value;
this.displayName = displayName;
this.description = description;
}
@Override
public String getValue() {
return value;
}
@Override
public String getDisplayName() {
return displayName;
}
@Override
public String getDescription() {
return description;
}
public abstract String getStage(final SnowflakeInternalStageTypeParameters parameters);
public static SnowflakeInternalStageType forName(String stageType) {
return Stream.of(values())
.filter(internalStageType -> internalStageType.getValue().equalsIgnoreCase(stageType))
.findFirst()
.orElseThrow(
() -> new IllegalArgumentException("Invalid SnowflakeInternalStageType: " + stageType));
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.snowflake.util;
public class SnowflakeInternalStageTypeParameters {
private final String database;
private final String schema;
private final String table;
private final String stageName;
public SnowflakeInternalStageTypeParameters(final String database, final String schema, final String table, final String stageName) {
this.database = database;
this.schema = schema;
this.table = table;
this.stageName = stageName;
}
public String getDatabase() {
return database;
}
public String getSchema() {
return schema;
}
public String getTable() {
return table;
}
public String getStageName() {
return stageName;
}
}

View File

@ -0,0 +1,20 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.apache.nifi.processors.snowflake.GetSnowflakeIngestStatus
org.apache.nifi.processors.snowflake.StartSnowflakeIngest
org.apache.nifi.processors.snowflake.PutSnowflakeInternalStage

View File

@ -0,0 +1,48 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8"/>
<title>GetSnowflakeIngestStatus</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
<style>
h2 {margin-top: 4em}
h3 {margin-top: 3em}
td {text-align: left}
</style>
</head>
<body>
<h1>GetSnowflakeIngestStatus</h1>
<h3>Description</h3>
<p>
The GetSnowflakeIngestStatus processor can be used to get the status of a staged file ingested by a Snowflake pipe.
To wait until a staged file is fully ingested (copied into the table) you should connect this processor's "retry" relationship to itself.
The processor requires an upstream connection that provides the path of the staged file to be checked through the "snowflake.staged.file.path" attribute.
See StartSnowflakeIngest processor for details about how to properly set up a flow to ingest staged files.
<b>
NOTE: Snowflake pipes cache the paths of ingested files and never ingest the same file multiple times.
This can cause the processor to enter an "infinite loop" with a FlowFile that has the same "snowflake.staged.file.path" attribute as a staged file that has been previously ingested by the pipe.
It is recommended that the retry mechanism be configured to avoid these scenarios.
</b>
</p>
</body>
</html>

View File

@ -0,0 +1,43 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8"/>
<title>PutSnowflakeInternalStage</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
<style>
h2 {margin-top: 4em}
h3 {margin-top: 3em}
td {text-align: left}
</style>
</head>
<body>
<h1>PutSnowflakeInternalStage</h1>
<h3>Description</h3>
<p>
The PutSnowflakeInternalStage processor can upload a file to a Snowflake internal stage. Please note, that named stages needs to be created in your Snowflake account manually.
See the documentation on how to set up an internal stage <a href="https://docs.snowflake.com/en/user-guide/data-load-local-file-system-create-stage.html">here</a>.
The processor requires an upstream connection and the incoming FlowFiles' content will be uploaded to the stage. A unique file name is generated for the file's staged file name.
While the processor may be used separately, it's recommended to connect it to a StartSnowflakeIngest processor so that the uploaded file can be piped into your Snowflake table.
</p>
</body>
</html>

View File

@ -0,0 +1,52 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8"/>
<title>StartSnowflakeIngest</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
<style>
h2 {margin-top: 4em}
h3 {margin-top: 3em}
td {text-align: left}
</style>
</head>
<body>
<h1>StartSnowflakeIngest</h1>
<h3>Description</h3>
<p>
The StartSnowflakeIngest processor triggers a Snowflake pipe ingestion for a staged file. Please note, that the pipe has to be created in your Snowflake account manually.
The processor requires an upstream connection that provides the path of the file to be ingested in the stage through the "snowflake.staged.file.path" attribute.
This attribute is automatically filled in by the PutSnowflakeInternalStage processor when using an internal stage.
In case a pipe copies data from an external stage, the attribute shall be manually provided (e.g. with an UpdateAttribute processor).
<b>
NOTE: Since Snowflake pipes ingest files asynchronously, this processor transfers FlowFiles to the "success" relationship when they're marked for ingestion.
In order to wait for the actual result of the ingestion, the processor may be connected to a downstream GetSnowflakeIngestStatus processor.
</b>
</p>
<h4>Example flow for internal stage</h4>
<p>GetFile -> PutSnowflakeInternalStage -> StartSnowflakeIngest -> GetSnowflakeIngestStatus</p>
<h4>Example flow for external stage</h4>
<p>ListS3 -> UpdateAttribute (add the "snowflake.staged.file.path" attribute) -> StartSnowflakeIngest -> GetSnowflakeIngestStatus</p>
</body>
</html>

View File

@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.snowflake;
import java.nio.file.Path;
import java.nio.file.Paths;
import org.apache.nifi.key.service.StandardPrivateKeyService;
import org.apache.nifi.key.service.api.PrivateKeyService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.snowflake.service.util.ConnectionUrlFormat;
import org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool;
import org.apache.nifi.snowflake.service.StandardSnowflakeIngestManagerProviderService;
import org.apache.nifi.processors.snowflake.util.SnowflakeProperties;
import org.apache.nifi.util.TestRunner;
public interface SnowflakeConfigAware {
Path filePath = Paths.get("???");
String stagedFilePath = "???";
String organizationName = "???";
String accountName = "???";
String userName = "???";
String password = "???";
String database = "???";
String schema = "PUBLIC";
String pipeName = "???";
String internalStageName = "???";
String privateKeyFile = "???";
String privateKeyPassphrase = "???";
default PrivateKeyService createPrivateKeyService(TestRunner runner) throws InitializationException {
final StandardPrivateKeyService privateKeyService = new StandardPrivateKeyService();
runner.addControllerService("privateKeyService", privateKeyService);
runner.setProperty(privateKeyService,
StandardPrivateKeyService.KEY_FILE,
privateKeyFile);
runner.setProperty(privateKeyService,
StandardPrivateKeyService.KEY_PASSWORD,
privateKeyPassphrase);
runner.enableControllerService(privateKeyService);
return privateKeyService;
}
default SnowflakeConnectionProviderService createConnectionProviderService(TestRunner runner)
throws InitializationException {
final SnowflakeConnectionProviderService connectionProviderService = new SnowflakeComputingConnectionPool();
runner.addControllerService("connectionProviderService", connectionProviderService);
runner.setProperty(connectionProviderService,
SnowflakeComputingConnectionPool.CONNECTION_URL_FORMAT,
ConnectionUrlFormat.ACCOUNT_NAME.getValue());
runner.setProperty(connectionProviderService,
SnowflakeComputingConnectionPool.SNOWFLAKE_ORGANIZATION_NAME,
organizationName);
runner.setProperty(connectionProviderService,
SnowflakeComputingConnectionPool.SNOWFLAKE_ACCOUNT_NAME,
accountName);
runner.setProperty(connectionProviderService,
SnowflakeComputingConnectionPool.SNOWFLAKE_USER,
userName);
runner.setProperty(connectionProviderService,
SnowflakeComputingConnectionPool.SNOWFLAKE_PASSWORD,
password);
runner.setProperty(connectionProviderService,
SnowflakeProperties.DATABASE,
database);
runner.setProperty(connectionProviderService,
SnowflakeProperties.SCHEMA,
schema);
runner.enableControllerService(connectionProviderService);
return connectionProviderService;
}
default SnowflakeIngestManagerProviderService createIngestManagerProviderService(TestRunner runner)
throws InitializationException {
final SnowflakeIngestManagerProviderService ingestManagerProviderService =
new StandardSnowflakeIngestManagerProviderService();
final PrivateKeyService privateKeyService = createPrivateKeyService(runner);
runner.addControllerService("ingestManagerProviderService", ingestManagerProviderService);
runner.setProperty(ingestManagerProviderService,
StandardSnowflakeIngestManagerProviderService.ORGANIZATION_NAME,
organizationName);
runner.setProperty(ingestManagerProviderService,
StandardSnowflakeIngestManagerProviderService.ACCOUNT_NAME,
accountName);
runner.setProperty(ingestManagerProviderService,
StandardSnowflakeIngestManagerProviderService.USER_NAME,
userName);
runner.setProperty(ingestManagerProviderService,
StandardSnowflakeIngestManagerProviderService.DATABASE,
database);
runner.setProperty(ingestManagerProviderService,
StandardSnowflakeIngestManagerProviderService.SCHEMA,
schema);
runner.setProperty(ingestManagerProviderService,
StandardSnowflakeIngestManagerProviderService.PIPE,
pipeName);
runner.setProperty(ingestManagerProviderService,
StandardSnowflakeIngestManagerProviderService.PRIVATE_KEY_SERVICE,
privateKeyService.getIdentifier());
runner.enableControllerService(ingestManagerProviderService);
return ingestManagerProviderService;
}
}

View File

@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.snowflake;
import java.security.Security;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import net.snowflake.ingest.utils.StagedFileWrapper;
import org.apache.commons.io.FileUtils;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processors.snowflake.util.SnowflakeAttributes;
import org.apache.nifi.processors.snowflake.util.SnowflakeInternalStageType;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
class SnowflakePipeIT implements SnowflakeConfigAware {
@BeforeAll
static void setUpOnce() {
Security.addProvider(new BouncyCastleProvider());
}
@Test
void shouldPutIntoInternalStage() throws Exception {
final PutSnowflakeInternalStage processor = new PutSnowflakeInternalStage();
final TestRunner runner = TestRunners.newTestRunner(processor);
final SnowflakeConnectionProviderService connectionProviderService = createConnectionProviderService(runner);
runner.setProperty(PutSnowflakeInternalStage.SNOWFLAKE_CONNECTION_PROVIDER, connectionProviderService.getIdentifier());
runner.setProperty(PutSnowflakeInternalStage.INTERNAL_STAGE_TYPE, SnowflakeInternalStageType.NAMED.getValue());
runner.setProperty(PutSnowflakeInternalStage.INTERNAL_STAGE, internalStageName);
final String uuid = UUID.randomUUID().toString();
final String fileName = filePath.getFileName().toString();
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), fileName);
attributes.put(CoreAttributes.PATH.key(), uuid + "/");
runner.enqueue(filePath, attributes);
runner.run();
runner.assertAllFlowFilesTransferred(StartSnowflakeIngest.REL_SUCCESS);
}
@Test
void shouldStartFileIngestion() throws Exception {
final StartSnowflakeIngest processor = new StartSnowflakeIngest();
final TestRunner runner = TestRunners.newTestRunner(processor);
final SnowflakeConnectionProviderService connectionProviderService = createConnectionProviderService(runner);
final SnowflakeIngestManagerProviderService ingestManagerProviderService = createIngestManagerProviderService(runner);
final String uuid = UUID.randomUUID().toString();
final String fileName = filePath.getFileName().toString();
runner.setProperty(StartSnowflakeIngest.INGEST_MANAGER_PROVIDER, ingestManagerProviderService.getIdentifier());
try (final SnowflakeConnectionWrapper snowflakeConnection = connectionProviderService.getSnowflakeConnection()) {
snowflakeConnection.unwrap().uploadStream(internalStageName,
uuid,
FileUtils.openInputStream(filePath.toFile()),
fileName,
false);
}
final Map<String, String> attributes = Collections.singletonMap(SnowflakeAttributes.ATTRIBUTE_STAGED_FILE_PATH, uuid + "/" + stagedFilePath);
runner.enqueue("", attributes);
runner.run();
runner.assertAllFlowFilesTransferred(StartSnowflakeIngest.REL_SUCCESS);
}
@Test
void shouldAwaitSnowflakePipeIngestion() throws Exception {
final GetSnowflakeIngestStatus processor = new GetSnowflakeIngestStatus();
final TestRunner runner = TestRunners.newTestRunner(processor);
final SnowflakeConnectionProviderService connectionProviderService = createConnectionProviderService(runner);
final SnowflakeIngestManagerProviderService ingestManagerProviderService = createIngestManagerProviderService(runner);
final String uuid = UUID.randomUUID().toString();
final String fileName = filePath.getFileName().toString();
runner.setProperty(GetSnowflakeIngestStatus.INGEST_MANAGER_PROVIDER, ingestManagerProviderService.getIdentifier());
try (final SnowflakeConnectionWrapper snowflakeConnection = connectionProviderService.getSnowflakeConnection()) {
snowflakeConnection.unwrap().uploadStream(internalStageName,
uuid,
FileUtils.openInputStream(filePath.toFile()),
fileName,
false);
}
final String stagedFilePathAttribute = uuid + "/" + stagedFilePath;
final StagedFileWrapper stagedFile = new StagedFileWrapper(stagedFilePathAttribute);
ingestManagerProviderService.getIngestManager().ingestFile(stagedFile, null);
final Map<String, String> attributes = Collections.singletonMap(SnowflakeAttributes.ATTRIBUTE_STAGED_FILE_PATH, stagedFilePathAttribute);
runner.enqueue("", attributes);
runner.run();
runner.assertAllFlowFilesTransferred(GetSnowflakeIngestStatus.REL_RETRY);
}
}

View File

@ -0,0 +1,45 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-snowflake-bundle</artifactId>
<groupId>org.apache.nifi</groupId>
<version>1.19.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-snowflake-services-api-nar</artifactId>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-snowflake-services-api</artifactId>
<version>1.19.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<version>1.19.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,202 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -0,0 +1,14 @@
nifi-snowflake-services-api-nar
Copyright 2014-2022 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
**************************
Apache Software License v2
**************************
(ASLv2) Snowflake Ingest SDK
The following NOTICE information applies:
Snowflake Ingest SDK
Copyright (c) 2013-2016 Snowflake Computing, Inc.

View File

@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-snowflake-bundle</artifactId>
<groupId>org.apache.nifi</groupId>
<version>1.19.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-snowflake-services-api</artifactId>
<dependencies>
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-ingest-sdk</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>1.19.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.snowflake;
import org.apache.nifi.controller.ControllerService;
public interface SnowflakeConnectionProviderService extends ControllerService {
SnowflakeConnectionWrapper getSnowflakeConnection();
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.snowflake;
import java.sql.Connection;
import java.sql.SQLException;
import net.snowflake.client.jdbc.SnowflakeConnection;
public class SnowflakeConnectionWrapper implements AutoCloseable {
final Connection connection;
public SnowflakeConnectionWrapper(Connection connection) {
this.connection = connection;
}
@Override
public void close() throws SQLException {
connection.close();
}
public SnowflakeConnection unwrap() throws SQLException {
return connection.unwrap(SnowflakeConnection.class);
}
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.snowflake;
import net.snowflake.ingest.SimpleIngestManager;
import org.apache.nifi.controller.ControllerService;
public interface SnowflakeIngestManagerProviderService extends ControllerService {
String getPipeName();
SimpleIngestManager getIngestManager();
}

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.snowflake.util;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
public final class SnowflakeProperties {
private SnowflakeProperties() {
}
public static final PropertyDescriptor ACCOUNT_LOCATOR = new PropertyDescriptor.Builder()
.name("account-locator")
.displayName("Account Locator")
.description("Snowflake account locator to use for connection.")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(true)
.build();
public static final PropertyDescriptor CLOUD_REGION = new PropertyDescriptor.Builder()
.name("cloud-region")
.displayName("Cloud Region")
.description("Snowflake cloud region to use for connection.")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(true)
.build();
public static final PropertyDescriptor CLOUD_TYPE = new PropertyDescriptor.Builder()
.name("cloud-type")
.displayName("Cloud Type")
.description("Snowflake cloud type to use for connection.")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor ORGANIZATION_NAME = new PropertyDescriptor.Builder()
.name("organization-name")
.displayName("Organization Name")
.description("Snowflake organization name to use for connection.")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(true)
.build();
public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder()
.name("account-name")
.displayName("Account Name")
.description("Snowflake account name to use for connection.")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(true)
.build();
public static final PropertyDescriptor DATABASE = new PropertyDescriptor.Builder()
.name("database")
.displayName("Database")
.description("The database to use by default. The same as passing 'db=DATABASE_NAME' to the connection string.")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder()
.name("schema")
.displayName("Schema")
.description("The schema to use by default. The same as passing 'schema=SCHEMA' to the connection string.")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
}

View File

@ -33,7 +33,7 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<artifactId>nifi-snowflake-services-api-nar</artifactId>
<version>1.19.0-SNAPSHOT</version>
<type>nar</type>
</dependency>

View File

@ -25,7 +25,14 @@ The following binary components are provided under the Apache Software License v
Apache Commons Pool
Copyright 2002-2019 The Apache Software Foundation
(ASLv2) snowflake-jdbc
************
Bouncy Castle (MIT style)
************
The following binary components are provided under the Bouncy Castle (an MIT style) license.
Bouncy Castle PKIX, CMS, EAC, TSP, PKCS, OCSP, CMP, and CRMF APIs
Bouncy Castle Provider
Bouncy Castle ASN.1 Extension and Utility APIs
The following NOTICE information applies:
snowflake-jdbc
Copyright (c) 2013-2018 Snowflake Computing, Inc.
Copyright (c) 2000 - 2021 The Legion of the Bouncy Castle Inc. (https://www.bouncycastle.org)

View File

@ -34,11 +34,46 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-base</artifactId>
<version>1.19.0-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-key-service-api</artifactId>
<version>1.19.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-snowflake-services-api</artifactId>
<version>1.19.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-jdbc</artifactId>
<version>3.13.24</version>
<artifactId>snowflake-ingest-sdk</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>1.19.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<version>1.19.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-proxy-configuration-api</artifactId>
<version>1.19.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>

View File

@ -16,6 +16,15 @@
*/
package org.apache.nifi.snowflake.service;
import java.sql.Driver;
import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import net.snowflake.client.core.SFSessionProperty;
import net.snowflake.client.jdbc.SnowflakeDriver;
import org.apache.nifi.annotation.behavior.DynamicProperties;
import org.apache.nifi.annotation.behavior.DynamicProperty;
@ -27,20 +36,20 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.dbcp.AbstractDBCPConnectionPool;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.exception.ProcessException;
import java.sql.Driver;
import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.snowflake.SnowflakeConnectionProviderService;
import org.apache.nifi.processors.snowflake.SnowflakeConnectionWrapper;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxyConfigurationService;
import org.apache.nifi.snowflake.service.util.ConnectionUrlFormatParameters;
import org.apache.nifi.processors.snowflake.util.SnowflakeProperties;
import org.apache.nifi.snowflake.service.util.ConnectionUrlFormat;
/**
* Implementation of Database Connection Pooling Service for Snowflake.
* Apache DBCP is used for connection pooling functionality.
* Implementation of Database Connection Pooling Service for Snowflake. Apache DBCP is used for connection pooling
* functionality.
*/
@Tags({"snowflake", "dbcp", "jdbc", "database", "connection", "pooling", "store"})
@CapabilityDescription("Provides Snowflake Connection Pooling Service. Connections can be asked from pool and returned after usage.")
@ -55,34 +64,88 @@ import java.util.List;
description = "Snowflake JDBC driver property name prefixed with 'SENSITIVE.' handled as a sensitive property.")
})
@RequiresInstanceClassLoading
public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool implements DBCPService {
public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool implements SnowflakeConnectionProviderService {
public static final PropertyDescriptor CONNECTION_URL_FORMAT = new PropertyDescriptor.Builder()
.name("connection-url-format")
.displayName("Connection URL Format")
.description("The format of the connection URL.")
.allowableValues(ConnectionUrlFormat.class)
.required(true)
.defaultValue(ConnectionUrlFormat.FULL_URL.getValue())
.build();
public static final PropertyDescriptor SNOWFLAKE_URL = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(AbstractDBCPConnectionPool.DATABASE_URL)
.displayName("Snowflake URL")
.description("Example connection string: jdbc:snowflake://[account].[region].snowflakecomputing.com/?[connection_params]" +
" The connection parameters can include db=DATABASE_NAME to avoid using qualified table names such as DATABASE_NAME.PUBLIC.TABLE_NAME")
.build();
.fromPropertyDescriptor(AbstractDBCPConnectionPool.DATABASE_URL)
.displayName("Snowflake URL")
.description("Example connection string: jdbc:snowflake://[account].[region]" + ConnectionUrlFormat.SNOWFLAKE_HOST_SUFFIX + "/?[connection_params]" +
" The connection parameters can include db=DATABASE_NAME to avoid using qualified table names such as DATABASE_NAME.PUBLIC.TABLE_NAME")
.required(true)
.dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.FULL_URL)
.build();
public static final PropertyDescriptor SNOWFLAKE_ACCOUNT_LOCATOR = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(SnowflakeProperties.ACCOUNT_LOCATOR)
.dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_LOCATOR)
.build();
public static final PropertyDescriptor SNOWFLAKE_CLOUD_REGION = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(SnowflakeProperties.CLOUD_REGION)
.dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_LOCATOR)
.build();
public static final PropertyDescriptor SNOWFLAKE_CLOUD_TYPE = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(SnowflakeProperties.CLOUD_TYPE)
.dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_LOCATOR)
.build();
public static final PropertyDescriptor SNOWFLAKE_ORGANIZATION_NAME = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(SnowflakeProperties.ORGANIZATION_NAME)
.dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_NAME)
.build();
public static final PropertyDescriptor SNOWFLAKE_ACCOUNT_NAME = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(SnowflakeProperties.ACCOUNT_NAME)
.dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_NAME)
.build();
public static final PropertyDescriptor SNOWFLAKE_USER = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(AbstractDBCPConnectionPool.DB_USER)
.displayName("Snowflake User")
.description("The Snowflake user name")
.build();
.fromPropertyDescriptor(AbstractDBCPConnectionPool.DB_USER)
.displayName("Username")
.description("The Snowflake user name.")
.build();
public static final PropertyDescriptor SNOWFLAKE_PASSWORD = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(AbstractDBCPConnectionPool.DB_PASSWORD)
.displayName("Snowflake Password")
.description("The password for the Snowflake user")
.build();
.fromPropertyDescriptor(AbstractDBCPConnectionPool.DB_PASSWORD)
.displayName("Password")
.description("The password for the Snowflake user.")
.build();
public static final PropertyDescriptor SNOWFLAKE_WAREHOUSE = new PropertyDescriptor.Builder()
.name("warehouse")
.displayName("Warehouse")
.description("The warehouse to use by default. The same as passing 'warehouse=WAREHOUSE' to the connection string.")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
private static final List<PropertyDescriptor> PROPERTIES;
static {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(CONNECTION_URL_FORMAT);
props.add(SNOWFLAKE_URL);
props.add(SNOWFLAKE_ACCOUNT_LOCATOR);
props.add(SNOWFLAKE_CLOUD_REGION);
props.add(SNOWFLAKE_CLOUD_TYPE);
props.add(SNOWFLAKE_ORGANIZATION_NAME);
props.add(SNOWFLAKE_ACCOUNT_NAME);
props.add(SNOWFLAKE_USER);
props.add(SNOWFLAKE_PASSWORD);
props.add(SnowflakeProperties.DATABASE);
props.add(SnowflakeProperties.SCHEMA);
props.add(SNOWFLAKE_WAREHOUSE);
props.add(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE);
props.add(VALIDATION_QUERY);
props.add(MAX_WAIT_TIME);
props.add(MAX_TOTAL_CONNECTIONS);
@ -108,13 +171,11 @@ public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool
@Override
protected String getUrl(final ConfigurationContext context) {
String snowflakeUrl = context.getProperty(SNOWFLAKE_URL).evaluateAttributeExpressions().getValue();
final ConnectionUrlFormat connectionUrlFormat = ConnectionUrlFormat.forName(context.getProperty(CONNECTION_URL_FORMAT)
.getValue());
final ConnectionUrlFormatParameters parameters = getConnectionUrlFormatParameters(context);
if (!snowflakeUrl.startsWith("jdbc:snowflake")) {
snowflakeUrl = "jdbc:snowflake://" + snowflakeUrl;
}
return snowflakeUrl;
return connectionUrlFormat.buildConnectionUrl(parameters);
}
@Override
@ -126,4 +187,77 @@ public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool
throw new ProcessException("Snowflake driver unavailable or incompatible connection URL", e);
}
}
@Override
protected Map<String, String> getConnectionProperties(final ConfigurationContext context) {
final String database = context.getProperty(SnowflakeProperties.DATABASE).evaluateAttributeExpressions().getValue();
final String schema = context.getProperty(SnowflakeProperties.SCHEMA).evaluateAttributeExpressions().getValue();
final String warehouse = context.getProperty(SNOWFLAKE_WAREHOUSE).evaluateAttributeExpressions().getValue();
final Map<String, String> connectionProperties = new HashMap<>();
if (database != null) {
connectionProperties.put("db", database);
}
if (schema != null) {
connectionProperties.put("schema", schema);
}
if (warehouse != null) {
connectionProperties.put("warehouse", warehouse);
}
final ProxyConfigurationService proxyConfigurationService = context
.getProperty(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE)
.asControllerService(ProxyConfigurationService.class);
if (proxyConfigurationService != null) {
final ProxyConfiguration proxyConfiguration = proxyConfigurationService.getConfiguration();
connectionProperties.put(SFSessionProperty.USE_PROXY.getPropertyKey(), "true");
if (proxyConfiguration.getProxyServerHost() != null) {
connectionProperties.put(SFSessionProperty.PROXY_HOST.getPropertyKey(), proxyConfiguration.getProxyServerHost());
}
if (proxyConfiguration.getProxyServerPort() != null) {
connectionProperties.put(SFSessionProperty.PROXY_PORT.getPropertyKey(), proxyConfiguration.getProxyServerPort().toString());
}
if (proxyConfiguration.getProxyUserName() != null) {
connectionProperties.put(SFSessionProperty.PROXY_USER.getPropertyKey(), proxyConfiguration.getProxyUserName());
}
if (proxyConfiguration.getProxyUserPassword() != null) {
connectionProperties.put(SFSessionProperty.PROXY_PASSWORD.getPropertyKey(), proxyConfiguration.getProxyUserPassword());
}
if (proxyConfiguration.getProxyType() != null) {
connectionProperties.put(SFSessionProperty.PROXY_PROTOCOL.getPropertyKey(), proxyConfiguration.getProxyType().name().toLowerCase());
}
}
return connectionProperties;
}
@Override
public SnowflakeConnectionWrapper getSnowflakeConnection() {
return new SnowflakeConnectionWrapper(getConnection());
}
private ConnectionUrlFormatParameters getConnectionUrlFormatParameters(ConfigurationContext context) {
final String snowflakeUrl = context.getProperty(SNOWFLAKE_URL).evaluateAttributeExpressions().getValue();
final String organizationName = context.getProperty(SNOWFLAKE_ORGANIZATION_NAME)
.evaluateAttributeExpressions()
.getValue();
final String accountName = context.getProperty(SNOWFLAKE_ACCOUNT_NAME)
.evaluateAttributeExpressions()
.getValue();
final String accountLocator = context.getProperty(SNOWFLAKE_ACCOUNT_LOCATOR)
.evaluateAttributeExpressions()
.getValue();
final String cloudRegion = context.getProperty(SNOWFLAKE_CLOUD_REGION)
.evaluateAttributeExpressions()
.getValue();
final String cloudType = context.getProperty(SNOWFLAKE_CLOUD_TYPE)
.evaluateAttributeExpressions()
.getValue();
return new ConnectionUrlFormatParameters(
snowflakeUrl,
organizationName,
accountName,
accountLocator,
cloudRegion,
cloudType);
}
}

View File

@ -0,0 +1,224 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.snowflake.service;
import java.security.NoSuchAlgorithmException;
import java.security.PrivateKey;
import java.security.spec.InvalidKeySpecException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import net.snowflake.ingest.SimpleIngestManager;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.key.service.api.PrivateKeyService;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.snowflake.SnowflakeIngestManagerProviderService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.snowflake.service.util.AccountIdentifierFormat;
import org.apache.nifi.snowflake.service.util.AccountIdentifierFormatParameters;
import org.apache.nifi.snowflake.service.util.ConnectionUrlFormat;
import org.apache.nifi.processors.snowflake.util.SnowflakeProperties;
@Tags({"snowflake", "jdbc", "database", "connection"})
@CapabilityDescription("Provides a Snowflake Ingest Manager for Snowflake pipe processors")
public class StandardSnowflakeIngestManagerProviderService extends AbstractControllerService
implements SnowflakeIngestManagerProviderService {
public static final PropertyDescriptor ACCOUNT_IDENTIFIER_FORMAT = new PropertyDescriptor.Builder()
.name("account-identifier-format")
.displayName("Account Identifier Format")
.description("The format of the account identifier.")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.required(true)
.allowableValues(AccountIdentifierFormat.class)
.defaultValue(AccountIdentifierFormat.ACCOUNT_NAME.getValue())
.build();
public static final PropertyDescriptor HOST_URL = new PropertyDescriptor.Builder()
.name("host-url")
.displayName("Snowflake URL")
.description("Example host url: [account-locator].[cloud-region].[cloud]" + ConnectionUrlFormat.SNOWFLAKE_HOST_SUFFIX)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(true)
.dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.FULL_URL)
.build();
public static final PropertyDescriptor ACCOUNT_LOCATOR = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(SnowflakeProperties.ACCOUNT_LOCATOR)
.dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_LOCATOR)
.build();
public static final PropertyDescriptor CLOUD_REGION = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(SnowflakeProperties.CLOUD_REGION)
.dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_LOCATOR)
.build();
public static final PropertyDescriptor CLOUD_TYPE = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(SnowflakeProperties.CLOUD_TYPE)
.dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_LOCATOR)
.build();
public static final PropertyDescriptor ORGANIZATION_NAME = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(SnowflakeProperties.ORGANIZATION_NAME)
.dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_NAME)
.build();
public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(SnowflakeProperties.ACCOUNT_NAME)
.dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_NAME)
.build();
public static final PropertyDescriptor USER_NAME = new PropertyDescriptor.Builder()
.name("user-name")
.displayName("User Name")
.description("The Snowflake user name.")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(true)
.build();
public static final PropertyDescriptor PRIVATE_KEY_SERVICE = new PropertyDescriptor.Builder()
.name("private-key-service")
.displayName("Private Key Service")
.description("Specifies the Controller Service that will provide the private key. The public key needs to be added to the user account in the Snowflake account beforehand.")
.identifiesControllerService(PrivateKeyService.class)
.required(true)
.build();
public static final PropertyDescriptor DATABASE = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(SnowflakeProperties.DATABASE)
.required(true)
.build();
public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(SnowflakeProperties.SCHEMA)
.required(true)
.build();
public static final PropertyDescriptor PIPE = new PropertyDescriptor.Builder()
.name("pipe")
.displayName("Pipe")
.description("The Snowflake pipe to ingest from.")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(true)
.build();
static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
ACCOUNT_IDENTIFIER_FORMAT,
HOST_URL,
ACCOUNT_LOCATOR,
CLOUD_REGION,
CLOUD_TYPE,
ORGANIZATION_NAME,
ACCOUNT_NAME,
USER_NAME,
PRIVATE_KEY_SERVICE,
DATABASE,
SCHEMA,
PIPE
));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
private volatile String fullyQualifiedPipeName;
private volatile SimpleIngestManager ingestManager;
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws InitializationException {
final String user = context.getProperty(USER_NAME).evaluateAttributeExpressions().getValue();
final String database = context.getProperty(DATABASE)
.evaluateAttributeExpressions()
.getValue();
final String schema = context.getProperty(SCHEMA)
.evaluateAttributeExpressions()
.getValue();
final String pipe = context.getProperty(PIPE).evaluateAttributeExpressions().getValue();
fullyQualifiedPipeName = database + "." + schema + "." + pipe;
final PrivateKeyService privateKeyService = context.getProperty(PRIVATE_KEY_SERVICE)
.asControllerService(PrivateKeyService.class);
final PrivateKey privateKey = privateKeyService.getPrivateKey();
final AccountIdentifierFormat accountIdentifierFormat = AccountIdentifierFormat.forName(context.getProperty(ACCOUNT_IDENTIFIER_FORMAT)
.getValue());
final AccountIdentifierFormatParameters parameters = getAccountIdentifierFormatParameters(context);
final String account = accountIdentifierFormat.getAccount(parameters);
final String host = accountIdentifierFormat.getHostname(parameters);
try {
ingestManager = new SimpleIngestManager(account, user, fullyQualifiedPipeName, privateKey, "https", host, 443);
} catch (NoSuchAlgorithmException | InvalidKeySpecException e) {
throw new InitializationException("Failed create Snowflake ingest manager", e);
}
}
@OnDisabled
public void onDisabled() {
if (ingestManager != null) {
ingestManager.close();
ingestManager = null;
}
}
@Override
public String getPipeName() {
return fullyQualifiedPipeName;
}
@Override
public SimpleIngestManager getIngestManager() {
return ingestManager;
}
private AccountIdentifierFormatParameters getAccountIdentifierFormatParameters(ConfigurationContext context) {
final String hostUrl = context.getProperty(HOST_URL)
.evaluateAttributeExpressions()
.getValue();
final String organizationName = context.getProperty(ORGANIZATION_NAME)
.evaluateAttributeExpressions()
.getValue();
final String accountName = context.getProperty(ACCOUNT_NAME)
.evaluateAttributeExpressions()
.getValue();
final String accountLocator = context.getProperty(ACCOUNT_LOCATOR)
.evaluateAttributeExpressions()
.getValue();
final String cloudRegion = context.getProperty(CLOUD_REGION)
.evaluateAttributeExpressions()
.getValue();
final String cloudType = context.getProperty(CLOUD_TYPE)
.evaluateAttributeExpressions()
.getValue();
return new AccountIdentifierFormatParameters(hostUrl,
organizationName,
accountName,
accountLocator,
cloudRegion,
cloudType);
}
}

View File

@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.snowflake.service.util;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.nifi.components.DescribedValue;
public enum AccountIdentifierFormat implements DescribedValue {
FULL_URL("full-url", "Full URL", "Provide an account identifier in a single property") {
@Override
public String getAccount(final AccountIdentifierFormatParameters parameters) {
final String[] hostParts = getHostname(parameters).split("\\.");
if (hostParts.length == 0) {
throw new IllegalArgumentException("Invalid Snowflake host url");
}
return hostParts[0];
}
@Override
public String getHostname(final AccountIdentifierFormatParameters parameters) {
return Objects.requireNonNull(parameters.getHostUrl());
}
},
ACCOUNT_NAME("account-name", "Account Name", "Provide a Snowflake Account Name") {
@Override
public String getAccount(final AccountIdentifierFormatParameters parameters) {
final String organizationName = Objects.requireNonNull(parameters.getOrganizationName());
final String accountName = Objects.requireNonNull(parameters.getAccountName());
return organizationName + "-" + accountName;
}
@Override
public String getHostname(final AccountIdentifierFormatParameters parameters) {
return getAccount(parameters) + ConnectionUrlFormat.SNOWFLAKE_HOST_SUFFIX;
}
},
ACCOUNT_LOCATOR("account-locator", "Account Locator", "Provide a Snowflake Account Locator") {
@Override
public String getAccount(final AccountIdentifierFormatParameters parameters) {
return Objects.requireNonNull(parameters.getAccountLocator());
}
@Override
public String getHostname(final AccountIdentifierFormatParameters parameters) {
final String accountLocator = Objects.requireNonNull(parameters.getAccountLocator());
final String cloudRegion = Objects.requireNonNull(parameters.getCloudRegion());
final String optCloudType = parameters.getCloudType();
final StringBuilder hostBuilder = new StringBuilder();
hostBuilder.append(accountLocator)
.append(".").append(cloudRegion);
Optional.ofNullable(optCloudType)
.ifPresent(cloudType -> hostBuilder.append(".").append(cloudType));
hostBuilder.append(ConnectionUrlFormat.SNOWFLAKE_HOST_SUFFIX);
return hostBuilder.toString();
}
};
private final String value;
private final String displayName;
private final String description;
AccountIdentifierFormat(final String value, final String displayName, final String description) {
this.value = value;
this.displayName = displayName;
this.description = description;
}
@Override
public String getValue() {
return value;
}
@Override
public String getDisplayName() {
return displayName;
}
@Override
public String getDescription() {
return description;
}
public abstract String getAccount(final AccountIdentifierFormatParameters parameters);
public abstract String getHostname(final AccountIdentifierFormatParameters parameters);
public static AccountIdentifierFormat forName(String provideMethod) {
return Stream.of(values()).filter(provider -> provider.getValue().equalsIgnoreCase(provideMethod))
.findFirst()
.orElseThrow(
() -> new IllegalArgumentException("Invalid AccountIdentifierFormat: " + provideMethod));
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.snowflake.service.util;
public final class AccountIdentifierFormatParameters extends SnowflakeCommonParameters {
private final String hostUrl;
public AccountIdentifierFormatParameters(final String hostUrl,
final String organizationName,
final String accountName,
final String accountLocator,
final String cloudRegion,
final String cloudType) {
super(organizationName, accountName, accountLocator, cloudRegion, cloudType);
this.hostUrl = hostUrl;
}
public String getHostUrl() {
return hostUrl;
}
}

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.snowflake.service.util;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.nifi.components.DescribedValue;
public enum ConnectionUrlFormat implements DescribedValue {
FULL_URL("full-url", "Full URL", "Provide connection URL in a single property") {
@Override
public String buildConnectionUrl(final ConnectionUrlFormatParameters parameters) {
String snowflakeUrl = parameters.getSnowflakeUrl();
if (!snowflakeUrl.startsWith(SNOWFLAKE_SCHEME)) {
snowflakeUrl = SNOWFLAKE_URI_PREFIX + snowflakeUrl;
}
return snowflakeUrl;
}
},
ACCOUNT_NAME("account-name", "Account Name", "Provide a Snowflake Account Name") {
@Override
public String buildConnectionUrl(final ConnectionUrlFormatParameters parameters) {
final String organizationName = Objects.requireNonNull(parameters.getOrganizationName());
final String accountName = Objects.requireNonNull(parameters.getAccountName());
return SNOWFLAKE_URI_PREFIX + organizationName + "-" + accountName + SNOWFLAKE_HOST_SUFFIX;
}
},
ACCOUNT_LOCATOR("account-locator", "Account Locator", "Provide a Snowflake Account Locator") {
@Override
public String buildConnectionUrl(final ConnectionUrlFormatParameters parameters) {
final String accountLocator = Objects.requireNonNull(parameters.getAccountLocator());
final String cloudRegion = Objects.requireNonNull(parameters.getCloudRegion());
final String optCloudType = parameters.getCloudType();
final StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(SNOWFLAKE_URI_PREFIX).append(accountLocator)
.append(".").append(cloudRegion);
Optional.ofNullable(optCloudType)
.ifPresent(cloudType -> stringBuilder.append(".").append(cloudType));
stringBuilder.append(SNOWFLAKE_HOST_SUFFIX);
return stringBuilder.toString();
}
};
public static final String SNOWFLAKE_HOST_SUFFIX = ".snowflakecomputing.com";
public static final String SNOWFLAKE_SCHEME = "jdbc:snowflake";
public static final String SNOWFLAKE_URI_PREFIX = SNOWFLAKE_SCHEME + "://";
private final String value;
private final String displayName;
private final String description;
ConnectionUrlFormat(final String value, final String displayName, final String description) {
this.value = value;
this.displayName = displayName;
this.description = description;
}
@Override
public String getValue() {
return value;
}
@Override
public String getDisplayName() {
return displayName;
}
@Override
public String getDescription() {
return description;
}
public abstract String buildConnectionUrl(final ConnectionUrlFormatParameters parameters);
public static ConnectionUrlFormat forName(String provideMethod) {
return Stream.of(values()).filter(provider -> provider.getValue().equalsIgnoreCase(provideMethod))
.findFirst()
.orElseThrow(
() -> new IllegalArgumentException("Invalid ConnectionUrlFormat: " + provideMethod));
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.snowflake.service.util;
public final class ConnectionUrlFormatParameters extends SnowflakeCommonParameters {
private final String snowflakeUrl;
public ConnectionUrlFormatParameters(final String snowflakeUrl,
final String organizationName,
final String accountName,
final String accountLocator,
final String cloudRegion,
final String cloudType) {
super(organizationName, accountName, accountLocator, cloudRegion, cloudType);
this.snowflakeUrl = snowflakeUrl;
}
public String getSnowflakeUrl() {
return snowflakeUrl;
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.snowflake.service.util;
public class SnowflakeCommonParameters {
protected final String organizationName;
protected final String accountName;
protected final String accountLocator;
protected final String cloudRegion;
protected final String cloudType;
public SnowflakeCommonParameters(
final String organizationName,
final String accountName,
final String accountLocator,
final String cloudRegion,
final String cloudType) {
this.organizationName = organizationName;
this.accountName = accountName;
this.accountLocator = accountLocator;
this.cloudRegion = cloudRegion;
this.cloudType = cloudType;
}
public String getOrganizationName() {
return organizationName;
}
public String getAccountName() {
return accountName;
}
public String getAccountLocator() {
return accountLocator;
}
public String getCloudRegion() {
return cloudRegion;
}
public String getCloudType() {
return cloudType;
}
}

View File

@ -13,3 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool
org.apache.nifi.snowflake.service.StandardSnowflakeIngestManagerProviderService

View File

@ -24,5 +24,20 @@
<modules>
<module>nifi-snowflake-services</module>
<module>nifi-snowflake-services-nar</module>
<module>nifi-snowflake-processors</module>
<module>nifi-snowflake-processors-nar</module>
<module>nifi-snowflake-services-api</module>
<module>nifi-snowflake-services-api-nar</module>
</modules>
<dependencyManagement>
<dependencies>
<!-- snowflake-ingest-sdk contains snowflake-jdbc -->
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-ingest-sdk</artifactId>
<version>1.0.2-beta.6</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>