NIFI-12546 Updated nifi-snowflake-bundle using current API methods

This closes #8187

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
EndzeitBegins 2023-12-23 21:48:49 +01:00 committed by exceptionfactory
parent 499b63e544
commit ccf3f35076
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
12 changed files with 115 additions and 224 deletions

View File

@ -17,16 +17,6 @@
package org.apache.nifi.processors.snowflake; package org.apache.nifi.processors.snowflake;
import static org.apache.nifi.processors.snowflake.util.SnowflakeAttributes.ATTRIBUTE_STAGED_FILE_PATH;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import net.snowflake.ingest.SimpleIngestManager; import net.snowflake.ingest.SimpleIngestManager;
import net.snowflake.ingest.connection.HistoryResponse; import net.snowflake.ingest.connection.HistoryResponse;
import net.snowflake.ingest.connection.HistoryResponse.FileEntry; import net.snowflake.ingest.connection.HistoryResponse.FileEntry;
@ -47,6 +37,14 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import static org.apache.nifi.processors.snowflake.util.SnowflakeAttributes.ATTRIBUTE_STAGED_FILE_PATH;
@InputRequirement(Requirement.INPUT_REQUIRED) @InputRequirement(Requirement.INPUT_REQUIRED)
@DefaultSettings(penaltyDuration = "5 sec") @DefaultSettings(penaltyDuration = "5 sec")
@ReadsAttributes({ @ReadsAttributes({
@ -81,15 +79,9 @@ public class GetSnowflakeIngestStatus extends AbstractProcessor {
.description("For FlowFiles whose file is still not ingested. These FlowFiles should be routed back to this processor to try again later") .description("For FlowFiles whose file is still not ingested. These FlowFiles should be routed back to this processor to try again later")
.build(); .build();
static final List<PropertyDescriptor> PROPERTIES = Collections.singletonList( static final List<PropertyDescriptor> PROPERTIES = List.of(INGEST_MANAGER_PROVIDER);
INGEST_MANAGER_PROVIDER
);
private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_RETRY, REL_FAILURE);
REL_SUCCESS,
REL_RETRY,
REL_FAILURE
)));
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -135,7 +127,7 @@ public class GetSnowflakeIngestStatus extends AbstractProcessor {
.filter(entry -> entry.getPath().equals(stagedFilePath) && entry.isComplete()) .filter(entry -> entry.getPath().equals(stagedFilePath) && entry.isComplete())
.findFirst()); .findFirst());
if (!fileEntry.isPresent()) { if (fileEntry.isEmpty()) {
session.transfer(session.penalize(flowFile), REL_RETRY); session.transfer(session.penalize(flowFile), REL_RETRY);
return; return;
} }
@ -147,6 +139,5 @@ public class GetSnowflakeIngestStatus extends AbstractProcessor {
return; return;
} }
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
} }
} }

View File

