diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/main/java/org/apache/nifi/dbcp/HikariCPConnectionPool.java b/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/main/java/org/apache/nifi/dbcp/HikariCPConnectionPool.java index 230df5a6f3..59413b71e4 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/main/java/org/apache/nifi/dbcp/HikariCPConnectionPool.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/main/java/org/apache/nifi/dbcp/HikariCPConnectionPool.java @@ -27,6 +27,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnDisabled; import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.RequiredPermission; @@ -34,12 +35,15 @@ import org.apache.nifi.components.resource.ResourceCardinality; import org.apache.nifi.components.resource.ResourceType; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.VerifiableControllerService; import org.apache.nifi.expression.AttributeExpression; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.kerberos.KerberosUserService; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.security.krb.KerberosAction; +import org.apache.nifi.security.krb.KerberosLoginException; import org.apache.nifi.security.krb.KerberosUser; import javax.security.auth.login.LoginException; @@ -48,10 +52,14 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.apache.nifi.components.ConfigVerificationResult.Outcome.FAILED; +import static org.apache.nifi.components.ConfigVerificationResult.Outcome.SUCCESSFUL; + /** * Implementation of Database Connection Pooling Service. HikariCP is used for connection pooling functionality. */ @@ -71,7 +79,7 @@ import java.util.stream.Collectors; ) } ) -public class HikariCPConnectionPool extends AbstractControllerService implements DBCPService { +public class HikariCPConnectionPool extends AbstractControllerService implements DBCPService, VerifiableControllerService { /** * Property Name Prefix for Sensitive Dynamic Properties */ @@ -81,6 +89,8 @@ public class HikariCPConnectionPool extends AbstractControllerService implements private static final String DEFAULT_TOTAL_CONNECTIONS = "10"; private static final String DEFAULT_MAX_CONN_LIFETIME = "-1"; + private static final int DEFAULT_MIN_VALIDATION_TIMEOUT = 250; + public static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder() .name("hikaricp-connection-url") .displayName("Database Connection URL") @@ -254,58 +264,8 @@ public class HikariCPConnectionPool extends AbstractControllerService implements */ @OnEnabled public void onConfigured(final ConfigurationContext context) { - - final String driverName = context.getProperty(DB_DRIVERNAME).evaluateAttributeExpressions().getValue(); - final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue(); - final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue(); - final String dburl = context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue(); - final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger(); - final String validationQuery = context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue(); - final long maxWaitMillis = extractMillisWithInfinite(context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions()); - final int minIdle = context.getProperty(MIN_IDLE).evaluateAttributeExpressions().asInteger(); - final long maxConnLifetimeMillis = extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME).evaluateAttributeExpressions()); - final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class); - - if (kerberosUserService != null) { - kerberosUser = kerberosUserService.createKerberosUser(); - if (kerberosUser != null) { - kerberosUser.login(); - } - } - dataSource = new HikariDataSource(); - dataSource.setDriverClassName(driverName); - dataSource.setConnectionTimeout(maxWaitMillis); - dataSource.setMaximumPoolSize(maxTotal); - dataSource.setMinimumIdle(minIdle); - dataSource.setMaxLifetime(maxConnLifetimeMillis); - - if (validationQuery != null && !validationQuery.isEmpty()) { - dataSource.setConnectionTestQuery(validationQuery); - } - - dataSource.setJdbcUrl(dburl); - dataSource.setUsername(user); - dataSource.setPassword(passw); - - final List dynamicProperties = context.getProperties() - .keySet() - .stream() - .filter(PropertyDescriptor::isDynamic) - .collect(Collectors.toList()); - - Properties properties = dataSource.getDataSourceProperties(); - dynamicProperties.forEach((descriptor) -> { - final PropertyValue propertyValue = context.getProperty(descriptor); - if (descriptor.isSensitive()) { - final String propertyName = StringUtils.substringAfter(descriptor.getName(), SENSITIVE_PROPERTY_PREFIX); - properties.setProperty(propertyName, propertyValue.getValue()); - } else { - properties.setProperty(descriptor.getName(), propertyValue.evaluateAttributeExpressions().getValue()); - } - }); - dataSource.setDataSourceProperties(properties); - dataSource.setPoolName(toString()); + configureDataSource(context, dataSource); } private long extractMillisWithInfinite(PropertyValue prop) { @@ -343,7 +303,7 @@ public class HikariCPConnectionPool extends AbstractControllerService implements try { final Connection con; if (kerberosUser != null) { - KerberosAction kerberosAction = new KerberosAction<>(kerberosUser, () -> dataSource.getConnection(), getLogger()); + KerberosAction kerberosAction = new KerberosAction<>(kerberosUser, dataSource::getConnection, getLogger()); con = kerberosAction.execute(); } else { con = dataSource.getConnection(); @@ -360,6 +320,167 @@ public class HikariCPConnectionPool extends AbstractControllerService implements } @Override + public List verify(final ConfigurationContext context, final ComponentLog verificationLogger, final Map variables) { + List results = new ArrayList<>(); + final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class); + KerberosUser kerberosUser = null; + try { + if (kerberosUserService != null) { + kerberosUser = kerberosUserService.createKerberosUser(); + if (kerberosUser != null) { + results.add(new ConfigVerificationResult.Builder() + .verificationStepName("Configure Kerberos User") + .outcome(SUCCESSFUL) + .explanation("Successfully configured Kerberos user") + .build()); + } + } + } catch (final Exception e) { + verificationLogger.error("Failed to configure Kerberos user", e); + results.add(new ConfigVerificationResult.Builder() + .verificationStepName("Configure Kerberos User") + .outcome(FAILED) + .explanation("Failed to configure Kerberos user: " + e.getMessage()) + .build()); + } + + final HikariDataSource hikariDataSource = new HikariDataSource(); + try { + configureDataSource(context, hikariDataSource); + results.add(new ConfigVerificationResult.Builder() + .verificationStepName("Configure Data Source") + .outcome(SUCCESSFUL) + .explanation("Successfully configured data source") + .build()); + + try (final Connection conn = getConnection(hikariDataSource, kerberosUser)) { + results.add(new ConfigVerificationResult.Builder() + .verificationStepName("Establish Connection") + .outcome(SUCCESSFUL) + .explanation("Successfully established Database Connection") + .build()); + } catch (final Exception e) { + verificationLogger.error("Failed to establish Database Connection", e); + results.add(new ConfigVerificationResult.Builder() + .verificationStepName("Establish Connection") + .outcome(FAILED) + .explanation("Failed to establish Database Connection: " + e.getMessage()) + .build()); + } + } catch (final Exception e) { + String message = "Failed to configure Data Source."; + if (e.getCause() instanceof ClassNotFoundException) { + message += String.format(" Ensure changes to the '%s' property are applied before verifying", + DB_DRIVER_LOCATION.getDisplayName()); + } + verificationLogger.error(message, e); + results.add(new ConfigVerificationResult.Builder() + .verificationStepName("Configure Data Source") + .outcome(FAILED) + .explanation(message + ": " + e.getMessage()) + .build()); + } finally { + try { + shutdown(dataSource, kerberosUser); + } catch (final SQLException e) { + verificationLogger.error("Failed to shut down data source", e); + } + } + return results; + } + + protected void configureDataSource(final ConfigurationContext context, final HikariDataSource dataSource) { + final String driverName = context.getProperty(DB_DRIVERNAME).evaluateAttributeExpressions().getValue(); + final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue(); + final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue(); + final String dburl = context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue(); + final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger(); + final String validationQuery = context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue(); + final long maxWaitMillis = extractMillisWithInfinite(context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions()); + final int minIdle = context.getProperty(MIN_IDLE).evaluateAttributeExpressions().asInteger(); + final long maxConnLifetimeMillis = extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME).evaluateAttributeExpressions()); + + final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class); + + if (kerberosUserService != null) { + kerberosUser = kerberosUserService.createKerberosUser(); + if (kerberosUser != null) { + kerberosUser.login(); + } + } + + dataSource.setConnectionTimeout(maxWaitMillis); + dataSource.setValidationTimeout(Math.max(maxWaitMillis, DEFAULT_MIN_VALIDATION_TIMEOUT)); + dataSource.setMaximumPoolSize(maxTotal); + dataSource.setMinimumIdle(minIdle); + dataSource.setMaxLifetime(maxConnLifetimeMillis); + + if (validationQuery != null && !validationQuery.isEmpty()) { + dataSource.setConnectionTestQuery(validationQuery); + } + + dataSource.setDriverClassName(driverName); + dataSource.setJdbcUrl(dburl); + dataSource.setUsername(user); + dataSource.setPassword(passw); + + final List dynamicProperties = context.getProperties() + .keySet() + .stream() + .filter(PropertyDescriptor::isDynamic) + .collect(Collectors.toList()); + + Properties properties = dataSource.getDataSourceProperties(); + dynamicProperties.forEach((descriptor) -> { + final PropertyValue propertyValue = context.getProperty(descriptor); + if (descriptor.isSensitive()) { + final String propertyName = StringUtils.substringAfter(descriptor.getName(), SENSITIVE_PROPERTY_PREFIX); + properties.setProperty(propertyName, propertyValue.getValue()); + } else { + properties.setProperty(descriptor.getName(), propertyValue.evaluateAttributeExpressions().getValue()); + } + }); + dataSource.setDataSourceProperties(properties); + dataSource.setPoolName(toString()); + } + + private Connection getConnection(final HikariDataSource dataSource, final KerberosUser kerberosUser) { + try { + final Connection con; + if (kerberosUser != null) { + KerberosAction kerberosAction = new KerberosAction<>(kerberosUser, dataSource::getConnection, getLogger()); + con = kerberosAction.execute(); + } else { + con = dataSource.getConnection(); + } + return con; + } catch (final SQLException e) { + // If using Kerberos, attempt to re-login + if (kerberosUser != null) { + try { + getLogger().info("Error getting connection, performing Kerberos re-login", e); + kerberosUser.login(); + } catch (KerberosLoginException le) { + throw new ProcessException("Unable to authenticate Kerberos principal", le); + } + } + throw new ProcessException(e); + } + } + + private void shutdown(final HikariDataSource dataSource, final KerberosUser kerberosUser) throws SQLException { + try { + if (kerberosUser != null) { + kerberosUser.logout(); + } + } finally { + if (dataSource != null) { + dataSource.close(); + } + } + } + + @Override public String toString() { return String.format("%s[id=%s]", getClass().getSimpleName(), getIdentifier()); } diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/test/java/org/apache/nifi/dbcp/HikariCPConnectionPoolTest.java b/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/test/java/org/apache/nifi/dbcp/HikariCPConnectionPoolTest.java index 3abc75cb19..5d2f925a0a 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/test/java/org/apache/nifi/dbcp/HikariCPConnectionPoolTest.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/test/java/org/apache/nifi/dbcp/HikariCPConnectionPoolTest.java @@ -16,7 +16,11 @@ */ package org.apache.nifi.dbcp; +import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.ControllerServiceConfiguration; +import org.apache.nifi.util.MockConfigurationContext; +import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.NoOpProcessor; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -26,8 +30,14 @@ import org.junit.jupiter.api.Test; import java.sql.Connection; import java.sql.SQLException; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; public class HikariCPConnectionPoolTest { @@ -35,6 +45,10 @@ public class HikariCPConnectionPoolTest { private static final String INVALID_CONNECTION_URL = "jdbc:h2"; + private static final String DB_DRIVERNAME_VALUE = "jdbc:mock"; + + private static final String MAX_WAIT_TIME_VALUE = "5 s"; + private TestRunner runner; @BeforeEach @@ -134,11 +148,44 @@ public class HikariCPConnectionPoolTest { } } + @Test + void testVerifySuccessful() throws Exception { + final HikariCPConnectionPool service = new HikariCPConnectionPool(); + runner.addControllerService(SERVICE_ID, service); + final Connection mockConnection = mock(Connection.class); + MockDriver.setConnection(mockConnection); + setDatabaseProperties(service); + runner.setProperty(service, HikariCPConnectionPool.MAX_TOTAL_CONNECTIONS, "2"); + runner.enableControllerService(service); + runner.assertValid(service); + MockProcessContext processContext = (MockProcessContext) runner.getProcessContext(); + final ControllerServiceConfiguration configuration = processContext.getConfiguration(service.getIdentifier()); + final MockConfigurationContext configContext = new MockConfigurationContext(service, configuration.getProperties(), processContext, Collections.emptyMap()); + final List results = service.verify(configContext, runner.getLogger(), configContext.getAllProperties()); + + assertOutcomeSuccessful(results); + } + private void setDatabaseProperties(final HikariCPConnectionPool service) { - runner.setProperty(service, HikariCPConnectionPool.DATABASE_URL, "jdbc:mock"); + runner.setProperty(service, HikariCPConnectionPool.DATABASE_URL, DB_DRIVERNAME_VALUE); runner.setProperty(service, HikariCPConnectionPool.DB_DRIVERNAME, MockDriver.class.getName()); - runner.setProperty(service, HikariCPConnectionPool.MAX_WAIT_TIME, "5 s"); + runner.setProperty(service, HikariCPConnectionPool.MAX_WAIT_TIME, MAX_WAIT_TIME_VALUE); runner.setProperty(service, HikariCPConnectionPool.DB_USER, String.class.getSimpleName()); runner.setProperty(service, HikariCPConnectionPool.DB_PASSWORD, String.class.getName()); } + + private void assertOutcomeSuccessful(final List results) { + assertNotNull(results); + final Iterator resultsFound = results.iterator(); + + assertTrue(resultsFound.hasNext()); + final ConfigVerificationResult firstResult = resultsFound.next(); + assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, firstResult.getOutcome(), firstResult.getExplanation()); + + assertTrue(resultsFound.hasNext()); + final ConfigVerificationResult secondResult = resultsFound.next(); + assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, secondResult.getOutcome(), secondResult.getExplanation()); + + assertFalse(resultsFound.hasNext()); + } } \ No newline at end of file diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java index 5bec0ce078..9d5de109d6 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java @@ -55,7 +55,7 @@ public abstract class MockControllerServiceLookup implements ControllerServiceLo this.controllerServiceMap.putAll(other.controllerServiceMap); } - protected ControllerServiceConfiguration getConfiguration(final String identifier) { + public ControllerServiceConfiguration getConfiguration(final String identifier) { return controllerServiceMap.get(identifier); }