NIFI-10659: Extracted DBCP common code to nifi-dbcp-base module for use in Snowflake NAR

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #6543
This commit is contained in:
Peter Turcsanyi 2022-10-16 23:05:14 +02:00 committed by Matthew Burgess
parent 808d3d6664
commit 2be5c26f28
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
9 changed files with 656 additions and 577 deletions

View File

@ -0,0 +1,57 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>nifi-extension-utils</artifactId>
<groupId>org.apache.nifi</groupId>
<version>1.19.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-dbcp-base</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-kerberos</artifactId>
<version>1.19.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-credentials-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-user-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,569 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.dbcp;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.VerifiableControllerService;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.krb.KerberosAction;
import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosLoginException;
import org.apache.nifi.security.krb.KerberosPasswordUser;
import org.apache.nifi.security.krb.KerberosUser;
import javax.security.auth.login.LoginException;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.nifi.components.ConfigVerificationResult.Outcome.FAILED;
import static org.apache.nifi.components.ConfigVerificationResult.Outcome.SUCCESSFUL;
/**
* Abstract base class for Database Connection Pooling Services using Apache Commons DBCP as the underlying connection pool implementation.
*
*/
public abstract class AbstractDBCPConnectionPool extends AbstractControllerService implements DBCPService, VerifiableControllerService {
/** Property Name Prefix for Sensitive Dynamic Properties */
protected static final String SENSITIVE_PROPERTY_PREFIX = "SENSITIVE.";
/**
* Copied from {@link GenericObjectPoolConfig#DEFAULT_MIN_IDLE} in Commons-DBCP 2.7.0
*/
private static final String DEFAULT_MIN_IDLE = "0";
/**
* Copied from {@link GenericObjectPoolConfig#DEFAULT_MAX_IDLE} in Commons-DBCP 2.7.0
*/
private static final String DEFAULT_MAX_IDLE = "8";
/**
* Copied from private variable {@link BasicDataSource#maxConnLifetimeMillis} in Commons-DBCP 2.7.0
*/
private static final String DEFAULT_MAX_CONN_LIFETIME = "-1";
/**
* Copied from {@link GenericObjectPoolConfig#DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS} in Commons-DBCP 2.7.0
*/
private static final String DEFAULT_EVICTION_RUN_PERIOD = String.valueOf(-1L);
/**
* Copied from {@link GenericObjectPoolConfig#DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS} in Commons-DBCP 2.7.0
* and converted from 1800000L to "1800000 millis" to "30 mins"
*/
private static final String DEFAULT_MIN_EVICTABLE_IDLE_TIME = "30 mins";
/**
* Copied from {@link GenericObjectPoolConfig#DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME_MILLIS} in Commons-DBCP 2.7.0
*/
private static final String DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME = String.valueOf(-1L);
public static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder()
.name("Database Connection URL")
.description("A database connection URL used to connect to a database. May contain database system name, host, port, database name and some parameters."
+ " The exact syntax of a database connection URL is specified by your DBMS.")
.defaultValue(null)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor DB_DRIVERNAME = new PropertyDescriptor.Builder()
.name("Database Driver Class Name")
.description("Database driver class name")
.defaultValue(null)
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor DB_DRIVER_LOCATION = new PropertyDescriptor.Builder()
.name("database-driver-locations")
.displayName("Database Driver Location(s)")
.description("Comma-separated list of files/folders and/or URLs containing the driver JAR and its dependencies (if any). For example '/var/tmp/mariadb-java-client-1.1.7.jar'")
.defaultValue(null)
.required(false)
.identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, ResourceType.DIRECTORY, ResourceType.URL)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.dynamicallyModifiesClasspath(true)
.build();
public static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder()
.name("Database User")
.description("Database user name")
.defaultValue(null)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor DB_PASSWORD = new PropertyDescriptor.Builder()
.name("Password")
.description("The password for the database user")
.defaultValue(null)
.required(false)
.sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor MAX_WAIT_TIME = new PropertyDescriptor.Builder()
.name("Max Wait Time")
.description("The maximum amount of time that the pool will wait (when there are no available connections) "
+ " for a connection to be returned before failing, or -1 to wait indefinitely. ")
.defaultValue("500 millis")
.required(true)
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.sensitive(false)
.build();
public static final PropertyDescriptor MAX_TOTAL_CONNECTIONS = new PropertyDescriptor.Builder()
.name("Max Total Connections")
.description("The maximum number of active connections that can be allocated from this pool at the same time, "
+ " or negative for no limit.")
.defaultValue("8")
.required(true)
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.sensitive(false)
.build();
public static final PropertyDescriptor VALIDATION_QUERY = new PropertyDescriptor.Builder()
.name("Validation-query")
.displayName("Validation query")
.description("Validation query used to validate connections before returning them. "
+ "When connection is invalid, it gets dropped and new valid connection will be returned. "
+ "Note!! Using validation might have some performance penalty.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor MIN_IDLE = new PropertyDescriptor.Builder()
.displayName("Minimum Idle Connections")
.name("dbcp-min-idle-conns")
.description("The minimum number of connections that can remain idle in the pool without extra ones being " +
"created. Set to or zero to allow no idle connections.")
.defaultValue(DEFAULT_MIN_IDLE)
.required(false)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor MAX_IDLE = new PropertyDescriptor.Builder()
.displayName("Max Idle Connections")
.name("dbcp-max-idle-conns")
.description("The maximum number of connections that can remain idle in the pool without extra ones being " +
"released. Set to any negative value to allow unlimited idle connections.")
.defaultValue(DEFAULT_MAX_IDLE)
.required(false)
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor MAX_CONN_LIFETIME = new PropertyDescriptor.Builder()
.displayName("Max Connection Lifetime")
.name("dbcp-max-conn-lifetime")
.description("The maximum lifetime in milliseconds of a connection. After this time is exceeded the " +
"connection will fail the next activation, passivation or validation test. A value of zero or less " +
"means the connection has an infinite lifetime.")
.defaultValue(DEFAULT_MAX_CONN_LIFETIME)
.required(false)
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor EVICTION_RUN_PERIOD = new PropertyDescriptor.Builder()
.displayName("Time Between Eviction Runs")
.name("dbcp-time-between-eviction-runs")
.description("The number of milliseconds to sleep between runs of the idle connection evictor thread. When " +
"non-positive, no idle connection evictor thread will be run.")
.defaultValue(DEFAULT_EVICTION_RUN_PERIOD)
.required(false)
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor MIN_EVICTABLE_IDLE_TIME = new PropertyDescriptor.Builder()
.displayName("Minimum Evictable Idle Time")
.name("dbcp-min-evictable-idle-time")
.description("The minimum amount of time a connection may sit idle in the pool before it is eligible for eviction.")
.defaultValue(DEFAULT_MIN_EVICTABLE_IDLE_TIME)
.required(false)
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor SOFT_MIN_EVICTABLE_IDLE_TIME = new PropertyDescriptor.Builder()
.displayName("Soft Minimum Evictable Idle Time")
.name("dbcp-soft-min-evictable-idle-time")
.description("The minimum amount of time a connection may sit idle in the pool before it is eligible for " +
"eviction by the idle connection evictor, with the extra condition that at least a minimum number of" +
" idle connections remain in the pool. When the not-soft version of this option is set to a positive" +
" value, it is examined first by the idle connection evictor: when idle connections are visited by " +
"the evictor, idle time is first compared against it (without considering the number of idle " +
"connections in the pool) and then against this soft option, including the minimum idle connections " +
"constraint.")
.defaultValue(DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME)
.required(false)
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
.name("kerberos-credentials-service")
.displayName("Kerberos Credentials Service")
.description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
.identifiesControllerService(KerberosCredentialsService.class)
.required(false)
.build();
public static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
.name("kerberos-user-service")
.displayName("Kerberos User Service")
.description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos")
.identifiesControllerService(KerberosUserService.class)
.required(false)
.build();
public static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder()
.name("kerberos-principal")
.displayName("Kerberos Principal")
.description("The principal to use when specifying the principal and password directly in the processor for authenticating via Kerberos.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor KERBEROS_PASSWORD = new PropertyDescriptor.Builder()
.name("kerberos-password")
.displayName("Kerberos Password")
.description("The password to use when specifying the principal and password directly in the processor for authenticating via Kerberos.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(true)
.build();
protected volatile BasicDataSource dataSource;
protected volatile KerberosUser kerberosUser;
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
final PropertyDescriptor.Builder builder = new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
.dynamic(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR);
if (propertyDescriptorName.startsWith(SENSITIVE_PROPERTY_PREFIX)) {
builder.sensitive(true).expressionLanguageSupported(ExpressionLanguageScope.NONE);
} else {
builder.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY);
}
return builder.build();
}
@Override
public List<ConfigVerificationResult> verify(final ConfigurationContext context, final ComponentLog verificationLogger, final Map<String, String> variables) {
List<ConfigVerificationResult> results = new ArrayList<>();
KerberosUser kerberosUser = null;
try {
kerberosUser = getKerberosUser(context);
if (kerberosUser != null) {
results.add(new ConfigVerificationResult.Builder()
.verificationStepName("Configure Kerberos User")
.outcome(SUCCESSFUL)
.explanation("Successfully configured Kerberos user")
.build());
}
} catch (final Exception e) {
verificationLogger.error("Failed to configure Kerberos user", e);
results.add(new ConfigVerificationResult.Builder()
.verificationStepName("Configure Kerberos User")
.outcome(FAILED)
.explanation("Failed to configure Kerberos user: " + e.getMessage())
.build());
}
final BasicDataSource dataSource = new BasicDataSource();
try {
configureDataSource(dataSource, kerberosUser, context);
results.add(new ConfigVerificationResult.Builder()
.verificationStepName("Configure Data Source")
.outcome(SUCCESSFUL)
.explanation("Successfully configured data source")
.build());
try (final Connection conn = getConnection(dataSource, kerberosUser)) {
results.add(new ConfigVerificationResult.Builder()
.verificationStepName("Establish Connection")
.outcome(SUCCESSFUL)
.explanation("Successfully established Database Connection")
.build());
} catch (final Exception e) {
verificationLogger.error("Failed to establish Database Connection", e);
results.add(new ConfigVerificationResult.Builder()
.verificationStepName("Establish Connection")
.outcome(FAILED)
.explanation("Failed to establish Database Connection: " + e.getMessage())
.build());
}
} catch (final Exception e) {
String message = "Failed to configure Data Source.";
if (e.getCause() instanceof ClassNotFoundException) {
message += String.format(" Ensure changes to the '%s' property are applied before verifying",
DB_DRIVER_LOCATION.getDisplayName());
}
verificationLogger.error(message, e);
results.add(new ConfigVerificationResult.Builder()
.verificationStepName("Configure Data Source")
.outcome(FAILED)
.explanation(message + ": " + e.getMessage())
.build());
} finally {
try {
shutdown(dataSource, kerberosUser);
} catch (final SQLException e) {
verificationLogger.error("Failed to shut down data source", e);
}
}
return results;
}
/**
* Configures connection pool by creating an instance of the
* {@link BasicDataSource} based on configuration provided with
* {@link ConfigurationContext}.
*
* This operation makes no guarantees that the actual connection could be
* made since the underlying system may still go off-line during normal
* operation of the connection pool.
*
* @param context
* the configuration context
* @throws InitializationException
* if unable to create a database connection
*/
@OnEnabled
public void onConfigured(final ConfigurationContext context) throws InitializationException {
kerberosUser = getKerberosUser(context);
dataSource = new BasicDataSource();
configureDataSource(dataSource, kerberosUser, context);
}
private void configureDataSource(final BasicDataSource dataSource, final KerberosUser kerberosUser,
final ConfigurationContext context) throws InitializationException {
final String dburl = getUrl(context);
final String driverName = context.getProperty(DB_DRIVERNAME).evaluateAttributeExpressions().getValue();
final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger();
final String validationQuery = context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue();
final Long maxWaitMillis = extractMillisWithInfinite(context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions());
final Integer minIdle = context.getProperty(MIN_IDLE).evaluateAttributeExpressions().asInteger();
final Integer maxIdle = context.getProperty(MAX_IDLE).evaluateAttributeExpressions().asInteger();
final Long maxConnLifetimeMillis = extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME).evaluateAttributeExpressions());
final Long timeBetweenEvictionRunsMillis = extractMillisWithInfinite(context.getProperty(EVICTION_RUN_PERIOD).evaluateAttributeExpressions());
final Long minEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
final Long softMinEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
if (kerberosUser != null) {
try {
kerberosUser.login();
} catch (KerberosLoginException e) {
throw new InitializationException("Unable to authenticate Kerberos principal", e);
}
}
dataSource.setDriver(getDriver(driverName, dburl));
dataSource.setMaxWaitMillis(maxWaitMillis);
dataSource.setMaxTotal(maxTotal);
dataSource.setMinIdle(minIdle);
dataSource.setMaxIdle(maxIdle);
dataSource.setMaxConnLifetimeMillis(maxConnLifetimeMillis);
dataSource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
dataSource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
dataSource.setSoftMinEvictableIdleTimeMillis(softMinEvictableIdleTimeMillis);
if (validationQuery != null && !validationQuery.isEmpty()) {
dataSource.setValidationQuery(validationQuery);
dataSource.setTestOnBorrow(true);
}
dataSource.setUrl(dburl);
dataSource.setUsername(user);
dataSource.setPassword(passw);
final List<PropertyDescriptor> dynamicProperties = context.getProperties()
.keySet()
.stream()
.filter(PropertyDescriptor::isDynamic)
.collect(Collectors.toList());
dynamicProperties.forEach((descriptor) -> {
final PropertyValue propertyValue = context.getProperty(descriptor);
if (descriptor.isSensitive()) {
final String propertyName = StringUtils.substringAfter(descriptor.getName(), SENSITIVE_PROPERTY_PREFIX);
dataSource.addConnectionProperty(propertyName, propertyValue.getValue());
} else {
dataSource.addConnectionProperty(descriptor.getName(), propertyValue.evaluateAttributeExpressions().getValue());
}
});
}
private KerberosUser getKerberosUser(final ConfigurationContext context) {
KerberosUser kerberosUser = null;
final KerberosCredentialsService kerberosCredentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
final String kerberosPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
final String kerberosPassword = context.getProperty(KERBEROS_PASSWORD).getValue();
if (kerberosUserService != null) {
kerberosUser = kerberosUserService.createKerberosUser();
} else if (kerberosCredentialsService != null) {
kerberosUser = new KerberosKeytabUser(kerberosCredentialsService.getPrincipal(), kerberosCredentialsService.getKeytab());
} else if (!StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword)) {
kerberosUser = new KerberosPasswordUser(kerberosPrincipal, kerberosPassword);
}
return kerberosUser;
}
protected String getUrl(ConfigurationContext context) {
return context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue();
}
protected Driver getDriver(final String driverName, final String url) {
final Class<?> clazz;
try {
clazz = Class.forName(driverName);
} catch (final ClassNotFoundException e) {
throw new ProcessException("Driver class " + driverName + " is not found", e);
}
try {
return DriverManager.getDriver(url);
} catch (final SQLException e) {
// In case the driver is not registered by the implementation, we explicitly try to register it.
try {
final Driver driver = (Driver) clazz.newInstance();
DriverManager.registerDriver(driver);
return DriverManager.getDriver(url);
} catch (final SQLException e2) {
throw new ProcessException("No suitable driver for the given Database Connection URL", e2);
} catch (final IllegalAccessException | InstantiationException e2) {
throw new ProcessException("Creating driver instance is failed", e2);
}
}
}
protected Long extractMillisWithInfinite(PropertyValue prop) {
return "-1".equals(prop.getValue()) ? -1 : prop.asTimePeriod(TimeUnit.MILLISECONDS);
}
/**
* Shutdown pool, close all open connections.
* If a principal is authenticated with a KDC, that principal is logged out.
*
* If a @{@link LoginException} occurs while attempting to log out the @{@link org.apache.nifi.security.krb.KerberosUser},
* an attempt will still be made to shut down the pool and close open connections.
*
* @throws SQLException if there is an error while closing open connections
* @throws LoginException if there is an error during the principal log out, and will only be thrown if there was
* no exception while closing open connections
*/
@OnDisabled
public void shutdown() throws SQLException {
try {
this.shutdown(dataSource, kerberosUser);
} finally {
kerberosUser = null;
dataSource = null;
}
}
private void shutdown(final BasicDataSource dataSource, final KerberosUser kerberosUser) throws SQLException {
try {
if (kerberosUser != null) {
kerberosUser.logout();
}
} finally {
if (dataSource != null) {
dataSource.close();
}
}
}
@Override
public Connection getConnection() throws ProcessException {
return getConnection(dataSource, kerberosUser);
}
private Connection getConnection(final BasicDataSource dataSource, final KerberosUser kerberosUser) {
try {
final Connection con;
if (kerberosUser != null) {
KerberosAction<Connection> kerberosAction = new KerberosAction<>(kerberosUser, () -> dataSource.getConnection(), getLogger());
con = kerberosAction.execute();
} else {
con = dataSource.getConnection();
}
return con;
} catch (final SQLException e) {
// If using Kerberos, attempt to re-login
if (kerberosUser != null) {
try {
getLogger().info("Error getting connection, performing Kerberos re-login");
kerberosUser.login();
} catch (KerberosLoginException le) {
throw new ProcessException("Unable to authenticate Kerberos principal", le);
}
}
throw new ProcessException(e);
}
}
@Override
public String toString() {
return this.getClass().getSimpleName() + "[id=" + getIdentifier() + "]";
}
}