@ -17,17 +17,6 @@
package org.apache.nifi.processors.snowflake; package org.apache.nifi.processors.snowflake;
import static org.apache.nifi.processors.snowflake.util.SnowflakeAttributes.ATTRIBUTE_STAGED_FILE_PATH;
import java.io.IOException;
import java.io.InputStream;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttribute;
@ -48,6 +37,15 @@ import org.apache.nifi.processors.snowflake.util.SnowflakeInternalStageType;
import org.apache.nifi.processors.snowflake.util.SnowflakeInternalStageTypeParameters; import org.apache.nifi.processors.snowflake.util.SnowflakeInternalStageTypeParameters;
import org.apache.nifi.processors.snowflake.util.SnowflakeProperties; import org.apache.nifi.processors.snowflake.util.SnowflakeProperties;
import java.io.IOException;
import java.io.InputStream;
import java.sql.SQLException;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import static org.apache.nifi.processors.snowflake.util.SnowflakeAttributes.ATTRIBUTE_STAGED_FILE_PATH;
@InputRequirement(Requirement.INPUT_REQUIRED) @InputRequirement(Requirement.INPUT_REQUIRED)
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH, @WritesAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH,
@ -115,19 +113,16 @@ public class PutSnowflakeInternalStage extends AbstractProcessor {
.description("For FlowFiles of failed PUT operation") .description("For FlowFiles of failed PUT operation")
.build(); .build();
static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList( static final List<PropertyDescriptor> PROPERTIES = List.of(
SNOWFLAKE_CONNECTION_PROVIDER, SNOWFLAKE_CONNECTION_PROVIDER,
INTERNAL_STAGE_TYPE, INTERNAL_STAGE_TYPE,
DATABASE, DATABASE,
SCHEMA, SCHEMA,
TABLE, TABLE,
INTERNAL_STAGE INTERNAL_STAGE
)); );
private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE);
REL_SUCCESS,
REL_FAILURE
)));
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -146,8 +141,7 @@ public class PutSnowflakeInternalStage extends AbstractProcessor {
return; return;
} }
final SnowflakeInternalStageType internalStageType = SnowflakeInternalStageType.forName(context.getProperty(INTERNAL_STAGE_TYPE) final SnowflakeInternalStageType internalStageType = context.getProperty(INTERNAL_STAGE_TYPE).asDescribedValue(SnowflakeInternalStageType.class);
.getValue());
final SnowflakeInternalStageTypeParameters parameters = getSnowflakeInternalStageTypeParameters(context, flowFile); final SnowflakeInternalStageTypeParameters parameters = getSnowflakeInternalStageTypeParameters(context, flowFile);
final String internalStageName = internalStageType.getStage(parameters); final String internalStageName = internalStageType.getStage(parameters);
final SnowflakeConnectionProviderService connectionProviderService = final SnowflakeConnectionProviderService connectionProviderService =

View File

