mirror of https://github.com/apache/nifi.git
NIFI-7019 Add kerberos principal and password properties to NiFi DBPCConnectionPool
This closes #4087.
This commit is contained in:
parent
5b93e537d5
commit
e0fc75a963
|
@ -77,8 +77,9 @@ public class KerberosAction<T> {
|
|||
} catch (Exception e) {
|
||||
throw new ProcessException("Retrying privileged action failed due to: " + e.getMessage(), e);
|
||||
}
|
||||
} catch (PrivilegedActionException e) {
|
||||
throw new ProcessException("Privileged action failed due to: " + e.getMessage(), e.getException());
|
||||
} catch (PrivilegedActionException pae) {
|
||||
final Exception cause = pae.getException();
|
||||
throw new ProcessException("Privileged action failed due to: " + cause.getMessage(), cause);
|
||||
}
|
||||
|
||||
return result;
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.nifi.dbcp;
|
||||
|
||||
import org.apache.commons.dbcp2.BasicDataSource;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
|
||||
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
|
@ -25,6 +26,8 @@ import org.apache.nifi.annotation.lifecycle.OnDisabled;
|
|||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.PropertyValue;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.expression.AttributeExpression;
|
||||
|
@ -35,6 +38,8 @@ import org.apache.nifi.processor.util.StandardValidators;
|
|||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.security.krb.KerberosAction;
|
||||
import org.apache.nifi.security.krb.KerberosKeytabUser;
|
||||
import org.apache.nifi.security.krb.KerberosPasswordUser;
|
||||
import org.apache.nifi.security.krb.KerberosUser;
|
||||
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
|
||||
|
||||
import javax.security.auth.login.LoginException;
|
||||
|
@ -44,6 +49,7 @@ import java.sql.Driver;
|
|||
import java.sql.DriverManager;
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -245,6 +251,25 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
|
|||
.required(false)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder()
|
||||
.name("kerberos-principal")
|
||||
.displayName("Kerberos Principal")
|
||||
.description("The principal to use when specifying the principal and password directly in the processor for authenticating via Kerberos.")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor KERBEROS_PASSWORD = new PropertyDescriptor.Builder()
|
||||
.name("kerberos-password")
|
||||
.displayName("Kerberos Password")
|
||||
.description("The password to use when specifying the principal and password directly in the processor for authenticating via Kerberos.")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.sensitive(true)
|
||||
.build();
|
||||
|
||||
private static final List<PropertyDescriptor> properties;
|
||||
|
||||
static {
|
||||
|
@ -253,6 +278,8 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
|
|||
props.add(DB_DRIVERNAME);
|
||||
props.add(DB_DRIVER_LOCATION);
|
||||
props.add(KERBEROS_CREDENTIALS_SERVICE);
|
||||
props.add(KERBEROS_PRINCIPAL);
|
||||
props.add(KERBEROS_PASSWORD);
|
||||
props.add(DB_USER);
|
||||
props.add(DB_PASSWORD);
|
||||
props.add(MAX_WAIT_TIME);
|
||||
|
@ -269,7 +296,7 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
|
|||
}
|
||||
|
||||
private volatile BasicDataSource dataSource;
|
||||
private volatile KerberosKeytabUser kerberosUser;
|
||||
private volatile KerberosUser kerberosUser;
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
|
@ -288,6 +315,42 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
|
|||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(ValidationContext context) {
|
||||
final List<ValidationResult> results = new ArrayList<>();
|
||||
|
||||
final boolean kerberosPrincipalProvided = !StringUtils.isBlank(context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue());
|
||||
final boolean kerberosPasswordProvided = !StringUtils.isBlank(context.getProperty(KERBEROS_PASSWORD).getValue());
|
||||
|
||||
if (kerberosPrincipalProvided && !kerberosPasswordProvided) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject(KERBEROS_PASSWORD.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation("a password must be provided for the given principal")
|
||||
.build());
|
||||
}
|
||||
|
||||
if (kerberosPasswordProvided && !kerberosPrincipalProvided) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject(KERBEROS_PRINCIPAL.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation("a principal must be provided for the given password")
|
||||
.build());
|
||||
}
|
||||
|
||||
final KerberosCredentialsService kerberosCredentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
|
||||
|
||||
if (kerberosCredentialsService != null && (kerberosPrincipalProvided || kerberosPasswordProvided)) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject(KERBEROS_CREDENTIALS_SERVICE.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation("kerberos principal/password and kerberos credential service cannot be configured at the same time")
|
||||
.build());
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configures connection pool by creating an instance of the
|
||||
* {@link BasicDataSource} based on configuration provided with
|
||||
|
@ -318,9 +381,16 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
|
|||
final Long minEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
|
||||
final Long softMinEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
|
||||
final KerberosCredentialsService kerberosCredentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
|
||||
final String kerberosPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
|
||||
final String kerberosPassword = context.getProperty(KERBEROS_PASSWORD).getValue();
|
||||
|
||||
if (kerberosCredentialsService != null) {
|
||||
kerberosUser = new KerberosKeytabUser(kerberosCredentialsService.getPrincipal(), kerberosCredentialsService.getKeytab());
|
||||
} else if (!StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword)) {
|
||||
kerberosUser = new KerberosPasswordUser(kerberosPrincipal, kerberosPassword);
|
||||
}
|
||||
|
||||
if (kerberosUser != null) {
|
||||
try {
|
||||
kerberosUser.login();
|
||||
} catch (LoginException e) {
|
||||
|
@ -422,7 +492,9 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
|
|||
} finally {
|
||||
kerberosUser = null;
|
||||
try {
|
||||
dataSource.close();
|
||||
if (dataSource != null) {
|
||||
dataSource.close();
|
||||
}
|
||||
} finally {
|
||||
dataSource = null;
|
||||
}
|
||||
|
|
|
@ -57,6 +57,8 @@ import static org.apache.nifi.dbcp.DBCPConnectionPool.DB_PASSWORD
|
|||
import static org.apache.nifi.dbcp.DBCPConnectionPool.DB_USER
|
||||
import static org.apache.nifi.dbcp.DBCPConnectionPool.EVICTION_RUN_PERIOD
|
||||
import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_CREDENTIALS_SERVICE
|
||||
import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_PASSWORD
|
||||
import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_PRINCIPAL
|
||||
import static org.apache.nifi.dbcp.DBCPConnectionPool.MAX_CONN_LIFETIME
|
||||
import static org.apache.nifi.dbcp.DBCPConnectionPool.MAX_IDLE
|
||||
import static org.apache.nifi.dbcp.DBCPConnectionPool.MAX_TOTAL_CONNECTIONS
|
||||
|
@ -304,6 +306,8 @@ class DatabaseRecordSinkTest {
|
|||
when(dbContext.getProperty(MIN_EVICTABLE_IDLE_TIME)).thenReturn(new MockPropertyValue('5 sec'))
|
||||
when(dbContext.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME)).thenReturn(new MockPropertyValue('5 sec'))
|
||||
when(dbContext.getProperty(KERBEROS_CREDENTIALS_SERVICE)).thenReturn(new MockPropertyValue(null))
|
||||
when(dbContext.getProperty(KERBEROS_PRINCIPAL)).thenReturn(new MockPropertyValue(null))
|
||||
when(dbContext.getProperty(KERBEROS_PASSWORD)).thenReturn(new MockPropertyValue(null))
|
||||
|
||||
final ControllerServiceInitializationContext dbInitContext = new MockControllerServiceInitializationContext(dbcpService, UUID.randomUUID().toString(), logger, dbStateManager)
|
||||
dbcpService.initialize(dbInitContext)
|
||||
|
|
Loading…
Reference in New Issue