NIFI-12890: Refactor HadoopDBCPConnectionPool to extend AbstractDBCPConnectionPool

This closes #8619.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
lehelb 2024-03-13 14:22:19 -05:00 committed by Peter Turcsanyi
parent d78e817fe8
commit 419a9cc73b
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
3 changed files with 135 additions and 259 deletions

View File

@ -28,6 +28,11 @@
<version>2.0.0-SNAPSHOT</version> <version>2.0.0-SNAPSHOT</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-base</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId> <artifactId>nifi-api</artifactId>

View File

@ -16,21 +16,6 @@
*/ */
package org.apache.nifi.dbcp; package org.apache.nifi.dbcp;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.PrivilegedExceptionAction;
import java.sql.Connection;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.login.LoginException;
import org.apache.commons.dbcp2.BasicDataSource; import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -45,15 +30,15 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled; import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.RequiredPermission; import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.ResourceCardinality; import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType; import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext; import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.dbcp.utils.DBCPProperties;
import org.apache.nifi.dbcp.utils.DataSourceConfiguration;
import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hadoop.SecurityUtil; import org.apache.nifi.hadoop.SecurityUtil;
@ -64,12 +49,42 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.krb.KerberosKeytabUser; import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosLoginException; import org.apache.nifi.security.krb.KerberosLoginException;
import org.apache.nifi.security.krb.KerberosPasswordUser; import org.apache.nifi.security.krb.KerberosPasswordUser;
import org.apache.nifi.security.krb.KerberosUser;
import javax.security.auth.login.LoginException;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.PrivilegedExceptionAction;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.nifi.dbcp.utils.DBCPProperties.DATABASE_URL;
import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_DRIVERNAME;
import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_PASSWORD;
import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_USER;
import static org.apache.nifi.dbcp.utils.DBCPProperties.EVICTION_RUN_PERIOD;
import static org.apache.nifi.dbcp.utils.DBCPProperties.KERBEROS_USER_SERVICE;
import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_CONN_LIFETIME;
import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_IDLE;
import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_TOTAL_CONNECTIONS;
import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_WAIT_TIME;
import static org.apache.nifi.dbcp.utils.DBCPProperties.MIN_EVICTABLE_IDLE_TIME;
import static org.apache.nifi.dbcp.utils.DBCPProperties.MIN_IDLE;
import static org.apache.nifi.dbcp.utils.DBCPProperties.SOFT_MIN_EVICTABLE_IDLE_TIME;
import static org.apache.nifi.dbcp.utils.DBCPProperties.VALIDATION_QUERY;
import static org.apache.nifi.dbcp.utils.DBCPProperties.extractMillisWithInfinite;
/** /**
* Implementation of Database Connection Pooling Service for Hadoop related JDBC Service. * Implementation of Database Connection Pooling Service for Hadoop related JDBC Service.
* Apache DBCP is used for connection pooling functionality. * Apache DBCP is used for connection pooling functionality.
*
*/ */
@RequiresInstanceClassLoading @RequiresInstanceClassLoading
@Tags({"dbcp", "jdbc", "database", "connection", "pooling", "store", "hadoop"}) @Tags({"dbcp", "jdbc", "database", "connection", "pooling", "store", "hadoop"})
@ -86,47 +101,19 @@ import org.apache.nifi.security.krb.KerberosUser;
) )
} }
) )
public class HadoopDBCPConnectionPool extends AbstractControllerService implements DBCPService { public class HadoopDBCPConnectionPool extends AbstractDBCPConnectionPool {
private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB"; private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB";
private static final String HADOOP_CONFIGURATION_CLASS = "org.apache.hadoop.conf.Configuration"; private static final String HADOOP_CONFIGURATION_CLASS = "org.apache.hadoop.conf.Configuration";
private static final String HADOOP_UGI_CLASS = "org.apache.hadoop.security.UserGroupInformation"; private static final String HADOOP_UGI_CLASS = "org.apache.hadoop.security.UserGroupInformation";
private static final String DEFAULT_MIN_IDLE = "0";
private static final String DEFAULT_MAX_IDLE = "8";
private static final String DEFAULT_MAX_CONN_LIFETIME = "-1";
private static final String DEFAULT_EVICTION_RUN_PERIOD = String.valueOf(-1L);
private static final String DEFAULT_MIN_EVICTABLE_IDLE_TIME = "30 mins";
private static final String DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME = String.valueOf(-1L);
public static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder()
.name("Database Connection URL")
.description("A database connection URL used to connect to a database. May contain database system name, host, port, database name and some parameters."
+ " The exact syntax of a database connection URL is specified by your DBMS.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
public static final PropertyDescriptor DB_DRIVERNAME = new PropertyDescriptor.Builder()
.name("Database Driver Class Name")
.description("Database driver class name")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
public static final PropertyDescriptor DB_DRIVER_LOCATION = new PropertyDescriptor.Builder() public static final PropertyDescriptor DB_DRIVER_LOCATION = new PropertyDescriptor.Builder()
.name("database-driver-locations") .fromPropertyDescriptor(DBCPProperties.DB_DRIVER_LOCATION)
.displayName("Database Driver Location(s)") .description("Comma-separated list of files/folders and/or URLs containing the driver JAR and its dependencies. " +
.description("Comma-separated list of files/folders and/or URLs containing the driver JAR and its dependencies (if any). " +
"For example '/var/tmp/phoenix-client.jar'. NOTE: It is required that the resources specified by this property provide " + "For example '/var/tmp/phoenix-client.jar'. NOTE: It is required that the resources specified by this property provide " +
"the classes from hadoop-common, such as Configuration and UserGroupInformation.") "the classes from hadoop-common, such as Configuration and UserGroupInformation.")
.required(true) .required(true)
.identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, ResourceType.DIRECTORY, ResourceType.URL)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.dynamicallyModifiesClasspath(true)
.build(); .build();
static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder() static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
@ -141,126 +128,6 @@ public class HadoopDBCPConnectionPool extends AbstractControllerService implemen
.dynamicallyModifiesClasspath(true) .dynamicallyModifiesClasspath(true)
.build(); .build();
public static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder()
.name("Database User")
.description("The user for the database")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
public static final PropertyDescriptor DB_PASSWORD = new PropertyDescriptor.Builder()
.name("Password")
.description("The password for the database user")
.required(false)
.sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
public static final PropertyDescriptor MAX_WAIT_TIME = new PropertyDescriptor.Builder()
.name("Max Wait Time")
.description("The maximum amount of time that the pool will wait (when there are no available connections) "
+ " for a connection to be returned before failing, or -1 to wait indefinitely. ")
.defaultValue("500 millis")
.required(true)
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.sensitive(false)
.build();
public static final PropertyDescriptor MAX_TOTAL_CONNECTIONS = new PropertyDescriptor.Builder()
.name("Max Total Connections")
.description("The maximum number of active connections that can be allocated from this pool at the same time, "
+ " or negative for no limit.")
.defaultValue("8")
.required(true)
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.sensitive(false)
.build();
public static final PropertyDescriptor VALIDATION_QUERY = new PropertyDescriptor.Builder()
.name("Validation-query")
.displayName("Validation query")
.description("Validation query used to validate connections before returning them. "
+ "When connection is invalid, it get's dropped and new valid connection will be returned. "
+ "Note!! Using validation might have some performance penalty.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
public static final PropertyDescriptor MIN_IDLE = new PropertyDescriptor.Builder()
.displayName("Minimum Idle Connections")
.name("dbcp-min-idle-conns")
.description("The minimum number of connections that can remain idle in the pool, without extra ones being " +
"created, or zero to create none.")
.defaultValue(DEFAULT_MIN_IDLE)
.required(false)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
public static final PropertyDescriptor MAX_IDLE = new PropertyDescriptor.Builder()
.displayName("Max Idle Connections")
.name("dbcp-max-idle-conns")
.description("The maximum number of connections that can remain idle in the pool, without extra ones being " +
"released, or negative for no limit.")
.defaultValue(DEFAULT_MAX_IDLE)
.required(false)
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
public static final PropertyDescriptor MAX_CONN_LIFETIME = new PropertyDescriptor.Builder()
.displayName("Max Connection Lifetime")
.name("dbcp-max-conn-lifetime")
.description("The maximum lifetime of a connection. After this time is exceeded the " +
"connection will fail the next activation, passivation or validation test. A value of zero or less " +
"means the connection has an infinite lifetime.")
.defaultValue(DEFAULT_MAX_CONN_LIFETIME)
.required(false)
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
public static final PropertyDescriptor EVICTION_RUN_PERIOD = new PropertyDescriptor.Builder()
.displayName("Time Between Eviction Runs")
.name("dbcp-time-between-eviction-runs")
.description("The time period to sleep between runs of the idle connection evictor thread. When " +
"non-positive, no idle connection evictor thread will be run.")
.defaultValue(DEFAULT_EVICTION_RUN_PERIOD)
.required(false)
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
public static final PropertyDescriptor MIN_EVICTABLE_IDLE_TIME = new PropertyDescriptor.Builder()
.displayName("Minimum Evictable Idle Time")
.name("dbcp-min-evictable-idle-time")
.description("The minimum amount of time a connection may sit idle in the pool before it is eligible for eviction.")
.defaultValue(DEFAULT_MIN_EVICTABLE_IDLE_TIME)
.required(false)
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
public static final PropertyDescriptor SOFT_MIN_EVICTABLE_IDLE_TIME = new PropertyDescriptor.Builder()
.displayName("Soft Minimum Evictable Idle Time")
.name("dbcp-soft-min-evictable-idle-time")
.description("The minimum amount of time a connection may sit idle in the pool before it is eligible for " +
"eviction by the idle connection evictor, with the extra condition that at least a minimum number of" +
" idle connections remain in the pool. When the not-soft version of this option is set to a positive" +
" value, it is examined first by the idle connection evictor: when idle connections are visited by " +
"the evictor, idle time is first compared against it (without considering the number of idle " +
"connections in the pool) and then against this soft option, including the minimum idle connections " +
"constraint.")
.defaultValue(DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME)
.required(false)
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder() public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
.name("kerberos-credentials-service") .name("kerberos-credentials-service")
.displayName("Kerberos Credentials Service") .displayName("Kerberos Credentials Service")
@ -269,21 +136,10 @@ public class HadoopDBCPConnectionPool extends AbstractControllerService implemen
.required(false) .required(false)
.build(); .build();
public static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
.name("kerberos-user-service")
.displayName("Kerberos User Service")
.description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos")
.identifiesControllerService(KerberosUserService.class)
.required(false)
.build();
private KerberosProperties kerberosProperties; private KerberosProperties kerberosProperties;
private List<PropertyDescriptor> properties; private List<PropertyDescriptor> properties;
private volatile BasicDataSource dataSource;
private volatile UserGroupInformation ugi; private volatile UserGroupInformation ugi;
private volatile KerberosUser kerberosUser;
private volatile Boolean foundHadoopDependencies; private volatile Boolean foundHadoopDependencies;
// Holder of cached Configuration information so validation does not reload the same config over and over // Holder of cached Configuration information so validation does not reload the same config over and over
@ -294,29 +150,28 @@ public class HadoopDBCPConnectionPool extends AbstractControllerService implemen
File kerberosConfigFile = context.getKerberosConfigurationFile(); File kerberosConfigFile = context.getKerberosConfigurationFile();
kerberosProperties = getKerberosProperties(kerberosConfigFile); kerberosProperties = getKerberosProperties(kerberosConfigFile);
final List<PropertyDescriptor> props = new ArrayList<>(); properties = Arrays.asList(
props.add(DATABASE_URL); DATABASE_URL,
props.add(DB_DRIVERNAME); DB_DRIVERNAME,
props.add(DB_DRIVER_LOCATION); DB_DRIVER_LOCATION,
props.add(HADOOP_CONFIGURATION_RESOURCES); HADOOP_CONFIGURATION_RESOURCES,
props.add(KERBEROS_USER_SERVICE); KERBEROS_USER_SERVICE,
props.add(KERBEROS_CREDENTIALS_SERVICE); KERBEROS_CREDENTIALS_SERVICE,
props.add(kerberosProperties.getKerberosPrincipal()); kerberosProperties.getKerberosPrincipal(),
props.add(kerberosProperties.getKerberosKeytab()); kerberosProperties.getKerberosKeytab(),
props.add(kerberosProperties.getKerberosPassword()); kerberosProperties.getKerberosPassword(),
props.add(DB_USER); DB_USER,
props.add(DB_PASSWORD); DB_PASSWORD,
props.add(MAX_WAIT_TIME); MAX_WAIT_TIME,
props.add(MAX_TOTAL_CONNECTIONS); MAX_TOTAL_CONNECTIONS,
props.add(VALIDATION_QUERY); VALIDATION_QUERY,
props.add(MIN_IDLE); MIN_IDLE,
props.add(MAX_IDLE); MAX_IDLE,
props.add(MAX_CONN_LIFETIME); MAX_CONN_LIFETIME,
props.add(EVICTION_RUN_PERIOD); EVICTION_RUN_PERIOD,
props.add(MIN_EVICTABLE_IDLE_TIME); MIN_EVICTABLE_IDLE_TIME,
props.add(SOFT_MIN_EVICTABLE_IDLE_TIME); SOFT_MIN_EVICTABLE_IDLE_TIME
);
properties = Collections.unmodifiableList(props);
} }
protected KerberosProperties getKerberosProperties(File kerberosConfigFile) { protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
@ -458,7 +313,7 @@ public class HadoopDBCPConnectionPool extends AbstractControllerService implemen
* Configures connection pool by creating an instance of the * Configures connection pool by creating an instance of the
* {@link BasicDataSource} based on configuration provided with * {@link BasicDataSource} based on configuration provided with
* {@link ConfigurationContext}. * {@link ConfigurationContext}.
* * <p>
* This operation makes no guarantees that the actual connection could be * This operation makes no guarantees that the actual connection could be
* made since the underlying system may still go off-line during normal * made since the underlying system may still go off-line during normal
* operation of the connection pool. * operation of the connection pool.
@ -498,64 +353,25 @@ public class HadoopDBCPConnectionPool extends AbstractControllerService implemen
if (resolvedKeytab != null) { if (resolvedKeytab != null) {
kerberosUser = new KerberosKeytabUser(resolvedPrincipal, resolvedKeytab); kerberosUser = new KerberosKeytabUser(resolvedPrincipal, resolvedKeytab);
getLogger().info("Security Enabled, logging in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab}); getLogger().info("Security Enabled, logging in as principal {} with keytab {}", resolvedPrincipal, resolvedKeytab);
} else if (explicitPassword != null) { } else if (explicitPassword != null) {
kerberosUser = new KerberosPasswordUser(resolvedPrincipal, explicitPassword); kerberosUser = new KerberosPasswordUser(resolvedPrincipal, explicitPassword);
getLogger().info("Security Enabled, logging in as principal {} with password", new Object[] {resolvedPrincipal}); getLogger().info("Security Enabled, logging in as principal {} with password", resolvedPrincipal);
} else { } else {
throw new IOException("Unable to authenticate with Kerberos, no keytab or password was provided"); throw new IOException("Unable to authenticate with Kerberos, no keytab or password was provided");
} }
ugi = SecurityUtil.getUgiForKerberosUser(hadoopConfig, kerberosUser); ugi = SecurityUtil.getUgiForKerberosUser(hadoopConfig, kerberosUser);
getLogger().info("Successfully logged in as principal " + resolvedPrincipal); getLogger().info("Successfully logged in as principal {}", resolvedPrincipal);
} else { } else {
getLogger().info("Simple Authentication"); getLogger().info("Simple Authentication");
} }
// Initialize the DataSource...
final String dbUrl = context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue();
final String driverName = context.getProperty(DB_DRIVERNAME).evaluateAttributeExpressions().getValue();
final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger();
final String validationQuery = context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue();
final Long maxWaitMillis = extractMillisWithInfinite(context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions());
final Integer minIdle = context.getProperty(MIN_IDLE).evaluateAttributeExpressions().asInteger();
final Integer maxIdle = context.getProperty(MAX_IDLE).evaluateAttributeExpressions().asInteger();
final Long maxConnLifetimeMillis = extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME).evaluateAttributeExpressions());
final Long timeBetweenEvictionRunsMillis = extractMillisWithInfinite(context.getProperty(EVICTION_RUN_PERIOD).evaluateAttributeExpressions());
final Long minEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
final Long softMinEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
dataSource = new BasicDataSource();
dataSource.setDriverClassName(driverName);
dataSource.setDriverClassLoader(this.getClass().getClassLoader());
dataSource.setUrl(dbUrl);
dataSource.setUsername(user);
dataSource.setPassword(passw);
dataSource.setMaxWait(Duration.ofMillis(maxWaitMillis));
dataSource.setMaxTotal(maxTotal);
dataSource.setMinIdle(minIdle);
dataSource.setMaxIdle(maxIdle);
dataSource.setMaxConn(Duration.ofMillis(maxConnLifetimeMillis));
dataSource.setDurationBetweenEvictionRuns(Duration.ofMillis(timeBetweenEvictionRunsMillis));
dataSource.setMinEvictableIdle(Duration.ofMillis(minEvictableIdleTimeMillis));
dataSource.setSoftMinEvictableIdle(Duration.ofMillis(softMinEvictableIdleTimeMillis));
if (StringUtils.isEmpty(validationQuery)) {
dataSource.setValidationQuery(validationQuery);
dataSource.setTestOnBorrow(true);
}
}
private Long extractMillisWithInfinite(PropertyValue prop) {
return "-1".equals(prop.getValue()) ? -1 : prop.asTimePeriod(TimeUnit.MILLISECONDS);
} }
/** /**
* Shutdown pool, close all open connections. * Shutdown pool, close all open connections.
* If a principal is authenticated with a KDC, that principal is logged out. * If a principal is authenticated with a KDC, that principal is logged out.
* * <p>
* If a @{@link LoginException} occurs while attempting to log out the @{@link org.apache.nifi.security.krb.KerberosUser}, * If a @{@link LoginException} occurs while attempting to log out the @{@link org.apache.nifi.security.krb.KerberosUser},
* an attempt will still be made to shut down the pool and close open connections. * an attempt will still be made to shut down the pool and close open connections.
* *
@ -582,6 +398,61 @@ public class HadoopDBCPConnectionPool extends AbstractControllerService implemen
} }
} }
@Override
protected Driver getDriver(String driverName, String url) {
final Class<?> clazz;
try {
clazz = Class.forName(driverName);
} catch (final ClassNotFoundException e) {
throw new ProcessException("Driver class " + driverName + " is not found", e);
}
try {
return DriverManager.getDriver(url);
} catch (final SQLException e) {
// In case the driver is not registered by the implementation, we explicitly try to register it.
try {
final Driver driver = (Driver) clazz.getDeclaredConstructor().newInstance();
DriverManager.registerDriver(driver);
return DriverManager.getDriver(url);
} catch (final SQLException e2) {
throw new ProcessException("No suitable driver for the given Database Connection URL", e2);
} catch (final Exception e2) {
throw new ProcessException("Creating driver instance is failed", e2);
}
}
}
@Override
protected DataSourceConfiguration getDataSourceConfiguration(ConfigurationContext context) {
final String url = context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue();
final String driverName = context.getProperty(DB_DRIVERNAME).evaluateAttributeExpressions().getValue();
final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
final String password = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger();
final String validationQuery = context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue();
final Long maxWaitMillis = extractMillisWithInfinite(context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions());
final Integer minIdle = context.getProperty(MIN_IDLE).evaluateAttributeExpressions().asInteger();
final Integer maxIdle = context.getProperty(MAX_IDLE).evaluateAttributeExpressions().asInteger();
final Long maxConnLifetimeMillis = extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME).evaluateAttributeExpressions());
final Long timeBetweenEvictionRunsMillis = extractMillisWithInfinite(context.getProperty(EVICTION_RUN_PERIOD).evaluateAttributeExpressions());
final Long minEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
final Long softMinEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
return new DataSourceConfiguration.Builder(url, driverName, user, password)
.validationQuery(validationQuery)
.maxWaitMillis(maxWaitMillis)
.maxTotal(maxTotal)
.minIdle(minIdle)
.maxIdle(maxIdle)
.maxConnLifetimeMillis(maxConnLifetimeMillis)
.timeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis)
.minEvictableIdleTimeMillis(minEvictableIdleTimeMillis)
.softMinEvictableIdleTimeMillis(softMinEvictableIdleTimeMillis)
.build();
}
@Override @Override
public Connection getConnection() throws ProcessException { public Connection getConnection() throws ProcessException {
try { try {
@ -591,9 +462,9 @@ public class HadoopDBCPConnectionPool extends AbstractControllerService implemen
getLogger().trace("getting UGI instance"); getLogger().trace("getting UGI instance");
if (kerberosUser != null) { if (kerberosUser != null) {
// if there's a KerberosUser associated with this UGI, check the TGT and relogin if it is close to expiring // if there's a KerberosUser associated with this UGI, check the TGT and relogin if it is close to expiring
getLogger().debug("kerberosUser is " + kerberosUser); getLogger().debug("kerberosUser is {}", kerberosUser);
try { try {
getLogger().debug("checking TGT on kerberosUser " + kerberosUser); getLogger().debug("checking TGT on kerberosUser {}", kerberosUser);
kerberosUser.checkTGTAndRelogin(); kerberosUser.checkTGTAndRelogin();
} catch (final KerberosLoginException e) { } catch (final KerberosLoginException e) {
throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e); throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e);
@ -619,7 +490,6 @@ public class HadoopDBCPConnectionPool extends AbstractControllerService implemen
return dataSource.getConnection(); return dataSource.getConnection();
} }
} catch (SQLException | IOException | InterruptedException e) { } catch (SQLException | IOException | InterruptedException e) {
getLogger().error("Error getting Connection: " + e.getMessage(), e);
throw new ProcessException(e); throw new ProcessException(e);
} }
} }

View File

@ -17,6 +17,7 @@
package org.apache.nifi.dbcp; package org.apache.nifi.dbcp;
import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.utils.DBCPProperties;
import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.kerberos.KerberosContext; import org.apache.nifi.kerberos.KerberosContext;
import org.apache.nifi.kerberos.KerberosCredentialsService; import org.apache.nifi.kerberos.KerberosCredentialsService;
@ -55,9 +56,9 @@ public class HadoopDBCPConnectionPoolTest {
// Configure minimum required properties.. // Configure minimum required properties..
final HadoopDBCPConnectionPool hadoopDBCPService = new TestableHadoopDBCPConnectionPool(true); final HadoopDBCPConnectionPool hadoopDBCPService = new TestableHadoopDBCPConnectionPool(true);
runner.addControllerService("hadoop-dbcp-service", hadoopDBCPService); runner.addControllerService("hadoop-dbcp-service", hadoopDBCPService);
runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.DATABASE_URL, "jdbc:phoenix:zk-host1,zk-host2:2181:/hbase"); runner.setProperty(hadoopDBCPService, DBCPProperties.DATABASE_URL, "jdbc:phoenix:zk-host1,zk-host2:2181:/hbase");
runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.DB_DRIVERNAME, "org.apache.phoenix.jdbc.PhoenixDriver"); runner.setProperty(hadoopDBCPService, DBCPProperties.DB_DRIVERNAME, "org.apache.phoenix.jdbc.PhoenixDriver");
runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.DB_DRIVER_LOCATION, "target"); runner.setProperty(hadoopDBCPService, DBCPProperties.DB_DRIVER_LOCATION, "target");
// Security is not enabled yet since no conf files provided, so should be valid // Security is not enabled yet since no conf files provided, so should be valid
runner.assertValid(hadoopDBCPService); runner.assertValid(hadoopDBCPService);
@ -100,7 +101,7 @@ public class HadoopDBCPConnectionPoolTest {
when(kerberosUserService.getIdentifier()).thenReturn("userService1"); when(kerberosUserService.getIdentifier()).thenReturn("userService1");
runner.addControllerService(kerberosUserService.getIdentifier(), kerberosUserService); runner.addControllerService(kerberosUserService.getIdentifier(), kerberosUserService);
runner.enableControllerService(kerberosUserService); runner.enableControllerService(kerberosUserService);
runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier()); runner.setProperty(hadoopDBCPService, DBCPProperties.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier());
runner.assertNotValid(hadoopDBCPService); runner.assertNotValid(hadoopDBCPService);
// Remove KerberosCredentialService, should be valid with only KerberosUserService // Remove KerberosCredentialService, should be valid with only KerberosUserService
@ -118,7 +119,7 @@ public class HadoopDBCPConnectionPoolTest {
runner.assertNotValid(hadoopDBCPService); runner.assertNotValid(hadoopDBCPService);
// Remove kerberos user service, should be valid // Remove kerberos user service, should be valid
runner.removeProperty(hadoopDBCPService, HadoopDBCPConnectionPool.KERBEROS_USER_SERVICE); runner.removeProperty(hadoopDBCPService, DBCPProperties.KERBEROS_USER_SERVICE);
runner.assertValid(hadoopDBCPService); runner.assertValid(hadoopDBCPService);
} }
@ -130,8 +131,8 @@ public class HadoopDBCPConnectionPoolTest {
// Configure minimum required properties.. // Configure minimum required properties..
final HadoopDBCPConnectionPool hadoopDBCPService = new TestableHadoopDBCPConnectionPool(false); final HadoopDBCPConnectionPool hadoopDBCPService = new TestableHadoopDBCPConnectionPool(false);
runner.addControllerService("hadoop-dbcp-service", hadoopDBCPService); runner.addControllerService("hadoop-dbcp-service", hadoopDBCPService);
runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.DATABASE_URL, "jdbc:phoenix:zk-host1,zk-host2:2181:/hbase"); runner.setProperty(hadoopDBCPService, DBCPProperties.DATABASE_URL, "jdbc:phoenix:zk-host1,zk-host2:2181:/hbase");
runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.DB_DRIVERNAME, "org.apache.phoenix.jdbc.PhoenixDriver"); runner.setProperty(hadoopDBCPService, DBCPProperties.DB_DRIVERNAME, "org.apache.phoenix.jdbc.PhoenixDriver");
runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.DB_DRIVER_LOCATION, "target"); runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.DB_DRIVER_LOCATION, "target");
// Security is not enabled yet since no conf files provided, so should be valid // Security is not enabled yet since no conf files provided, so should be valid