@ -17,15 +17,6 @@
package org.apache.nifi.processors.snowflake; package org.apache.nifi.processors.snowflake;
import static org.apache.nifi.processors.snowflake.util.SnowflakeAttributes.ATTRIBUTE_STAGED_FILE_PATH;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import net.snowflake.ingest.SimpleIngestManager; import net.snowflake.ingest.SimpleIngestManager;
import net.snowflake.ingest.connection.IngestResponseException; import net.snowflake.ingest.connection.IngestResponseException;
import net.snowflake.ingest.utils.StagedFileWrapper; import net.snowflake.ingest.utils.StagedFileWrapper;
@ -44,6 +35,13 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Set;
import static org.apache.nifi.processors.snowflake.util.SnowflakeAttributes.ATTRIBUTE_STAGED_FILE_PATH;
@InputRequirement(Requirement.INPUT_REQUIRED) @InputRequirement(Requirement.INPUT_REQUIRED)
@ReadsAttributes({ @ReadsAttributes({
@ReadsAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH, description = "Staged file path") @ReadsAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH, description = "Staged file path")
@ -74,14 +72,9 @@ public class StartSnowflakeIngest extends AbstractProcessor {
.description("For FlowFiles of failed ingest request") .description("For FlowFiles of failed ingest request")
.build(); .build();
static final List<PropertyDescriptor> PROPERTIES = Collections.singletonList( static final List<PropertyDescriptor> PROPERTIES = List.of(INGEST_MANAGER_PROVIDER);
INGEST_MANAGER_PROVIDER
);
private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE);
REL_SUCCESS,
REL_FAILURE
)));
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {

View File

@ -17,10 +17,10 @@
package org.apache.nifi.processors.snowflake.util; package org.apache.nifi.processors.snowflake.util;
import org.apache.nifi.components.DescribedValue;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Stream;
import org.apache.nifi.components.DescribedValue;
public enum SnowflakeInternalStageType implements DescribedValue { public enum SnowflakeInternalStageType implements DescribedValue {
USER("user", "User", "Use the user's internal stage") { USER("user", "User", "Use the user's internal stage") {
@ -33,12 +33,12 @@ public enum SnowflakeInternalStageType implements DescribedValue {
@Override @Override
public String getStage(final SnowflakeInternalStageTypeParameters parameters) { public String getStage(final SnowflakeInternalStageTypeParameters parameters) {
final StringBuilder stringBuilder = new StringBuilder("@"); final StringBuilder stringBuilder = new StringBuilder("@");
Optional.ofNullable(parameters.getDatabase()) Optional.ofNullable(parameters.database())
.ifPresent(database -> stringBuilder.append(database).append(".")); .ifPresent(database -> stringBuilder.append(database).append("."));
Optional.ofNullable(parameters.getSchema()) Optional.ofNullable(parameters.schema())
.ifPresent(schema -> stringBuilder.append(schema).append(".")); .ifPresent(schema -> stringBuilder.append(schema).append("."));
stringBuilder.append("%").append(Objects.requireNonNull(parameters.getTable())); stringBuilder.append("%").append(Objects.requireNonNull(parameters.table()));
return stringBuilder.toString(); return stringBuilder.toString();
} }
}, },
@ -46,11 +46,11 @@ public enum SnowflakeInternalStageType implements DescribedValue {
@Override @Override
public String getStage(final SnowflakeInternalStageTypeParameters parameters) { public String getStage(final SnowflakeInternalStageTypeParameters parameters) {
final StringBuilder stringBuilder = new StringBuilder("@"); final StringBuilder stringBuilder = new StringBuilder("@");
Optional.ofNullable(parameters.getDatabase()) Optional.ofNullable(parameters.database())
.ifPresent(database -> stringBuilder.append(database).append(".")); .ifPresent(database -> stringBuilder.append(database).append("."));
Optional.ofNullable(parameters.getSchema()) Optional.ofNullable(parameters.schema())
.ifPresent(schema -> stringBuilder.append(schema).append(".")); .ifPresent(schema -> stringBuilder.append(schema).append("."));
stringBuilder.append(Objects.requireNonNull(parameters.getStageName())); stringBuilder.append(Objects.requireNonNull(parameters.stageName()));
return stringBuilder.toString(); return stringBuilder.toString();
} }
}; };
@ -82,11 +82,4 @@ public enum SnowflakeInternalStageType implements DescribedValue {
public abstract String getStage(final SnowflakeInternalStageTypeParameters parameters); public abstract String getStage(final SnowflakeInternalStageTypeParameters parameters);
public static SnowflakeInternalStageType forName(String stageType) {
return Stream.of(values())
.filter(internalStageType -> internalStageType.getValue().equalsIgnoreCase(stageType))
.findFirst()
.orElseThrow(
() -> new IllegalArgumentException("Invalid SnowflakeInternalStageType: " + stageType));
}
} }

View File

@ -17,33 +17,5 @@
package org.apache.nifi.processors.snowflake.util; package org.apache.nifi.processors.snowflake.util;
public class SnowflakeInternalStageTypeParameters { public record SnowflakeInternalStageTypeParameters(String database, String schema, String table, String stageName) {
}
private final String database;
private final String schema;
private final String table;
private final String stageName;
public SnowflakeInternalStageTypeParameters(final String database, final String schema, final String table, final String stageName) {
this.database = database;
this.schema = schema;
this.table = table;
this.stageName = stageName;
}
public String getDatabase() {
return database;
}
public String getSchema() {
return schema;
}
public String getTable() {
return table;
}
public String getStageName() {
return stageName;
}
}

View File

@ -17,17 +17,18 @@
package org.apache.nifi.processors.snowflake; package org.apache.nifi.processors.snowflake;
import java.nio.file.Path;
import java.nio.file.Paths;
import org.apache.nifi.key.service.StandardPrivateKeyService; import org.apache.nifi.key.service.StandardPrivateKeyService;
import org.apache.nifi.key.service.api.PrivateKeyService; import org.apache.nifi.key.service.api.PrivateKeyService;
import org.apache.nifi.processors.snowflake.util.SnowflakeProperties;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.snowflake.service.util.ConnectionUrlFormat;
import org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool; import org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool;
import org.apache.nifi.snowflake.service.StandardSnowflakeIngestManagerProviderService; import org.apache.nifi.snowflake.service.StandardSnowflakeIngestManagerProviderService;
import org.apache.nifi.processors.snowflake.util.SnowflakeProperties; import org.apache.nifi.snowflake.service.util.ConnectionUrlFormat;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import java.nio.file.Path;
import java.nio.file.Paths;
public interface SnowflakeConfigAware { public interface SnowflakeConfigAware {
Path filePath = Paths.get("???"); Path filePath = Paths.get("???");
@ -67,7 +68,7 @@ public interface SnowflakeConfigAware {
runner.setProperty(connectionProviderService, runner.setProperty(connectionProviderService,
SnowflakeComputingConnectionPool.CONNECTION_URL_FORMAT, SnowflakeComputingConnectionPool.CONNECTION_URL_FORMAT,
ConnectionUrlFormat.ACCOUNT_NAME.getValue()); ConnectionUrlFormat.ACCOUNT_NAME);
runner.setProperty(connectionProviderService, runner.setProperty(connectionProviderService,
SnowflakeComputingConnectionPool.SNOWFLAKE_ORGANIZATION_NAME, SnowflakeComputingConnectionPool.SNOWFLAKE_ORGANIZATION_NAME,
organizationName); organizationName);

View File

@ -17,10 +17,6 @@
package org.apache.nifi.processors.snowflake; package org.apache.nifi.processors.snowflake;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import net.snowflake.ingest.utils.StagedFileWrapper; import net.snowflake.ingest.utils.StagedFileWrapper;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -30,6 +26,11 @@ import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
class SnowflakePipeIT implements SnowflakeConfigAware { class SnowflakePipeIT implements SnowflakeConfigAware {
@Test @Test
@ -40,7 +41,7 @@ class SnowflakePipeIT implements SnowflakeConfigAware {
final SnowflakeConnectionProviderService connectionProviderService = createConnectionProviderService(runner); final SnowflakeConnectionProviderService connectionProviderService = createConnectionProviderService(runner);
runner.setProperty(PutSnowflakeInternalStage.SNOWFLAKE_CONNECTION_PROVIDER, connectionProviderService.getIdentifier()); runner.setProperty(PutSnowflakeInternalStage.SNOWFLAKE_CONNECTION_PROVIDER, connectionProviderService.getIdentifier());
runner.setProperty(PutSnowflakeInternalStage.INTERNAL_STAGE_TYPE, SnowflakeInternalStageType.NAMED.getValue()); runner.setProperty(PutSnowflakeInternalStage.INTERNAL_STAGE_TYPE, SnowflakeInternalStageType.NAMED);
runner.setProperty(PutSnowflakeInternalStage.INTERNAL_STAGE, internalStageName); runner.setProperty(PutSnowflakeInternalStage.INTERNAL_STAGE, internalStageName);
final String uuid = UUID.randomUUID().toString(); final String uuid = UUID.randomUUID().toString();

View File

@ -17,9 +17,10 @@
package org.apache.nifi.processors.snowflake; package org.apache.nifi.processors.snowflake;
import net.snowflake.client.jdbc.SnowflakeConnection;
import java.sql.Connection; import java.sql.Connection;
import java.sql.SQLException; import java.sql.SQLException;
import net.snowflake.client.jdbc.SnowflakeConnection;
public class SnowflakeConnectionWrapper implements AutoCloseable { public class SnowflakeConnectionWrapper implements AutoCloseable {

View File

@ -45,7 +45,6 @@ import org.apache.nifi.snowflake.service.util.ConnectionUrlFormatParameters;
import java.sql.Driver; import java.sql.Driver;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -86,7 +85,7 @@ public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool
.description("The format of the connection URL.") .description("The format of the connection URL.")
.allowableValues(ConnectionUrlFormat.class) .allowableValues(ConnectionUrlFormat.class)
.required(true) .required(true)
.defaultValue(ConnectionUrlFormat.FULL_URL.getValue()) .defaultValue(ConnectionUrlFormat.FULL_URL)
.build(); .build();
public static final PropertyDescriptor SNOWFLAKE_URL = new PropertyDescriptor.Builder() public static final PropertyDescriptor SNOWFLAKE_URL = new PropertyDescriptor.Builder()
@ -143,35 +142,30 @@ public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build(); .build();
private static final List<PropertyDescriptor> PROPERTIES; private static final List<PropertyDescriptor> PROPERTIES = List.of(
CONNECTION_URL_FORMAT,
static { SNOWFLAKE_URL,
final List<PropertyDescriptor> props = new ArrayList<>(); SNOWFLAKE_ACCOUNT_LOCATOR,
props.add(CONNECTION_URL_FORMAT); SNOWFLAKE_CLOUD_REGION,
props.add(SNOWFLAKE_URL); SNOWFLAKE_CLOUD_TYPE,
props.add(SNOWFLAKE_ACCOUNT_LOCATOR); SNOWFLAKE_ORGANIZATION_NAME,
props.add(SNOWFLAKE_CLOUD_REGION); SNOWFLAKE_ACCOUNT_NAME,
props.add(SNOWFLAKE_CLOUD_TYPE); SNOWFLAKE_USER,
props.add(SNOWFLAKE_ORGANIZATION_NAME); SNOWFLAKE_PASSWORD,
props.add(SNOWFLAKE_ACCOUNT_NAME); SnowflakeProperties.DATABASE,
props.add(SNOWFLAKE_USER); SnowflakeProperties.SCHEMA,
props.add(SNOWFLAKE_PASSWORD); SNOWFLAKE_WAREHOUSE,
props.add(SnowflakeProperties.DATABASE); ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE,
props.add(SnowflakeProperties.SCHEMA); VALIDATION_QUERY,
props.add(SNOWFLAKE_WAREHOUSE); MAX_WAIT_TIME,
props.add(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE); MAX_TOTAL_CONNECTIONS,
props.add(VALIDATION_QUERY); MIN_IDLE,
props.add(MAX_WAIT_TIME); MAX_IDLE,
props.add(MAX_TOTAL_CONNECTIONS); MAX_CONN_LIFETIME,
props.add(MIN_IDLE); EVICTION_RUN_PERIOD,
props.add(MAX_IDLE); MIN_EVICTABLE_IDLE_TIME,
props.add(MAX_CONN_LIFETIME); SOFT_MIN_EVICTABLE_IDLE_TIME
props.add(EVICTION_RUN_PERIOD); );
props.add(MIN_EVICTABLE_IDLE_TIME);
props.add(SOFT_MIN_EVICTABLE_IDLE_TIME);
PROPERTIES = Collections.unmodifiableList(props);
}
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -226,8 +220,7 @@ public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool
} }
protected String getUrl(final ConfigurationContext context) { protected String getUrl(final ConfigurationContext context) {
final ConnectionUrlFormat connectionUrlFormat = ConnectionUrlFormat.forName(context.getProperty(CONNECTION_URL_FORMAT) final ConnectionUrlFormat connectionUrlFormat = context.getProperty(CONNECTION_URL_FORMAT).asDescribedValue(ConnectionUrlFormat.class);
.getValue());
final ConnectionUrlFormatParameters parameters = getConnectionUrlFormatParameters(context); final ConnectionUrlFormatParameters parameters = getConnectionUrlFormatParameters(context);
return connectionUrlFormat.buildConnectionUrl(parameters); return connectionUrlFormat.buildConnectionUrl(parameters);
@ -291,28 +284,13 @@ public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool
} }
private ConnectionUrlFormatParameters getConnectionUrlFormatParameters(ConfigurationContext context) { private ConnectionUrlFormatParameters getConnectionUrlFormatParameters(ConfigurationContext context) {
final String snowflakeUrl = context.getProperty(SNOWFLAKE_URL).evaluateAttributeExpressions().getValue();
final String organizationName = context.getProperty(SNOWFLAKE_ORGANIZATION_NAME)
.evaluateAttributeExpressions()
.getValue();
final String accountName = context.getProperty(SNOWFLAKE_ACCOUNT_NAME)
.evaluateAttributeExpressions()
.getValue();
final String accountLocator = context.getProperty(SNOWFLAKE_ACCOUNT_LOCATOR)
.evaluateAttributeExpressions()
.getValue();
final String cloudRegion = context.getProperty(SNOWFLAKE_CLOUD_REGION)
.evaluateAttributeExpressions()
.getValue();
final String cloudType = context.getProperty(SNOWFLAKE_CLOUD_TYPE)
.evaluateAttributeExpressions()
.getValue();
return new ConnectionUrlFormatParameters( return new ConnectionUrlFormatParameters(
snowflakeUrl, context.getProperty(SNOWFLAKE_URL).evaluateAttributeExpressions().getValue(),
organizationName, context.getProperty(SNOWFLAKE_ORGANIZATION_NAME).evaluateAttributeExpressions().getValue(),
accountName, context.getProperty(SNOWFLAKE_ACCOUNT_NAME).evaluateAttributeExpressions().getValue(),
accountLocator, context.getProperty(SNOWFLAKE_ACCOUNT_LOCATOR).evaluateAttributeExpressions().getValue(),
cloudRegion, context.getProperty(SNOWFLAKE_CLOUD_REGION).evaluateAttributeExpressions().getValue(),
cloudType); context.getProperty(SNOWFLAKE_CLOUD_TYPE).evaluateAttributeExpressions().getValue()
);
} }
} }

View File

@ -17,12 +17,6 @@
package org.apache.nifi.snowflake.service; package org.apache.nifi.snowflake.service;
import java.security.NoSuchAlgorithmException;
import java.security.PrivateKey;
import java.security.spec.InvalidKeySpecException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import net.snowflake.ingest.SimpleIngestManager; import net.snowflake.ingest.SimpleIngestManager;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
@ -35,11 +29,16 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.key.service.api.PrivateKeyService; import org.apache.nifi.key.service.api.PrivateKeyService;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.snowflake.SnowflakeIngestManagerProviderService; import org.apache.nifi.processors.snowflake.SnowflakeIngestManagerProviderService;
import org.apache.nifi.processors.snowflake.util.SnowflakeProperties;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.snowflake.service.util.AccountIdentifierFormat; import org.apache.nifi.snowflake.service.util.AccountIdentifierFormat;
import org.apache.nifi.snowflake.service.util.AccountIdentifierFormatParameters; import org.apache.nifi.snowflake.service.util.AccountIdentifierFormatParameters;
import org.apache.nifi.snowflake.service.util.ConnectionUrlFormat; import org.apache.nifi.snowflake.service.util.ConnectionUrlFormat;
import org.apache.nifi.processors.snowflake.util.SnowflakeProperties;
import java.security.NoSuchAlgorithmException;
import java.security.PrivateKey;
import java.security.spec.InvalidKeySpecException;
import java.util.List;
@Tags({"snowflake", "jdbc", "database", "connection"}) @Tags({"snowflake", "jdbc", "database", "connection"})
@CapabilityDescription("Provides a Snowflake Ingest Manager for Snowflake pipe processors") @CapabilityDescription("Provides a Snowflake Ingest Manager for Snowflake pipe processors")
@ -53,7 +52,7 @@ public class StandardSnowflakeIngestManagerProviderService extends AbstractContr
.addValidator(StandardValidators.NON_BLANK_VALIDATOR) .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.required(true) .required(true)
.allowableValues(AccountIdentifierFormat.class) .allowableValues(AccountIdentifierFormat.class)
.defaultValue(AccountIdentifierFormat.ACCOUNT_NAME.getValue()) .defaultValue(AccountIdentifierFormat.ACCOUNT_NAME)
.build(); .build();
public static final PropertyDescriptor HOST_URL = new PropertyDescriptor.Builder() public static final PropertyDescriptor HOST_URL = new PropertyDescriptor.Builder()
@ -127,7 +126,7 @@ public class StandardSnowflakeIngestManagerProviderService extends AbstractContr
.required(true) .required(true)
.build(); .build();
static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList( static final List<PropertyDescriptor> PROPERTIES = List.of(
ACCOUNT_IDENTIFIER_FORMAT, ACCOUNT_IDENTIFIER_FORMAT,
HOST_URL, HOST_URL,
ACCOUNT_LOCATOR, ACCOUNT_LOCATOR,
@ -140,7 +139,7 @@ public class StandardSnowflakeIngestManagerProviderService extends AbstractContr
DATABASE, DATABASE,
SCHEMA, SCHEMA,
PIPE PIPE
)); );
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -153,20 +152,16 @@ public class StandardSnowflakeIngestManagerProviderService extends AbstractContr
@OnEnabled @OnEnabled
public void onEnabled(final ConfigurationContext context) throws InitializationException { public void onEnabled(final ConfigurationContext context) throws InitializationException {
final String user = context.getProperty(USER_NAME).evaluateAttributeExpressions().getValue(); final String user = context.getProperty(USER_NAME).evaluateAttributeExpressions().getValue();
final String database = context.getProperty(DATABASE) final String database = context.getProperty(DATABASE).evaluateAttributeExpressions().getValue();
.evaluateAttributeExpressions() final String schema = context.getProperty(SCHEMA).evaluateAttributeExpressions().getValue();
.getValue();
final String schema = context.getProperty(SCHEMA)
.evaluateAttributeExpressions()
.getValue();
final String pipe = context.getProperty(PIPE).evaluateAttributeExpressions().getValue(); final String pipe = context.getProperty(PIPE).evaluateAttributeExpressions().getValue();
fullyQualifiedPipeName = database + "." + schema + "." + pipe; fullyQualifiedPipeName = database + "." + schema + "." + pipe;
final PrivateKeyService privateKeyService = context.getProperty(PRIVATE_KEY_SERVICE) final PrivateKeyService privateKeyService = context.getProperty(PRIVATE_KEY_SERVICE)
.asControllerService(PrivateKeyService.class); .asControllerService(PrivateKeyService.class);
final PrivateKey privateKey = privateKeyService.getPrivateKey(); final PrivateKey privateKey = privateKeyService.getPrivateKey();
final AccountIdentifierFormat accountIdentifierFormat = AccountIdentifierFormat.forName(context.getProperty(ACCOUNT_IDENTIFIER_FORMAT) final AccountIdentifierFormat accountIdentifierFormat = context.getProperty(ACCOUNT_IDENTIFIER_FORMAT)
.getValue()); .asDescribedValue(AccountIdentifierFormat.class);
final AccountIdentifierFormatParameters parameters = getAccountIdentifierFormatParameters(context); final AccountIdentifierFormatParameters parameters = getAccountIdentifierFormatParameters(context);
final String account = accountIdentifierFormat.getAccount(parameters); final String account = accountIdentifierFormat.getAccount(parameters);
final String host = accountIdentifierFormat.getHostname(parameters); final String host = accountIdentifierFormat.getHostname(parameters);
@ -196,29 +191,13 @@ public class StandardSnowflakeIngestManagerProviderService extends AbstractContr
} }
private AccountIdentifierFormatParameters getAccountIdentifierFormatParameters(ConfigurationContext context) { private AccountIdentifierFormatParameters getAccountIdentifierFormatParameters(ConfigurationContext context) {
final String hostUrl = context.getProperty(HOST_URL) return new AccountIdentifierFormatParameters(
.evaluateAttributeExpressions() context.getProperty(HOST_URL).evaluateAttributeExpressions().getValue(),
.getValue(); context.getProperty(ORGANIZATION_NAME).evaluateAttributeExpressions().getValue(),
final String organizationName = context.getProperty(ORGANIZATION_NAME) context.getProperty(ACCOUNT_NAME).evaluateAttributeExpressions().getValue(),
.evaluateAttributeExpressions() context.getProperty(ACCOUNT_LOCATOR).evaluateAttributeExpressions().getValue(),
.getValue(); context.getProperty(CLOUD_REGION).evaluateAttributeExpressions().getValue(),
final String accountName = context.getProperty(ACCOUNT_NAME) context.getProperty(CLOUD_TYPE).evaluateAttributeExpressions().getValue()
.evaluateAttributeExpressions() );
.getValue();
final String accountLocator = context.getProperty(ACCOUNT_LOCATOR)
.evaluateAttributeExpressions()
.getValue();
final String cloudRegion = context.getProperty(CLOUD_REGION)
.evaluateAttributeExpressions()
.getValue();
final String cloudType = context.getProperty(CLOUD_TYPE)
.evaluateAttributeExpressions()
.getValue();
return new AccountIdentifierFormatParameters(hostUrl,
organizationName,
accountName,
accountLocator,
cloudRegion,
cloudType);
} }
} }

View File

@ -17,10 +17,10 @@
package org.apache.nifi.snowflake.service.util; package org.apache.nifi.snowflake.service.util;
import org.apache.nifi.components.DescribedValue;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Stream;
import org.apache.nifi.components.DescribedValue;
public enum AccountIdentifierFormat implements DescribedValue { public enum AccountIdentifierFormat implements DescribedValue {
FULL_URL("full-url", "Full URL", "Provide an account identifier in a single property") { FULL_URL("full-url", "Full URL", "Provide an account identifier in a single property") {
@ -100,10 +100,4 @@ public enum AccountIdentifierFormat implements DescribedValue {
public abstract String getAccount(final AccountIdentifierFormatParameters parameters); public abstract String getAccount(final AccountIdentifierFormatParameters parameters);
public abstract String getHostname(final AccountIdentifierFormatParameters parameters); public abstract String getHostname(final AccountIdentifierFormatParameters parameters);
public static AccountIdentifierFormat forName(String provideMethod) {
return Stream.of(values()).filter(provider -> provider.getValue().equalsIgnoreCase(provideMethod))
.findFirst()
.orElseThrow(
() -> new IllegalArgumentException("Invalid AccountIdentifierFormat: " + provideMethod));
}
} }

View File

@ -17,10 +17,10 @@
package org.apache.nifi.snowflake.service.util; package org.apache.nifi.snowflake.service.util;
import org.apache.nifi.components.DescribedValue;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Stream;
import org.apache.nifi.components.DescribedValue;
public enum ConnectionUrlFormat implements DescribedValue { public enum ConnectionUrlFormat implements DescribedValue {
FULL_URL("full-url", "Full URL", "Provide connection URL in a single property") { FULL_URL("full-url", "Full URL", "Provide connection URL in a single property") {
@ -90,10 +90,4 @@ public enum ConnectionUrlFormat implements DescribedValue {
public abstract String buildConnectionUrl(final ConnectionUrlFormatParameters parameters); public abstract String buildConnectionUrl(final ConnectionUrlFormatParameters parameters);
public static ConnectionUrlFormat forName(String provideMethod) {
return Stream.of(values()).filter(provider -> provider.getValue().equalsIgnoreCase(provideMethod))
.findFirst()
.orElseThrow(
() -> new IllegalArgumentException("Invalid ConnectionUrlFormat: " + provideMethod));
}
} }