From 13d343d5ee45269c7d882b6e55bbf783217bc38b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lehel=20Bo=C3=A9r?= Date: Wed, 8 Feb 2023 20:08:24 +0100 Subject: [PATCH] NIFI-11151: Improving code reusability of DBCP services This closes #6935. Signed-off-by: Peter Turcsanyi --- .../nifi/dbcp/AbstractDBCPConnectionPool.java | 475 ++++-------------- .../nifi/dbcp/utils/DBCPProperties.java | 199 ++++++++ .../dbcp/utils/DataSourceConfiguration.java | 174 +++++++ .../dbcp/utils/DefaultDataSourceValues.java | 79 +++ .../SnowflakeComputingConnectionPool.java | 88 +++- .../apache/nifi/dbcp/DBCPConnectionPool.java | 171 ++++++- .../sink/db/DatabaseRecordSinkTest.groovy | 32 +- .../org/apache/nifi/dbcp/DBCPServiceTest.java | 37 +- 8 files changed, 807 insertions(+), 448 deletions(-) create mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/utils/DBCPProperties.java create mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/utils/DataSourceConfiguration.java create mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/utils/DefaultDataSourceValues.java diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/AbstractDBCPConnectionPool.java b/nifi-nar-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/AbstractDBCPConnectionPool.java index c71ac4766b..cc15e36c11 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/AbstractDBCPConnectionPool.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/AbstractDBCPConnectionPool.java @@ -16,290 +16,43 @@ */ package org.apache.nifi.dbcp; -import java.util.HashMap; import org.apache.commons.dbcp2.BasicDataSource; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.apache.nifi.annotation.lifecycle.OnDisabled; import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; -import org.apache.nifi.components.resource.ResourceCardinality; -import org.apache.nifi.components.resource.ResourceType; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.VerifiableControllerService; -import org.apache.nifi.expression.AttributeExpression; -import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.kerberos.KerberosCredentialsService; +import org.apache.nifi.dbcp.utils.DataSourceConfiguration; import org.apache.nifi.kerberos.KerberosUserService; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.security.krb.KerberosAction; -import org.apache.nifi.security.krb.KerberosKeytabUser; import org.apache.nifi.security.krb.KerberosLoginException; -import org.apache.nifi.security.krb.KerberosPasswordUser; import org.apache.nifi.security.krb.KerberosUser; -import javax.security.auth.login.LoginException; import java.sql.Connection; import java.sql.Driver; -import java.sql.DriverManager; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.apache.nifi.components.ConfigVerificationResult.Outcome.FAILED; import static org.apache.nifi.components.ConfigVerificationResult.Outcome.SUCCESSFUL; +import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_DRIVER_LOCATION; +import static org.apache.nifi.dbcp.utils.DBCPProperties.KERBEROS_USER_SERVICE; -/** - * Abstract base class for Database Connection Pooling Services using Apache Commons DBCP as the underlying connection pool implementation. - * - */ public abstract class AbstractDBCPConnectionPool extends AbstractControllerService implements DBCPService, VerifiableControllerService { - /** Property Name Prefix for Sensitive Dynamic Properties */ - protected static final String SENSITIVE_PROPERTY_PREFIX = "SENSITIVE."; - - /** - * Copied from {@link GenericObjectPoolConfig#DEFAULT_MIN_IDLE} in Commons-DBCP 2.7.0 - */ - private static final String DEFAULT_MIN_IDLE = "0"; - /** - * Copied from {@link GenericObjectPoolConfig#DEFAULT_MAX_IDLE} in Commons-DBCP 2.7.0 - */ - private static final String DEFAULT_MAX_IDLE = "8"; - /** - * Copied from private variable {@link BasicDataSource#maxConnLifetimeMillis} in Commons-DBCP 2.7.0 - */ - private static final String DEFAULT_MAX_CONN_LIFETIME = "-1"; - /** - * Copied from {@link GenericObjectPoolConfig#DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS} in Commons-DBCP 2.7.0 - */ - private static final String DEFAULT_EVICTION_RUN_PERIOD = String.valueOf(-1L); - /** - * Copied from {@link GenericObjectPoolConfig#DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS} in Commons-DBCP 2.7.0 - * and converted from 1800000L to "1800000 millis" to "30 mins" - */ - private static final String DEFAULT_MIN_EVICTABLE_IDLE_TIME = "30 mins"; - /** - * Copied from {@link GenericObjectPoolConfig#DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME_MILLIS} in Commons-DBCP 2.7.0 - */ - private static final String DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME = String.valueOf(-1L); - - public static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder() - .name("Database Connection URL") - .description("A database connection URL used to connect to a database. May contain database system name, host, port, database name and some parameters." - + " The exact syntax of a database connection URL is specified by your DBMS.") - .defaultValue(null) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .required(true) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .build(); - - public static final PropertyDescriptor DB_DRIVERNAME = new PropertyDescriptor.Builder() - .name("Database Driver Class Name") - .description("Database driver class name") - .defaultValue(null) - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .build(); - - public static final PropertyDescriptor DB_DRIVER_LOCATION = new PropertyDescriptor.Builder() - .name("database-driver-locations") - .displayName("Database Driver Location(s)") - .description("Comma-separated list of files/folders and/or URLs containing the driver JAR and its dependencies (if any). For example '/var/tmp/mariadb-java-client-1.1.7.jar'") - .defaultValue(null) - .required(false) - .identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, ResourceType.DIRECTORY, ResourceType.URL) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .dynamicallyModifiesClasspath(true) - .build(); - - public static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder() - .name("Database User") - .description("Database user name") - .defaultValue(null) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .build(); - - public static final PropertyDescriptor DB_PASSWORD = new PropertyDescriptor.Builder() - .name("Password") - .description("The password for the database user") - .defaultValue(null) - .required(false) - .sensitive(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .build(); - - public static final PropertyDescriptor MAX_WAIT_TIME = new PropertyDescriptor.Builder() - .name("Max Wait Time") - .description("The maximum amount of time that the pool will wait (when there are no available connections) " - + " for a connection to be returned before failing, or -1 to wait indefinitely. ") - .defaultValue("500 millis") - .required(true) - .addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .sensitive(false) - .build(); - - public static final PropertyDescriptor MAX_TOTAL_CONNECTIONS = new PropertyDescriptor.Builder() - .name("Max Total Connections") - .description("The maximum number of active connections that can be allocated from this pool at the same time, " - + " or negative for no limit.") - .defaultValue("8") - .required(true) - .addValidator(StandardValidators.INTEGER_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .sensitive(false) - .build(); - - public static final PropertyDescriptor VALIDATION_QUERY = new PropertyDescriptor.Builder() - .name("Validation-query") - .displayName("Validation query") - .description("Validation query used to validate connections before returning them. " - + "When connection is invalid, it gets dropped and new valid connection will be returned. " - + "Note!! Using validation might have some performance penalty.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .build(); - - public static final PropertyDescriptor MIN_IDLE = new PropertyDescriptor.Builder() - .displayName("Minimum Idle Connections") - .name("dbcp-min-idle-conns") - .description("The minimum number of connections that can remain idle in the pool without extra ones being " + - "created. Set to or zero to allow no idle connections.") - .defaultValue(DEFAULT_MIN_IDLE) - .required(false) - .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .build(); - - public static final PropertyDescriptor MAX_IDLE = new PropertyDescriptor.Builder() - .displayName("Max Idle Connections") - .name("dbcp-max-idle-conns") - .description("The maximum number of connections that can remain idle in the pool without extra ones being " + - "released. Set to any negative value to allow unlimited idle connections.") - .defaultValue(DEFAULT_MAX_IDLE) - .required(false) - .addValidator(StandardValidators.INTEGER_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .build(); - - public static final PropertyDescriptor MAX_CONN_LIFETIME = new PropertyDescriptor.Builder() - .displayName("Max Connection Lifetime") - .name("dbcp-max-conn-lifetime") - .description("The maximum lifetime in milliseconds of a connection. After this time is exceeded the " + - "connection will fail the next activation, passivation or validation test. A value of zero or less " + - "means the connection has an infinite lifetime.") - .defaultValue(DEFAULT_MAX_CONN_LIFETIME) - .required(false) - .addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .build(); - - public static final PropertyDescriptor EVICTION_RUN_PERIOD = new PropertyDescriptor.Builder() - .displayName("Time Between Eviction Runs") - .name("dbcp-time-between-eviction-runs") - .description("The number of milliseconds to sleep between runs of the idle connection evictor thread. When " + - "non-positive, no idle connection evictor thread will be run.") - .defaultValue(DEFAULT_EVICTION_RUN_PERIOD) - .required(false) - .addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .build(); - - public static final PropertyDescriptor MIN_EVICTABLE_IDLE_TIME = new PropertyDescriptor.Builder() - .displayName("Minimum Evictable Idle Time") - .name("dbcp-min-evictable-idle-time") - .description("The minimum amount of time a connection may sit idle in the pool before it is eligible for eviction.") - .defaultValue(DEFAULT_MIN_EVICTABLE_IDLE_TIME) - .required(false) - .addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .build(); - - public static final PropertyDescriptor SOFT_MIN_EVICTABLE_IDLE_TIME = new PropertyDescriptor.Builder() - .displayName("Soft Minimum Evictable Idle Time") - .name("dbcp-soft-min-evictable-idle-time") - .description("The minimum amount of time a connection may sit idle in the pool before it is eligible for " + - "eviction by the idle connection evictor, with the extra condition that at least a minimum number of" + - " idle connections remain in the pool. When the not-soft version of this option is set to a positive" + - " value, it is examined first by the idle connection evictor: when idle connections are visited by " + - "the evictor, idle time is first compared against it (without considering the number of idle " + - "connections in the pool) and then against this soft option, including the minimum idle connections " + - "constraint.") - .defaultValue(DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME) - .required(false) - .addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .build(); - - public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder() - .name("kerberos-credentials-service") - .displayName("Kerberos Credentials Service") - .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos") - .identifiesControllerService(KerberosCredentialsService.class) - .required(false) - .build(); - - public static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder() - .name("kerberos-user-service") - .displayName("Kerberos User Service") - .description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos") - .identifiesControllerService(KerberosUserService.class) - .required(false) - .build(); - - public static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder() - .name("kerberos-principal") - .displayName("Kerberos Principal") - .description("The principal to use when specifying the principal and password directly in the processor for authenticating via Kerberos.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING)) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .build(); - - public static final PropertyDescriptor KERBEROS_PASSWORD = new PropertyDescriptor.Builder() - .name("kerberos-password") - .displayName("Kerberos Password") - .description("The password to use when specifying the principal and password directly in the processor for authenticating via Kerberos.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .sensitive(true) - .build(); protected volatile BasicDataSource dataSource; protected volatile KerberosUser kerberosUser; - @Override - protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { - final PropertyDescriptor.Builder builder = new PropertyDescriptor.Builder() - .name(propertyDescriptorName) - .required(false) - .dynamic(true) - .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) - .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR); - - if (propertyDescriptorName.startsWith(SENSITIVE_PROPERTY_PREFIX)) { - builder.sensitive(true).expressionLanguageSupported(ExpressionLanguageScope.NONE); - } else { - builder.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY); - } - - return builder.build(); - } - @Override public List verify(final ConfigurationContext context, final ComponentLog verificationLogger, final Map variables) { List results = new ArrayList<>(); @@ -325,7 +78,8 @@ public abstract class AbstractDBCPConnectionPool extends AbstractControllerServi final BasicDataSource dataSource = new BasicDataSource(); try { - configureDataSource(dataSource, kerberosUser, context); + final DataSourceConfiguration configuration = getDataSourceConfiguration(context); + configureDataSource(context, configuration); results.add(new ConfigVerificationResult.Builder() .verificationStepName("Configure Data Source") .outcome(SUCCESSFUL) @@ -365,7 +119,6 @@ public abstract class AbstractDBCPConnectionPool extends AbstractControllerServi verificationLogger.error("Failed to shut down data source", e); } } - return results; } @@ -373,40 +126,24 @@ public abstract class AbstractDBCPConnectionPool extends AbstractControllerServi * Configures connection pool by creating an instance of the * {@link BasicDataSource} based on configuration provided with * {@link ConfigurationContext}. - * + *

