mirror of https://github.com/apache/nifi.git
NIFI-13555 Added Verification to HikariDBCPConnectionPool (#9085)
Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
da6c9c4791
commit
fd8acd57b8
|
@ -27,6 +27,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
|||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnDisabled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||
import org.apache.nifi.components.ConfigVerificationResult;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.PropertyValue;
|
||||
import org.apache.nifi.components.RequiredPermission;
|
||||
|
@ -34,12 +35,15 @@ import org.apache.nifi.components.resource.ResourceCardinality;
|
|||
import org.apache.nifi.components.resource.ResourceType;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.controller.VerifiableControllerService;
|
||||
import org.apache.nifi.expression.AttributeExpression;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.kerberos.KerberosUserService;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.security.krb.KerberosAction;
|
||||
import org.apache.nifi.security.krb.KerberosLoginException;
|
||||
import org.apache.nifi.security.krb.KerberosUser;
|
||||
|
||||
import javax.security.auth.login.LoginException;
|
||||
|
@ -48,10 +52,14 @@ import java.sql.SQLException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.nifi.components.ConfigVerificationResult.Outcome.FAILED;
|
||||
import static org.apache.nifi.components.ConfigVerificationResult.Outcome.SUCCESSFUL;
|
||||
|
||||
/**
|
||||
* Implementation of Database Connection Pooling Service. HikariCP is used for connection pooling functionality.
|
||||
*/
|
||||
|
@ -71,7 +79,7 @@ import java.util.stream.Collectors;
|
|||
)
|
||||
}
|
||||
)
|
||||
public class HikariCPConnectionPool extends AbstractControllerService implements DBCPService {
|
||||
public class HikariCPConnectionPool extends AbstractControllerService implements DBCPService, VerifiableControllerService {
|
||||
/**
|
||||
* Property Name Prefix for Sensitive Dynamic Properties
|
||||
*/
|
||||
|
@ -81,6 +89,8 @@ public class HikariCPConnectionPool extends AbstractControllerService implements
|
|||
private static final String DEFAULT_TOTAL_CONNECTIONS = "10";
|
||||
private static final String DEFAULT_MAX_CONN_LIFETIME = "-1";
|
||||
|
||||
private static final int DEFAULT_MIN_VALIDATION_TIMEOUT = 250;
|
||||
|
||||
public static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder()
|
||||
.name("hikaricp-connection-url")
|
||||
.displayName("Database Connection URL")
|
||||
|
@ -254,58 +264,8 @@ public class HikariCPConnectionPool extends AbstractControllerService implements
|
|||
*/
|
||||
@OnEnabled
|
||||
public void onConfigured(final ConfigurationContext context) {
|
||||
|
||||
final String driverName = context.getProperty(DB_DRIVERNAME).evaluateAttributeExpressions().getValue();
|
||||
final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
|
||||
final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
|
||||
final String dburl = context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue();
|
||||
final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger();
|
||||
final String validationQuery = context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue();
|
||||
final long maxWaitMillis = extractMillisWithInfinite(context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions());
|
||||
final int minIdle = context.getProperty(MIN_IDLE).evaluateAttributeExpressions().asInteger();
|
||||
final long maxConnLifetimeMillis = extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME).evaluateAttributeExpressions());
|
||||
final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
|
||||
|
||||
if (kerberosUserService != null) {
|
||||
kerberosUser = kerberosUserService.createKerberosUser();
|
||||
if (kerberosUser != null) {
|
||||
kerberosUser.login();
|
||||
}
|
||||
}
|
||||
|
||||
dataSource = new HikariDataSource();
|
||||
dataSource.setDriverClassName(driverName);
|
||||
dataSource.setConnectionTimeout(maxWaitMillis);
|
||||
dataSource.setMaximumPoolSize(maxTotal);
|
||||
dataSource.setMinimumIdle(minIdle);
|
||||
dataSource.setMaxLifetime(maxConnLifetimeMillis);
|
||||
|
||||
if (validationQuery != null && !validationQuery.isEmpty()) {
|
||||
dataSource.setConnectionTestQuery(validationQuery);
|
||||
}
|
||||
|
||||
dataSource.setJdbcUrl(dburl);
|
||||
dataSource.setUsername(user);
|
||||
dataSource.setPassword(passw);
|
||||
|
||||
final List<PropertyDescriptor> dynamicProperties = context.getProperties()
|
||||
.keySet()
|
||||
.stream()
|
||||
.filter(PropertyDescriptor::isDynamic)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
Properties properties = dataSource.getDataSourceProperties();
|
||||
dynamicProperties.forEach((descriptor) -> {
|
||||
final PropertyValue propertyValue = context.getProperty(descriptor);
|
||||
if (descriptor.isSensitive()) {
|
||||
final String propertyName = StringUtils.substringAfter(descriptor.getName(), SENSITIVE_PROPERTY_PREFIX);
|
||||
properties.setProperty(propertyName, propertyValue.getValue());
|
||||
} else {
|
||||
properties.setProperty(descriptor.getName(), propertyValue.evaluateAttributeExpressions().getValue());
|
||||
}
|
||||
});
|
||||
dataSource.setDataSourceProperties(properties);
|
||||
dataSource.setPoolName(toString());
|
||||
configureDataSource(context, dataSource);
|
||||
}
|
||||
|
||||
private long extractMillisWithInfinite(PropertyValue prop) {
|
||||
|
@ -343,7 +303,7 @@ public class HikariCPConnectionPool extends AbstractControllerService implements
|
|||
try {
|
||||
final Connection con;
|
||||
if (kerberosUser != null) {
|
||||
KerberosAction<Connection> kerberosAction = new KerberosAction<>(kerberosUser, () -> dataSource.getConnection(), getLogger());
|
||||
KerberosAction<Connection> kerberosAction = new KerberosAction<>(kerberosUser, dataSource::getConnection, getLogger());
|
||||
con = kerberosAction.execute();
|
||||
} else {
|
||||
con = dataSource.getConnection();
|
||||
|
@ -360,6 +320,167 @@ public class HikariCPConnectionPool extends AbstractControllerService implements
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<ConfigVerificationResult> verify(final ConfigurationContext context, final ComponentLog verificationLogger, final Map<String, String> variables) {
|
||||
List<ConfigVerificationResult> results = new ArrayList<>();
|
||||
final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
|
||||
KerberosUser kerberosUser = null;
|
||||
try {
|
||||
if (kerberosUserService != null) {
|
||||
kerberosUser = kerberosUserService.createKerberosUser();
|
||||
if (kerberosUser != null) {
|
||||
results.add(new ConfigVerificationResult.Builder()
|
||||
.verificationStepName("Configure Kerberos User")
|
||||
.outcome(SUCCESSFUL)
|
||||
.explanation("Successfully configured Kerberos user")
|
||||
.build());
|
||||
}
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
verificationLogger.error("Failed to configure Kerberos user", e);
|
||||
results.add(new ConfigVerificationResult.Builder()
|
||||
.verificationStepName("Configure Kerberos User")
|
||||
.outcome(FAILED)
|
||||
.explanation("Failed to configure Kerberos user: " + e.getMessage())
|
||||
.build());
|
||||
}
|
||||
|
||||
final HikariDataSource hikariDataSource = new HikariDataSource();
|
||||
try {
|
||||
configureDataSource(context, hikariDataSource);
|
||||
results.add(new ConfigVerificationResult.Builder()
|
||||
.verificationStepName("Configure Data Source")
|
||||
.outcome(SUCCESSFUL)
|
||||
.explanation("Successfully configured data source")
|
||||
.build());
|
||||
|
||||
try (final Connection conn = getConnection(hikariDataSource, kerberosUser)) {
|
||||
results.add(new ConfigVerificationResult.Builder()
|
||||
.verificationStepName("Establish Connection")
|
||||
.outcome(SUCCESSFUL)
|
||||
.explanation("Successfully established Database Connection")
|
||||
.build());
|
||||
} catch (final Exception e) {
|
||||
verificationLogger.error("Failed to establish Database Connection", e);
|
||||
results.add(new ConfigVerificationResult.Builder()
|
||||
.verificationStepName("Establish Connection")
|
||||
.outcome(FAILED)
|
||||
.explanation("Failed to establish Database Connection: " + e.getMessage())
|
||||
.build());
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
String message = "Failed to configure Data Source.";
|
||||
if (e.getCause() instanceof ClassNotFoundException) {
|
||||
message += String.format(" Ensure changes to the '%s' property are applied before verifying",
|
||||
DB_DRIVER_LOCATION.getDisplayName());
|
||||
}
|
||||
verificationLogger.error(message, e);
|
||||
results.add(new ConfigVerificationResult.Builder()
|
||||
.verificationStepName("Configure Data Source")
|
||||
.outcome(FAILED)
|
||||
.explanation(message + ": " + e.getMessage())
|
||||
.build());
|
||||
} finally {
|
||||
try {
|
||||
shutdown(dataSource, kerberosUser);
|
||||
} catch (final SQLException e) {
|
||||
verificationLogger.error("Failed to shut down data source", e);
|
||||
}
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
protected void configureDataSource(final ConfigurationContext context, final HikariDataSource dataSource) {
|
||||
final String driverName = context.getProperty(DB_DRIVERNAME).evaluateAttributeExpressions().getValue();
|
||||
final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
|
||||
final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
|
||||
final String dburl = context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue();
|
||||
final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger();
|
||||
final String validationQuery = context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue();
|
||||
final long maxWaitMillis = extractMillisWithInfinite(context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions());
|
||||
final int minIdle = context.getProperty(MIN_IDLE).evaluateAttributeExpressions().asInteger();
|
||||
final long maxConnLifetimeMillis = extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME).evaluateAttributeExpressions());
|
||||
|
||||
final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
|
||||
|
||||
if (kerberosUserService != null) {
|
||||
kerberosUser = kerberosUserService.createKerberosUser();
|
||||
if (kerberosUser != null) {
|
||||
kerberosUser.login();
|
||||
}
|
||||
}
|
||||
|
||||
dataSource.setConnectionTimeout(maxWaitMillis);
|
||||
dataSource.setValidationTimeout(Math.max(maxWaitMillis, DEFAULT_MIN_VALIDATION_TIMEOUT));
|
||||
dataSource.setMaximumPoolSize(maxTotal);
|
||||
dataSource.setMinimumIdle(minIdle);
|
||||
dataSource.setMaxLifetime(maxConnLifetimeMillis);
|
||||
|
||||
if (validationQuery != null && !validationQuery.isEmpty()) {
|
||||
dataSource.setConnectionTestQuery(validationQuery);
|
||||
}
|
||||
|
||||
dataSource.setDriverClassName(driverName);
|
||||
dataSource.setJdbcUrl(dburl);
|
||||
dataSource.setUsername(user);
|
||||
dataSource.setPassword(passw);
|
||||
|
||||
final List<PropertyDescriptor> dynamicProperties = context.getProperties()
|
||||
.keySet()
|
||||
.stream()
|
||||
.filter(PropertyDescriptor::isDynamic)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
Properties properties = dataSource.getDataSourceProperties();
|
||||
dynamicProperties.forEach((descriptor) -> {
|
||||
final PropertyValue propertyValue = context.getProperty(descriptor);
|
||||
if (descriptor.isSensitive()) {
|
||||
final String propertyName = StringUtils.substringAfter(descriptor.getName(), SENSITIVE_PROPERTY_PREFIX);
|
||||
properties.setProperty(propertyName, propertyValue.getValue());
|
||||
} else {
|
||||
properties.setProperty(descriptor.getName(), propertyValue.evaluateAttributeExpressions().getValue());
|
||||
}
|
||||
});
|
||||
dataSource.setDataSourceProperties(properties);
|
||||
dataSource.setPoolName(toString());
|
||||
}
|
||||
|
||||
private Connection getConnection(final HikariDataSource dataSource, final KerberosUser kerberosUser) {
|
||||
try {
|
||||
final Connection con;
|
||||
if (kerberosUser != null) {
|
||||
KerberosAction<Connection> kerberosAction = new KerberosAction<>(kerberosUser, dataSource::getConnection, getLogger());
|
||||
con = kerberosAction.execute();
|
||||
} else {
|
||||
con = dataSource.getConnection();
|
||||
}
|
||||
return con;
|
||||
} catch (final SQLException e) {
|
||||
// If using Kerberos, attempt to re-login
|
||||
if (kerberosUser != null) {
|
||||
try {
|
||||
getLogger().info("Error getting connection, performing Kerberos re-login", e);
|
||||
kerberosUser.login();
|
||||
} catch (KerberosLoginException le) {
|
||||
throw new ProcessException("Unable to authenticate Kerberos principal", le);
|
||||
}
|
||||
}
|
||||
throw new ProcessException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void shutdown(final HikariDataSource dataSource, final KerberosUser kerberosUser) throws SQLException {
|
||||
try {
|
||||
if (kerberosUser != null) {
|
||||
kerberosUser.logout();
|
||||
}
|
||||
} finally {
|
||||
if (dataSource != null) {
|
||||
dataSource.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("%s[id=%s]", getClass().getSimpleName(), getIdentifier());
|
||||
}
|
||||
|
|
|
@ -16,7 +16,11 @@
|
|||
*/
|
||||
package org.apache.nifi.dbcp;
|
||||
|
||||
import org.apache.nifi.components.ConfigVerificationResult;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.util.ControllerServiceConfiguration;
|
||||
import org.apache.nifi.util.MockConfigurationContext;
|
||||
import org.apache.nifi.util.MockProcessContext;
|
||||
import org.apache.nifi.util.NoOpProcessor;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
|
@ -26,8 +30,14 @@ import org.junit.jupiter.api.Test;
|
|||
|
||||
import java.sql.Connection;
|
||||
import java.sql.SQLException;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class HikariCPConnectionPoolTest {
|
||||
|
@ -35,6 +45,10 @@ public class HikariCPConnectionPoolTest {
|
|||
|
||||
private static final String INVALID_CONNECTION_URL = "jdbc:h2";
|
||||
|
||||
private static final String DB_DRIVERNAME_VALUE = "jdbc:mock";
|
||||
|
||||
private static final String MAX_WAIT_TIME_VALUE = "5 s";
|
||||
|
||||
private TestRunner runner;
|
||||
|
||||
@BeforeEach
|
||||
|
@ -134,11 +148,44 @@ public class HikariCPConnectionPoolTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void testVerifySuccessful() throws Exception {
|
||||
final HikariCPConnectionPool service = new HikariCPConnectionPool();
|
||||
runner.addControllerService(SERVICE_ID, service);
|
||||
final Connection mockConnection = mock(Connection.class);
|
||||
MockDriver.setConnection(mockConnection);
|
||||
setDatabaseProperties(service);
|
||||
runner.setProperty(service, HikariCPConnectionPool.MAX_TOTAL_CONNECTIONS, "2");
|
||||
runner.enableControllerService(service);
|
||||
runner.assertValid(service);
|
||||
MockProcessContext processContext = (MockProcessContext) runner.getProcessContext();
|
||||
final ControllerServiceConfiguration configuration = processContext.getConfiguration(service.getIdentifier());
|
||||
final MockConfigurationContext configContext = new MockConfigurationContext(service, configuration.getProperties(), processContext, Collections.emptyMap());
|
||||
final List<ConfigVerificationResult> results = service.verify(configContext, runner.getLogger(), configContext.getAllProperties());
|
||||
|
||||
assertOutcomeSuccessful(results);
|
||||
}
|
||||
|
||||
private void setDatabaseProperties(final HikariCPConnectionPool service) {
|
||||
runner.setProperty(service, HikariCPConnectionPool.DATABASE_URL, "jdbc:mock");
|
||||
runner.setProperty(service, HikariCPConnectionPool.DATABASE_URL, DB_DRIVERNAME_VALUE);
|
||||
runner.setProperty(service, HikariCPConnectionPool.DB_DRIVERNAME, MockDriver.class.getName());
|
||||
runner.setProperty(service, HikariCPConnectionPool.MAX_WAIT_TIME, "5 s");
|
||||
runner.setProperty(service, HikariCPConnectionPool.MAX_WAIT_TIME, MAX_WAIT_TIME_VALUE);
|
||||
runner.setProperty(service, HikariCPConnectionPool.DB_USER, String.class.getSimpleName());
|
||||
runner.setProperty(service, HikariCPConnectionPool.DB_PASSWORD, String.class.getName());
|
||||
}
|
||||
|
||||
private void assertOutcomeSuccessful(final List<ConfigVerificationResult> results) {
|
||||
assertNotNull(results);
|
||||
final Iterator<ConfigVerificationResult> resultsFound = results.iterator();
|
||||
|
||||
assertTrue(resultsFound.hasNext());
|
||||
final ConfigVerificationResult firstResult = resultsFound.next();
|
||||
assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, firstResult.getOutcome(), firstResult.getExplanation());
|
||||
|
||||
assertTrue(resultsFound.hasNext());
|
||||
final ConfigVerificationResult secondResult = resultsFound.next();
|
||||
assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, secondResult.getOutcome(), secondResult.getExplanation());
|
||||
|
||||
assertFalse(resultsFound.hasNext());
|
||||
}
|
||||
}
|
|
@ -55,7 +55,7 @@ public abstract class MockControllerServiceLookup implements ControllerServiceLo
|
|||
this.controllerServiceMap.putAll(other.controllerServiceMap);
|
||||
}
|
||||
|
||||
protected ControllerServiceConfiguration getConfiguration(final String identifier) {
|
||||
public ControllerServiceConfiguration getConfiguration(final String identifier) {
|
||||
return controllerServiceMap.get(identifier);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue