mirror of https://github.com/apache/nifi.git
NIFI-9762: Adding DBCPConnectionPool config verification
Relaxing MockPropertyValue validation to allow for variables to be passed to config verification Fixing underlying framework issue with config verification: wrong variable registry was being used Signed-off-by: Matthew Burgess <mattyb149@apache.org> This closes #5843
This commit is contained in:
parent
fc5c810de7
commit
743020eeb4
|
@ -410,7 +410,7 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
|
||||||
|
|
||||||
final ParameterLookup parameterLookup = serviceNode.getProcessGroup() == null ? ParameterLookup.EMPTY : serviceNode.getProcessGroup().getParameterContext();
|
final ParameterLookup parameterLookup = serviceNode.getProcessGroup() == null ? ParameterLookup.EMPTY : serviceNode.getProcessGroup().getParameterContext();
|
||||||
final ConfigurationContext configurationContext = new StandardConfigurationContext(serviceNode, properties, serviceNode.getAnnotationData(),
|
final ConfigurationContext configurationContext = new StandardConfigurationContext(serviceNode, properties, serviceNode.getAnnotationData(),
|
||||||
parameterLookup, flowController.getControllerServiceProvider(), null, flowController.getVariableRegistry());
|
parameterLookup, flowController.getControllerServiceProvider(), null, serviceNode.getProcessGroup().getVariableRegistry());
|
||||||
|
|
||||||
final List<ConfigVerificationResult> verificationResults = serviceNode.verifyConfiguration(configurationContext, configVerificationLog, variables, extensionManager);
|
final List<ConfigVerificationResult> verificationResults = serviceNode.verifyConfiguration(configurationContext, configVerificationLog, variables, extensionManager);
|
||||||
final List<ConfigVerificationResultDTO> resultsDtos = verificationResults.stream()
|
final List<ConfigVerificationResultDTO> resultsDtos = verificationResults.stream()
|
||||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnDisabled;
|
import org.apache.nifi.annotation.lifecycle.OnDisabled;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||||
|
import org.apache.nifi.components.ConfigVerificationResult;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.components.PropertyValue;
|
import org.apache.nifi.components.PropertyValue;
|
||||||
import org.apache.nifi.components.ValidationContext;
|
import org.apache.nifi.components.ValidationContext;
|
||||||
|
@ -34,10 +35,12 @@ import org.apache.nifi.components.resource.ResourceCardinality;
|
||||||
import org.apache.nifi.components.resource.ResourceType;
|
import org.apache.nifi.components.resource.ResourceType;
|
||||||
import org.apache.nifi.controller.AbstractControllerService;
|
import org.apache.nifi.controller.AbstractControllerService;
|
||||||
import org.apache.nifi.controller.ConfigurationContext;
|
import org.apache.nifi.controller.ConfigurationContext;
|
||||||
|
import org.apache.nifi.controller.VerifiableControllerService;
|
||||||
import org.apache.nifi.expression.AttributeExpression;
|
import org.apache.nifi.expression.AttributeExpression;
|
||||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||||
import org.apache.nifi.kerberos.KerberosCredentialsService;
|
import org.apache.nifi.kerberos.KerberosCredentialsService;
|
||||||
import org.apache.nifi.kerberos.KerberosUserService;
|
import org.apache.nifi.kerberos.KerberosUserService;
|
||||||
|
import org.apache.nifi.logging.ComponentLog;
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
import org.apache.nifi.reporting.InitializationException;
|
import org.apache.nifi.reporting.InitializationException;
|
||||||
|
@ -56,9 +59,13 @@ import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static org.apache.nifi.components.ConfigVerificationResult.Outcome.FAILED;
|
||||||
|
import static org.apache.nifi.components.ConfigVerificationResult.Outcome.SUCCESSFUL;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implementation of for Database Connection Pooling Service. Apache DBCP is used for connection pooling functionality.
|
* Implementation of for Database Connection Pooling Service. Apache DBCP is used for connection pooling functionality.
|
||||||
*
|
*
|
||||||
|
@ -76,7 +83,7 @@ import java.util.stream.Collectors;
|
||||||
description = "JDBC driver property name prefixed with 'SENSITIVE.' handled as a sensitive property.")
|
description = "JDBC driver property name prefixed with 'SENSITIVE.' handled as a sensitive property.")
|
||||||
})
|
})
|
||||||
@RequiresInstanceClassLoading
|
@RequiresInstanceClassLoading
|
||||||
public class DBCPConnectionPool extends AbstractControllerService implements DBCPService {
|
public class DBCPConnectionPool extends AbstractControllerService implements DBCPService, VerifiableControllerService {
|
||||||
/** Property Name Prefix for Sensitive Dynamic Properties */
|
/** Property Name Prefix for Sensitive Dynamic Properties */
|
||||||
protected static final String SENSITIVE_PROPERTY_PREFIX = "SENSITIVE.";
|
protected static final String SENSITIVE_PROPERTY_PREFIX = "SENSITIVE.";
|
||||||
|
|
||||||
|
@ -398,6 +405,75 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
|
||||||
return results;
|
return results;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<ConfigVerificationResult> verify(final ConfigurationContext context, final ComponentLog verificationLogger, final Map<String, String> variables) {
|
||||||
|
List<ConfigVerificationResult> results = new ArrayList<>();
|
||||||
|
|
||||||
|
KerberosUser kerberosUser = null;
|
||||||
|
try {
|
||||||
|
kerberosUser = getKerberosUser(context);
|
||||||
|
if (kerberosUser != null) {
|
||||||
|
results.add(new ConfigVerificationResult.Builder()
|
||||||
|
.verificationStepName("Configure Kerberos User")
|
||||||
|
.outcome(SUCCESSFUL)
|
||||||
|
.explanation("Successfully configured Kerberos user")
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
} catch (final Exception e) {
|
||||||
|
verificationLogger.error("Failed to configure Kerberos user", e);
|
||||||
|
results.add(new ConfigVerificationResult.Builder()
|
||||||
|
.verificationStepName("Configure Kerberos User")
|
||||||
|
.outcome(FAILED)
|
||||||
|
.explanation("Failed to configure Kerberos user: " + e.getMessage())
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
|
||||||
|
final BasicDataSource dataSource = new BasicDataSource();
|
||||||
|
try {
|
||||||
|
configureDataSource(dataSource, kerberosUser, context);
|
||||||
|
results.add(new ConfigVerificationResult.Builder()
|
||||||
|
.verificationStepName("Configure Data Source")
|
||||||
|
.outcome(SUCCESSFUL)
|
||||||
|
.explanation("Successfully configured data source")
|
||||||
|
.build());
|
||||||
|
|
||||||
|
try (final Connection conn = getConnection(dataSource, kerberosUser)) {
|
||||||
|
results.add(new ConfigVerificationResult.Builder()
|
||||||
|
.verificationStepName("Establish Connection")
|
||||||
|
.outcome(SUCCESSFUL)
|
||||||
|
.explanation("Successfully established Database Connection")
|
||||||
|
.build());
|
||||||
|
} catch (final Exception e) {
|
||||||
|
verificationLogger.error("Failed to establish Database Connection", e);
|
||||||
|
results.add(new ConfigVerificationResult.Builder()
|
||||||
|
.verificationStepName("Establish Connection")
|
||||||
|
.outcome(FAILED)
|
||||||
|
.explanation("Failed to establish Database Connection: " + e.getMessage())
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
} catch (final Exception e) {
|
||||||
|
String message = "Failed to configure Data Source.";
|
||||||
|
if (e.getCause() instanceof ClassNotFoundException) {
|
||||||
|
message += String.format(" Ensure changes to the '%s' property are applied before verifying",
|
||||||
|
DB_DRIVER_LOCATION.getDisplayName());
|
||||||
|
}
|
||||||
|
verificationLogger.error(message, e);
|
||||||
|
results.add(new ConfigVerificationResult.Builder()
|
||||||
|
.verificationStepName("Configure Data Source")
|
||||||
|
.outcome(FAILED)
|
||||||
|
.explanation(message + ": " + e.getMessage())
|
||||||
|
.build());
|
||||||
|
} finally {
|
||||||
|
try {
|
||||||
|
shutdown(dataSource, kerberosUser);
|
||||||
|
} catch (final SQLException e) {
|
||||||
|
verificationLogger.error("Failed to shut down data source", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Configures connection pool by creating an instance of the
|
* Configures connection pool by creating an instance of the
|
||||||
* {@link BasicDataSource} based on configuration provided with
|
* {@link BasicDataSource} based on configuration provided with
|
||||||
|
@ -414,6 +490,13 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
|
||||||
*/
|
*/
|
||||||
@OnEnabled
|
@OnEnabled
|
||||||
public void onConfigured(final ConfigurationContext context) throws InitializationException {
|
public void onConfigured(final ConfigurationContext context) throws InitializationException {
|
||||||
|
kerberosUser = getKerberosUser(context);
|
||||||
|
dataSource = new BasicDataSource();
|
||||||
|
configureDataSource(dataSource, kerberosUser, context);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void configureDataSource(final BasicDataSource dataSource, final KerberosUser kerberosUser,
|
||||||
|
final ConfigurationContext context) throws InitializationException {
|
||||||
final String dburl = getUrl(context);
|
final String dburl = getUrl(context);
|
||||||
|
|
||||||
final String driverName = context.getProperty(DB_DRIVERNAME).evaluateAttributeExpressions().getValue();
|
final String driverName = context.getProperty(DB_DRIVERNAME).evaluateAttributeExpressions().getValue();
|
||||||
|
@ -428,18 +511,6 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
|
||||||
final Long timeBetweenEvictionRunsMillis = extractMillisWithInfinite(context.getProperty(EVICTION_RUN_PERIOD).evaluateAttributeExpressions());
|
final Long timeBetweenEvictionRunsMillis = extractMillisWithInfinite(context.getProperty(EVICTION_RUN_PERIOD).evaluateAttributeExpressions());
|
||||||
final Long minEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
|
final Long minEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
|
||||||
final Long softMinEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
|
final Long softMinEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
|
||||||
final KerberosCredentialsService kerberosCredentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
|
|
||||||
final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
|
|
||||||
final String kerberosPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
|
|
||||||
final String kerberosPassword = context.getProperty(KERBEROS_PASSWORD).getValue();
|
|
||||||
|
|
||||||
if (kerberosUserService != null) {
|
|
||||||
kerberosUser = kerberosUserService.createKerberosUser();
|
|
||||||
} else if (kerberosCredentialsService != null) {
|
|
||||||
kerberosUser = new KerberosKeytabUser(kerberosCredentialsService.getPrincipal(), kerberosCredentialsService.getKeytab());
|
|
||||||
} else if (!StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword)) {
|
|
||||||
kerberosUser = new KerberosPasswordUser(kerberosPrincipal, kerberosPassword);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (kerberosUser != null) {
|
if (kerberosUser != null) {
|
||||||
try {
|
try {
|
||||||
|
@ -449,7 +520,6 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
dataSource = new BasicDataSource();
|
|
||||||
dataSource.setDriver(getDriver(driverName, dburl));
|
dataSource.setDriver(getDriver(driverName, dburl));
|
||||||
dataSource.setMaxWaitMillis(maxWaitMillis);
|
dataSource.setMaxWaitMillis(maxWaitMillis);
|
||||||
dataSource.setMaxTotal(maxTotal);
|
dataSource.setMaxTotal(maxTotal);
|
||||||
|
@ -460,7 +530,7 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
|
||||||
dataSource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
|
dataSource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
|
||||||
dataSource.setSoftMinEvictableIdleTimeMillis(softMinEvictableIdleTimeMillis);
|
dataSource.setSoftMinEvictableIdleTimeMillis(softMinEvictableIdleTimeMillis);
|
||||||
|
|
||||||
if (validationQuery!=null && !validationQuery.isEmpty()) {
|
if (validationQuery != null && !validationQuery.isEmpty()) {
|
||||||
dataSource.setValidationQuery(validationQuery);
|
dataSource.setValidationQuery(validationQuery);
|
||||||
dataSource.setTestOnBorrow(true);
|
dataSource.setTestOnBorrow(true);
|
||||||
}
|
}
|
||||||
|
@ -486,6 +556,23 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private KerberosUser getKerberosUser(final ConfigurationContext context) {
|
||||||
|
KerberosUser kerberosUser = null;
|
||||||
|
final KerberosCredentialsService kerberosCredentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
|
||||||
|
final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
|
||||||
|
final String kerberosPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
|
||||||
|
final String kerberosPassword = context.getProperty(KERBEROS_PASSWORD).getValue();
|
||||||
|
|
||||||
|
if (kerberosUserService != null) {
|
||||||
|
kerberosUser = kerberosUserService.createKerberosUser();
|
||||||
|
} else if (kerberosCredentialsService != null) {
|
||||||
|
kerberosUser = new KerberosKeytabUser(kerberosCredentialsService.getPrincipal(), kerberosCredentialsService.getKeytab());
|
||||||
|
} else if (!StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword)) {
|
||||||
|
kerberosUser = new KerberosPasswordUser(kerberosPrincipal, kerberosPassword);
|
||||||
|
}
|
||||||
|
return kerberosUser;
|
||||||
|
}
|
||||||
|
|
||||||
protected String getUrl(ConfigurationContext context) {
|
protected String getUrl(ConfigurationContext context) {
|
||||||
return context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue();
|
return context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue();
|
||||||
}
|
}
|
||||||
|
@ -532,24 +619,32 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
|
||||||
*/
|
*/
|
||||||
@OnDisabled
|
@OnDisabled
|
||||||
public void shutdown() throws SQLException {
|
public void shutdown() throws SQLException {
|
||||||
|
try {
|
||||||
|
this.shutdown(dataSource, kerberosUser);
|
||||||
|
} finally {
|
||||||
|
kerberosUser = null;
|
||||||
|
dataSource = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void shutdown(final BasicDataSource dataSource, final KerberosUser kerberosUser) throws SQLException {
|
||||||
try {
|
try {
|
||||||
if (kerberosUser != null) {
|
if (kerberosUser != null) {
|
||||||
kerberosUser.logout();
|
kerberosUser.logout();
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
kerberosUser = null;
|
if (dataSource != null) {
|
||||||
try {
|
dataSource.close();
|
||||||
if (dataSource != null) {
|
|
||||||
dataSource.close();
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
dataSource = null;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Connection getConnection() throws ProcessException {
|
public Connection getConnection() throws ProcessException {
|
||||||
|
return getConnection(dataSource, kerberosUser);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Connection getConnection(final BasicDataSource dataSource, final KerberosUser kerberosUser) {
|
||||||
try {
|
try {
|
||||||
final Connection con;
|
final Connection con;
|
||||||
if (kerberosUser != null) {
|
if (kerberosUser != null) {
|
||||||
|
|
Loading…
Reference in New Issue