* This operation makes no guarantees that the actual connection could be * made since the underlying system may still go off-line during normal * operation of the connection pool. * - * @param context - * the configuration context - * @throws InitializationException - * if unable to create a database connection + * @param context the configuration context + * @throws InitializationException if unable to create a database connection */ @OnEnabled public void onConfigured(final ConfigurationContext context) throws InitializationException { - kerberosUser = getKerberosUser(context); dataSource = new BasicDataSource(); - configureDataSource(dataSource, kerberosUser, context); + kerberosUser = getKerberosUser(context); + loginKerberos(kerberosUser); + final DataSourceConfiguration configuration = getDataSourceConfiguration(context); + configureDataSource(context, configuration); } - private void configureDataSource(final BasicDataSource dataSource, final KerberosUser kerberosUser, - final ConfigurationContext context) throws InitializationException { - final String dburl = getUrl(context); - - final String driverName = context.getProperty(DB_DRIVERNAME).evaluateAttributeExpressions().getValue(); - final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue(); - final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue(); - final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger(); - final String validationQuery = context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue(); - final Long maxWaitMillis = extractMillisWithInfinite(context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions()); - final Integer minIdle = context.getProperty(MIN_IDLE).evaluateAttributeExpressions().asInteger(); - final Integer maxIdle = context.getProperty(MAX_IDLE).evaluateAttributeExpressions().asInteger(); - final Long maxConnLifetimeMillis = extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME).evaluateAttributeExpressions()); - final Long timeBetweenEvictionRunsMillis = extractMillisWithInfinite(context.getProperty(EVICTION_RUN_PERIOD).evaluateAttributeExpressions()); - final Long minEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions()); - final Long softMinEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions()); - + private void loginKerberos(KerberosUser kerberosUser) throws InitializationException { if (kerberosUser != null) { try { kerberosUser.login(); @@ -414,136 +151,71 @@ public abstract class AbstractDBCPConnectionPool extends AbstractControllerServi throw new InitializationException("Unable to authenticate Kerberos principal", e); } } + } - dataSource.setDriver(getDriver(driverName, dburl)); - dataSource.setMaxWaitMillis(maxWaitMillis); - dataSource.setMaxTotal(maxTotal); - dataSource.setMinIdle(minIdle); - dataSource.setMaxIdle(maxIdle); - dataSource.setMaxConnLifetimeMillis(maxConnLifetimeMillis); - dataSource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis); - dataSource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis); - dataSource.setSoftMinEvictableIdleTimeMillis(softMinEvictableIdleTimeMillis); + protected abstract Driver getDriver(final String driverName, final String url); - if (validationQuery != null && !validationQuery.isEmpty()) { + protected abstract DataSourceConfiguration getDataSourceConfiguration(final ConfigurationContext context); + + protected void configureDataSource(final ConfigurationContext context, final DataSourceConfiguration configuration) { + final Driver driver = getDriver(configuration.getDriverName(), configuration.getUrl()); + + dataSource.setDriver(driver); + dataSource.setMaxWaitMillis(configuration.getMaxWaitMillis()); + dataSource.setMaxTotal(configuration.getMaxTotal()); + dataSource.setMinIdle(configuration.getMinIdle()); + dataSource.setMaxIdle(configuration.getMaxIdle()); + dataSource.setMaxConnLifetimeMillis(configuration.getMaxConnLifetimeMillis()); + dataSource.setTimeBetweenEvictionRunsMillis(configuration.getTimeBetweenEvictionRunsMillis()); + dataSource.setMinEvictableIdleTimeMillis(configuration.getMinEvictableIdleTimeMillis()); + dataSource.setSoftMinEvictableIdleTimeMillis(configuration.getSoftMinEvictableIdleTimeMillis()); + + final String validationQuery = configuration.getValidationQuery(); + if (StringUtils.isNotBlank(validationQuery)) { dataSource.setValidationQuery(validationQuery); dataSource.setTestOnBorrow(true); } - dataSource.setUrl(dburl); - dataSource.setUsername(user); - dataSource.setPassword(passw); - - final List dynamicProperties = context.getProperties() - .keySet() - .stream() - .filter(PropertyDescriptor::isDynamic) - .collect(Collectors.toList()); - - dynamicProperties.forEach((descriptor) -> { - final PropertyValue propertyValue = context.getProperty(descriptor); - if (descriptor.isSensitive()) { - final String propertyName = StringUtils.substringAfter(descriptor.getName(), SENSITIVE_PROPERTY_PREFIX); - dataSource.addConnectionProperty(propertyName, propertyValue.getValue()); - } else { - dataSource.addConnectionProperty(descriptor.getName(), propertyValue.evaluateAttributeExpressions().getValue()); - } - }); + dataSource.setUrl(configuration.getUrl()); + dataSource.setUsername(configuration.getUserName()); + dataSource.setPassword(configuration.getPassword()); getConnectionProperties(context).forEach(dataSource::addConnectionProperty); } - private KerberosUser getKerberosUser(final ConfigurationContext context) { - KerberosUser kerberosUser = null; - final KerberosCredentialsService kerberosCredentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); + protected Map getConnectionProperties(final ConfigurationContext context) { + return getDynamicProperties(context) + .stream() + .collect(Collectors.toMap(PropertyDescriptor::getName, s -> { + final PropertyValue propertyValue = context.getProperty(s); + return propertyValue.evaluateAttributeExpressions().getValue(); + })); + } + + protected List getDynamicProperties(final ConfigurationContext context) { + return context.getProperties() + .keySet() + .stream() + .filter(PropertyDescriptor::isDynamic) + .collect(Collectors.toList()); + } + + protected KerberosUser getKerberosUser(final ConfigurationContext context) { + final KerberosUser kerberosUser; final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class); - final String kerberosPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue(); - final String kerberosPassword = context.getProperty(KERBEROS_PASSWORD).getValue(); if (kerberosUserService != null) { kerberosUser = kerberosUserService.createKerberosUser(); - } else if (kerberosCredentialsService != null) { - kerberosUser = new KerberosKeytabUser(kerberosCredentialsService.getPrincipal(), kerberosCredentialsService.getKeytab()); - } else if (!StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword)) { - kerberosUser = new KerberosPasswordUser(kerberosPrincipal, kerberosPassword); + } else { + kerberosUser = getKerberosUserByCredentials(context); } return kerberosUser; } - protected String getUrl(ConfigurationContext context) { - return context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue(); + protected KerberosUser getKerberosUserByCredentials(final ConfigurationContext context) { + return null; } - protected Driver getDriver(final String driverName, final String url) { - final Class clazz; - - try { - clazz = Class.forName(driverName); - } catch (final ClassNotFoundException e) { - throw new ProcessException("Driver class " + driverName + " is not found", e); - } - - try { - return DriverManager.getDriver(url); - } catch (final SQLException e) { - // In case the driver is not registered by the implementation, we explicitly try to register it. - try { - final Driver driver = (Driver) clazz.newInstance(); - DriverManager.registerDriver(driver); - return DriverManager.getDriver(url); - } catch (final SQLException e2) { - throw new ProcessException("No suitable driver for the given Database Connection URL", e2); - } catch (final IllegalAccessException | InstantiationException e2) { - throw new ProcessException("Creating driver instance is failed", e2); - } - } - } - - /** - * Override in subclasses to provide connection properties to the data source - * - * @return Key-value pairs that will be added as connection properties - */ - protected Map getConnectionProperties(final ConfigurationContext context) { - return new HashMap<>(); - } - - protected Long extractMillisWithInfinite(PropertyValue prop) { - return "-1".equals(prop.getValue()) ? -1 : prop.asTimePeriod(TimeUnit.MILLISECONDS); - } - - /** - * Shutdown pool, close all open connections. - * If a principal is authenticated with a KDC, that principal is logged out. - * - * If a @{@link LoginException} occurs while attempting to log out the @{@link org.apache.nifi.security.krb.KerberosUser}, - * an attempt will still be made to shut down the pool and close open connections. - * - * @throws SQLException if there is an error while closing open connections - * @throws LoginException if there is an error during the principal log out, and will only be thrown if there was - * no exception while closing open connections - */ - @OnDisabled - public void shutdown() throws SQLException { - try { - this.shutdown(dataSource, kerberosUser); - } finally { - kerberosUser = null; - dataSource = null; - } - } - - private void shutdown(final BasicDataSource dataSource, final KerberosUser kerberosUser) throws SQLException { - try { - if (kerberosUser != null) { - kerberosUser.logout(); - } - } finally { - if (dataSource != null) { - dataSource.close(); - } - } - } @Override public Connection getConnection() throws ProcessException { @@ -554,7 +226,7 @@ public abstract class AbstractDBCPConnectionPool extends AbstractControllerServi try { final Connection con; if (kerberosUser != null) { - KerberosAction kerberosAction = new KerberosAction<>(kerberosUser, () -> dataSource.getConnection(), getLogger()); + KerberosAction kerberosAction = new KerberosAction<>(kerberosUser, dataSource::getConnection, getLogger()); con = kerberosAction.execute(); } else { con = dataSource.getConnection(); @@ -574,8 +246,31 @@ public abstract class AbstractDBCPConnectionPool extends AbstractControllerServi } } - @Override - public String toString() { - return this.getClass().getSimpleName() + "[id=" + getIdentifier() + "]"; + /** + * Shutdown pool, close all open connections. + * If a principal is authenticated with a KDC, that principal is logged out. + * + * @throws SQLException if there is an error while closing open connections + */ + @OnDisabled + public void shutdown() throws SQLException { + try { + shutdown(dataSource, kerberosUser); + } finally { + kerberosUser = null; + dataSource = null; + } + } + + private void shutdown(final BasicDataSource dataSource, final KerberosUser kerberosUser) throws SQLException { + try { + if (kerberosUser != null) { + kerberosUser.logout(); + } + } finally { + if (dataSource != null) { + dataSource.close(); + } + } } } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/utils/DBCPProperties.java b/nifi-nar-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/utils/DBCPProperties.java new file mode 100644 index 0000000000..a59719cdd2 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/utils/DBCPProperties.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.dbcp.utils; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.resource.ResourceCardinality; +import org.apache.nifi.components.resource.ResourceType; +import org.apache.nifi.dbcp.DBCPValidator; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.kerberos.KerberosUserService; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.concurrent.TimeUnit; + +public final class DBCPProperties { + + private DBCPProperties() { + } + + public static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder() + .name("Database Connection URL") + .description("A database connection URL used to connect to a database. May contain database system name, host, port, database name and some parameters." + + " The exact syntax of a database connection URL is specified by your DBMS.") + .defaultValue(null) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder() + .name("Database User") + .description("Database user name") + .defaultValue(null) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor DB_PASSWORD = new PropertyDescriptor.Builder() + .name("Password") + .description("The password for the database user") + .defaultValue(null) + .required(false) + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + + public static final PropertyDescriptor DB_DRIVERNAME = new PropertyDescriptor.Builder() + .name("Database Driver Class Name") + .description("Database driver class name") + .defaultValue(null) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor DB_DRIVER_LOCATION = new PropertyDescriptor.Builder() + .name("database-driver-locations") + .displayName("Database Driver Location(s)") + .description("Comma-separated list of files/folders and/or URLs containing the driver JAR and its dependencies (if any). For example '/var/tmp/mariadb-java-client-1.1.7.jar'") + .defaultValue(null) + .required(false) + .identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, ResourceType.DIRECTORY, ResourceType.URL) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .dynamicallyModifiesClasspath(true) + .build(); + + public static final PropertyDescriptor MAX_WAIT_TIME = new PropertyDescriptor.Builder() + .name("Max Wait Time") + .description("The maximum amount of time that the pool will wait (when there are no available connections) " + + " for a connection to be returned before failing, or -1 to wait indefinitely. ") + .defaultValue(DefaultDataSourceValues.MAX_WAIT_TIME.getValue()) + .required(true) + .addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .sensitive(false) + .build(); + + public static final PropertyDescriptor MAX_TOTAL_CONNECTIONS = new PropertyDescriptor.Builder() + .name("Max Total Connections") + .description("The maximum number of active connections that can be allocated from this pool at the same time, " + + " or negative for no limit.") + .defaultValue(DefaultDataSourceValues.MAX_TOTAL_CONNECTIONS.getValue()) + .required(true) + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .sensitive(false) + .build(); + + public static final PropertyDescriptor VALIDATION_QUERY = new PropertyDescriptor.Builder() + .name("Validation-query") + .displayName("Validation query") + .description("Validation query used to validate connections before returning them. " + + "When connection is invalid, it gets dropped and new valid connection will be returned. " + + "Note!! Using validation might have some performance penalty.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor MIN_IDLE = new PropertyDescriptor.Builder() + .displayName("Minimum Idle Connections") + .name("dbcp-min-idle-conns") + .description("The minimum number of connections that can remain idle in the pool without extra ones being " + + "created. Set to or zero to allow no idle connections.") + .defaultValue(DefaultDataSourceValues.MIN_IDLE.getValue()) + .required(false) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor MAX_IDLE = new PropertyDescriptor.Builder() + .displayName("Max Idle Connections") + .name("dbcp-max-idle-conns") + .description("The maximum number of connections that can remain idle in the pool without extra ones being " + + "released. Set to any negative value to allow unlimited idle connections.") + .defaultValue(DefaultDataSourceValues.MAX_IDLE.getValue()) + .required(false) + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor MAX_CONN_LIFETIME = new PropertyDescriptor.Builder() + .displayName("Max Connection Lifetime") + .name("dbcp-max-conn-lifetime") + .description("The maximum lifetime in milliseconds of a connection. After this time is exceeded the " + + "connection will fail the next activation, passivation or validation test. A value of zero or less " + + "means the connection has an infinite lifetime.") + .defaultValue(DefaultDataSourceValues.MAX_CONN_LIFETIME.getValue()) + .required(false) + .addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor EVICTION_RUN_PERIOD = new PropertyDescriptor.Builder() + .displayName("Time Between Eviction Runs") + .name("dbcp-time-between-eviction-runs") + .description("The number of milliseconds to sleep between runs of the idle connection evictor thread. When " + + "non-positive, no idle connection evictor thread will be run.") + .defaultValue(DefaultDataSourceValues.EVICTION_RUN_PERIOD.getValue()) + .required(false) + .addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor MIN_EVICTABLE_IDLE_TIME = new PropertyDescriptor.Builder() + .displayName("Minimum Evictable Idle Time") + .name("dbcp-min-evictable-idle-time") + .description("The minimum amount of time a connection may sit idle in the pool before it is eligible for eviction.") + .defaultValue(DefaultDataSourceValues.MIN_EVICTABLE_IDLE_TIME.getValue()) + .required(false) + .addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor SOFT_MIN_EVICTABLE_IDLE_TIME = new PropertyDescriptor.Builder() + .displayName("Soft Minimum Evictable Idle Time") + .name("dbcp-soft-min-evictable-idle-time") + .description("The minimum amount of time a connection may sit idle in the pool before it is eligible for " + + "eviction by the idle connection evictor, with the extra condition that at least a minimum number of" + + " idle connections remain in the pool. When the not-soft version of this option is set to a positive" + + " value, it is examined first by the idle connection evictor: when idle connections are visited by " + + "the evictor, idle time is first compared against it (without considering the number of idle " + + "connections in the pool) and then against this soft option, including the minimum idle connections " + + "constraint.") + .defaultValue(DefaultDataSourceValues.SOFT_MIN_EVICTABLE_IDLE_TIME.getValue()) + .required(false) + .addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder() + .name("kerberos-user-service") + .displayName("Kerberos User Service") + .description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos") + .identifiesControllerService(KerberosUserService.class) + .required(false) + .build(); + + public static Long extractMillisWithInfinite(PropertyValue prop) { + return "-1".equals(prop.getValue()) ? -1 : prop.asTimePeriod(TimeUnit.MILLISECONDS); + } +} diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/utils/DataSourceConfiguration.java b/nifi-nar-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/utils/DataSourceConfiguration.java new file mode 100644 index 0000000000..416a8783e7 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/utils/DataSourceConfiguration.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.dbcp.utils; + +public class DataSourceConfiguration { + + private final String url; + private final String driverName; + private final String userName; + private final String password; + private final long maxWaitMillis; + private final int maxTotal; + private final int minIdle; + private final int maxIdle; + private final long maxConnLifetimeMillis; + private final long timeBetweenEvictionRunsMillis; + private final long minEvictableIdleTimeMillis; + private final long softMinEvictableIdleTimeMillis; + private final String validationQuery; + + public String getUrl() { + return url; + } + + public String getDriverName() { + return driverName; + } + + public String getUserName() { + return userName; + } + + public String getPassword() { + return password; + } + + public long getMaxWaitMillis() { + return maxWaitMillis; + } + + public int getMaxTotal() { + return maxTotal; + } + + public int getMinIdle() { + return minIdle; + } + + public int getMaxIdle() { + return maxIdle; + } + + public long getMaxConnLifetimeMillis() { + return maxConnLifetimeMillis; + } + + public long getTimeBetweenEvictionRunsMillis() { + return timeBetweenEvictionRunsMillis; + } + + public long getMinEvictableIdleTimeMillis() { + return minEvictableIdleTimeMillis; + } + + public long getSoftMinEvictableIdleTimeMillis() { + return softMinEvictableIdleTimeMillis; + } + + public String getValidationQuery() { + return validationQuery; + } + + public DataSourceConfiguration(final Builder builder) { + this.url = builder.url; + this.driverName = builder.driverName; + this.userName = builder.userName; + this.password = builder.password; + this.maxWaitMillis = builder.maxWaitMillis; + this.maxTotal = builder.maxTotal; + this.minIdle = builder.minIdle; + this.maxIdle = builder.maxIdle; + this.maxConnLifetimeMillis = builder.maxConnLifetimeMillis; + this.timeBetweenEvictionRunsMillis = builder.timeBetweenEvictionRunsMillis; + this.minEvictableIdleTimeMillis = builder.minEvictableIdleTimeMillis; + this.softMinEvictableIdleTimeMillis = builder.softMinEvictableIdleTimeMillis; + this.validationQuery = builder.validationQuery; + } + + public static class Builder { + private final String url; + private final String driverName; + private final String userName; + private final String password; + private long maxWaitMillis = DefaultDataSourceValues.MAX_WAIT_TIME.getLongValue(); + private int maxTotal = DefaultDataSourceValues.MAX_TOTAL_CONNECTIONS.getLongValue().intValue(); + private int minIdle = DefaultDataSourceValues.MIN_IDLE.getLongValue().intValue(); + private int maxIdle = DefaultDataSourceValues.MAX_IDLE.getLongValue().intValue(); + private long maxConnLifetimeMillis = DefaultDataSourceValues.MAX_CONN_LIFETIME.getLongValue(); + private long timeBetweenEvictionRunsMillis = DefaultDataSourceValues.EVICTION_RUN_PERIOD.getLongValue(); + private long minEvictableIdleTimeMillis = DefaultDataSourceValues.MIN_EVICTABLE_IDLE_TIME.getLongValue(); + private long softMinEvictableIdleTimeMillis = DefaultDataSourceValues.SOFT_MIN_EVICTABLE_IDLE_TIME.getLongValue(); + private String validationQuery; + + public Builder(final String url, final String driverName, final String userName, final String password) { + this.url = url; + this.driverName = driverName; + this.userName = userName; + this.password = password; + } + + public Builder maxWaitMillis(long maxWaitMillis) { + this.maxWaitMillis = maxWaitMillis; + return this; + } + + public Builder maxTotal(int maxTotal) { + this.maxTotal = maxTotal; + return this; + } + + public Builder minIdle(int minIdle) { + this.minIdle = minIdle; + return this; + } + + public Builder maxIdle(int maxIdle) { + this.maxIdle = maxIdle; + return this; + } + + public Builder maxConnLifetimeMillis(long maxConnLifetimeMillis) { + this.maxConnLifetimeMillis = maxConnLifetimeMillis; + return this; + } + + public Builder timeBetweenEvictionRunsMillis(long timeBetweenEvictionRunsMillis) { + this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis; + return this; + } + + public Builder minEvictableIdleTimeMillis(long minEvictableIdleTimeMillis) { + this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis; + return this; + } + + public Builder softMinEvictableIdleTimeMillis(long softMinEvictableIdleTimeMillis) { + this.softMinEvictableIdleTimeMillis = softMinEvictableIdleTimeMillis; + return this; + } + + public Builder validationQuery(String validationQuery) { + this.validationQuery = validationQuery; + return this; + } + + public DataSourceConfiguration build() { + return new DataSourceConfiguration(this); + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/utils/DefaultDataSourceValues.java b/nifi-nar-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/utils/DefaultDataSourceValues.java new file mode 100644 index 0000000000..e16fa45b18 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/utils/DefaultDataSourceValues.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.dbcp.utils; + +import org.apache.commons.dbcp2.BasicDataSource; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.nifi.util.FormatUtils; + +import java.util.concurrent.TimeUnit; + +public enum DefaultDataSourceValues { + + MAX_WAIT_TIME("500 millis") { + @Override + public Long getLongValue() { + return (long) FormatUtils.getPreciseTimeDuration(MAX_WAIT_TIME.value, TimeUnit.MILLISECONDS); + } + }, + MAX_TOTAL_CONNECTIONS("8"), + /** + * Copied from {@link GenericObjectPoolConfig#DEFAULT_MIN_IDLE} in Commons-DBCP 2.7.0 + */ + MIN_IDLE("0"), + /** + * Copied from {@link GenericObjectPoolConfig#DEFAULT_MAX_IDLE} in Commons-DBCP 2.7.0 + */ + MAX_IDLE("8"), + /** + * Copied from private variable {@link BasicDataSource#maxConnLifetimeMillis} in Commons-DBCP 2.7.0 + */ + MAX_CONN_LIFETIME("-1"), + /** + * Copied from {@link GenericObjectPoolConfig#DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS} in Commons-DBCP 2.7.0 + */ + EVICTION_RUN_PERIOD("-1"), + /** + * Copied from {@link GenericObjectPoolConfig#DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS} in Commons-DBCP 2.7.0 + * and converted from 1800000L to "1800000 millis" to "30 mins" + */ + MIN_EVICTABLE_IDLE_TIME("30 mins") { + @Override + public Long getLongValue() { + return (long) FormatUtils.getPreciseTimeDuration(MAX_WAIT_TIME.value, TimeUnit.MINUTES); + } + }, + /** + * Copied from {@link GenericObjectPoolConfig#DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME_MILLIS} in Commons-DBCP 2.7.0 + */ + SOFT_MIN_EVICTABLE_IDLE_TIME("-1"); + + + private final String value; + + DefaultDataSourceValues(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + + public Long getLongValue() { + return Long.parseLong(value); + } +} diff --git a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/SnowflakeComputingConnectionPool.java b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/SnowflakeComputingConnectionPool.java index a3f3c0c55f..492d14340f 100644 --- a/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/SnowflakeComputingConnectionPool.java +++ b/nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/SnowflakeComputingConnectionPool.java @@ -16,14 +16,6 @@ */ package org.apache.nifi.snowflake.service; -import java.sql.Driver; -import java.sql.DriverManager; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import net.snowflake.client.core.SFSessionProperty; import net.snowflake.client.jdbc.SnowflakeDriver; import org.apache.nifi.annotation.behavior.DynamicProperties; @@ -36,16 +28,40 @@ import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.dbcp.AbstractDBCPConnectionPool; +import org.apache.nifi.dbcp.utils.DBCPProperties; +import org.apache.nifi.dbcp.utils.DataSourceConfiguration; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.snowflake.SnowflakeConnectionProviderService; import org.apache.nifi.processors.snowflake.SnowflakeConnectionWrapper; +import org.apache.nifi.processors.snowflake.util.SnowflakeProperties; import org.apache.nifi.proxy.ProxyConfiguration; import org.apache.nifi.proxy.ProxyConfigurationService; -import org.apache.nifi.snowflake.service.util.ConnectionUrlFormatParameters; -import org.apache.nifi.processors.snowflake.util.SnowflakeProperties; import org.apache.nifi.snowflake.service.util.ConnectionUrlFormat; +import org.apache.nifi.snowflake.service.util.ConnectionUrlFormatParameters; + +import java.sql.Driver; +import java.sql.DriverManager; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_DRIVERNAME; +import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_PASSWORD; +import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_USER; +import static org.apache.nifi.dbcp.utils.DBCPProperties.EVICTION_RUN_PERIOD; +import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_CONN_LIFETIME; +import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_IDLE; +import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_TOTAL_CONNECTIONS; +import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_WAIT_TIME; +import static org.apache.nifi.dbcp.utils.DBCPProperties.MIN_EVICTABLE_IDLE_TIME; +import static org.apache.nifi.dbcp.utils.DBCPProperties.MIN_IDLE; +import static org.apache.nifi.dbcp.utils.DBCPProperties.SOFT_MIN_EVICTABLE_IDLE_TIME; +import static org.apache.nifi.dbcp.utils.DBCPProperties.VALIDATION_QUERY; +import static org.apache.nifi.dbcp.utils.DBCPProperties.extractMillisWithInfinite; /** * Implementation of Database Connection Pooling Service for Snowflake. Apache DBCP is used for connection pooling @@ -54,14 +70,14 @@ import org.apache.nifi.snowflake.service.util.ConnectionUrlFormat; @Tags({"snowflake", "dbcp", "jdbc", "database", "connection", "pooling", "store"}) @CapabilityDescription("Provides Snowflake Connection Pooling Service. Connections can be asked from pool and returned after usage.") @DynamicProperties({ - @DynamicProperty(name = "JDBC property name", - value = "Snowflake JDBC property value", - expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY, - description = "Snowflake JDBC driver property name and value applied to JDBC connections."), - @DynamicProperty(name = "SENSITIVE.JDBC property name", - value = "Snowflake JDBC property value", - expressionLanguageScope = ExpressionLanguageScope.NONE, - description = "Snowflake JDBC driver property name prefixed with 'SENSITIVE.' handled as a sensitive property.") + @DynamicProperty(name = "JDBC property name", + value = "Snowflake JDBC property value", + expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY, + description = "Snowflake JDBC driver property name and value applied to JDBC connections."), + @DynamicProperty(name = "SENSITIVE.JDBC property name", + value = "Snowflake JDBC property value", + expressionLanguageScope = ExpressionLanguageScope.NONE, + description = "Snowflake JDBC driver property name prefixed with 'SENSITIVE.' handled as a sensitive property.") }) @RequiresInstanceClassLoading public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool implements SnowflakeConnectionProviderService { @@ -76,7 +92,7 @@ public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool .build(); public static final PropertyDescriptor SNOWFLAKE_URL = new PropertyDescriptor.Builder() - .fromPropertyDescriptor(AbstractDBCPConnectionPool.DATABASE_URL) + .fromPropertyDescriptor(DBCPProperties.DATABASE_URL) .displayName("Snowflake URL") .description("Example connection string: jdbc:snowflake://[account].[region]" + ConnectionUrlFormat.SNOWFLAKE_HOST_SUFFIX + "/?[connection_params]" + " The connection parameters can include db=DATABASE_NAME to avoid using qualified table names such as DATABASE_NAME.PUBLIC.TABLE_NAME") @@ -110,13 +126,13 @@ public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool .build(); public static final PropertyDescriptor SNOWFLAKE_USER = new PropertyDescriptor.Builder() - .fromPropertyDescriptor(AbstractDBCPConnectionPool.DB_USER) + .fromPropertyDescriptor(DBCPProperties.DB_USER) .displayName("Username") .description("The Snowflake user name.") .build(); public static final PropertyDescriptor SNOWFLAKE_PASSWORD = new PropertyDescriptor.Builder() - .fromPropertyDescriptor(AbstractDBCPConnectionPool.DB_PASSWORD) + .fromPropertyDescriptor(DBCPProperties.DB_PASSWORD) .displayName("Password") .description("The password for the Snowflake user.") .build(); @@ -170,6 +186,34 @@ public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool } @Override + protected DataSourceConfiguration getDataSourceConfiguration(final ConfigurationContext context) { + final String url = getUrl(context); + final String driverName = context.getProperty(DB_DRIVERNAME).evaluateAttributeExpressions().getValue(); + final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue(); + final String password = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue(); + final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger(); + final String validationQuery = context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue(); + final Long maxWaitMillis = extractMillisWithInfinite(context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions()); + final Integer minIdle = context.getProperty(MIN_IDLE).evaluateAttributeExpressions().asInteger(); + final Integer maxIdle = context.getProperty(MAX_IDLE).evaluateAttributeExpressions().asInteger(); + final Long maxConnLifetimeMillis = extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME).evaluateAttributeExpressions()); + final Long timeBetweenEvictionRunsMillis = extractMillisWithInfinite(context.getProperty(EVICTION_RUN_PERIOD).evaluateAttributeExpressions()); + final Long minEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions()); + final Long softMinEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions()); + + return new DataSourceConfiguration.Builder(url, driverName, user, password) + .maxTotal(maxTotal) + .validationQuery(validationQuery) + .maxWaitMillis(maxWaitMillis) + .minIdle(minIdle) + .maxIdle(maxIdle) + .maxConnLifetimeMillis(maxConnLifetimeMillis) + .timeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis) + .minEvictableIdleTimeMillis(minEvictableIdleTimeMillis) + .softMinEvictableIdleTimeMillis(softMinEvictableIdleTimeMillis) + .build(); + } + protected String getUrl(final ConfigurationContext context) { final ConnectionUrlFormat connectionUrlFormat = ConnectionUrlFormat.forName(context.getProperty(CONNECTION_URL_FORMAT) .getValue()); @@ -194,7 +238,7 @@ public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool final String schema = context.getProperty(SnowflakeProperties.SCHEMA).evaluateAttributeExpressions().getValue(); final String warehouse = context.getProperty(SNOWFLAKE_WAREHOUSE).evaluateAttributeExpressions().getValue(); - final Map connectionProperties = new HashMap<>(); + final Map connectionProperties = super.getConnectionProperties(context); if (database != null) { connectionProperties.put("db", database); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java index 71176aa2d0..904b9229ab 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java @@ -25,24 +25,55 @@ import org.apache.nifi.annotation.behavior.SupportsSensitiveDynamicProperties; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.VerifiableControllerService; +import org.apache.nifi.dbcp.utils.DataSourceConfiguration; +import org.apache.nifi.expression.AttributeExpression; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.kerberos.KerberosCredentialsService; import org.apache.nifi.kerberos.KerberosUserService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.security.krb.KerberosKeytabUser; +import org.apache.nifi.security.krb.KerberosPasswordUser; +import org.apache.nifi.security.krb.KerberosUser; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.nifi.dbcp.utils.DBCPProperties.DATABASE_URL; +import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_DRIVERNAME; +import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_DRIVER_LOCATION; +import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_PASSWORD; +import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_USER; +import static org.apache.nifi.dbcp.utils.DBCPProperties.EVICTION_RUN_PERIOD; +import static org.apache.nifi.dbcp.utils.DBCPProperties.KERBEROS_USER_SERVICE; +import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_CONN_LIFETIME; +import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_IDLE; +import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_TOTAL_CONNECTIONS; +import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_WAIT_TIME; +import static org.apache.nifi.dbcp.utils.DBCPProperties.MIN_EVICTABLE_IDLE_TIME; +import static org.apache.nifi.dbcp.utils.DBCPProperties.MIN_IDLE; +import static org.apache.nifi.dbcp.utils.DBCPProperties.SOFT_MIN_EVICTABLE_IDLE_TIME; +import static org.apache.nifi.dbcp.utils.DBCPProperties.VALIDATION_QUERY; +import static org.apache.nifi.dbcp.utils.DBCPProperties.extractMillisWithInfinite; /** * Implementation of for Database Connection Pooling Service. Apache DBCP is used for connection pooling functionality. - * */ @SupportsSensitiveDynamicProperties -@Tags({ "dbcp", "jdbc", "database", "connection", "pooling", "store" }) +@Tags({"dbcp", "jdbc", "database", "connection", "pooling", "store"}) @CapabilityDescription("Provides Database Connection Pooling Service. Connections can be asked from pool and returned after usage.") @DynamicProperties({ @DynamicProperty(name = "JDBC property name", @@ -56,9 +87,40 @@ import java.util.List; }) @RequiresInstanceClassLoading public class DBCPConnectionPool extends AbstractDBCPConnectionPool implements DBCPService, VerifiableControllerService { + /** + * Property Name Prefix for Sensitive Dynamic Properties + */ + protected static final String SENSITIVE_PROPERTY_PREFIX = "SENSITIVE."; private static final List PROPERTIES; + public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder() + .name("kerberos-credentials-service") + .displayName("Kerberos Credentials Service") + .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos") + .identifiesControllerService(KerberosCredentialsService.class) + .required(false) + .build(); + + public static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder() + .name("kerberos-principal") + .displayName("Kerberos Principal") + .description("The principal to use when specifying the principal and password directly in the processor for authenticating via Kerberos.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING)) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor KERBEROS_PASSWORD = new PropertyDescriptor.Builder() + .name("kerberos-password") + .displayName("Kerberos Password") + .description("The password to use when specifying the principal and password directly in the processor for authenticating via Kerberos.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .sensitive(true) + .build(); + static { final List props = new ArrayList<>(); props.add(DATABASE_URL); @@ -144,4 +206,109 @@ public class DBCPConnectionPool extends AbstractDBCPConnectionPool implements DB BasicDataSource getDataSource() { return dataSource; } + + @Override + protected DataSourceConfiguration getDataSourceConfiguration(ConfigurationContext context) { + final String url = context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue(); + final String driverName = context.getProperty(DB_DRIVERNAME).evaluateAttributeExpressions().getValue(); + final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue(); + final String password = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue(); + final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger(); + final String validationQuery = context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue(); + final Long maxWaitMillis = extractMillisWithInfinite(context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions()); + final Integer minIdle = context.getProperty(MIN_IDLE).evaluateAttributeExpressions().asInteger(); + final Integer maxIdle = context.getProperty(MAX_IDLE).evaluateAttributeExpressions().asInteger(); + final Long maxConnLifetimeMillis = extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME).evaluateAttributeExpressions()); + final Long timeBetweenEvictionRunsMillis = extractMillisWithInfinite(context.getProperty(EVICTION_RUN_PERIOD).evaluateAttributeExpressions()); + final Long minEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions()); + final Long softMinEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions()); + + return new DataSourceConfiguration.Builder(url, driverName, user, password) + .maxTotal(maxTotal) + .validationQuery(validationQuery) + .maxWaitMillis(maxWaitMillis) + .minIdle(minIdle) + .maxIdle(maxIdle) + .maxConnLifetimeMillis(maxConnLifetimeMillis) + .timeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis) + .minEvictableIdleTimeMillis(minEvictableIdleTimeMillis) + .softMinEvictableIdleTimeMillis(softMinEvictableIdleTimeMillis) + .build(); + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + final PropertyDescriptor.Builder builder = new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .required(false) + .dynamic(true) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) + .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR); + + if (propertyDescriptorName.startsWith(SENSITIVE_PROPERTY_PREFIX)) { + builder.sensitive(true).expressionLanguageSupported(ExpressionLanguageScope.NONE); + } else { + builder.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY); + } + + return builder.build(); + } + + @Override + protected Map getConnectionProperties(ConfigurationContext context) { + return getDynamicProperties(context) + .stream() + .map(descriptor -> { + final PropertyValue propertyValue = context.getProperty(descriptor); + if (descriptor.isSensitive()) { + final String propertyName = StringUtils.substringAfter(descriptor.getName(), SENSITIVE_PROPERTY_PREFIX); + return new AbstractMap.SimpleEntry<>(propertyName, propertyValue.getValue()); + } else { + return new AbstractMap.SimpleEntry<>(descriptor.getName(), propertyValue.evaluateAttributeExpressions().getValue()); + } + }) + .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue)); + } + + @Override + protected Driver getDriver(final String driverName, final String url) { + final Class clazz; + + try { + clazz = Class.forName(driverName); + } catch (final ClassNotFoundException e) { + throw new ProcessException("Driver class " + driverName + " is not found", e); + } + + try { + return DriverManager.getDriver(url); + } catch (final SQLException e) { + // In case the driver is not registered by the implementation, we explicitly try to register it. + try { + final Driver driver = (Driver) clazz.newInstance(); + DriverManager.registerDriver(driver); + return DriverManager.getDriver(url); + } catch (final SQLException e2) { + throw new ProcessException("No suitable driver for the given Database Connection URL", e2); + } catch (final IllegalAccessException | InstantiationException e2) { + throw new ProcessException("Creating driver instance is failed", e2); + } + } + } + + @Override + protected KerberosUser getKerberosUserByCredentials(ConfigurationContext context) { + KerberosUser kerberosUser = super.getKerberosUserByCredentials(context); + if (kerberosUser == null) { + final KerberosCredentialsService kerberosCredentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); + final String kerberosPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue(); + final String kerberosPassword = context.getProperty(KERBEROS_PASSWORD).getValue(); + if (kerberosCredentialsService != null) { + kerberosUser = new KerberosKeytabUser(kerberosCredentialsService.getPrincipal(), kerberosCredentialsService.getKeytab()); + } else if (!StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword)) { + kerberosUser = new KerberosPasswordUser(kerberosPrincipal, kerberosPassword); + } + } + return kerberosUser; + } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/groovy/org/apache/nifi/record/sink/db/DatabaseRecordSinkTest.groovy b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/groovy/org/apache/nifi/record/sink/db/DatabaseRecordSinkTest.groovy index 8f373e20ae..5b1c72eed8 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/groovy/org/apache/nifi/record/sink/db/DatabaseRecordSinkTest.groovy +++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/groovy/org/apache/nifi/record/sink/db/DatabaseRecordSinkTest.groovy @@ -51,24 +51,24 @@ import java.sql.SQLException import java.sql.SQLNonTransientConnectionException import java.sql.Statement -import static org.apache.nifi.dbcp.DBCPConnectionPool.DATABASE_URL -import static org.apache.nifi.dbcp.DBCPConnectionPool.DB_DRIVERNAME -import static org.apache.nifi.dbcp.DBCPConnectionPool.DB_DRIVER_LOCATION -import static org.apache.nifi.dbcp.DBCPConnectionPool.DB_PASSWORD -import static org.apache.nifi.dbcp.DBCPConnectionPool.DB_USER -import static org.apache.nifi.dbcp.DBCPConnectionPool.EVICTION_RUN_PERIOD +import static org.apache.nifi.dbcp.utils.DBCPProperties.DATABASE_URL +import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_DRIVERNAME +import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_DRIVER_LOCATION +import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_PASSWORD +import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_USER +import static org.apache.nifi.dbcp.utils.DBCPProperties.EVICTION_RUN_PERIOD import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_CREDENTIALS_SERVICE -import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_PASSWORD import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_PRINCIPAL -import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_USER_SERVICE -import static org.apache.nifi.dbcp.DBCPConnectionPool.MAX_CONN_LIFETIME -import static org.apache.nifi.dbcp.DBCPConnectionPool.MAX_IDLE -import static org.apache.nifi.dbcp.DBCPConnectionPool.MAX_TOTAL_CONNECTIONS -import static org.apache.nifi.dbcp.DBCPConnectionPool.MAX_WAIT_TIME -import static org.apache.nifi.dbcp.DBCPConnectionPool.MIN_EVICTABLE_IDLE_TIME -import static org.apache.nifi.dbcp.DBCPConnectionPool.MIN_IDLE -import static org.apache.nifi.dbcp.DBCPConnectionPool.SOFT_MIN_EVICTABLE_IDLE_TIME -import static org.apache.nifi.dbcp.DBCPConnectionPool.VALIDATION_QUERY +import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_PASSWORD +import static org.apache.nifi.dbcp.utils.DBCPProperties.KERBEROS_USER_SERVICE +import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_CONN_LIFETIME +import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_IDLE +import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_TOTAL_CONNECTIONS +import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_WAIT_TIME +import static org.apache.nifi.dbcp.utils.DBCPProperties.MIN_EVICTABLE_IDLE_TIME +import static org.apache.nifi.dbcp.utils.DBCPProperties.MIN_IDLE +import static org.apache.nifi.dbcp.utils.DBCPProperties.SOFT_MIN_EVICTABLE_IDLE_TIME +import static org.apache.nifi.dbcp.utils.DBCPProperties.VALIDATION_QUERY import static org.junit.jupiter.api.Assertions.assertEquals import static org.junit.jupiter.api.Assertions.assertFalse import static org.junit.jupiter.api.Assertions.assertNotNull diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java index 4074567dc6..82d6c307d4 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.dbcp; +import org.apache.nifi.dbcp.utils.DBCPProperties; import org.apache.nifi.kerberos.KerberosCredentialsService; import org.apache.nifi.kerberos.KerberosUserService; import org.apache.nifi.kerberos.MockKerberosCredentialsService; @@ -83,10 +84,10 @@ public class DBCPServiceTest { runner.addControllerService(SERVICE_ID, service); final String url = String.format("jdbc:derby:%s;create=true", databaseDirectory); - runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, url); - runner.setProperty(service, DBCPConnectionPool.DB_USER, String.class.getSimpleName()); - runner.setProperty(service, DBCPConnectionPool.DB_PASSWORD, String.class.getName()); - runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, "org.apache.derby.jdbc.EmbeddedDriver"); + runner.setProperty(service, DBCPProperties.DATABASE_URL, url); + runner.setProperty(service, DBCPProperties.DB_USER, String.class.getSimpleName()); + runner.setProperty(service, DBCPProperties.DB_PASSWORD, String.class.getName()); + runner.setProperty(service, DBCPProperties.DB_DRIVERNAME, "org.apache.derby.jdbc.EmbeddedDriver"); } @AfterEach @@ -117,7 +118,7 @@ public class DBCPServiceTest { // kerberos credential service with kerberos user service is invalid final KerberosUserService kerberosUserService = enableKerberosUserService(runner); - runner.setProperty(service, DBCPConnectionPool.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier()); + runner.setProperty(service, DBCPProperties.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier()); runner.assertNotValid(service); // kerberos user service by itself is valid @@ -132,7 +133,7 @@ public class DBCPServiceTest { @Test public void testNotValidWithNegativeMinIdleProperty() { - runner.setProperty(service, DBCPConnectionPool.MIN_IDLE, "-1"); + runner.setProperty(service, DBCPProperties.MIN_IDLE, "-1"); runner.assertNotValid(service); } @@ -185,9 +186,9 @@ public class DBCPServiceTest { runner.enableControllerService(kerberosCredentialsService); // set fake Derby database connection url - runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, "jdbc:derby://localhost:1527/NoDB"); + runner.setProperty(service, DBCPProperties.DATABASE_URL, "jdbc:derby://localhost:1527/NoDB"); // Use the client driver here rather than the embedded one, as it will generate a ConnectException for the test - runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, "org.apache.derby.jdbc.ClientDriver"); + runner.setProperty(service, DBCPProperties.DB_DRIVERNAME, "org.apache.derby.jdbc.ClientDriver"); runner.setProperty(service, DBCPConnectionPool.KERBEROS_CREDENTIALS_SERVICE, kerberosServiceId); try { @@ -202,7 +203,7 @@ public class DBCPServiceTest { @Test public void testGetConnection() throws SQLException { - runner.setProperty(service, DBCPConnectionPool.MAX_TOTAL_CONNECTIONS, "2"); + runner.setProperty(service, DBCPProperties.MAX_TOTAL_CONNECTIONS, "2"); runner.enableControllerService(service); runner.assertValid(service); @@ -216,8 +217,8 @@ public class DBCPServiceTest { @Test public void testGetConnectionMaxTotalConnectionsExceeded() { - runner.setProperty(service, DBCPConnectionPool.MAX_TOTAL_CONNECTIONS, "1"); - runner.setProperty(service, DBCPConnectionPool.MAX_WAIT_TIME, "1 ms"); + runner.setProperty(service, DBCPProperties.MAX_TOTAL_CONNECTIONS, "1"); + runner.setProperty(service, DBCPProperties.MAX_WAIT_TIME, "1 ms"); runner.enableControllerService(service); runner.assertValid(service); @@ -228,13 +229,13 @@ public class DBCPServiceTest { @Test public void testGetDataSourceProperties() throws SQLException { - runner.setProperty(service, DBCPConnectionPool.MAX_WAIT_TIME, "-1"); - runner.setProperty(service, DBCPConnectionPool.MAX_IDLE, "6"); - runner.setProperty(service, DBCPConnectionPool.MIN_IDLE, "4"); - runner.setProperty(service, DBCPConnectionPool.MAX_CONN_LIFETIME, "1 secs"); - runner.setProperty(service, DBCPConnectionPool.EVICTION_RUN_PERIOD, "1 secs"); - runner.setProperty(service, DBCPConnectionPool.MIN_EVICTABLE_IDLE_TIME, "1 secs"); - runner.setProperty(service, DBCPConnectionPool.SOFT_MIN_EVICTABLE_IDLE_TIME, "1 secs"); + runner.setProperty(service, DBCPProperties.MAX_WAIT_TIME, "-1"); + runner.setProperty(service, DBCPProperties.MAX_IDLE, "6"); + runner.setProperty(service, DBCPProperties.MIN_IDLE, "4"); + runner.setProperty(service, DBCPProperties.MAX_CONN_LIFETIME, "1 secs"); + runner.setProperty(service, DBCPProperties.EVICTION_RUN_PERIOD, "1 secs"); + runner.setProperty(service, DBCPProperties.MIN_EVICTABLE_IDLE_TIME, "1 secs"); + runner.setProperty(service, DBCPProperties.SOFT_MIN_EVICTABLE_IDLE_TIME, "1 secs"); runner.enableControllerService(service);