mirror of https://github.com/apache/nifi.git
NIFI-11151: Improving code reusability of DBCP services
This closes #6935. Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
parent
5cf909ccf6
commit
13d343d5ee
|
@ -16,290 +16,43 @@
|
|||
*/
|
||||
package org.apache.nifi.dbcp;
|
||||
|
||||
import java.util.HashMap;
|
||||
import org.apache.commons.dbcp2.BasicDataSource;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
|
||||
import org.apache.nifi.annotation.lifecycle.OnDisabled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||
import org.apache.nifi.components.ConfigVerificationResult;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.PropertyValue;
|
||||
import org.apache.nifi.components.resource.ResourceCardinality;
|
||||
import org.apache.nifi.components.resource.ResourceType;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.controller.VerifiableControllerService;
|
||||
import org.apache.nifi.expression.AttributeExpression;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.kerberos.KerberosCredentialsService;
|
||||
import org.apache.nifi.dbcp.utils.DataSourceConfiguration;
|
||||
import org.apache.nifi.kerberos.KerberosUserService;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.security.krb.KerberosAction;
|
||||
import org.apache.nifi.security.krb.KerberosKeytabUser;
|
||||
import org.apache.nifi.security.krb.KerberosLoginException;
|
||||
import org.apache.nifi.security.krb.KerberosPasswordUser;
|
||||
import org.apache.nifi.security.krb.KerberosUser;
|
||||
|
||||
import javax.security.auth.login.LoginException;
|
||||
import java.sql.Connection;
|
||||
import java.sql.Driver;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.nifi.components.ConfigVerificationResult.Outcome.FAILED;
|
||||
import static org.apache.nifi.components.ConfigVerificationResult.Outcome.SUCCESSFUL;
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_DRIVER_LOCATION;
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.KERBEROS_USER_SERVICE;
|
||||
|
||||
/**
|
||||
* Abstract base class for Database Connection Pooling Services using Apache Commons DBCP as the underlying connection pool implementation.
|
||||
*
|
||||
*/
|
||||
public abstract class AbstractDBCPConnectionPool extends AbstractControllerService implements DBCPService, VerifiableControllerService {
|
||||
/** Property Name Prefix for Sensitive Dynamic Properties */
|
||||
protected static final String SENSITIVE_PROPERTY_PREFIX = "SENSITIVE.";
|
||||
|
||||
/**
|
||||
* Copied from {@link GenericObjectPoolConfig#DEFAULT_MIN_IDLE} in Commons-DBCP 2.7.0
|
||||
*/
|
||||
private static final String DEFAULT_MIN_IDLE = "0";
|
||||
/**
|
||||
* Copied from {@link GenericObjectPoolConfig#DEFAULT_MAX_IDLE} in Commons-DBCP 2.7.0
|
||||
*/
|
||||
private static final String DEFAULT_MAX_IDLE = "8";
|
||||
/**
|
||||
* Copied from private variable {@link BasicDataSource#maxConnLifetimeMillis} in Commons-DBCP 2.7.0
|
||||
*/
|
||||
private static final String DEFAULT_MAX_CONN_LIFETIME = "-1";
|
||||
/**
|
||||
* Copied from {@link GenericObjectPoolConfig#DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS} in Commons-DBCP 2.7.0
|
||||
*/
|
||||
private static final String DEFAULT_EVICTION_RUN_PERIOD = String.valueOf(-1L);
|
||||
/**
|
||||
* Copied from {@link GenericObjectPoolConfig#DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS} in Commons-DBCP 2.7.0
|
||||
* and converted from 1800000L to "1800000 millis" to "30 mins"
|
||||
*/
|
||||
private static final String DEFAULT_MIN_EVICTABLE_IDLE_TIME = "30 mins";
|
||||
/**
|
||||
* Copied from {@link GenericObjectPoolConfig#DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME_MILLIS} in Commons-DBCP 2.7.0
|
||||
*/
|
||||
private static final String DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME = String.valueOf(-1L);
|
||||
|
||||
public static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder()
|
||||
.name("Database Connection URL")
|
||||
.description("A database connection URL used to connect to a database. May contain database system name, host, port, database name and some parameters."
|
||||
+ " The exact syntax of a database connection URL is specified by your DBMS.")
|
||||
.defaultValue(null)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.required(true)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor DB_DRIVERNAME = new PropertyDescriptor.Builder()
|
||||
.name("Database Driver Class Name")
|
||||
.description("Database driver class name")
|
||||
.defaultValue(null)
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor DB_DRIVER_LOCATION = new PropertyDescriptor.Builder()
|
||||
.name("database-driver-locations")
|
||||
.displayName("Database Driver Location(s)")
|
||||
.description("Comma-separated list of files/folders and/or URLs containing the driver JAR and its dependencies (if any). For example '/var/tmp/mariadb-java-client-1.1.7.jar'")
|
||||
.defaultValue(null)
|
||||
.required(false)
|
||||
.identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, ResourceType.DIRECTORY, ResourceType.URL)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.dynamicallyModifiesClasspath(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder()
|
||||
.name("Database User")
|
||||
.description("Database user name")
|
||||
.defaultValue(null)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor DB_PASSWORD = new PropertyDescriptor.Builder()
|
||||
.name("Password")
|
||||
.description("The password for the database user")
|
||||
.defaultValue(null)
|
||||
.required(false)
|
||||
.sensitive(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor MAX_WAIT_TIME = new PropertyDescriptor.Builder()
|
||||
.name("Max Wait Time")
|
||||
.description("The maximum amount of time that the pool will wait (when there are no available connections) "
|
||||
+ " for a connection to be returned before failing, or -1 to wait indefinitely. ")
|
||||
.defaultValue("500 millis")
|
||||
.required(true)
|
||||
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.sensitive(false)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor MAX_TOTAL_CONNECTIONS = new PropertyDescriptor.Builder()
|
||||
.name("Max Total Connections")
|
||||
.description("The maximum number of active connections that can be allocated from this pool at the same time, "
|
||||
+ " or negative for no limit.")
|
||||
.defaultValue("8")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.INTEGER_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.sensitive(false)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor VALIDATION_QUERY = new PropertyDescriptor.Builder()
|
||||
.name("Validation-query")
|
||||
.displayName("Validation query")
|
||||
.description("Validation query used to validate connections before returning them. "
|
||||
+ "When connection is invalid, it gets dropped and new valid connection will be returned. "
|
||||
+ "Note!! Using validation might have some performance penalty.")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor MIN_IDLE = new PropertyDescriptor.Builder()
|
||||
.displayName("Minimum Idle Connections")
|
||||
.name("dbcp-min-idle-conns")
|
||||
.description("The minimum number of connections that can remain idle in the pool without extra ones being " +
|
||||
"created. Set to or zero to allow no idle connections.")
|
||||
.defaultValue(DEFAULT_MIN_IDLE)
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor MAX_IDLE = new PropertyDescriptor.Builder()
|
||||
.displayName("Max Idle Connections")
|
||||
.name("dbcp-max-idle-conns")
|
||||
.description("The maximum number of connections that can remain idle in the pool without extra ones being " +
|
||||
"released. Set to any negative value to allow unlimited idle connections.")
|
||||
.defaultValue(DEFAULT_MAX_IDLE)
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.INTEGER_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor MAX_CONN_LIFETIME = new PropertyDescriptor.Builder()
|
||||
.displayName("Max Connection Lifetime")
|
||||
.name("dbcp-max-conn-lifetime")
|
||||
.description("The maximum lifetime in milliseconds of a connection. After this time is exceeded the " +
|
||||
"connection will fail the next activation, passivation or validation test. A value of zero or less " +
|
||||
"means the connection has an infinite lifetime.")
|
||||
.defaultValue(DEFAULT_MAX_CONN_LIFETIME)
|
||||
.required(false)
|
||||
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor EVICTION_RUN_PERIOD = new PropertyDescriptor.Builder()
|
||||
.displayName("Time Between Eviction Runs")
|
||||
.name("dbcp-time-between-eviction-runs")
|
||||
.description("The number of milliseconds to sleep between runs of the idle connection evictor thread. When " +
|
||||
"non-positive, no idle connection evictor thread will be run.")
|
||||
.defaultValue(DEFAULT_EVICTION_RUN_PERIOD)
|
||||
.required(false)
|
||||
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor MIN_EVICTABLE_IDLE_TIME = new PropertyDescriptor.Builder()
|
||||
.displayName("Minimum Evictable Idle Time")
|
||||
.name("dbcp-min-evictable-idle-time")
|
||||
.description("The minimum amount of time a connection may sit idle in the pool before it is eligible for eviction.")
|
||||
.defaultValue(DEFAULT_MIN_EVICTABLE_IDLE_TIME)
|
||||
.required(false)
|
||||
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor SOFT_MIN_EVICTABLE_IDLE_TIME = new PropertyDescriptor.Builder()
|
||||
.displayName("Soft Minimum Evictable Idle Time")
|
||||
.name("dbcp-soft-min-evictable-idle-time")
|
||||
.description("The minimum amount of time a connection may sit idle in the pool before it is eligible for " +
|
||||
"eviction by the idle connection evictor, with the extra condition that at least a minimum number of" +
|
||||
" idle connections remain in the pool. When the not-soft version of this option is set to a positive" +
|
||||
" value, it is examined first by the idle connection evictor: when idle connections are visited by " +
|
||||
"the evictor, idle time is first compared against it (without considering the number of idle " +
|
||||
"connections in the pool) and then against this soft option, including the minimum idle connections " +
|
||||
"constraint.")
|
||||
.defaultValue(DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME)
|
||||
.required(false)
|
||||
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("kerberos-credentials-service")
|
||||
.displayName("Kerberos Credentials Service")
|
||||
.description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
|
||||
.identifiesControllerService(KerberosCredentialsService.class)
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("kerberos-user-service")
|
||||
.displayName("Kerberos User Service")
|
||||
.description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos")
|
||||
.identifiesControllerService(KerberosUserService.class)
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder()
|
||||
.name("kerberos-principal")
|
||||
.displayName("Kerberos Principal")
|
||||
.description("The principal to use when specifying the principal and password directly in the processor for authenticating via Kerberos.")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor KERBEROS_PASSWORD = new PropertyDescriptor.Builder()
|
||||
.name("kerberos-password")
|
||||
.displayName("Kerberos Password")
|
||||
.description("The password to use when specifying the principal and password directly in the processor for authenticating via Kerberos.")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.sensitive(true)
|
||||
.build();
|
||||
|
||||
protected volatile BasicDataSource dataSource;
|
||||
protected volatile KerberosUser kerberosUser;
|
||||
|
||||
@Override
|
||||
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
|
||||
final PropertyDescriptor.Builder builder = new PropertyDescriptor.Builder()
|
||||
.name(propertyDescriptorName)
|
||||
.required(false)
|
||||
.dynamic(true)
|
||||
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
|
||||
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR);
|
||||
|
||||
if (propertyDescriptorName.startsWith(SENSITIVE_PROPERTY_PREFIX)) {
|
||||
builder.sensitive(true).expressionLanguageSupported(ExpressionLanguageScope.NONE);
|
||||
} else {
|
||||
builder.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY);
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ConfigVerificationResult> verify(final ConfigurationContext context, final ComponentLog verificationLogger, final Map<String, String> variables) {
|
||||
List<ConfigVerificationResult> results = new ArrayList<>();
|
||||
|
@ -325,7 +78,8 @@ public abstract class AbstractDBCPConnectionPool extends AbstractControllerServi
|
|||
|
||||
final BasicDataSource dataSource = new BasicDataSource();
|
||||
try {
|
||||
configureDataSource(dataSource, kerberosUser, context);
|
||||
final DataSourceConfiguration configuration = getDataSourceConfiguration(context);
|
||||
configureDataSource(context, configuration);
|
||||
results.add(new ConfigVerificationResult.Builder()
|
||||
.verificationStepName("Configure Data Source")
|
||||
.outcome(SUCCESSFUL)
|
||||
|
@ -365,7 +119,6 @@ public abstract class AbstractDBCPConnectionPool extends AbstractControllerServi
|
|||
verificationLogger.error("Failed to shut down data source", e);
|
||||
}
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
|
@ -373,40 +126,24 @@ public abstract class AbstractDBCPConnectionPool extends AbstractControllerServi
|
|||
* Configures connection pool by creating an instance of the
|
||||
* {@link BasicDataSource} based on configuration provided with
|
||||
* {@link ConfigurationContext}.
|
||||
*
|
||||
* <p>
|
||||
* This operation makes no guarantees that the actual connection could be
|
||||
* made since the underlying system may still go off-line during normal
|
||||
* operation of the connection pool.
|
||||
*
|
||||
* @param context
|
||||
* the configuration context
|
||||
* @throws InitializationException
|
||||
* if unable to create a database connection
|
||||
* @param context the configuration context
|
||||
* @throws InitializationException if unable to create a database connection
|
||||
*/
|
||||
@OnEnabled
|
||||
public void onConfigured(final ConfigurationContext context) throws InitializationException {
|
||||
kerberosUser = getKerberosUser(context);
|
||||
dataSource = new BasicDataSource();
|
||||
configureDataSource(dataSource, kerberosUser, context);
|
||||
kerberosUser = getKerberosUser(context);
|
||||
loginKerberos(kerberosUser);
|
||||
final DataSourceConfiguration configuration = getDataSourceConfiguration(context);
|
||||
configureDataSource(context, configuration);
|
||||
}
|
||||
|
||||
private void configureDataSource(final BasicDataSource dataSource, final KerberosUser kerberosUser,
|
||||
final ConfigurationContext context) throws InitializationException {
|
||||
final String dburl = getUrl(context);
|
||||
|
||||
final String driverName = context.getProperty(DB_DRIVERNAME).evaluateAttributeExpressions().getValue();
|
||||
final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
|
||||
final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
|
||||
final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger();
|
||||
final String validationQuery = context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue();
|
||||
final Long maxWaitMillis = extractMillisWithInfinite(context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions());
|
||||
final Integer minIdle = context.getProperty(MIN_IDLE).evaluateAttributeExpressions().asInteger();
|
||||
final Integer maxIdle = context.getProperty(MAX_IDLE).evaluateAttributeExpressions().asInteger();
|
||||
final Long maxConnLifetimeMillis = extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME).evaluateAttributeExpressions());
|
||||
final Long timeBetweenEvictionRunsMillis = extractMillisWithInfinite(context.getProperty(EVICTION_RUN_PERIOD).evaluateAttributeExpressions());
|
||||
final Long minEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
|
||||
final Long softMinEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
|
||||
|
||||
private void loginKerberos(KerberosUser kerberosUser) throws InitializationException {
|
||||
if (kerberosUser != null) {
|
||||
try {
|
||||
kerberosUser.login();
|
||||
|
@ -414,136 +151,71 @@ public abstract class AbstractDBCPConnectionPool extends AbstractControllerServi
|
|||
throw new InitializationException("Unable to authenticate Kerberos principal", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
dataSource.setDriver(getDriver(driverName, dburl));
|
||||
dataSource.setMaxWaitMillis(maxWaitMillis);
|
||||
dataSource.setMaxTotal(maxTotal);
|
||||
dataSource.setMinIdle(minIdle);
|
||||
dataSource.setMaxIdle(maxIdle);
|
||||
dataSource.setMaxConnLifetimeMillis(maxConnLifetimeMillis);
|
||||
dataSource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
|
||||
dataSource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
|
||||
dataSource.setSoftMinEvictableIdleTimeMillis(softMinEvictableIdleTimeMillis);
|
||||
protected abstract Driver getDriver(final String driverName, final String url);
|
||||
|
||||
if (validationQuery != null && !validationQuery.isEmpty()) {
|
||||
protected abstract DataSourceConfiguration getDataSourceConfiguration(final ConfigurationContext context);
|
||||
|
||||
protected void configureDataSource(final ConfigurationContext context, final DataSourceConfiguration configuration) {
|
||||
final Driver driver = getDriver(configuration.getDriverName(), configuration.getUrl());
|
||||
|
||||
dataSource.setDriver(driver);
|
||||
dataSource.setMaxWaitMillis(configuration.getMaxWaitMillis());
|
||||
dataSource.setMaxTotal(configuration.getMaxTotal());
|
||||
dataSource.setMinIdle(configuration.getMinIdle());
|
||||
dataSource.setMaxIdle(configuration.getMaxIdle());
|
||||
dataSource.setMaxConnLifetimeMillis(configuration.getMaxConnLifetimeMillis());
|
||||
dataSource.setTimeBetweenEvictionRunsMillis(configuration.getTimeBetweenEvictionRunsMillis());
|
||||
dataSource.setMinEvictableIdleTimeMillis(configuration.getMinEvictableIdleTimeMillis());
|
||||
dataSource.setSoftMinEvictableIdleTimeMillis(configuration.getSoftMinEvictableIdleTimeMillis());
|
||||
|
||||
final String validationQuery = configuration.getValidationQuery();
|
||||
if (StringUtils.isNotBlank(validationQuery)) {
|
||||
dataSource.setValidationQuery(validationQuery);
|
||||
dataSource.setTestOnBorrow(true);
|
||||
}
|
||||
|
||||
dataSource.setUrl(dburl);
|
||||
dataSource.setUsername(user);
|
||||
dataSource.setPassword(passw);
|
||||
|
||||
final List<PropertyDescriptor> dynamicProperties = context.getProperties()
|
||||
.keySet()
|
||||
.stream()
|
||||
.filter(PropertyDescriptor::isDynamic)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
dynamicProperties.forEach((descriptor) -> {
|
||||
final PropertyValue propertyValue = context.getProperty(descriptor);
|
||||
if (descriptor.isSensitive()) {
|
||||
final String propertyName = StringUtils.substringAfter(descriptor.getName(), SENSITIVE_PROPERTY_PREFIX);
|
||||
dataSource.addConnectionProperty(propertyName, propertyValue.getValue());
|
||||
} else {
|
||||
dataSource.addConnectionProperty(descriptor.getName(), propertyValue.evaluateAttributeExpressions().getValue());
|
||||
}
|
||||
});
|
||||
dataSource.setUrl(configuration.getUrl());
|
||||
dataSource.setUsername(configuration.getUserName());
|
||||
dataSource.setPassword(configuration.getPassword());
|
||||
|
||||
getConnectionProperties(context).forEach(dataSource::addConnectionProperty);
|
||||
}
|
||||
|
||||
private KerberosUser getKerberosUser(final ConfigurationContext context) {
|
||||
KerberosUser kerberosUser = null;
|
||||
final KerberosCredentialsService kerberosCredentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
|
||||
protected Map<String, String> getConnectionProperties(final ConfigurationContext context) {
|
||||
return getDynamicProperties(context)
|
||||
.stream()
|
||||
.collect(Collectors.toMap(PropertyDescriptor::getName, s -> {
|
||||
final PropertyValue propertyValue = context.getProperty(s);
|
||||
return propertyValue.evaluateAttributeExpressions().getValue();
|
||||
}));
|
||||
}
|
||||
|
||||
protected List<PropertyDescriptor> getDynamicProperties(final ConfigurationContext context) {
|
||||
return context.getProperties()
|
||||
.keySet()
|
||||
.stream()
|
||||
.filter(PropertyDescriptor::isDynamic)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
protected KerberosUser getKerberosUser(final ConfigurationContext context) {
|
||||
final KerberosUser kerberosUser;
|
||||
final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
|
||||
final String kerberosPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
|
||||
final String kerberosPassword = context.getProperty(KERBEROS_PASSWORD).getValue();
|
||||
|
||||
if (kerberosUserService != null) {
|
||||
kerberosUser = kerberosUserService.createKerberosUser();
|
||||
} else if (kerberosCredentialsService != null) {
|
||||
kerberosUser = new KerberosKeytabUser(kerberosCredentialsService.getPrincipal(), kerberosCredentialsService.getKeytab());
|
||||
} else if (!StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword)) {
|
||||
kerberosUser = new KerberosPasswordUser(kerberosPrincipal, kerberosPassword);
|
||||
} else {
|
||||
kerberosUser = getKerberosUserByCredentials(context);
|
||||
}
|
||||
return kerberosUser;
|
||||
}
|
||||
|
||||
protected String getUrl(ConfigurationContext context) {
|
||||
return context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue();
|
||||
protected KerberosUser getKerberosUserByCredentials(final ConfigurationContext context) {
|
||||
return null;
|
||||
}
|
||||
|
||||
protected Driver getDriver(final String driverName, final String url) {
|
||||
final Class<?> clazz;
|
||||
|
||||
try {
|
||||
clazz = Class.forName(driverName);
|
||||
} catch (final ClassNotFoundException e) {
|
||||
throw new ProcessException("Driver class " + driverName + " is not found", e);
|
||||
}
|
||||
|
||||
try {
|
||||
return DriverManager.getDriver(url);
|
||||
} catch (final SQLException e) {
|
||||
// In case the driver is not registered by the implementation, we explicitly try to register it.
|
||||
try {
|
||||
final Driver driver = (Driver) clazz.newInstance();
|
||||
DriverManager.registerDriver(driver);
|
||||
return DriverManager.getDriver(url);
|
||||
} catch (final SQLException e2) {
|
||||
throw new ProcessException("No suitable driver for the given Database Connection URL", e2);
|
||||
} catch (final IllegalAccessException | InstantiationException e2) {
|
||||
throw new ProcessException("Creating driver instance is failed", e2);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Override in subclasses to provide connection properties to the data source
|
||||
*
|
||||
* @return Key-value pairs that will be added as connection properties
|
||||
*/
|
||||
protected Map<String, String> getConnectionProperties(final ConfigurationContext context) {
|
||||
return new HashMap<>();
|
||||
}
|
||||
|
||||
protected Long extractMillisWithInfinite(PropertyValue prop) {
|
||||
return "-1".equals(prop.getValue()) ? -1 : prop.asTimePeriod(TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown pool, close all open connections.
|
||||
* If a principal is authenticated with a KDC, that principal is logged out.
|
||||
*
|
||||
* If a @{@link LoginException} occurs while attempting to log out the @{@link org.apache.nifi.security.krb.KerberosUser},
|
||||
* an attempt will still be made to shut down the pool and close open connections.
|
||||
*
|
||||
* @throws SQLException if there is an error while closing open connections
|
||||
* @throws LoginException if there is an error during the principal log out, and will only be thrown if there was
|
||||
* no exception while closing open connections
|
||||
*/
|
||||
@OnDisabled
|
||||
public void shutdown() throws SQLException {
|
||||
try {
|
||||
this.shutdown(dataSource, kerberosUser);
|
||||
} finally {
|
||||
kerberosUser = null;
|
||||
dataSource = null;
|
||||
}
|
||||
}
|
||||
|
||||
private void shutdown(final BasicDataSource dataSource, final KerberosUser kerberosUser) throws SQLException {
|
||||
try {
|
||||
if (kerberosUser != null) {
|
||||
kerberosUser.logout();
|
||||
}
|
||||
} finally {
|
||||
if (dataSource != null) {
|
||||
dataSource.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Connection getConnection() throws ProcessException {
|
||||
|
@ -554,7 +226,7 @@ public abstract class AbstractDBCPConnectionPool extends AbstractControllerServi
|
|||
try {
|
||||
final Connection con;
|
||||
if (kerberosUser != null) {
|
||||
KerberosAction<Connection> kerberosAction = new KerberosAction<>(kerberosUser, () -> dataSource.getConnection(), getLogger());
|
||||
KerberosAction<Connection> kerberosAction = new KerberosAction<>(kerberosUser, dataSource::getConnection, getLogger());
|
||||
con = kerberosAction.execute();
|
||||
} else {
|
||||
con = dataSource.getConnection();
|
||||
|
@ -574,8 +246,31 @@ public abstract class AbstractDBCPConnectionPool extends AbstractControllerServi
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return this.getClass().getSimpleName() + "[id=" + getIdentifier() + "]";
|
||||
/**
|
||||
* Shutdown pool, close all open connections.
|
||||
* If a principal is authenticated with a KDC, that principal is logged out.
|
||||
*
|
||||
* @throws SQLException if there is an error while closing open connections
|
||||
*/
|
||||
@OnDisabled
|
||||
public void shutdown() throws SQLException {
|
||||
try {
|
||||
shutdown(dataSource, kerberosUser);
|
||||
} finally {
|
||||
kerberosUser = null;
|
||||
dataSource = null;
|
||||
}
|
||||
}
|
||||
|
||||
private void shutdown(final BasicDataSource dataSource, final KerberosUser kerberosUser) throws SQLException {
|
||||
try {
|
||||
if (kerberosUser != null) {
|
||||
kerberosUser.logout();
|
||||
}
|
||||
} finally {
|
||||
if (dataSource != null) {
|
||||
dataSource.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,199 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.dbcp.utils;
|
||||
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.PropertyValue;
|
||||
import org.apache.nifi.components.resource.ResourceCardinality;
|
||||
import org.apache.nifi.components.resource.ResourceType;
|
||||
import org.apache.nifi.dbcp.DBCPValidator;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.kerberos.KerberosUserService;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public final class DBCPProperties {
|
||||
|
||||
private DBCPProperties() {
|
||||
}
|
||||
|
||||
public static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder()
|
||||
.name("Database Connection URL")
|
||||
.description("A database connection URL used to connect to a database. May contain database system name, host, port, database name and some parameters."
|
||||
+ " The exact syntax of a database connection URL is specified by your DBMS.")
|
||||
.defaultValue(null)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.required(true)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder()
|
||||
.name("Database User")
|
||||
.description("Database user name")
|
||||
.defaultValue(null)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor DB_PASSWORD = new PropertyDescriptor.Builder()
|
||||
.name("Password")
|
||||
.description("The password for the database user")
|
||||
.defaultValue(null)
|
||||
.required(false)
|
||||
.sensitive(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
|
||||
public static final PropertyDescriptor DB_DRIVERNAME = new PropertyDescriptor.Builder()
|
||||
.name("Database Driver Class Name")
|
||||
.description("Database driver class name")
|
||||
.defaultValue(null)
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor DB_DRIVER_LOCATION = new PropertyDescriptor.Builder()
|
||||
.name("database-driver-locations")
|
||||
.displayName("Database Driver Location(s)")
|
||||
.description("Comma-separated list of files/folders and/or URLs containing the driver JAR and its dependencies (if any). For example '/var/tmp/mariadb-java-client-1.1.7.jar'")
|
||||
.defaultValue(null)
|
||||
.required(false)
|
||||
.identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, ResourceType.DIRECTORY, ResourceType.URL)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.dynamicallyModifiesClasspath(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor MAX_WAIT_TIME = new PropertyDescriptor.Builder()
|
||||
.name("Max Wait Time")
|
||||
.description("The maximum amount of time that the pool will wait (when there are no available connections) "
|
||||
+ " for a connection to be returned before failing, or -1 to wait indefinitely. ")
|
||||
.defaultValue(DefaultDataSourceValues.MAX_WAIT_TIME.getValue())
|
||||
.required(true)
|
||||
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.sensitive(false)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor MAX_TOTAL_CONNECTIONS = new PropertyDescriptor.Builder()
|
||||
.name("Max Total Connections")
|
||||
.description("The maximum number of active connections that can be allocated from this pool at the same time, "
|
||||
+ " or negative for no limit.")
|
||||
.defaultValue(DefaultDataSourceValues.MAX_TOTAL_CONNECTIONS.getValue())
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.INTEGER_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.sensitive(false)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor VALIDATION_QUERY = new PropertyDescriptor.Builder()
|
||||
.name("Validation-query")
|
||||
.displayName("Validation query")
|
||||
.description("Validation query used to validate connections before returning them. "
|
||||
+ "When connection is invalid, it gets dropped and new valid connection will be returned. "
|
||||
+ "Note!! Using validation might have some performance penalty.")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor MIN_IDLE = new PropertyDescriptor.Builder()
|
||||
.displayName("Minimum Idle Connections")
|
||||
.name("dbcp-min-idle-conns")
|
||||
.description("The minimum number of connections that can remain idle in the pool without extra ones being " +
|
||||
"created. Set to or zero to allow no idle connections.")
|
||||
.defaultValue(DefaultDataSourceValues.MIN_IDLE.getValue())
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor MAX_IDLE = new PropertyDescriptor.Builder()
|
||||
.displayName("Max Idle Connections")
|
||||
.name("dbcp-max-idle-conns")
|
||||
.description("The maximum number of connections that can remain idle in the pool without extra ones being " +
|
||||
"released. Set to any negative value to allow unlimited idle connections.")
|
||||
.defaultValue(DefaultDataSourceValues.MAX_IDLE.getValue())
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.INTEGER_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor MAX_CONN_LIFETIME = new PropertyDescriptor.Builder()
|
||||
.displayName("Max Connection Lifetime")
|
||||
.name("dbcp-max-conn-lifetime")
|
||||
.description("The maximum lifetime in milliseconds of a connection. After this time is exceeded the " +
|
||||
"connection will fail the next activation, passivation or validation test. A value of zero or less " +
|
||||
"means the connection has an infinite lifetime.")
|
||||
.defaultValue(DefaultDataSourceValues.MAX_CONN_LIFETIME.getValue())
|
||||
.required(false)
|
||||
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor EVICTION_RUN_PERIOD = new PropertyDescriptor.Builder()
|
||||
.displayName("Time Between Eviction Runs")
|
||||
.name("dbcp-time-between-eviction-runs")
|
||||
.description("The number of milliseconds to sleep between runs of the idle connection evictor thread. When " +
|
||||
"non-positive, no idle connection evictor thread will be run.")
|
||||
.defaultValue(DefaultDataSourceValues.EVICTION_RUN_PERIOD.getValue())
|
||||
.required(false)
|
||||
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor MIN_EVICTABLE_IDLE_TIME = new PropertyDescriptor.Builder()
|
||||
.displayName("Minimum Evictable Idle Time")
|
||||
.name("dbcp-min-evictable-idle-time")
|
||||
.description("The minimum amount of time a connection may sit idle in the pool before it is eligible for eviction.")
|
||||
.defaultValue(DefaultDataSourceValues.MIN_EVICTABLE_IDLE_TIME.getValue())
|
||||
.required(false)
|
||||
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor SOFT_MIN_EVICTABLE_IDLE_TIME = new PropertyDescriptor.Builder()
|
||||
.displayName("Soft Minimum Evictable Idle Time")
|
||||
.name("dbcp-soft-min-evictable-idle-time")
|
||||
.description("The minimum amount of time a connection may sit idle in the pool before it is eligible for " +
|
||||
"eviction by the idle connection evictor, with the extra condition that at least a minimum number of" +
|
||||
" idle connections remain in the pool. When the not-soft version of this option is set to a positive" +
|
||||
" value, it is examined first by the idle connection evictor: when idle connections are visited by " +
|
||||
"the evictor, idle time is first compared against it (without considering the number of idle " +
|
||||
"connections in the pool) and then against this soft option, including the minimum idle connections " +
|
||||
"constraint.")
|
||||
.defaultValue(DefaultDataSourceValues.SOFT_MIN_EVICTABLE_IDLE_TIME.getValue())
|
||||
.required(false)
|
||||
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("kerberos-user-service")
|
||||
.displayName("Kerberos User Service")
|
||||
.description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos")
|
||||
.identifiesControllerService(KerberosUserService.class)
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
public static Long extractMillisWithInfinite(PropertyValue prop) {
|
||||
return "-1".equals(prop.getValue()) ? -1 : prop.asTimePeriod(TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,174 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.dbcp.utils;
|
||||
|
||||
public class DataSourceConfiguration {
|
||||
|
||||
private final String url;
|
||||
private final String driverName;
|
||||
private final String userName;
|
||||
private final String password;
|
||||
private final long maxWaitMillis;
|
||||
private final int maxTotal;
|
||||
private final int minIdle;
|
||||
private final int maxIdle;
|
||||
private final long maxConnLifetimeMillis;
|
||||
private final long timeBetweenEvictionRunsMillis;
|
||||
private final long minEvictableIdleTimeMillis;
|
||||
private final long softMinEvictableIdleTimeMillis;
|
||||
private final String validationQuery;
|
||||
|
||||
public String getUrl() {
|
||||
return url;
|
||||
}
|
||||
|
||||
public String getDriverName() {
|
||||
return driverName;
|
||||
}
|
||||
|
||||
public String getUserName() {
|
||||
return userName;
|
||||
}
|
||||
|
||||
public String getPassword() {
|
||||
return password;
|
||||
}
|
||||
|
||||
public long getMaxWaitMillis() {
|
||||
return maxWaitMillis;
|
||||
}
|
||||
|
||||
public int getMaxTotal() {
|
||||
return maxTotal;
|
||||
}
|
||||
|
||||
public int getMinIdle() {
|
||||
return minIdle;
|
||||
}
|
||||
|
||||
public int getMaxIdle() {
|
||||
return maxIdle;
|
||||
}
|
||||
|
||||
public long getMaxConnLifetimeMillis() {
|
||||
return maxConnLifetimeMillis;
|
||||
}
|
||||
|
||||
public long getTimeBetweenEvictionRunsMillis() {
|
||||
return timeBetweenEvictionRunsMillis;
|
||||
}
|
||||
|
||||
public long getMinEvictableIdleTimeMillis() {
|
||||
return minEvictableIdleTimeMillis;
|
||||
}
|
||||
|
||||
public long getSoftMinEvictableIdleTimeMillis() {
|
||||
return softMinEvictableIdleTimeMillis;
|
||||
}
|
||||
|
||||
public String getValidationQuery() {
|
||||
return validationQuery;
|
||||
}
|
||||
|
||||
public DataSourceConfiguration(final Builder builder) {
|
||||
this.url = builder.url;
|
||||
this.driverName = builder.driverName;
|
||||
this.userName = builder.userName;
|
||||
this.password = builder.password;
|
||||
this.maxWaitMillis = builder.maxWaitMillis;
|
||||
this.maxTotal = builder.maxTotal;
|
||||
this.minIdle = builder.minIdle;
|
||||
this.maxIdle = builder.maxIdle;
|
||||
this.maxConnLifetimeMillis = builder.maxConnLifetimeMillis;
|
||||
this.timeBetweenEvictionRunsMillis = builder.timeBetweenEvictionRunsMillis;
|
||||
this.minEvictableIdleTimeMillis = builder.minEvictableIdleTimeMillis;
|
||||
this.softMinEvictableIdleTimeMillis = builder.softMinEvictableIdleTimeMillis;
|
||||
this.validationQuery = builder.validationQuery;
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
private final String url;
|
||||
private final String driverName;
|
||||
private final String userName;
|
||||
private final String password;
|
||||
private long maxWaitMillis = DefaultDataSourceValues.MAX_WAIT_TIME.getLongValue();
|
||||
private int maxTotal = DefaultDataSourceValues.MAX_TOTAL_CONNECTIONS.getLongValue().intValue();
|
||||
private int minIdle = DefaultDataSourceValues.MIN_IDLE.getLongValue().intValue();
|
||||
private int maxIdle = DefaultDataSourceValues.MAX_IDLE.getLongValue().intValue();
|
||||
private long maxConnLifetimeMillis = DefaultDataSourceValues.MAX_CONN_LIFETIME.getLongValue();
|
||||
private long timeBetweenEvictionRunsMillis = DefaultDataSourceValues.EVICTION_RUN_PERIOD.getLongValue();
|
||||
private long minEvictableIdleTimeMillis = DefaultDataSourceValues.MIN_EVICTABLE_IDLE_TIME.getLongValue();
|
||||
private long softMinEvictableIdleTimeMillis = DefaultDataSourceValues.SOFT_MIN_EVICTABLE_IDLE_TIME.getLongValue();
|
||||
private String validationQuery;
|
||||
|
||||
public Builder(final String url, final String driverName, final String userName, final String password) {
|
||||
this.url = url;
|
||||
this.driverName = driverName;
|
||||
this.userName = userName;
|
||||
this.password = password;
|
||||
}
|
||||
|
||||
public Builder maxWaitMillis(long maxWaitMillis) {
|
||||
this.maxWaitMillis = maxWaitMillis;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder maxTotal(int maxTotal) {
|
||||
this.maxTotal = maxTotal;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder minIdle(int minIdle) {
|
||||
this.minIdle = minIdle;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder maxIdle(int maxIdle) {
|
||||
this.maxIdle = maxIdle;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder maxConnLifetimeMillis(long maxConnLifetimeMillis) {
|
||||
this.maxConnLifetimeMillis = maxConnLifetimeMillis;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder timeBetweenEvictionRunsMillis(long timeBetweenEvictionRunsMillis) {
|
||||
this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder minEvictableIdleTimeMillis(long minEvictableIdleTimeMillis) {
|
||||
this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder softMinEvictableIdleTimeMillis(long softMinEvictableIdleTimeMillis) {
|
||||
this.softMinEvictableIdleTimeMillis = softMinEvictableIdleTimeMillis;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder validationQuery(String validationQuery) {
|
||||
this.validationQuery = validationQuery;
|
||||
return this;
|
||||
}
|
||||
|
||||
public DataSourceConfiguration build() {
|
||||
return new DataSourceConfiguration(this);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,79 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.dbcp.utils;
|
||||
|
||||
import org.apache.commons.dbcp2.BasicDataSource;
|
||||
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
|
||||
import org.apache.nifi.util.FormatUtils;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public enum DefaultDataSourceValues {
|
||||
|
||||
MAX_WAIT_TIME("500 millis") {
|
||||
@Override
|
||||
public Long getLongValue() {
|
||||
return (long) FormatUtils.getPreciseTimeDuration(MAX_WAIT_TIME.value, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
},
|
||||
MAX_TOTAL_CONNECTIONS("8"),
|
||||
/**
|
||||
* Copied from {@link GenericObjectPoolConfig#DEFAULT_MIN_IDLE} in Commons-DBCP 2.7.0
|
||||
*/
|
||||
MIN_IDLE("0"),
|
||||
/**
|
||||
* Copied from {@link GenericObjectPoolConfig#DEFAULT_MAX_IDLE} in Commons-DBCP 2.7.0
|
||||
*/
|
||||
MAX_IDLE("8"),
|
||||
/**
|
||||
* Copied from private variable {@link BasicDataSource#maxConnLifetimeMillis} in Commons-DBCP 2.7.0
|
||||
*/
|
||||
MAX_CONN_LIFETIME("-1"),
|
||||
/**
|
||||
* Copied from {@link GenericObjectPoolConfig#DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS} in Commons-DBCP 2.7.0
|
||||
*/
|
||||
EVICTION_RUN_PERIOD("-1"),
|
||||
/**
|
||||
* Copied from {@link GenericObjectPoolConfig#DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS} in Commons-DBCP 2.7.0
|
||||
* and converted from 1800000L to "1800000 millis" to "30 mins"
|
||||
*/
|
||||
MIN_EVICTABLE_IDLE_TIME("30 mins") {
|
||||
@Override
|
||||
public Long getLongValue() {
|
||||
return (long) FormatUtils.getPreciseTimeDuration(MAX_WAIT_TIME.value, TimeUnit.MINUTES);
|
||||
}
|
||||
},
|
||||
/**
|
||||
* Copied from {@link GenericObjectPoolConfig#DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME_MILLIS} in Commons-DBCP 2.7.0
|
||||
*/
|
||||
SOFT_MIN_EVICTABLE_IDLE_TIME("-1");
|
||||
|
||||
|
||||
private final String value;
|
||||
|
||||
DefaultDataSourceValues(String value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public String getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
public Long getLongValue() {
|
||||
return Long.parseLong(value);
|
||||
}
|
||||
}
|
|
@ -16,14 +16,6 @@
|
|||
*/
|
||||
package org.apache.nifi.snowflake.service;
|
||||
|
||||
import java.sql.Driver;
|
||||
import java.sql.DriverManager;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import net.snowflake.client.core.SFSessionProperty;
|
||||
import net.snowflake.client.jdbc.SnowflakeDriver;
|
||||
import org.apache.nifi.annotation.behavior.DynamicProperties;
|
||||
|
@ -36,16 +28,40 @@ import org.apache.nifi.components.ValidationContext;
|
|||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.dbcp.AbstractDBCPConnectionPool;
|
||||
import org.apache.nifi.dbcp.utils.DBCPProperties;
|
||||
import org.apache.nifi.dbcp.utils.DataSourceConfiguration;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.snowflake.SnowflakeConnectionProviderService;
|
||||
import org.apache.nifi.processors.snowflake.SnowflakeConnectionWrapper;
|
||||
import org.apache.nifi.processors.snowflake.util.SnowflakeProperties;
|
||||
import org.apache.nifi.proxy.ProxyConfiguration;
|
||||
import org.apache.nifi.proxy.ProxyConfigurationService;
|
||||
import org.apache.nifi.snowflake.service.util.ConnectionUrlFormatParameters;
|
||||
import org.apache.nifi.processors.snowflake.util.SnowflakeProperties;
|
||||
import org.apache.nifi.snowflake.service.util.ConnectionUrlFormat;
|
||||
import org.apache.nifi.snowflake.service.util.ConnectionUrlFormatParameters;
|
||||
|
||||
import java.sql.Driver;
|
||||
import java.sql.DriverManager;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_DRIVERNAME;
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_PASSWORD;
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_USER;
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.EVICTION_RUN_PERIOD;
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_CONN_LIFETIME;
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_IDLE;
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_TOTAL_CONNECTIONS;
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_WAIT_TIME;
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.MIN_EVICTABLE_IDLE_TIME;
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.MIN_IDLE;
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.SOFT_MIN_EVICTABLE_IDLE_TIME;
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.VALIDATION_QUERY;
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.extractMillisWithInfinite;
|
||||
|
||||
/**
|
||||
* Implementation of Database Connection Pooling Service for Snowflake. Apache DBCP is used for connection pooling
|
||||
|
@ -54,14 +70,14 @@ import org.apache.nifi.snowflake.service.util.ConnectionUrlFormat;
|
|||
@Tags({"snowflake", "dbcp", "jdbc", "database", "connection", "pooling", "store"})
|
||||
@CapabilityDescription("Provides Snowflake Connection Pooling Service. Connections can be asked from pool and returned after usage.")
|
||||
@DynamicProperties({
|
||||
@DynamicProperty(name = "JDBC property name",
|
||||
value = "Snowflake JDBC property value",
|
||||
expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY,
|
||||
description = "Snowflake JDBC driver property name and value applied to JDBC connections."),
|
||||
@DynamicProperty(name = "SENSITIVE.JDBC property name",
|
||||
value = "Snowflake JDBC property value",
|
||||
expressionLanguageScope = ExpressionLanguageScope.NONE,
|
||||
description = "Snowflake JDBC driver property name prefixed with 'SENSITIVE.' handled as a sensitive property.")
|
||||
@DynamicProperty(name = "JDBC property name",
|
||||
value = "Snowflake JDBC property value",
|
||||
expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY,
|
||||
description = "Snowflake JDBC driver property name and value applied to JDBC connections."),
|
||||
@DynamicProperty(name = "SENSITIVE.JDBC property name",
|
||||
value = "Snowflake JDBC property value",
|
||||
expressionLanguageScope = ExpressionLanguageScope.NONE,
|
||||
description = "Snowflake JDBC driver property name prefixed with 'SENSITIVE.' handled as a sensitive property.")
|
||||
})
|
||||
@RequiresInstanceClassLoading
|
||||
public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool implements SnowflakeConnectionProviderService {
|
||||
|
@ -76,7 +92,7 @@ public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool
|
|||
.build();
|
||||
|
||||
public static final PropertyDescriptor SNOWFLAKE_URL = new PropertyDescriptor.Builder()
|
||||
.fromPropertyDescriptor(AbstractDBCPConnectionPool.DATABASE_URL)
|
||||
.fromPropertyDescriptor(DBCPProperties.DATABASE_URL)
|
||||
.displayName("Snowflake URL")
|
||||
.description("Example connection string: jdbc:snowflake://[account].[region]" + ConnectionUrlFormat.SNOWFLAKE_HOST_SUFFIX + "/?[connection_params]" +
|
||||
" The connection parameters can include db=DATABASE_NAME to avoid using qualified table names such as DATABASE_NAME.PUBLIC.TABLE_NAME")
|
||||
|
@ -110,13 +126,13 @@ public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool
|
|||
.build();
|
||||
|
||||
public static final PropertyDescriptor SNOWFLAKE_USER = new PropertyDescriptor.Builder()
|
||||
.fromPropertyDescriptor(AbstractDBCPConnectionPool.DB_USER)
|
||||
.fromPropertyDescriptor(DBCPProperties.DB_USER)
|
||||
.displayName("Username")
|
||||
.description("The Snowflake user name.")
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor SNOWFLAKE_PASSWORD = new PropertyDescriptor.Builder()
|
||||
.fromPropertyDescriptor(AbstractDBCPConnectionPool.DB_PASSWORD)
|
||||
.fromPropertyDescriptor(DBCPProperties.DB_PASSWORD)
|
||||
.displayName("Password")
|
||||
.description("The password for the Snowflake user.")
|
||||
.build();
|
||||
|
@ -170,6 +186,34 @@ public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool
|
|||
}
|
||||
|
||||
@Override
|
||||
protected DataSourceConfiguration getDataSourceConfiguration(final ConfigurationContext context) {
|
||||
final String url = getUrl(context);
|
||||
final String driverName = context.getProperty(DB_DRIVERNAME).evaluateAttributeExpressions().getValue();
|
||||
final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
|
||||
final String password = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
|
||||
final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger();
|
||||
final String validationQuery = context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue();
|
||||
final Long maxWaitMillis = extractMillisWithInfinite(context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions());
|
||||
final Integer minIdle = context.getProperty(MIN_IDLE).evaluateAttributeExpressions().asInteger();
|
||||
final Integer maxIdle = context.getProperty(MAX_IDLE).evaluateAttributeExpressions().asInteger();
|
||||
final Long maxConnLifetimeMillis = extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME).evaluateAttributeExpressions());
|
||||
final Long timeBetweenEvictionRunsMillis = extractMillisWithInfinite(context.getProperty(EVICTION_RUN_PERIOD).evaluateAttributeExpressions());
|
||||
final Long minEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
|
||||
final Long softMinEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
|
||||
|
||||
return new DataSourceConfiguration.Builder(url, driverName, user, password)
|
||||
.maxTotal(maxTotal)
|
||||
.validationQuery(validationQuery)
|
||||
.maxWaitMillis(maxWaitMillis)
|
||||
.minIdle(minIdle)
|
||||
.maxIdle(maxIdle)
|
||||
.maxConnLifetimeMillis(maxConnLifetimeMillis)
|
||||
.timeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis)
|
||||
.minEvictableIdleTimeMillis(minEvictableIdleTimeMillis)
|
||||
.softMinEvictableIdleTimeMillis(softMinEvictableIdleTimeMillis)
|
||||
.build();
|
||||
}
|
||||
|
||||
protected String getUrl(final ConfigurationContext context) {
|
||||
final ConnectionUrlFormat connectionUrlFormat = ConnectionUrlFormat.forName(context.getProperty(CONNECTION_URL_FORMAT)
|
||||
.getValue());
|
||||
|
@ -194,7 +238,7 @@ public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool
|
|||
final String schema = context.getProperty(SnowflakeProperties.SCHEMA).evaluateAttributeExpressions().getValue();
|
||||
final String warehouse = context.getProperty(SNOWFLAKE_WAREHOUSE).evaluateAttributeExpressions().getValue();
|
||||
|
||||
final Map<String, String> connectionProperties = new HashMap<>();
|
||||
final Map<String, String> connectionProperties = super.getConnectionProperties(context);
|
||||
if (database != null) {
|
||||
connectionProperties.put("db", database);
|
||||
}
|
||||
|
|
|
@ -25,24 +25,55 @@ import org.apache.nifi.annotation.behavior.SupportsSensitiveDynamicProperties;
|
|||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.PropertyValue;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.controller.VerifiableControllerService;
|
||||
import org.apache.nifi.dbcp.utils.DataSourceConfiguration;
|
||||
import org.apache.nifi.expression.AttributeExpression;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.kerberos.KerberosCredentialsService;
|
||||
import org.apache.nifi.kerberos.KerberosUserService;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.security.krb.KerberosKeytabUser;
|
||||
import org.apache.nifi.security.krb.KerberosPasswordUser;
|
||||
import org.apache.nifi.security.krb.KerberosUser;
|
||||
|
||||
import java.sql.Driver;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.SQLException;
|
||||
import java.util.AbstractMap;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.DATABASE_URL;
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_DRIVERNAME;
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_DRIVER_LOCATION;
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_PASSWORD;
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_USER;
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.EVICTION_RUN_PERIOD;
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.KERBEROS_USER_SERVICE;
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_CONN_LIFETIME;
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_IDLE;
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_TOTAL_CONNECTIONS;
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_WAIT_TIME;
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.MIN_EVICTABLE_IDLE_TIME;
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.MIN_IDLE;
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.SOFT_MIN_EVICTABLE_IDLE_TIME;
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.VALIDATION_QUERY;
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.extractMillisWithInfinite;
|
||||
|
||||
/**
|
||||
* Implementation of for Database Connection Pooling Service. Apache DBCP is used for connection pooling functionality.
|
||||
*
|
||||
*/
|
||||
@SupportsSensitiveDynamicProperties
|
||||
@Tags({ "dbcp", "jdbc", "database", "connection", "pooling", "store" })
|
||||
@Tags({"dbcp", "jdbc", "database", "connection", "pooling", "store"})
|
||||
@CapabilityDescription("Provides Database Connection Pooling Service. Connections can be asked from pool and returned after usage.")
|
||||
@DynamicProperties({
|
||||
@DynamicProperty(name = "JDBC property name",
|
||||
|
@ -56,9 +87,40 @@ import java.util.List;
|
|||
})
|
||||
@RequiresInstanceClassLoading
|
||||
public class DBCPConnectionPool extends AbstractDBCPConnectionPool implements DBCPService, VerifiableControllerService {
|
||||
/**
|
||||
* Property Name Prefix for Sensitive Dynamic Properties
|
||||
*/
|
||||
protected static final String SENSITIVE_PROPERTY_PREFIX = "SENSITIVE.";
|
||||
|
||||
private static final List<PropertyDescriptor> PROPERTIES;
|
||||
|
||||
public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("kerberos-credentials-service")
|
||||
.displayName("Kerberos Credentials Service")
|
||||
.description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
|
||||
.identifiesControllerService(KerberosCredentialsService.class)
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder()
|
||||
.name("kerberos-principal")
|
||||
.displayName("Kerberos Principal")
|
||||
.description("The principal to use when specifying the principal and password directly in the processor for authenticating via Kerberos.")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor KERBEROS_PASSWORD = new PropertyDescriptor.Builder()
|
||||
.name("kerberos-password")
|
||||
.displayName("Kerberos Password")
|
||||
.description("The password to use when specifying the principal and password directly in the processor for authenticating via Kerberos.")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.sensitive(true)
|
||||
.build();
|
||||
|
||||
static {
|
||||
final List<PropertyDescriptor> props = new ArrayList<>();
|
||||
props.add(DATABASE_URL);
|
||||
|
@ -144,4 +206,109 @@ public class DBCPConnectionPool extends AbstractDBCPConnectionPool implements DB
|
|||
BasicDataSource getDataSource() {
|
||||
return dataSource;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DataSourceConfiguration getDataSourceConfiguration(ConfigurationContext context) {
|
||||
final String url = context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue();
|
||||
final String driverName = context.getProperty(DB_DRIVERNAME).evaluateAttributeExpressions().getValue();
|
||||
final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
|
||||
final String password = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
|
||||
final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger();
|
||||
final String validationQuery = context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue();
|
||||
final Long maxWaitMillis = extractMillisWithInfinite(context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions());
|
||||
final Integer minIdle = context.getProperty(MIN_IDLE).evaluateAttributeExpressions().asInteger();
|
||||
final Integer maxIdle = context.getProperty(MAX_IDLE).evaluateAttributeExpressions().asInteger();
|
||||
final Long maxConnLifetimeMillis = extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME).evaluateAttributeExpressions());
|
||||
final Long timeBetweenEvictionRunsMillis = extractMillisWithInfinite(context.getProperty(EVICTION_RUN_PERIOD).evaluateAttributeExpressions());
|
||||
final Long minEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
|
||||
final Long softMinEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
|
||||
|
||||
return new DataSourceConfiguration.Builder(url, driverName, user, password)
|
||||
.maxTotal(maxTotal)
|
||||
.validationQuery(validationQuery)
|
||||
.maxWaitMillis(maxWaitMillis)
|
||||
.minIdle(minIdle)
|
||||
.maxIdle(maxIdle)
|
||||
.maxConnLifetimeMillis(maxConnLifetimeMillis)
|
||||
.timeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis)
|
||||
.minEvictableIdleTimeMillis(minEvictableIdleTimeMillis)
|
||||
.softMinEvictableIdleTimeMillis(softMinEvictableIdleTimeMillis)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
|
||||
final PropertyDescriptor.Builder builder = new PropertyDescriptor.Builder()
|
||||
.name(propertyDescriptorName)
|
||||
.required(false)
|
||||
.dynamic(true)
|
||||
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
|
||||
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR);
|
||||
|
||||
if (propertyDescriptorName.startsWith(SENSITIVE_PROPERTY_PREFIX)) {
|
||||
builder.sensitive(true).expressionLanguageSupported(ExpressionLanguageScope.NONE);
|
||||
} else {
|
||||
builder.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY);
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, String> getConnectionProperties(ConfigurationContext context) {
|
||||
return getDynamicProperties(context)
|
||||
.stream()
|
||||
.map(descriptor -> {
|
||||
final PropertyValue propertyValue = context.getProperty(descriptor);
|
||||
if (descriptor.isSensitive()) {
|
||||
final String propertyName = StringUtils.substringAfter(descriptor.getName(), SENSITIVE_PROPERTY_PREFIX);
|
||||
return new AbstractMap.SimpleEntry<>(propertyName, propertyValue.getValue());
|
||||
} else {
|
||||
return new AbstractMap.SimpleEntry<>(descriptor.getName(), propertyValue.evaluateAttributeExpressions().getValue());
|
||||
}
|
||||
})
|
||||
.collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Driver getDriver(final String driverName, final String url) {
|
||||
final Class<?> clazz;
|
||||
|
||||
try {
|
||||
clazz = Class.forName(driverName);
|
||||
} catch (final ClassNotFoundException e) {
|
||||
throw new ProcessException("Driver class " + driverName + " is not found", e);
|
||||
}
|
||||
|
||||
try {
|
||||
return DriverManager.getDriver(url);
|
||||
} catch (final SQLException e) {
|
||||
// In case the driver is not registered by the implementation, we explicitly try to register it.
|
||||
try {
|
||||
final Driver driver = (Driver) clazz.newInstance();
|
||||
DriverManager.registerDriver(driver);
|
||||
return DriverManager.getDriver(url);
|
||||
} catch (final SQLException e2) {
|
||||
throw new ProcessException("No suitable driver for the given Database Connection URL", e2);
|
||||
} catch (final IllegalAccessException | InstantiationException e2) {
|
||||
throw new ProcessException("Creating driver instance is failed", e2);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected KerberosUser getKerberosUserByCredentials(ConfigurationContext context) {
|
||||
KerberosUser kerberosUser = super.getKerberosUserByCredentials(context);
|
||||
if (kerberosUser == null) {
|
||||
final KerberosCredentialsService kerberosCredentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
|
||||
final String kerberosPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
|
||||
final String kerberosPassword = context.getProperty(KERBEROS_PASSWORD).getValue();
|
||||
if (kerberosCredentialsService != null) {
|
||||
kerberosUser = new KerberosKeytabUser(kerberosCredentialsService.getPrincipal(), kerberosCredentialsService.getKeytab());
|
||||
} else if (!StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword)) {
|
||||
kerberosUser = new KerberosPasswordUser(kerberosPrincipal, kerberosPassword);
|
||||
}
|
||||
}
|
||||
return kerberosUser;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -51,24 +51,24 @@ import java.sql.SQLException
|
|||
import java.sql.SQLNonTransientConnectionException
|
||||
import java.sql.Statement
|
||||
|
||||
import static org.apache.nifi.dbcp.DBCPConnectionPool.DATABASE_URL
|
||||
import static org.apache.nifi.dbcp.DBCPConnectionPool.DB_DRIVERNAME
|
||||
import static org.apache.nifi.dbcp.DBCPConnectionPool.DB_DRIVER_LOCATION
|
||||
import static org.apache.nifi.dbcp.DBCPConnectionPool.DB_PASSWORD
|
||||
import static org.apache.nifi.dbcp.DBCPConnectionPool.DB_USER
|
||||
import static org.apache.nifi.dbcp.DBCPConnectionPool.EVICTION_RUN_PERIOD
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.DATABASE_URL
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_DRIVERNAME
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_DRIVER_LOCATION
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_PASSWORD
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_USER
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.EVICTION_RUN_PERIOD
|
||||
import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_CREDENTIALS_SERVICE
|
||||
import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_PASSWORD
|
||||
import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_PRINCIPAL
|
||||
import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_USER_SERVICE
|
||||
import static org.apache.nifi.dbcp.DBCPConnectionPool.MAX_CONN_LIFETIME
|
||||
import static org.apache.nifi.dbcp.DBCPConnectionPool.MAX_IDLE
|
||||
import static org.apache.nifi.dbcp.DBCPConnectionPool.MAX_TOTAL_CONNECTIONS
|
||||
import static org.apache.nifi.dbcp.DBCPConnectionPool.MAX_WAIT_TIME
|
||||
import static org.apache.nifi.dbcp.DBCPConnectionPool.MIN_EVICTABLE_IDLE_TIME
|
||||
import static org.apache.nifi.dbcp.DBCPConnectionPool.MIN_IDLE
|
||||
import static org.apache.nifi.dbcp.DBCPConnectionPool.SOFT_MIN_EVICTABLE_IDLE_TIME
|
||||
import static org.apache.nifi.dbcp.DBCPConnectionPool.VALIDATION_QUERY
|
||||
import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_PASSWORD
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.KERBEROS_USER_SERVICE
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_CONN_LIFETIME
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_IDLE
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_TOTAL_CONNECTIONS
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_WAIT_TIME
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.MIN_EVICTABLE_IDLE_TIME
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.MIN_IDLE
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.SOFT_MIN_EVICTABLE_IDLE_TIME
|
||||
import static org.apache.nifi.dbcp.utils.DBCPProperties.VALIDATION_QUERY
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.nifi.dbcp;
|
||||
|
||||
import org.apache.nifi.dbcp.utils.DBCPProperties;
|
||||
import org.apache.nifi.kerberos.KerberosCredentialsService;
|
||||
import org.apache.nifi.kerberos.KerberosUserService;
|
||||
import org.apache.nifi.kerberos.MockKerberosCredentialsService;
|
||||
|
@ -83,10 +84,10 @@ public class DBCPServiceTest {
|
|||
runner.addControllerService(SERVICE_ID, service);
|
||||
|
||||
final String url = String.format("jdbc:derby:%s;create=true", databaseDirectory);
|
||||
runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, url);
|
||||
runner.setProperty(service, DBCPConnectionPool.DB_USER, String.class.getSimpleName());
|
||||
runner.setProperty(service, DBCPConnectionPool.DB_PASSWORD, String.class.getName());
|
||||
runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, "org.apache.derby.jdbc.EmbeddedDriver");
|
||||
runner.setProperty(service, DBCPProperties.DATABASE_URL, url);
|
||||
runner.setProperty(service, DBCPProperties.DB_USER, String.class.getSimpleName());
|
||||
runner.setProperty(service, DBCPProperties.DB_PASSWORD, String.class.getName());
|
||||
runner.setProperty(service, DBCPProperties.DB_DRIVERNAME, "org.apache.derby.jdbc.EmbeddedDriver");
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
|
@ -117,7 +118,7 @@ public class DBCPServiceTest {
|
|||
|
||||
// kerberos credential service with kerberos user service is invalid
|
||||
final KerberosUserService kerberosUserService = enableKerberosUserService(runner);
|
||||
runner.setProperty(service, DBCPConnectionPool.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier());
|
||||
runner.setProperty(service, DBCPProperties.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier());
|
||||
runner.assertNotValid(service);
|
||||
|
||||
// kerberos user service by itself is valid
|
||||
|
@ -132,7 +133,7 @@ public class DBCPServiceTest {
|
|||
|
||||
@Test
|
||||
public void testNotValidWithNegativeMinIdleProperty() {
|
||||
runner.setProperty(service, DBCPConnectionPool.MIN_IDLE, "-1");
|
||||
runner.setProperty(service, DBCPProperties.MIN_IDLE, "-1");
|
||||
runner.assertNotValid(service);
|
||||
}
|
||||
|
||||
|
@ -185,9 +186,9 @@ public class DBCPServiceTest {
|
|||
runner.enableControllerService(kerberosCredentialsService);
|
||||
|
||||
// set fake Derby database connection url
|
||||
runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, "jdbc:derby://localhost:1527/NoDB");
|
||||
runner.setProperty(service, DBCPProperties.DATABASE_URL, "jdbc:derby://localhost:1527/NoDB");
|
||||
// Use the client driver here rather than the embedded one, as it will generate a ConnectException for the test
|
||||
runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, "org.apache.derby.jdbc.ClientDriver");
|
||||
runner.setProperty(service, DBCPProperties.DB_DRIVERNAME, "org.apache.derby.jdbc.ClientDriver");
|
||||
runner.setProperty(service, DBCPConnectionPool.KERBEROS_CREDENTIALS_SERVICE, kerberosServiceId);
|
||||
|
||||
try {
|
||||
|
@ -202,7 +203,7 @@ public class DBCPServiceTest {
|
|||
|
||||
@Test
|
||||
public void testGetConnection() throws SQLException {
|
||||
runner.setProperty(service, DBCPConnectionPool.MAX_TOTAL_CONNECTIONS, "2");
|
||||
runner.setProperty(service, DBCPProperties.MAX_TOTAL_CONNECTIONS, "2");
|
||||
runner.enableControllerService(service);
|
||||
runner.assertValid(service);
|
||||
|
||||
|
@ -216,8 +217,8 @@ public class DBCPServiceTest {
|
|||
|
||||
@Test
|
||||
public void testGetConnectionMaxTotalConnectionsExceeded() {
|
||||
runner.setProperty(service, DBCPConnectionPool.MAX_TOTAL_CONNECTIONS, "1");
|
||||
runner.setProperty(service, DBCPConnectionPool.MAX_WAIT_TIME, "1 ms");
|
||||
runner.setProperty(service, DBCPProperties.MAX_TOTAL_CONNECTIONS, "1");
|
||||
runner.setProperty(service, DBCPProperties.MAX_WAIT_TIME, "1 ms");
|
||||
runner.enableControllerService(service);
|
||||
runner.assertValid(service);
|
||||
|
||||
|
@ -228,13 +229,13 @@ public class DBCPServiceTest {
|
|||
|
||||
@Test
|
||||
public void testGetDataSourceProperties() throws SQLException {
|
||||
runner.setProperty(service, DBCPConnectionPool.MAX_WAIT_TIME, "-1");
|
||||
runner.setProperty(service, DBCPConnectionPool.MAX_IDLE, "6");
|
||||
runner.setProperty(service, DBCPConnectionPool.MIN_IDLE, "4");
|
||||
runner.setProperty(service, DBCPConnectionPool.MAX_CONN_LIFETIME, "1 secs");
|
||||
runner.setProperty(service, DBCPConnectionPool.EVICTION_RUN_PERIOD, "1 secs");
|
||||
runner.setProperty(service, DBCPConnectionPool.MIN_EVICTABLE_IDLE_TIME, "1 secs");
|
||||
runner.setProperty(service, DBCPConnectionPool.SOFT_MIN_EVICTABLE_IDLE_TIME, "1 secs");
|
||||
runner.setProperty(service, DBCPProperties.MAX_WAIT_TIME, "-1");
|
||||
runner.setProperty(service, DBCPProperties.MAX_IDLE, "6");
|
||||
runner.setProperty(service, DBCPProperties.MIN_IDLE, "4");
|
||||
runner.setProperty(service, DBCPProperties.MAX_CONN_LIFETIME, "1 secs");
|
||||
runner.setProperty(service, DBCPProperties.EVICTION_RUN_PERIOD, "1 secs");
|
||||
runner.setProperty(service, DBCPProperties.MIN_EVICTABLE_IDLE_TIME, "1 secs");
|
||||
runner.setProperty(service, DBCPProperties.SOFT_MIN_EVICTABLE_IDLE_TIME, "1 secs");
|
||||
|
||||
runner.enableControllerService(service);
|
||||
|
||||
|
|
Loading…
Reference in New Issue