* This operation makes no guarantees that the actual connection could be * made since the underlying system may still go off-line during normal * operation of the connection pool. @@ -498,64 +353,25 @@ public class HadoopDBCPConnectionPool extends AbstractControllerService implemen if (resolvedKeytab != null) { kerberosUser = new KerberosKeytabUser(resolvedPrincipal, resolvedKeytab); - getLogger().info("Security Enabled, logging in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab}); + getLogger().info("Security Enabled, logging in as principal {} with keytab {}", resolvedPrincipal, resolvedKeytab); } else if (explicitPassword != null) { kerberosUser = new KerberosPasswordUser(resolvedPrincipal, explicitPassword); - getLogger().info("Security Enabled, logging in as principal {} with password", new Object[] {resolvedPrincipal}); + getLogger().info("Security Enabled, logging in as principal {} with password", resolvedPrincipal); } else { throw new IOException("Unable to authenticate with Kerberos, no keytab or password was provided"); } ugi = SecurityUtil.getUgiForKerberosUser(hadoopConfig, kerberosUser); - getLogger().info("Successfully logged in as principal " + resolvedPrincipal); + getLogger().info("Successfully logged in as principal {}", resolvedPrincipal); } else { getLogger().info("Simple Authentication"); } - - // Initialize the DataSource... - final String dbUrl = context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue(); - final String driverName = context.getProperty(DB_DRIVERNAME).evaluateAttributeExpressions().getValue(); - final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue(); - final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue(); - final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger(); - final String validationQuery = context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue(); - final Long maxWaitMillis = extractMillisWithInfinite(context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions()); - final Integer minIdle = context.getProperty(MIN_IDLE).evaluateAttributeExpressions().asInteger(); - final Integer maxIdle = context.getProperty(MAX_IDLE).evaluateAttributeExpressions().asInteger(); - final Long maxConnLifetimeMillis = extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME).evaluateAttributeExpressions()); - final Long timeBetweenEvictionRunsMillis = extractMillisWithInfinite(context.getProperty(EVICTION_RUN_PERIOD).evaluateAttributeExpressions()); - final Long minEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions()); - final Long softMinEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions()); - - dataSource = new BasicDataSource(); - dataSource.setDriverClassName(driverName); - dataSource.setDriverClassLoader(this.getClass().getClassLoader()); - dataSource.setUrl(dbUrl); - dataSource.setUsername(user); - dataSource.setPassword(passw); - dataSource.setMaxWait(Duration.ofMillis(maxWaitMillis)); - dataSource.setMaxTotal(maxTotal); - dataSource.setMinIdle(minIdle); - dataSource.setMaxIdle(maxIdle); - dataSource.setMaxConn(Duration.ofMillis(maxConnLifetimeMillis)); - dataSource.setDurationBetweenEvictionRuns(Duration.ofMillis(timeBetweenEvictionRunsMillis)); - dataSource.setMinEvictableIdle(Duration.ofMillis(minEvictableIdleTimeMillis)); - dataSource.setSoftMinEvictableIdle(Duration.ofMillis(softMinEvictableIdleTimeMillis)); - - if (StringUtils.isEmpty(validationQuery)) { - dataSource.setValidationQuery(validationQuery); - dataSource.setTestOnBorrow(true); - } - } - - private Long extractMillisWithInfinite(PropertyValue prop) { - return "-1".equals(prop.getValue()) ? -1 : prop.asTimePeriod(TimeUnit.MILLISECONDS); } /** * Shutdown pool, close all open connections. * If a principal is authenticated with a KDC, that principal is logged out. - * + *
* If a @{@link LoginException} occurs while attempting to log out the @{@link org.apache.nifi.security.krb.KerberosUser}, * an attempt will still be made to shut down the pool and close open connections. * @@ -582,6 +398,61 @@ public class HadoopDBCPConnectionPool extends AbstractControllerService implemen } } + @Override + protected Driver getDriver(String driverName, String url) { + final Class> clazz; + + try { + clazz = Class.forName(driverName); + } catch (final ClassNotFoundException e) { + throw new ProcessException("Driver class " + driverName + " is not found", e); + } + + try { + return DriverManager.getDriver(url); + } catch (final SQLException e) { + // In case the driver is not registered by the implementation, we explicitly try to register it. + try { + final Driver driver = (Driver) clazz.getDeclaredConstructor().newInstance(); + DriverManager.registerDriver(driver); + return DriverManager.getDriver(url); + } catch (final SQLException e2) { + throw new ProcessException("No suitable driver for the given Database Connection URL", e2); + } catch (final Exception e2) { + throw new ProcessException("Creating driver instance is failed", e2); + } + } + } + + @Override + protected DataSourceConfiguration getDataSourceConfiguration(ConfigurationContext context) { + final String url = context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue(); + final String driverName = context.getProperty(DB_DRIVERNAME).evaluateAttributeExpressions().getValue(); + final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue(); + final String password = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue(); + final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger(); + final String validationQuery = context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue(); + final Long maxWaitMillis = extractMillisWithInfinite(context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions()); + final Integer minIdle = context.getProperty(MIN_IDLE).evaluateAttributeExpressions().asInteger(); + final Integer maxIdle = context.getProperty(MAX_IDLE).evaluateAttributeExpressions().asInteger(); + final Long maxConnLifetimeMillis = extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME).evaluateAttributeExpressions()); + final Long timeBetweenEvictionRunsMillis = extractMillisWithInfinite(context.getProperty(EVICTION_RUN_PERIOD).evaluateAttributeExpressions()); + final Long minEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions()); + final Long softMinEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions()); + + return new DataSourceConfiguration.Builder(url, driverName, user, password) + .validationQuery(validationQuery) + .maxWaitMillis(maxWaitMillis) + .maxTotal(maxTotal) + .minIdle(minIdle) + .maxIdle(maxIdle) + .maxConnLifetimeMillis(maxConnLifetimeMillis) + .timeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis) + .minEvictableIdleTimeMillis(minEvictableIdleTimeMillis) + .softMinEvictableIdleTimeMillis(softMinEvictableIdleTimeMillis) + .build(); + } + @Override public Connection getConnection() throws ProcessException { try { @@ -591,9 +462,9 @@ public class HadoopDBCPConnectionPool extends AbstractControllerService implemen getLogger().trace("getting UGI instance"); if (kerberosUser != null) { // if there's a KerberosUser associated with this UGI, check the TGT and relogin if it is close to expiring - getLogger().debug("kerberosUser is " + kerberosUser); + getLogger().debug("kerberosUser is {}", kerberosUser); try { - getLogger().debug("checking TGT on kerberosUser " + kerberosUser); + getLogger().debug("checking TGT on kerberosUser {}", kerberosUser); kerberosUser.checkTGTAndRelogin(); } catch (final KerberosLoginException e) { throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e); @@ -619,7 +490,6 @@ public class HadoopDBCPConnectionPool extends AbstractControllerService implemen return dataSource.getConnection(); } } catch (SQLException | IOException | InterruptedException e) { - getLogger().error("Error getting Connection: " + e.getMessage(), e); throw new ProcessException(e); } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPoolTest.java b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPoolTest.java index a6d3b2c259..b1ab4be992 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPoolTest.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPoolTest.java @@ -17,6 +17,7 @@ package org.apache.nifi.dbcp; import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.dbcp.utils.DBCPProperties; import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.kerberos.KerberosContext; import org.apache.nifi.kerberos.KerberosCredentialsService; @@ -55,9 +56,9 @@ public class HadoopDBCPConnectionPoolTest { // Configure minimum required properties.. final HadoopDBCPConnectionPool hadoopDBCPService = new TestableHadoopDBCPConnectionPool(true); runner.addControllerService("hadoop-dbcp-service", hadoopDBCPService); - runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.DATABASE_URL, "jdbc:phoenix:zk-host1,zk-host2:2181:/hbase"); - runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.DB_DRIVERNAME, "org.apache.phoenix.jdbc.PhoenixDriver"); - runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.DB_DRIVER_LOCATION, "target"); + runner.setProperty(hadoopDBCPService, DBCPProperties.DATABASE_URL, "jdbc:phoenix:zk-host1,zk-host2:2181:/hbase"); + runner.setProperty(hadoopDBCPService, DBCPProperties.DB_DRIVERNAME, "org.apache.phoenix.jdbc.PhoenixDriver"); + runner.setProperty(hadoopDBCPService, DBCPProperties.DB_DRIVER_LOCATION, "target"); // Security is not enabled yet since no conf files provided, so should be valid runner.assertValid(hadoopDBCPService); @@ -100,7 +101,7 @@ public class HadoopDBCPConnectionPoolTest { when(kerberosUserService.getIdentifier()).thenReturn("userService1"); runner.addControllerService(kerberosUserService.getIdentifier(), kerberosUserService); runner.enableControllerService(kerberosUserService); - runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier()); + runner.setProperty(hadoopDBCPService, DBCPProperties.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier()); runner.assertNotValid(hadoopDBCPService); // Remove KerberosCredentialService, should be valid with only KerberosUserService @@ -118,7 +119,7 @@ public class HadoopDBCPConnectionPoolTest { runner.assertNotValid(hadoopDBCPService); // Remove kerberos user service, should be valid - runner.removeProperty(hadoopDBCPService, HadoopDBCPConnectionPool.KERBEROS_USER_SERVICE); + runner.removeProperty(hadoopDBCPService, DBCPProperties.KERBEROS_USER_SERVICE); runner.assertValid(hadoopDBCPService); } @@ -130,8 +131,8 @@ public class HadoopDBCPConnectionPoolTest { // Configure minimum required properties.. final HadoopDBCPConnectionPool hadoopDBCPService = new TestableHadoopDBCPConnectionPool(false); runner.addControllerService("hadoop-dbcp-service", hadoopDBCPService); - runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.DATABASE_URL, "jdbc:phoenix:zk-host1,zk-host2:2181:/hbase"); - runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.DB_DRIVERNAME, "org.apache.phoenix.jdbc.PhoenixDriver"); + runner.setProperty(hadoopDBCPService, DBCPProperties.DATABASE_URL, "jdbc:phoenix:zk-host1,zk-host2:2181:/hbase"); + runner.setProperty(hadoopDBCPService, DBCPProperties.DB_DRIVERNAME, "org.apache.phoenix.jdbc.PhoenixDriver"); runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.DB_DRIVER_LOCATION, "target"); // Security is not enabled yet since no conf files provided, so should be valid