View File

@ -30,6 +30,7 @@
<module>nifi-bin-manager</module>
<module>nifi-database-utils</module>
<module>nifi-database-test-utils</module>
<module>nifi-dbcp-base</module>
<module>nifi-event-listen</module>
<module>nifi-event-put</module>
<module>nifi-event-transport</module>

View File

@ -33,7 +33,7 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-nar</artifactId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<version>1.19.0-SNAPSHOT</version>
<type>nar</type>
</dependency>

View File

@ -29,29 +29,11 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-api</artifactId>
<version>1.19.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service</artifactId>
<artifactId>nifi-dbcp-base</artifactId>
<version>1.19.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-credentials-service-api</artifactId>
<version>1.19.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-user-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>net.snowflake</groupId>

View File

@ -26,7 +26,8 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.dbcp.DBCPConnectionPool;
import org.apache.nifi.dbcp.AbstractDBCPConnectionPool;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.exception.ProcessException;
@ -54,28 +55,28 @@ import java.util.List;
description = "Snowflake JDBC driver property name prefixed with 'SENSITIVE.' handled as a sensitive property.")
})
@RequiresInstanceClassLoading
public class SnowflakeComputingConnectionPool extends DBCPConnectionPool {
public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool implements DBCPService {
public static final PropertyDescriptor SNOWFLAKE_URL = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(DBCPConnectionPool.DATABASE_URL)
.fromPropertyDescriptor(AbstractDBCPConnectionPool.DATABASE_URL)
.displayName("Snowflake URL")
.description("Example connection string: jdbc:snowflake://[account].[region].snowflakecomputing.com/?[connection_params]" +
" The connection parameters can include db=DATABASE_NAME to avoid using qualified table names such as DATABASE_NAME.PUBLIC.TABLE_NAME")
.build();
public static final PropertyDescriptor SNOWFLAKE_USER = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(DBCPConnectionPool.DB_USER)
.fromPropertyDescriptor(AbstractDBCPConnectionPool.DB_USER)
.displayName("Snowflake User")
.description("The Snowflake user name")
.build();
public static final PropertyDescriptor SNOWFLAKE_PASSWORD = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(DBCPConnectionPool.DB_PASSWORD)
.fromPropertyDescriptor(AbstractDBCPConnectionPool.DB_PASSWORD)
.displayName("Snowflake Password")
.description("The password for the Snowflake user")
.build();
private static final List<PropertyDescriptor> properties;
private static final List<PropertyDescriptor> PROPERTIES;
static {
final List<PropertyDescriptor> props = new ArrayList<>();
@ -92,12 +93,12 @@ public class SnowflakeComputingConnectionPool extends DBCPConnectionPool {
props.add(MIN_EVICTABLE_IDLE_TIME);
props.add(SOFT_MIN_EVICTABLE_IDLE_TIME);
properties = Collections.unmodifiableList(props);
PROPERTIES = Collections.unmodifiableList(props);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
return PROPERTIES;
}
@Override

View File

@ -28,28 +28,21 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-base</artifactId>
<version>1.19.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>1.19.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-service-utils</artifactId>
<version>1.19.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-kerberos</artifactId>
<version>1.19.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-credentials-service-api</artifactId>
@ -91,6 +84,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>

View File

@ -18,54 +18,24 @@ package org.apache.nifi.dbcp;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.nifi.annotation.behavior.DynamicProperties;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.SupportsSensitiveDynamicProperties;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.VerifiableControllerService;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.krb.KerberosAction;
import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosLoginException;
import org.apache.nifi.security.krb.KerberosPasswordUser;
import org.apache.nifi.security.krb.KerberosUser;
import javax.security.auth.login.LoginException;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.nifi.components.ConfigVerificationResult.Outcome.FAILED;
import static org.apache.nifi.components.ConfigVerificationResult.Outcome.SUCCESSFUL;
/**
* Implementation of for Database Connection Pooling Service. Apache DBCP is used for connection pooling functionality.
@ -85,224 +55,9 @@ import static org.apache.nifi.components.ConfigVerificationResult.Outcome.SUCCES
description = "JDBC driver property name prefixed with 'SENSITIVE.' handled as a sensitive property.")
})
@RequiresInstanceClassLoading
public class DBCPConnectionPool extends AbstractControllerService implements DBCPService, VerifiableControllerService {
/** Property Name Prefix for Sensitive Dynamic Properties */
protected static final String SENSITIVE_PROPERTY_PREFIX = "SENSITIVE.";
public class DBCPConnectionPool extends AbstractDBCPConnectionPool implements DBCPService, VerifiableControllerService {
/**
* Copied from {@link GenericObjectPoolConfig#DEFAULT_MIN_IDLE} in Commons-DBCP 2.7.0
*/
private static final String DEFAULT_MIN_IDLE = "0";
/**
* Copied from {@link GenericObjectPoolConfig#DEFAULT_MAX_IDLE} in Commons-DBCP 2.7.0
*/
private static final String DEFAULT_MAX_IDLE = "8";
/**
* Copied from private variable {@link BasicDataSource#maxConnLifetimeMillis} in Commons-DBCP 2.7.0
*/
private static final String DEFAULT_MAX_CONN_LIFETIME = "-1";
/**
* Copied from {@link GenericObjectPoolConfig#DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS} in Commons-DBCP 2.7.0
*/
private static final String DEFAULT_EVICTION_RUN_PERIOD = String.valueOf(-1L);
/**
* Copied from {@link GenericObjectPoolConfig#DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS} in Commons-DBCP 2.7.0
* and converted from 1800000L to "1800000 millis" to "30 mins"
*/
private static final String DEFAULT_MIN_EVICTABLE_IDLE_TIME = "30 mins";
/**
* Copied from {@link GenericObjectPoolConfig#DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME_MILLIS} in Commons-DBCP 2.7.0
*/
private static final String DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME = String.valueOf(-1L);
public static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder()
.name("Database Connection URL")
.description("A database connection URL used to connect to a database. May contain database system name, host, port, database name and some parameters."
+ " The exact syntax of a database connection URL is specified by your DBMS.")
.defaultValue(null)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor DB_DRIVERNAME = new PropertyDescriptor.Builder()
.name("Database Driver Class Name")
.description("Database driver class name")
.defaultValue(null)
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor DB_DRIVER_LOCATION = new PropertyDescriptor.Builder()
.name("database-driver-locations")
.displayName("Database Driver Location(s)")
.description("Comma-separated list of files/folders and/or URLs containing the driver JAR and its dependencies (if any). For example '/var/tmp/mariadb-java-client-1.1.7.jar'")
.defaultValue(null)
.required(false)
.identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, ResourceType.DIRECTORY, ResourceType.URL)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.dynamicallyModifiesClasspath(true)
.build();
public static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder()
.name("Database User")
.description("Database user name")
.defaultValue(null)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor DB_PASSWORD = new PropertyDescriptor.Builder()
.name("Password")
.description("The password for the database user")
.defaultValue(null)
.required(false)
.sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor MAX_WAIT_TIME = new PropertyDescriptor.Builder()
.name("Max Wait Time")
.description("The maximum amount of time that the pool will wait (when there are no available connections) "
+ " for a connection to be returned before failing, or -1 to wait indefinitely. ")
.defaultValue("500 millis")
.required(true)
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.sensitive(false)
.build();
public static final PropertyDescriptor MAX_TOTAL_CONNECTIONS = new PropertyDescriptor.Builder()
.name("Max Total Connections")
.description("The maximum number of active connections that can be allocated from this pool at the same time, "
+ " or negative for no limit.")
.defaultValue("8")
.required(true)
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.sensitive(false)
.build();
public static final PropertyDescriptor VALIDATION_QUERY = new PropertyDescriptor.Builder()
.name("Validation-query")
.displayName("Validation query")
.description("Validation query used to validate connections before returning them. "
+ "When connection is invalid, it gets dropped and new valid connection will be returned. "
+ "Note!! Using validation might have some performance penalty.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor MIN_IDLE = new PropertyDescriptor.Builder()
.displayName("Minimum Idle Connections")
.name("dbcp-min-idle-conns")
.description("The minimum number of connections that can remain idle in the pool without extra ones being " +
"created. Set to or zero to allow no idle connections.")
.defaultValue(DEFAULT_MIN_IDLE)
.required(false)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor MAX_IDLE = new PropertyDescriptor.Builder()
.displayName("Max Idle Connections")
.name("dbcp-max-idle-conns")
.description("The maximum number of connections that can remain idle in the pool without extra ones being " +
"released. Set to any negative value to allow unlimited idle connections.")
.defaultValue(DEFAULT_MAX_IDLE)
.required(false)
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor MAX_CONN_LIFETIME = new PropertyDescriptor.Builder()
.displayName("Max Connection Lifetime")
.name("dbcp-max-conn-lifetime")
.description("The maximum lifetime in milliseconds of a connection. After this time is exceeded the " +
"connection will fail the next activation, passivation or validation test. A value of zero or less " +
"means the connection has an infinite lifetime.")
.defaultValue(DEFAULT_MAX_CONN_LIFETIME)
.required(false)
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor EVICTION_RUN_PERIOD = new PropertyDescriptor.Builder()
.displayName("Time Between Eviction Runs")
.name("dbcp-time-between-eviction-runs")
.description("The number of milliseconds to sleep between runs of the idle connection evictor thread. When " +
"non-positive, no idle connection evictor thread will be run.")
.defaultValue(DEFAULT_EVICTION_RUN_PERIOD)
.required(false)
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor MIN_EVICTABLE_IDLE_TIME = new PropertyDescriptor.Builder()
.displayName("Minimum Evictable Idle Time")
.name("dbcp-min-evictable-idle-time")
.description("The minimum amount of time a connection may sit idle in the pool before it is eligible for eviction.")
.defaultValue(DEFAULT_MIN_EVICTABLE_IDLE_TIME)
.required(false)
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor SOFT_MIN_EVICTABLE_IDLE_TIME = new PropertyDescriptor.Builder()
.displayName("Soft Minimum Evictable Idle Time")
.name("dbcp-soft-min-evictable-idle-time")
.description("The minimum amount of time a connection may sit idle in the pool before it is eligible for " +
"eviction by the idle connection evictor, with the extra condition that at least a minimum number of" +
" idle connections remain in the pool. When the not-soft version of this option is set to a positive" +
" value, it is examined first by the idle connection evictor: when idle connections are visited by " +
"the evictor, idle time is first compared against it (without considering the number of idle " +
"connections in the pool) and then against this soft option, including the minimum idle connections " +
"constraint.")
.defaultValue(DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME)
.required(false)
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
.name("kerberos-credentials-service")
.displayName("Kerberos Credentials Service")
.description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
.identifiesControllerService(KerberosCredentialsService.class)
.required(false)
.build();
public static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
.name("kerberos-user-service")
.displayName("Kerberos User Service")
.description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos")
.identifiesControllerService(KerberosUserService.class)
.required(false)
.build();
public static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder()
.name("kerberos-principal")
.displayName("Kerberos Principal")
.description("The principal to use when specifying the principal and password directly in the processor for authenticating via Kerberos.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor KERBEROS_PASSWORD = new PropertyDescriptor.Builder()
.name("kerberos-password")
.displayName("Kerberos Password")
.description("The password to use when specifying the principal and password directly in the processor for authenticating via Kerberos.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(true)
.build();
private static final List<PropertyDescriptor> properties;
private static final List<PropertyDescriptor> PROPERTIES;
static {
final List<PropertyDescriptor> props = new ArrayList<>();
@ -325,33 +80,12 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
props.add(MIN_EVICTABLE_IDLE_TIME);
props.add(SOFT_MIN_EVICTABLE_IDLE_TIME);
properties = Collections.unmodifiableList(props);
PROPERTIES = Collections.unmodifiableList(props);
}
protected volatile BasicDataSource dataSource;
protected volatile KerberosUser kerberosUser;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
final PropertyDescriptor.Builder builder = new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
.dynamic(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR);
if (propertyDescriptorName.startsWith(SENSITIVE_PROPERTY_PREFIX)) {
builder.sensitive(true).expressionLanguageSupported(ExpressionLanguageScope.NONE);
} else {
builder.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY);
}
return builder.build();
return PROPERTIES;
}
@Override
@ -407,274 +141,6 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
return results;
}
@Override
public List<ConfigVerificationResult> verify(final ConfigurationContext context, final ComponentLog verificationLogger, final Map<String, String> variables) {
List<ConfigVerificationResult> results = new ArrayList<>();
KerberosUser kerberosUser = null;
try {
kerberosUser = getKerberosUser(context);
if (kerberosUser != null) {
results.add(new ConfigVerificationResult.Builder()
.verificationStepName("Configure Kerberos User")
.outcome(SUCCESSFUL)
.explanation("Successfully configured Kerberos user")
.build());
}
} catch (final Exception e) {
verificationLogger.error("Failed to configure Kerberos user", e);
results.add(new ConfigVerificationResult.Builder()
.verificationStepName("Configure Kerberos User")
.outcome(FAILED)
.explanation("Failed to configure Kerberos user: " + e.getMessage())
.build());
}
final BasicDataSource dataSource = new BasicDataSource();
try {
configureDataSource(dataSource, kerberosUser, context);
results.add(new ConfigVerificationResult.Builder()
.verificationStepName("Configure Data Source")
.outcome(SUCCESSFUL)
.explanation("Successfully configured data source")
.build());
try (final Connection conn = getConnection(dataSource, kerberosUser)) {
results.add(new ConfigVerificationResult.Builder()
.verificationStepName("Establish Connection")
.outcome(SUCCESSFUL)
.explanation("Successfully established Database Connection")
.build());
} catch (final Exception e) {
verificationLogger.error("Failed to establish Database Connection", e);
results.add(new ConfigVerificationResult.Builder()
.verificationStepName("Establish Connection")
.outcome(FAILED)
.explanation("Failed to establish Database Connection: " + e.getMessage())
.build());
}
} catch (final Exception e) {
String message = "Failed to configure Data Source.";
if (e.getCause() instanceof ClassNotFoundException) {
message += String.format(" Ensure changes to the '%s' property are applied before verifying",
DB_DRIVER_LOCATION.getDisplayName());
}
verificationLogger.error(message, e);
results.add(new ConfigVerificationResult.Builder()
.verificationStepName("Configure Data Source")
.outcome(FAILED)
.explanation(message + ": " + e.getMessage())
.build());
} finally {
try {
shutdown(dataSource, kerberosUser);
} catch (final SQLException e) {
verificationLogger.error("Failed to shut down data source", e);
}
}
return results;
}
/**
* Configures connection pool by creating an instance of the
* {@link BasicDataSource} based on configuration provided with
* {@link ConfigurationContext}.
*
* This operation makes no guarantees that the actual connection could be
* made since the underlying system may still go off-line during normal
* operation of the connection pool.
*
* @param context
* the configuration context
* @throws InitializationException
* if unable to create a database connection
*/
@OnEnabled
public void onConfigured(final ConfigurationContext context) throws InitializationException {
kerberosUser = getKerberosUser(context);
dataSource = new BasicDataSource();
configureDataSource(dataSource, kerberosUser, context);
}
private void configureDataSource(final BasicDataSource dataSource, final KerberosUser kerberosUser,
final ConfigurationContext context) throws InitializationException {
final String dburl = getUrl(context);
final String driverName = context.getProperty(DB_DRIVERNAME).evaluateAttributeExpressions().getValue();
final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger();
final String validationQuery = context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue();
final Long maxWaitMillis = extractMillisWithInfinite(context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions());
final Integer minIdle = context.getProperty(MIN_IDLE).evaluateAttributeExpressions().asInteger();
final Integer maxIdle = context.getProperty(MAX_IDLE).evaluateAttributeExpressions().asInteger();
final Long maxConnLifetimeMillis = extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME).evaluateAttributeExpressions());
final Long timeBetweenEvictionRunsMillis = extractMillisWithInfinite(context.getProperty(EVICTION_RUN_PERIOD).evaluateAttributeExpressions());
final Long minEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
final Long softMinEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
if (kerberosUser != null) {
try {
kerberosUser.login();
} catch (KerberosLoginException e) {
throw new InitializationException("Unable to authenticate Kerberos principal", e);
}
}
dataSource.setDriver(getDriver(driverName, dburl));
dataSource.setMaxWaitMillis(maxWaitMillis);
dataSource.setMaxTotal(maxTotal);
dataSource.setMinIdle(minIdle);
dataSource.setMaxIdle(maxIdle);
dataSource.setMaxConnLifetimeMillis(maxConnLifetimeMillis);
dataSource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
dataSource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
dataSource.setSoftMinEvictableIdleTimeMillis(softMinEvictableIdleTimeMillis);
if (validationQuery != null && !validationQuery.isEmpty()) {
dataSource.setValidationQuery(validationQuery);
dataSource.setTestOnBorrow(true);
}
dataSource.setUrl(dburl);
dataSource.setUsername(user);
dataSource.setPassword(passw);
final List<PropertyDescriptor> dynamicProperties = context.getProperties()
.keySet()
.stream()
.filter(PropertyDescriptor::isDynamic)
.collect(Collectors.toList());
dynamicProperties.forEach((descriptor) -> {
final PropertyValue propertyValue = context.getProperty(descriptor);
if (descriptor.isSensitive()) {
final String propertyName = StringUtils.substringAfter(descriptor.getName(), SENSITIVE_PROPERTY_PREFIX);
dataSource.addConnectionProperty(propertyName, propertyValue.getValue());
} else {
dataSource.addConnectionProperty(descriptor.getName(), propertyValue.evaluateAttributeExpressions().getValue());
}
});
}
private KerberosUser getKerberosUser(final ConfigurationContext context) {
KerberosUser kerberosUser = null;
final KerberosCredentialsService kerberosCredentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
final String kerberosPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
final String kerberosPassword = context.getProperty(KERBEROS_PASSWORD).getValue();
if (kerberosUserService != null) {
kerberosUser = kerberosUserService.createKerberosUser();
} else if (kerberosCredentialsService != null) {
kerberosUser = new KerberosKeytabUser(kerberosCredentialsService.getPrincipal(), kerberosCredentialsService.getKeytab());
} else if (!StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword)) {
kerberosUser = new KerberosPasswordUser(kerberosPrincipal, kerberosPassword);
}
return kerberosUser;
}
protected String getUrl(ConfigurationContext context) {
return context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue();
}
protected Driver getDriver(final String driverName, final String url) {
final Class<?> clazz;
try {
clazz = Class.forName(driverName);
} catch (final ClassNotFoundException e) {
throw new ProcessException("Driver class " + driverName + " is not found", e);
}
try {
return DriverManager.getDriver(url);
} catch (final SQLException e) {
// In case the driver is not registered by the implementation, we explicitly try to register it.
try {
final Driver driver = (Driver) clazz.newInstance();
DriverManager.registerDriver(driver);
return DriverManager.getDriver(url);
} catch (final SQLException e2) {
throw new ProcessException("No suitable driver for the given Database Connection URL", e2);
} catch (final IllegalAccessException | InstantiationException e2) {
throw new ProcessException("Creating driver instance is failed", e2);
}
}
}
protected Long extractMillisWithInfinite(PropertyValue prop) {
return "-1".equals(prop.getValue()) ? -1 : prop.asTimePeriod(TimeUnit.MILLISECONDS);
}
/**
* Shutdown pool, close all open connections.
* If a principal is authenticated with a KDC, that principal is logged out.
*
* If a @{@link LoginException} occurs while attempting to log out the @{@link org.apache.nifi.security.krb.KerberosUser},
* an attempt will still be made to shut down the pool and close open connections.
*
* @throws SQLException if there is an error while closing open connections
* @throws LoginException if there is an error during the principal log out, and will only be thrown if there was
* no exception while closing open connections
*/
@OnDisabled
public void shutdown() throws SQLException {
try {
this.shutdown(dataSource, kerberosUser);
} finally {
kerberosUser = null;
dataSource = null;
}
}
private void shutdown(final BasicDataSource dataSource, final KerberosUser kerberosUser) throws SQLException {
try {
if (kerberosUser != null) {
kerberosUser.logout();
}
} finally {
if (dataSource != null) {
dataSource.close();
}
}
}
@Override
public Connection getConnection() throws ProcessException {
return getConnection(dataSource, kerberosUser);
}
private Connection getConnection(final BasicDataSource dataSource, final KerberosUser kerberosUser) {
try {
final Connection con;
if (kerberosUser != null) {
KerberosAction<Connection> kerberosAction = new KerberosAction<>(kerberosUser, () -> dataSource.getConnection(), getLogger());
con = kerberosAction.execute();
} else {
con = dataSource.getConnection();
}
return con;
} catch (final SQLException e) {
// If using Kerberos, attempt to re-login
if (kerberosUser != null) {
try {
getLogger().info("Error getting connection, performing Kerberos re-login");
kerberosUser.login();
} catch (KerberosLoginException le) {
throw new ProcessException("Unable to authenticate Kerberos principal", le);
}
}
throw new ProcessException(e);
}
}
@Override
public String toString() {
return this.getClass().getSimpleName() + "[id=" + getIdentifier() + "]";
}
BasicDataSource getDataSource() {
return dataSource;
}

View File

@ -277,6 +277,12 @@
<version>1.19.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-api</artifactId>
<version>1.19.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-distributed-cache-protocol</artifactId>