mirror of https://github.com/apache/nifi.git
NIFI-5985: Added capability for DBCPConnectionPool to use KerberosCredentialsService.
Refactored KerberosAction to return a result from execute() Removed usage of ProcessContext.yield() from KerberosAction, which should instead be handled the component using the KerberosCredentialsService. Updated SolrProcessor to yield a flowfile on error, rather than the KerberosAction invoking the yield. NIFI-5985: Updated TestPutSolrContentStream.testUpdateWithKerberosAuth test case to match on PrivilegedExceptionAction instead of PrivilegedAction doAs arguments. NIFI-5985: Moved kerberosUser logout after closing the datasource in the shutdown method. NIFI-5985: Removed catching exceptions in DBCPConnectionPool.shutdown Exception when closing the datasource is prioritized over an exception when logging out the kerberos principal Added GroovyDBCPServiceTest tests to verify prioritizing datasource.close() exception over kerberosUser.logout() exception This closes #3288. Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
parent
bc0e68ff2a
commit
8c8a9b4d53
|
@ -18,45 +18,40 @@ package org.apache.nifi.security.krb;
|
|||
|
||||
import org.apache.commons.lang3.Validate;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
|
||||
import javax.security.auth.login.LoginException;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.security.PrivilegedActionException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
||||
/**
|
||||
* Helper class for processors to perform an action as a KerberosUser.
|
||||
*/
|
||||
public class KerberosAction {
|
||||
public class KerberosAction<T> {
|
||||
|
||||
private final KerberosUser kerberosUser;
|
||||
private final PrivilegedAction action;
|
||||
private final ProcessContext context;
|
||||
private final PrivilegedExceptionAction<T> action;
|
||||
private final ComponentLog logger;
|
||||
|
||||
public KerberosAction(final KerberosUser kerberosUser,
|
||||
final PrivilegedAction action,
|
||||
final ProcessContext context,
|
||||
final PrivilegedExceptionAction<T> action,
|
||||
final ComponentLog logger) {
|
||||
this.kerberosUser = kerberosUser;
|
||||
this.action = action;
|
||||
this.context = context;
|
||||
this.logger = logger;
|
||||
Validate.notNull(this.kerberosUser);
|
||||
Validate.notNull(this.action);
|
||||
Validate.notNull(this.context);
|
||||
Validate.notNull(this.logger);
|
||||
}
|
||||
|
||||
public void execute() {
|
||||
public T execute() {
|
||||
T result;
|
||||
// lazily login the first time the processor executes
|
||||
if (!kerberosUser.isLoggedIn()) {
|
||||
try {
|
||||
kerberosUser.login();
|
||||
logger.info("Successful login for {}", new Object[]{kerberosUser.getPrincipal()});
|
||||
} catch (LoginException e) {
|
||||
// make sure to yield so the processor doesn't keep retrying the rolled back flow files immediately
|
||||
context.yield();
|
||||
throw new ProcessException("Login failed due to: " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
@ -65,14 +60,12 @@ public class KerberosAction {
|
|||
try {
|
||||
kerberosUser.checkTGTAndRelogin();
|
||||
} catch (LoginException e) {
|
||||
// make sure to yield so the processor doesn't keep retrying the rolled back flow files immediately
|
||||
context.yield();
|
||||
throw new ProcessException("Relogin check failed due to: " + e.getMessage(), e);
|
||||
}
|
||||
|
||||
// attempt to execute the action, if an exception is caught attempt to logout/login and retry
|
||||
try {
|
||||
kerberosUser.doAs(action);
|
||||
result = kerberosUser.doAs(action);
|
||||
} catch (SecurityException se) {
|
||||
logger.info("Privileged action failed, attempting relogin and retrying...");
|
||||
logger.debug("", se);
|
||||
|
@ -80,13 +73,15 @@ public class KerberosAction {
|
|||
try {
|
||||
kerberosUser.logout();
|
||||
kerberosUser.login();
|
||||
kerberosUser.doAs(action);
|
||||
result = kerberosUser.doAs(action);
|
||||
} catch (Exception e) {
|
||||
// make sure to yield so the processor doesn't keep retrying the rolled back flow files immediately
|
||||
context.yield();
|
||||
throw new ProcessException("Retrying privileged action failed due to: " + e.getMessage(), e);
|
||||
}
|
||||
} catch (PrivilegedActionException e) {
|
||||
throw new ProcessException("Privileged action failed due to: " + e.getMessage(), e.getException());
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -29,7 +29,7 @@ import javax.security.auth.kerberos.KerberosPrincipal;
|
|||
import javax.security.auth.login.LoginException;
|
||||
import java.io.File;
|
||||
import java.security.Principal;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
|
@ -174,7 +174,7 @@ public class KerberosUserIT {
|
|||
final KerberosUser user1 = new KerberosKeytabUser(principal1.getName(), principal1KeytabFile.getAbsolutePath());
|
||||
|
||||
final AtomicReference<String> resultHolder = new AtomicReference<>(null);
|
||||
final PrivilegedAction privilegedAction = () -> {
|
||||
final PrivilegedExceptionAction<Void> privilegedAction = () -> {
|
||||
resultHolder.set("SUCCESS");
|
||||
return null;
|
||||
};
|
||||
|
@ -183,7 +183,7 @@ public class KerberosUserIT {
|
|||
final ComponentLog logger = Mockito.mock(ComponentLog.class);
|
||||
|
||||
// create the action to test and execute it
|
||||
final KerberosAction kerberosAction = new KerberosAction(user1, privilegedAction, context, logger);
|
||||
final KerberosAction kerberosAction = new KerberosAction<>(user1, privilegedAction, logger);
|
||||
kerberosAction.execute();
|
||||
|
||||
// if the result holder has the string success then we know the action executed
|
||||
|
|
|
@ -36,7 +36,7 @@ import org.apache.solr.client.solrj.SolrClient;
|
|||
|
||||
import javax.security.auth.login.LoginException;
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
@ -113,14 +113,19 @@ public abstract class SolrProcessor extends AbstractProcessor {
|
|||
doOnTrigger(context, session);
|
||||
} else {
|
||||
// wrap doOnTrigger in a privileged action
|
||||
final PrivilegedAction action = () -> {
|
||||
final PrivilegedExceptionAction<Void> action = () -> {
|
||||
doOnTrigger(context, session);
|
||||
return null;
|
||||
};
|
||||
|
||||
// execute the privileged action as the given keytab user
|
||||
final KerberosAction kerberosAction = new KerberosAction(kerberosUser, action, context, getLogger());
|
||||
kerberosAction.execute();
|
||||
final KerberosAction kerberosAction = new KerberosAction<>(kerberosUser, action, getLogger());
|
||||
try {
|
||||
kerberosAction.execute();
|
||||
} catch (ProcessException e) {
|
||||
context.yield();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -44,7 +44,8 @@ import javax.net.ssl.SSLContext;
|
|||
import javax.security.auth.login.LoginException;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.security.PrivilegedActionException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Date;
|
||||
|
@ -459,7 +460,7 @@ public class TestPutSolrContentStream {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateWithKerberosAuth() throws IOException, InitializationException, LoginException {
|
||||
public void testUpdateWithKerberosAuth() throws IOException, InitializationException, LoginException, PrivilegedActionException {
|
||||
final String principal = "nifi@FOO.COM";
|
||||
final String keytab = "src/test/resources/foo.keytab";
|
||||
|
||||
|
@ -467,8 +468,8 @@ public class TestPutSolrContentStream {
|
|||
final KerberosKeytabUser kerberosUser = Mockito.mock(KerberosKeytabUser.class);
|
||||
when(kerberosUser.getPrincipal()).thenReturn(principal);
|
||||
when(kerberosUser.getKeytabFile()).thenReturn(keytab);
|
||||
when(kerberosUser.doAs(any(PrivilegedAction.class))).thenAnswer((invocation -> {
|
||||
final PrivilegedAction action = (PrivilegedAction) invocation.getArguments()[0];
|
||||
when(kerberosUser.doAs(any(PrivilegedExceptionAction.class))).thenAnswer((invocation -> {
|
||||
final PrivilegedExceptionAction action = (PrivilegedExceptionAction) invocation.getArguments()[0];
|
||||
action.run();
|
||||
return null;
|
||||
})
|
||||
|
@ -502,7 +503,7 @@ public class TestPutSolrContentStream {
|
|||
// Verify that during the update the user was logged in, TGT was checked, and the action was executed
|
||||
verify(kerberosUser, times(1)).login();
|
||||
verify(kerberosUser, times(1)).checkTGTAndRelogin();
|
||||
verify(kerberosUser, times(1)).doAs(any(PrivilegedAction.class));
|
||||
verify(kerberosUser, times(1)).doAs(any(PrivilegedExceptionAction.class));
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -42,6 +42,12 @@
|
|||
<artifactId>nifi-security-utils</artifactId>
|
||||
<version>1.9.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-kerberos-credentials-service-api</artifactId>
|
||||
<version>1.9.0-SNAPSHOT</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mock</artifactId>
|
||||
|
@ -57,7 +63,7 @@
|
|||
<groupId>org.apache.derby</groupId>
|
||||
<artifactId>derby</artifactId>
|
||||
<version>10.11.1.1</version>
|
||||
</dependency>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.derby</groupId>
|
||||
<artifactId>derbynet</artifactId>
|
||||
|
|
|
@ -32,12 +32,16 @@ import org.apache.nifi.controller.AbstractControllerService;
|
|||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.expression.AttributeExpression;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.kerberos.KerberosCredentialsService;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.security.krb.KerberosAction;
|
||||
import org.apache.nifi.security.krb.KerberosKeytabUser;
|
||||
import org.apache.nifi.util.FormatUtils;
|
||||
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
|
||||
|
||||
import javax.security.auth.login.LoginException;
|
||||
import java.net.MalformedURLException;
|
||||
import java.sql.Connection;
|
||||
import java.sql.Driver;
|
||||
|
@ -263,6 +267,14 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
|
|||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("kerberos-credentials-service")
|
||||
.displayName("Kerberos Credentials Service")
|
||||
.description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
|
||||
.identifiesControllerService(KerberosCredentialsService.class)
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
private static final List<PropertyDescriptor> properties;
|
||||
|
||||
static {
|
||||
|
@ -270,6 +282,7 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
|
|||
props.add(DATABASE_URL);
|
||||
props.add(DB_DRIVERNAME);
|
||||
props.add(DB_DRIVER_LOCATION);
|
||||
props.add(KERBEROS_CREDENTIALS_SERVICE);
|
||||
props.add(DB_USER);
|
||||
props.add(DB_PASSWORD);
|
||||
props.add(MAX_WAIT_TIME);
|
||||
|
@ -286,6 +299,7 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
|
|||
}
|
||||
|
||||
private volatile BasicDataSource dataSource;
|
||||
private volatile KerberosKeytabUser kerberosUser;
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
|
@ -333,6 +347,16 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
|
|||
final Long timeBetweenEvictionRunsMillis = extractMillisWithInfinite(context.getProperty(EVICTION_RUN_PERIOD));
|
||||
final Long minEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(MIN_EVICTABLE_IDLE_TIME));
|
||||
final Long softMinEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME));
|
||||
final KerberosCredentialsService kerberosCredentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
|
||||
|
||||
if (kerberosCredentialsService != null) {
|
||||
kerberosUser = new KerberosKeytabUser(kerberosCredentialsService.getPrincipal(), kerberosCredentialsService.getKeytab());
|
||||
try {
|
||||
kerberosUser.login();
|
||||
} catch (LoginException e) {
|
||||
throw new InitializationException("Unable to authenticate Kerberos principal", e);
|
||||
}
|
||||
}
|
||||
|
||||
dataSource = new BasicDataSource();
|
||||
dataSource.setDriverClassName(drv);
|
||||
|
@ -410,20 +434,41 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
|
|||
|
||||
/**
|
||||
* Shutdown pool, close all open connections.
|
||||
* If a principal is authenticated with a KDC, that principal is logged out.
|
||||
*
|
||||
* If a @{@link LoginException} occurs while attempting to log out the @{@link org.apache.nifi.security.krb.KerberosUser},
|
||||
* an attempt will still be made to shut down the pool and close open connections.
|
||||
*
|
||||
* @throws SQLException if there is an error while closing open connections
|
||||
* @throws LoginException if there is an error during the principal log out, and will only be thrown if there was
|
||||
* no exception while closing open connections
|
||||
*/
|
||||
@OnDisabled
|
||||
public void shutdown() {
|
||||
public void shutdown() throws SQLException, LoginException {
|
||||
try {
|
||||
dataSource.close();
|
||||
} catch (final SQLException e) {
|
||||
throw new ProcessException(e);
|
||||
if (kerberosUser != null) {
|
||||
kerberosUser.logout();
|
||||
}
|
||||
} finally {
|
||||
kerberosUser = null;
|
||||
try {
|
||||
dataSource.close();
|
||||
} finally {
|
||||
dataSource = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Connection getConnection() throws ProcessException {
|
||||
try {
|
||||
final Connection con = dataSource.getConnection();
|
||||
final Connection con;
|
||||
if (kerberosUser != null) {
|
||||
KerberosAction<Connection> kerberosAction = new KerberosAction<>(kerberosUser, () -> dataSource.getConnection(), getLogger());
|
||||
con = kerberosAction.execute();
|
||||
} else {
|
||||
con = dataSource.getConnection();
|
||||
}
|
||||
return con;
|
||||
} catch (final SQLException e) {
|
||||
throw new ProcessException(e);
|
||||
|
|
|
@ -16,13 +16,16 @@
|
|||
*/
|
||||
package org.apache.nifi.dbcp
|
||||
|
||||
import org.apache.commons.dbcp2.BasicDataSource
|
||||
import org.apache.nifi.reporting.InitializationException
|
||||
import org.apache.nifi.security.krb.KerberosKeytabUser
|
||||
import org.apache.nifi.util.TestRunner
|
||||
import org.apache.nifi.util.TestRunners
|
||||
import org.junit.Assert
|
||||
import org.junit.BeforeClass
|
||||
import org.junit.Test
|
||||
|
||||
import javax.security.auth.login.LoginException
|
||||
import java.sql.Connection
|
||||
import java.sql.SQLException
|
||||
|
||||
|
@ -72,4 +75,59 @@ class GroovyDBCPServiceTest {
|
|||
connection.close() // will return connection to pool
|
||||
}
|
||||
}
|
||||
|
||||
@Test(expected = LoginException)
|
||||
void testDatasourceCloseSuccessWithKerberosUserLogoutException() {
|
||||
final DBCPConnectionPool dbcpConnectionPoolService = new DBCPConnectionPool()
|
||||
|
||||
def basicDataSource = [close: { -> }] as BasicDataSource
|
||||
dbcpConnectionPoolService.dataSource = basicDataSource
|
||||
def kerberosKeytabUser = new KerberosKeytabUser("bad@PRINCIPAL.COM", "fake.keytab") {
|
||||
@Override
|
||||
void logout() throws LoginException {
|
||||
throw new LoginException("fake logout exception")
|
||||
}
|
||||
}
|
||||
dbcpConnectionPoolService.kerberosUser = kerberosKeytabUser
|
||||
|
||||
dbcpConnectionPoolService.shutdown()
|
||||
|
||||
}
|
||||
|
||||
@Test(expected = SQLException)
|
||||
void testDatasourceCloseExceptionWithKerberosUserLogoutSuccess() {
|
||||
final DBCPConnectionPool dbcpConnectionPoolService = new DBCPConnectionPool()
|
||||
|
||||
def basicDataSource = [
|
||||
close: { -> throw new SQLException("fake sql exception")
|
||||
}] as BasicDataSource
|
||||
dbcpConnectionPoolService.dataSource = basicDataSource
|
||||
def kerberosKeytabUser = new KerberosKeytabUser("bad@PRINCIPAL.COM", "fake.keytab") {
|
||||
@Override
|
||||
void logout() throws LoginException {
|
||||
}
|
||||
}
|
||||
dbcpConnectionPoolService.kerberosUser = kerberosKeytabUser
|
||||
|
||||
dbcpConnectionPoolService.shutdown()
|
||||
}
|
||||
|
||||
@Test(expected = SQLException)
|
||||
void testDatasourceCloseExceptionWithKerberosUserLogoutException() {
|
||||
final DBCPConnectionPool dbcpConnectionPoolService = new DBCPConnectionPool()
|
||||
|
||||
def basicDataSource = [
|
||||
close: { -> throw new SQLException("fake sql exception")
|
||||
}] as BasicDataSource
|
||||
dbcpConnectionPoolService.dataSource = basicDataSource
|
||||
def kerberosKeytabUser = new KerberosKeytabUser("bad@PRINCIPAL.COM", "fake.keytab") {
|
||||
@Override
|
||||
void logout() throws LoginException {
|
||||
throw new LoginException("fake logout exception")
|
||||
}
|
||||
}
|
||||
dbcpConnectionPoolService.kerberosUser = kerberosKeytabUser
|
||||
|
||||
dbcpConnectionPoolService.shutdown()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue