From ccf3f3507624d6df17321d1a41a86b8d0405d8cf Mon Sep 17 00:00:00 2001 From: EndzeitBegins <16666115+EndzeitBegins@users.noreply.github.com> Date: Sat, 23 Dec 2023 21:48:49 +0100 Subject: [PATCH] NIFI-12546 Updated nifi-snowflake-bundle using current API methods This closes #8187 Signed-off-by: David Handermann --- .../snowflake/GetSnowflakeIngestStatus.java | 31 +++---- .../snowflake/PutSnowflakeInternalStage.java | 32 +++---- .../snowflake/StartSnowflakeIngest.java | 25 ++---- .../util/SnowflakeInternalStageType.java | 23 ++--- .../SnowflakeInternalStageTypeParameters.java | 32 +------ .../snowflake/SnowflakeConfigAware.java | 11 +-- .../processors/snowflake/SnowflakePipeIT.java | 11 +-- .../snowflake/SnowflakeConnectionWrapper.java | 3 +- .../SnowflakeComputingConnectionPool.java | 88 +++++++------------ ...SnowflakeIngestManagerProviderService.java | 63 +++++-------- .../service/util/AccountIdentifierFormat.java | 10 +-- .../service/util/ConnectionUrlFormat.java | 10 +-- 12 files changed, 115 insertions(+), 224 deletions(-) diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/GetSnowflakeIngestStatus.java b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/GetSnowflakeIngestStatus.java index c1d1f28181..76997da03e 100644 --- a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/GetSnowflakeIngestStatus.java +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/GetSnowflakeIngestStatus.java @@ -17,16 +17,6 @@ package org.apache.nifi.processors.snowflake; -import static org.apache.nifi.processors.snowflake.util.SnowflakeAttributes.ATTRIBUTE_STAGED_FILE_PATH; - -import java.io.IOException; -import java.net.URISyntaxException; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Optional; -import java.util.Set; import net.snowflake.ingest.SimpleIngestManager; import net.snowflake.ingest.connection.HistoryResponse; import net.snowflake.ingest.connection.HistoryResponse.FileEntry; @@ -47,6 +37,14 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import static org.apache.nifi.processors.snowflake.util.SnowflakeAttributes.ATTRIBUTE_STAGED_FILE_PATH; + @InputRequirement(Requirement.INPUT_REQUIRED) @DefaultSettings(penaltyDuration = "5 sec") @ReadsAttributes({ @@ -81,15 +79,9 @@ public class GetSnowflakeIngestStatus extends AbstractProcessor { .description("For FlowFiles whose file is still not ingested. These FlowFiles should be routed back to this processor to try again later") .build(); - static final List PROPERTIES = Collections.singletonList( - INGEST_MANAGER_PROVIDER - ); + static final List PROPERTIES = List.of(INGEST_MANAGER_PROVIDER); - private static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( - REL_SUCCESS, - REL_RETRY, - REL_FAILURE - ))); + private static final Set RELATIONSHIPS = Set.of(REL_SUCCESS, REL_RETRY, REL_FAILURE); @Override protected List getSupportedPropertyDescriptors() { @@ -135,7 +127,7 @@ public class GetSnowflakeIngestStatus extends AbstractProcessor { .filter(entry -> entry.getPath().equals(stagedFilePath) && entry.isComplete()) .findFirst()); - if (!fileEntry.isPresent()) { + if (fileEntry.isEmpty()) { session.transfer(session.penalize(flowFile), REL_RETRY); return; } @@ -147,6 +139,5 @@ public class GetSnowflakeIngestStatus extends AbstractProcessor { return; } session.transfer(flowFile, REL_SUCCESS); - } } diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java index e2edf12adf..4053b6ab72 100644 --- a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java @@ -17,17 +17,6 @@ package org.apache.nifi.processors.snowflake; -import static org.apache.nifi.processors.snowflake.util.SnowflakeAttributes.ATTRIBUTE_STAGED_FILE_PATH; - -import java.io.IOException; -import java.io.InputStream; -import java.sql.SQLException; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.UUID; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.WritesAttribute; @@ -48,6 +37,15 @@ import org.apache.nifi.processors.snowflake.util.SnowflakeInternalStageType; import org.apache.nifi.processors.snowflake.util.SnowflakeInternalStageTypeParameters; import org.apache.nifi.processors.snowflake.util.SnowflakeProperties; +import java.io.IOException; +import java.io.InputStream; +import java.sql.SQLException; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import static org.apache.nifi.processors.snowflake.util.SnowflakeAttributes.ATTRIBUTE_STAGED_FILE_PATH; + @InputRequirement(Requirement.INPUT_REQUIRED) @WritesAttributes({ @WritesAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH, @@ -115,19 +113,16 @@ public class PutSnowflakeInternalStage extends AbstractProcessor { .description("For FlowFiles of failed PUT operation") .build(); - static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList( + static final List PROPERTIES = List.of( SNOWFLAKE_CONNECTION_PROVIDER, INTERNAL_STAGE_TYPE, DATABASE, SCHEMA, TABLE, INTERNAL_STAGE - )); + ); - private static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( - REL_SUCCESS, - REL_FAILURE - ))); + private static final Set RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE); @Override protected List getSupportedPropertyDescriptors() { @@ -146,8 +141,7 @@ public class PutSnowflakeInternalStage extends AbstractProcessor { return; } - final SnowflakeInternalStageType internalStageType = SnowflakeInternalStageType.forName(context.getProperty(INTERNAL_STAGE_TYPE) - .getValue()); + final SnowflakeInternalStageType internalStageType = context.getProperty(INTERNAL_STAGE_TYPE).asDescribedValue(SnowflakeInternalStageType.class); final SnowflakeInternalStageTypeParameters parameters = getSnowflakeInternalStageTypeParameters(context, flowFile); final String internalStageName = internalStageType.getStage(parameters); final SnowflakeConnectionProviderService connectionProviderService = diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/StartSnowflakeIngest.java b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/StartSnowflakeIngest.java index 0980f2b627..4fbede80db 100644 --- a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/StartSnowflakeIngest.java +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/StartSnowflakeIngest.java @@ -17,15 +17,6 @@ package org.apache.nifi.processors.snowflake; -import static org.apache.nifi.processors.snowflake.util.SnowflakeAttributes.ATTRIBUTE_STAGED_FILE_PATH; - -import java.io.IOException; -import java.net.URISyntaxException; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; import net.snowflake.ingest.SimpleIngestManager; import net.snowflake.ingest.connection.IngestResponseException; import net.snowflake.ingest.utils.StagedFileWrapper; @@ -44,6 +35,13 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.List; +import java.util.Set; + +import static org.apache.nifi.processors.snowflake.util.SnowflakeAttributes.ATTRIBUTE_STAGED_FILE_PATH; + @InputRequirement(Requirement.INPUT_REQUIRED) @ReadsAttributes({ @ReadsAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH, description = "Staged file path") @@ -74,14 +72,9 @@ public class StartSnowflakeIngest extends AbstractProcessor { .description("For FlowFiles of failed ingest request") .build(); - static final List PROPERTIES = Collections.singletonList( - INGEST_MANAGER_PROVIDER - ); + static final List PROPERTIES = List.of(INGEST_MANAGER_PROVIDER); - private static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( - REL_SUCCESS, - REL_FAILURE - ))); + private static final Set RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE); @Override protected List getSupportedPropertyDescriptors() { diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/util/SnowflakeInternalStageType.java b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/util/SnowflakeInternalStageType.java index db4d857e15..fbc5ecc499 100644 --- a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/util/SnowflakeInternalStageType.java +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/util/SnowflakeInternalStageType.java @@ -17,10 +17,10 @@ package org.apache.nifi.processors.snowflake.util; +import org.apache.nifi.components.DescribedValue; + import java.util.Objects; import java.util.Optional; -import java.util.stream.Stream; -import org.apache.nifi.components.DescribedValue; public enum SnowflakeInternalStageType implements DescribedValue { USER("user", "User", "Use the user's internal stage") { @@ -33,12 +33,12 @@ public enum SnowflakeInternalStageType implements DescribedValue { @Override public String getStage(final SnowflakeInternalStageTypeParameters parameters) { final StringBuilder stringBuilder = new StringBuilder("@"); - Optional.ofNullable(parameters.getDatabase()) + Optional.ofNullable(parameters.database()) .ifPresent(database -> stringBuilder.append(database).append(".")); - Optional.ofNullable(parameters.getSchema()) + Optional.ofNullable(parameters.schema()) .ifPresent(schema -> stringBuilder.append(schema).append(".")); - stringBuilder.append("%").append(Objects.requireNonNull(parameters.getTable())); + stringBuilder.append("%").append(Objects.requireNonNull(parameters.table())); return stringBuilder.toString(); } }, @@ -46,11 +46,11 @@ public enum SnowflakeInternalStageType implements DescribedValue { @Override public String getStage(final SnowflakeInternalStageTypeParameters parameters) { final StringBuilder stringBuilder = new StringBuilder("@"); - Optional.ofNullable(parameters.getDatabase()) + Optional.ofNullable(parameters.database()) .ifPresent(database -> stringBuilder.append(database).append(".")); - Optional.ofNullable(parameters.getSchema()) + Optional.ofNullable(parameters.schema()) .ifPresent(schema -> stringBuilder.append(schema).append(".")); - stringBuilder.append(Objects.requireNonNull(parameters.getStageName())); + stringBuilder.append(Objects.requireNonNull(parameters.stageName())); return stringBuilder.toString(); } }; @@ -82,11 +82,4 @@ public enum SnowflakeInternalStageType implements DescribedValue { public abstract String getStage(final SnowflakeInternalStageTypeParameters parameters); - public static SnowflakeInternalStageType forName(String stageType) { - return Stream.of(values()) - .filter(internalStageType -> internalStageType.getValue().equalsIgnoreCase(stageType)) - .findFirst() - .orElseThrow( - () -> new IllegalArgumentException("Invalid SnowflakeInternalStageType: " + stageType)); - } } diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/util/SnowflakeInternalStageTypeParameters.java b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/util/SnowflakeInternalStageTypeParameters.java index 6d26151b33..61ad9f636e 100644 --- a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/util/SnowflakeInternalStageTypeParameters.java +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/util/SnowflakeInternalStageTypeParameters.java @@ -17,33 +17,5 @@ package org.apache.nifi.processors.snowflake.util; -public class SnowflakeInternalStageTypeParameters { - - private final String database; - private final String schema; - private final String table; - private final String stageName; - - public SnowflakeInternalStageTypeParameters(final String database, final String schema, final String table, final String stageName) { - this.database = database; - this.schema = schema; - this.table = table; - this.stageName = stageName; - } - - public String getDatabase() { - return database; - } - - public String getSchema() { - return schema; - } - - public String getTable() { - return table; - } - - public String getStageName() { - return stageName; - } -} +public record SnowflakeInternalStageTypeParameters(String database, String schema, String table, String stageName) { +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/test/java/org/apache/nifi/processors/snowflake/SnowflakeConfigAware.java b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/test/java/org/apache/nifi/processors/snowflake/SnowflakeConfigAware.java index 2e892835dd..9a7e58beb1 100644 --- a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/test/java/org/apache/nifi/processors/snowflake/SnowflakeConfigAware.java +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/test/java/org/apache/nifi/processors/snowflake/SnowflakeConfigAware.java @@ -17,17 +17,18 @@ package org.apache.nifi.processors.snowflake; -import java.nio.file.Path; -import java.nio.file.Paths; import org.apache.nifi.key.service.StandardPrivateKeyService; import org.apache.nifi.key.service.api.PrivateKeyService; +import org.apache.nifi.processors.snowflake.util.SnowflakeProperties; import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.snowflake.service.util.ConnectionUrlFormat; import org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool; import org.apache.nifi.snowflake.service.StandardSnowflakeIngestManagerProviderService; -import org.apache.nifi.processors.snowflake.util.SnowflakeProperties; +import org.apache.nifi.snowflake.service.util.ConnectionUrlFormat; import org.apache.nifi.util.TestRunner; +import java.nio.file.Path; +import java.nio.file.Paths; + public interface SnowflakeConfigAware { Path filePath = Paths.get("???"); @@ -67,7 +68,7 @@ public interface SnowflakeConfigAware { runner.setProperty(connectionProviderService, SnowflakeComputingConnectionPool.CONNECTION_URL_FORMAT, - ConnectionUrlFormat.ACCOUNT_NAME.getValue()); + ConnectionUrlFormat.ACCOUNT_NAME); runner.setProperty(connectionProviderService, SnowflakeComputingConnectionPool.SNOWFLAKE_ORGANIZATION_NAME, organizationName); diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/test/java/org/apache/nifi/processors/snowflake/SnowflakePipeIT.java b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/test/java/org/apache/nifi/processors/snowflake/SnowflakePipeIT.java index ce602d6990..91eed724b7 100644 --- a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/test/java/org/apache/nifi/processors/snowflake/SnowflakePipeIT.java +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/test/java/org/apache/nifi/processors/snowflake/SnowflakePipeIT.java @@ -17,10 +17,6 @@ package org.apache.nifi.processors.snowflake; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; import net.snowflake.ingest.utils.StagedFileWrapper; import org.apache.commons.io.FileUtils; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -30,6 +26,11 @@ import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.Test; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + class SnowflakePipeIT implements SnowflakeConfigAware { @Test @@ -40,7 +41,7 @@ class SnowflakePipeIT implements SnowflakeConfigAware { final SnowflakeConnectionProviderService connectionProviderService = createConnectionProviderService(runner); runner.setProperty(PutSnowflakeInternalStage.SNOWFLAKE_CONNECTION_PROVIDER, connectionProviderService.getIdentifier()); - runner.setProperty(PutSnowflakeInternalStage.INTERNAL_STAGE_TYPE, SnowflakeInternalStageType.NAMED.getValue()); + runner.setProperty(PutSnowflakeInternalStage.INTERNAL_STAGE_TYPE, SnowflakeInternalStageType.NAMED); runner.setProperty(PutSnowflakeInternalStage.INTERNAL_STAGE, internalStageName); final String uuid = UUID.randomUUID().toString(); diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api/src/main/java/org/apache/nifi/processors/snowflake/SnowflakeConnectionWrapper.java b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api/src/main/java/org/apache/nifi/processors/snowflake/SnowflakeConnectionWrapper.java index 2c66a81a70..a754593215 100644 --- a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api/src/main/java/org/apache/nifi/processors/snowflake/SnowflakeConnectionWrapper.java +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api/src/main/java/org/apache/nifi/processors/snowflake/SnowflakeConnectionWrapper.java @@ -17,9 +17,10 @@ package org.apache.nifi.processors.snowflake; +import net.snowflake.client.jdbc.SnowflakeConnection; + import java.sql.Connection; import java.sql.SQLException; -import net.snowflake.client.jdbc.SnowflakeConnection; public class SnowflakeConnectionWrapper implements AutoCloseable { diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/SnowflakeComputingConnectionPool.java b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/SnowflakeComputingConnectionPool.java index 60b6cb001a..1d384642ec 100644 --- a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/SnowflakeComputingConnectionPool.java +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/SnowflakeComputingConnectionPool.java @@ -45,7 +45,6 @@ import org.apache.nifi.snowflake.service.util.ConnectionUrlFormatParameters; import java.sql.Driver; import java.sql.DriverManager; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -86,7 +85,7 @@ public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool .description("The format of the connection URL.") .allowableValues(ConnectionUrlFormat.class) .required(true) - .defaultValue(ConnectionUrlFormat.FULL_URL.getValue()) + .defaultValue(ConnectionUrlFormat.FULL_URL) .build(); public static final PropertyDescriptor SNOWFLAKE_URL = new PropertyDescriptor.Builder() @@ -143,35 +142,30 @@ public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) .build(); - private static final List PROPERTIES; - - static { - final List props = new ArrayList<>(); - props.add(CONNECTION_URL_FORMAT); - props.add(SNOWFLAKE_URL); - props.add(SNOWFLAKE_ACCOUNT_LOCATOR); - props.add(SNOWFLAKE_CLOUD_REGION); - props.add(SNOWFLAKE_CLOUD_TYPE); - props.add(SNOWFLAKE_ORGANIZATION_NAME); - props.add(SNOWFLAKE_ACCOUNT_NAME); - props.add(SNOWFLAKE_USER); - props.add(SNOWFLAKE_PASSWORD); - props.add(SnowflakeProperties.DATABASE); - props.add(SnowflakeProperties.SCHEMA); - props.add(SNOWFLAKE_WAREHOUSE); - props.add(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE); - props.add(VALIDATION_QUERY); - props.add(MAX_WAIT_TIME); - props.add(MAX_TOTAL_CONNECTIONS); - props.add(MIN_IDLE); - props.add(MAX_IDLE); - props.add(MAX_CONN_LIFETIME); - props.add(EVICTION_RUN_PERIOD); - props.add(MIN_EVICTABLE_IDLE_TIME); - props.add(SOFT_MIN_EVICTABLE_IDLE_TIME); - - PROPERTIES = Collections.unmodifiableList(props); - } + private static final List PROPERTIES = List.of( + CONNECTION_URL_FORMAT, + SNOWFLAKE_URL, + SNOWFLAKE_ACCOUNT_LOCATOR, + SNOWFLAKE_CLOUD_REGION, + SNOWFLAKE_CLOUD_TYPE, + SNOWFLAKE_ORGANIZATION_NAME, + SNOWFLAKE_ACCOUNT_NAME, + SNOWFLAKE_USER, + SNOWFLAKE_PASSWORD, + SnowflakeProperties.DATABASE, + SnowflakeProperties.SCHEMA, + SNOWFLAKE_WAREHOUSE, + ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE, + VALIDATION_QUERY, + MAX_WAIT_TIME, + MAX_TOTAL_CONNECTIONS, + MIN_IDLE, + MAX_IDLE, + MAX_CONN_LIFETIME, + EVICTION_RUN_PERIOD, + MIN_EVICTABLE_IDLE_TIME, + SOFT_MIN_EVICTABLE_IDLE_TIME + ); @Override protected List getSupportedPropertyDescriptors() { @@ -226,8 +220,7 @@ public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool } protected String getUrl(final ConfigurationContext context) { - final ConnectionUrlFormat connectionUrlFormat = ConnectionUrlFormat.forName(context.getProperty(CONNECTION_URL_FORMAT) - .getValue()); + final ConnectionUrlFormat connectionUrlFormat = context.getProperty(CONNECTION_URL_FORMAT).asDescribedValue(ConnectionUrlFormat.class); final ConnectionUrlFormatParameters parameters = getConnectionUrlFormatParameters(context); return connectionUrlFormat.buildConnectionUrl(parameters); @@ -291,28 +284,13 @@ public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool } private ConnectionUrlFormatParameters getConnectionUrlFormatParameters(ConfigurationContext context) { - final String snowflakeUrl = context.getProperty(SNOWFLAKE_URL).evaluateAttributeExpressions().getValue(); - final String organizationName = context.getProperty(SNOWFLAKE_ORGANIZATION_NAME) - .evaluateAttributeExpressions() - .getValue(); - final String accountName = context.getProperty(SNOWFLAKE_ACCOUNT_NAME) - .evaluateAttributeExpressions() - .getValue(); - final String accountLocator = context.getProperty(SNOWFLAKE_ACCOUNT_LOCATOR) - .evaluateAttributeExpressions() - .getValue(); - final String cloudRegion = context.getProperty(SNOWFLAKE_CLOUD_REGION) - .evaluateAttributeExpressions() - .getValue(); - final String cloudType = context.getProperty(SNOWFLAKE_CLOUD_TYPE) - .evaluateAttributeExpressions() - .getValue(); return new ConnectionUrlFormatParameters( - snowflakeUrl, - organizationName, - accountName, - accountLocator, - cloudRegion, - cloudType); + context.getProperty(SNOWFLAKE_URL).evaluateAttributeExpressions().getValue(), + context.getProperty(SNOWFLAKE_ORGANIZATION_NAME).evaluateAttributeExpressions().getValue(), + context.getProperty(SNOWFLAKE_ACCOUNT_NAME).evaluateAttributeExpressions().getValue(), + context.getProperty(SNOWFLAKE_ACCOUNT_LOCATOR).evaluateAttributeExpressions().getValue(), + context.getProperty(SNOWFLAKE_CLOUD_REGION).evaluateAttributeExpressions().getValue(), + context.getProperty(SNOWFLAKE_CLOUD_TYPE).evaluateAttributeExpressions().getValue() + ); } } diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/StandardSnowflakeIngestManagerProviderService.java b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/StandardSnowflakeIngestManagerProviderService.java index 07bbe55a6b..20648cd8a1 100644 --- a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/StandardSnowflakeIngestManagerProviderService.java +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/StandardSnowflakeIngestManagerProviderService.java @@ -17,12 +17,6 @@ package org.apache.nifi.snowflake.service; -import java.security.NoSuchAlgorithmException; -import java.security.PrivateKey; -import java.security.spec.InvalidKeySpecException; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; import net.snowflake.ingest.SimpleIngestManager; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; @@ -35,11 +29,16 @@ import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.key.service.api.PrivateKeyService; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.snowflake.SnowflakeIngestManagerProviderService; +import org.apache.nifi.processors.snowflake.util.SnowflakeProperties; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.snowflake.service.util.AccountIdentifierFormat; import org.apache.nifi.snowflake.service.util.AccountIdentifierFormatParameters; import org.apache.nifi.snowflake.service.util.ConnectionUrlFormat; -import org.apache.nifi.processors.snowflake.util.SnowflakeProperties; + +import java.security.NoSuchAlgorithmException; +import java.security.PrivateKey; +import java.security.spec.InvalidKeySpecException; +import java.util.List; @Tags({"snowflake", "jdbc", "database", "connection"}) @CapabilityDescription("Provides a Snowflake Ingest Manager for Snowflake pipe processors") @@ -53,7 +52,7 @@ public class StandardSnowflakeIngestManagerProviderService extends AbstractContr .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .required(true) .allowableValues(AccountIdentifierFormat.class) - .defaultValue(AccountIdentifierFormat.ACCOUNT_NAME.getValue()) + .defaultValue(AccountIdentifierFormat.ACCOUNT_NAME) .build(); public static final PropertyDescriptor HOST_URL = new PropertyDescriptor.Builder() @@ -127,7 +126,7 @@ public class StandardSnowflakeIngestManagerProviderService extends AbstractContr .required(true) .build(); - static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList( + static final List PROPERTIES = List.of( ACCOUNT_IDENTIFIER_FORMAT, HOST_URL, ACCOUNT_LOCATOR, @@ -140,7 +139,7 @@ public class StandardSnowflakeIngestManagerProviderService extends AbstractContr DATABASE, SCHEMA, PIPE - )); + ); @Override protected List getSupportedPropertyDescriptors() { @@ -153,20 +152,16 @@ public class StandardSnowflakeIngestManagerProviderService extends AbstractContr @OnEnabled public void onEnabled(final ConfigurationContext context) throws InitializationException { final String user = context.getProperty(USER_NAME).evaluateAttributeExpressions().getValue(); - final String database = context.getProperty(DATABASE) - .evaluateAttributeExpressions() - .getValue(); - final String schema = context.getProperty(SCHEMA) - .evaluateAttributeExpressions() - .getValue(); + final String database = context.getProperty(DATABASE).evaluateAttributeExpressions().getValue(); + final String schema = context.getProperty(SCHEMA).evaluateAttributeExpressions().getValue(); final String pipe = context.getProperty(PIPE).evaluateAttributeExpressions().getValue(); fullyQualifiedPipeName = database + "." + schema + "." + pipe; final PrivateKeyService privateKeyService = context.getProperty(PRIVATE_KEY_SERVICE) .asControllerService(PrivateKeyService.class); final PrivateKey privateKey = privateKeyService.getPrivateKey(); - final AccountIdentifierFormat accountIdentifierFormat = AccountIdentifierFormat.forName(context.getProperty(ACCOUNT_IDENTIFIER_FORMAT) - .getValue()); + final AccountIdentifierFormat accountIdentifierFormat = context.getProperty(ACCOUNT_IDENTIFIER_FORMAT) + .asDescribedValue(AccountIdentifierFormat.class); final AccountIdentifierFormatParameters parameters = getAccountIdentifierFormatParameters(context); final String account = accountIdentifierFormat.getAccount(parameters); final String host = accountIdentifierFormat.getHostname(parameters); @@ -196,29 +191,13 @@ public class StandardSnowflakeIngestManagerProviderService extends AbstractContr } private AccountIdentifierFormatParameters getAccountIdentifierFormatParameters(ConfigurationContext context) { - final String hostUrl = context.getProperty(HOST_URL) - .evaluateAttributeExpressions() - .getValue(); - final String organizationName = context.getProperty(ORGANIZATION_NAME) - .evaluateAttributeExpressions() - .getValue(); - final String accountName = context.getProperty(ACCOUNT_NAME) - .evaluateAttributeExpressions() - .getValue(); - final String accountLocator = context.getProperty(ACCOUNT_LOCATOR) - .evaluateAttributeExpressions() - .getValue(); - final String cloudRegion = context.getProperty(CLOUD_REGION) - .evaluateAttributeExpressions() - .getValue(); - final String cloudType = context.getProperty(CLOUD_TYPE) - .evaluateAttributeExpressions() - .getValue(); - return new AccountIdentifierFormatParameters(hostUrl, - organizationName, - accountName, - accountLocator, - cloudRegion, - cloudType); + return new AccountIdentifierFormatParameters( + context.getProperty(HOST_URL).evaluateAttributeExpressions().getValue(), + context.getProperty(ORGANIZATION_NAME).evaluateAttributeExpressions().getValue(), + context.getProperty(ACCOUNT_NAME).evaluateAttributeExpressions().getValue(), + context.getProperty(ACCOUNT_LOCATOR).evaluateAttributeExpressions().getValue(), + context.getProperty(CLOUD_REGION).evaluateAttributeExpressions().getValue(), + context.getProperty(CLOUD_TYPE).evaluateAttributeExpressions().getValue() + ); } } diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/AccountIdentifierFormat.java b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/AccountIdentifierFormat.java index 4b52dd69db..98d500d530 100644 --- a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/AccountIdentifierFormat.java +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/AccountIdentifierFormat.java @@ -17,10 +17,10 @@ package org.apache.nifi.snowflake.service.util; +import org.apache.nifi.components.DescribedValue; + import java.util.Objects; import java.util.Optional; -import java.util.stream.Stream; -import org.apache.nifi.components.DescribedValue; public enum AccountIdentifierFormat implements DescribedValue { FULL_URL("full-url", "Full URL", "Provide an account identifier in a single property") { @@ -100,10 +100,4 @@ public enum AccountIdentifierFormat implements DescribedValue { public abstract String getAccount(final AccountIdentifierFormatParameters parameters); public abstract String getHostname(final AccountIdentifierFormatParameters parameters); - public static AccountIdentifierFormat forName(String provideMethod) { - return Stream.of(values()).filter(provider -> provider.getValue().equalsIgnoreCase(provideMethod)) - .findFirst() - .orElseThrow( - () -> new IllegalArgumentException("Invalid AccountIdentifierFormat: " + provideMethod)); - } } diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/ConnectionUrlFormat.java b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/ConnectionUrlFormat.java index 04ea51eb44..47d2f01461 100644 --- a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/ConnectionUrlFormat.java +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/ConnectionUrlFormat.java @@ -17,10 +17,10 @@ package org.apache.nifi.snowflake.service.util; +import org.apache.nifi.components.DescribedValue; + import java.util.Objects; import java.util.Optional; -import java.util.stream.Stream; -import org.apache.nifi.components.DescribedValue; public enum ConnectionUrlFormat implements DescribedValue { FULL_URL("full-url", "Full URL", "Provide connection URL in a single property") { @@ -90,10 +90,4 @@ public enum ConnectionUrlFormat implements DescribedValue { public abstract String buildConnectionUrl(final ConnectionUrlFormatParameters parameters); - public static ConnectionUrlFormat forName(String provideMethod) { - return Stream.of(values()).filter(provider -> provider.getValue().equalsIgnoreCase(provideMethod)) - .findFirst() - .orElseThrow( - () -> new IllegalArgumentException("Invalid ConnectionUrlFormat: " + provideMethod)); - } }