NIFI-8982 Add KerberosUserService to Accumulo processors

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #5405.
This commit is contained in:
Bryan Bende 2021-09-21 10:48:56 -04:00 committed by Pierre Villard
parent 30cf49db7e
commit bf7d1ffa1b
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
3 changed files with 90 additions and 12 deletions

View File

@ -70,6 +70,11 @@
<artifactId>nifi-kerberos-credentials-service-api</artifactId>
<version>1.15.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-user-service-api</artifactId>
<version>1.15.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-kerberos</artifactId>

View File

@ -38,6 +38,7 @@ import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.krb.KerberosKeytabUser;
@ -113,6 +114,14 @@ public class AccumuloService extends AbstractControllerService implements BaseAc
.dependsOn(AUTHENTICATION_TYPE, AuthenticationType.PASSWORD.toString())
.build();
protected static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
.name("kerberos-user-service")
.displayName("Kerberos User Service")
.description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos")
.identifiesControllerService(KerberosUserService.class)
.required(false)
.build();
protected static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
.name("kerberos-credentials-service")
.displayName("Kerberos Credentials Service")
@ -172,6 +181,7 @@ public class AccumuloService extends AbstractControllerService implements BaseAc
props.add(AUTHENTICATION_TYPE);
props.add(ACCUMULO_USER);
props.add(ACCUMULO_PASSWORD);
props.add(KERBEROS_USER_SERVICE);
props.add(KERBEROS_CREDENTIALS_SERVICE);
props.add(KERBEROS_PRINCIPAL);
props.add(KERBEROS_PASSWORD);
@ -212,9 +222,10 @@ public class AccumuloService extends AbstractControllerService implements BaseAc
}
break;
case KERBEROS:
if (!validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && !validationContext.getProperty(KERBEROS_PASSWORD).isSet()){
if (!validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && !validationContext.getProperty(KERBEROS_USER_SERVICE).isSet()
&& !validationContext.getProperty(KERBEROS_PASSWORD).isSet()){
problems.add(new ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName())
.explanation("Either Kerberos Password or Kerberos Credential Service must be set").build());
.explanation("Either Kerberos Password, Kerberos Credential Service, or Kerberos User Service must be set").build());
} else if (validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && validationContext.getProperty(KERBEROS_PASSWORD).isSet()){
problems.add(new ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName())
.explanation("Kerberos Password and Kerberos Credential Service should not be filled out at the same time").build());
@ -224,6 +235,15 @@ public class AccumuloService extends AbstractControllerService implements BaseAc
} else if (validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && validationContext.getProperty(KERBEROS_PRINCIPAL).isSet()){
problems.add(new ValidationResult.Builder().valid(false).subject(KERBEROS_PRINCIPAL.getName())
.explanation("Kerberos Principal (for password) should not be filled out when principal + keytab Kerberos authentication is used").build());
} else if (validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && validationContext.getProperty(KERBEROS_USER_SERVICE).isSet()) {
problems.add(new ValidationResult.Builder().valid(false).subject(KERBEROS_USER_SERVICE.getName())
.explanation("Kerberos User Service cannot be specified while also specifying a Kerberos Credential Service").build());
} else if (validationContext.getProperty(KERBEROS_USER_SERVICE).isSet() && validationContext.getProperty(KERBEROS_PASSWORD).isSet()){
problems.add(new ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName())
.explanation("Kerberos Password and Kerberos User Service should not be filled out at the same time").build());
} else if (validationContext.getProperty(KERBEROS_USER_SERVICE).isSet() && validationContext.getProperty(KERBEROS_PRINCIPAL).isSet()){
problems.add(new ValidationResult.Builder().valid(false).subject(KERBEROS_PRINCIPAL.getName())
.explanation("Kerberos Principal (for password) should not be filled out when Kerberos User Service is used").build());
}
break;
default:
@ -239,7 +259,8 @@ public class AccumuloService extends AbstractControllerService implements BaseAc
throw new InitializationException("Instance name and Zookeeper Quorum must be specified");
}
final KerberosCredentialsService kerberosService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
final KerberosCredentialsService kerberosCredentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final String instanceName = context.getProperty(INSTANCE_NAME).evaluateAttributeExpressions().getValue();
final String zookeepers = context.getProperty(ZOOKEEPER_QUORUM).evaluateAttributeExpressions().getValue();
this.authType = AuthenticationType.valueOf( context.getProperty(AUTHENTICATION_TYPE).getValue());
@ -257,14 +278,12 @@ public class AccumuloService extends AbstractControllerService implements BaseAc
this.client = Accumulo.newClient().from(clientConf).as(accumuloUser, token).build();
break;
case KERBEROS:
final String principal;
if (kerberosService == null) {
principal = context.getProperty(KERBEROS_PRINCIPAL).getValue();
this.kerberosUser = new KerberosPasswordUser(principal, context.getProperty(KERBEROS_PASSWORD).getValue());
if (kerberosUserService != null) {
this.kerberosUser = kerberosUserService.createKerberosUser();
} else if (kerberosCredentialsService != null) {
this.kerberosUser = new KerberosKeytabUser(kerberosCredentialsService.getPrincipal(), kerberosCredentialsService.getKeytab());
} else {
principal = kerberosService.getPrincipal();
this.kerberosUser = new KerberosKeytabUser(principal, kerberosService.getKeytab());
this.kerberosUser = new KerberosPasswordUser(context.getProperty(KERBEROS_PRINCIPAL).getValue(), context.getProperty(KERBEROS_PASSWORD).getValue());
}
clientConf.setProperty("sasl.enabled", "true");
@ -277,7 +296,7 @@ public class AccumuloService extends AbstractControllerService implements BaseAc
final UserGroupInformation clientUgi = SecurityUtil.getUgiForKerberosUser(conf, kerberosUser);
this.client = clientUgi.doAs((PrivilegedExceptionAction<AccumuloClient>) () ->
Accumulo.newClient().from(clientConf).as(principal, new KerberosToken()).build());
Accumulo.newClient().from(clientConf).as(kerberosUser.getPrincipal(), new KerberosToken()).build());
break;
default:
throw new InitializationException("Not supported authentication type.");

View File

@ -18,6 +18,7 @@
package org.apache.nifi.accumulo.controllerservices;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.reporting.InitializationException;
import org.junit.Before;
@ -49,6 +50,8 @@ public class TestAccumuloService {
@Mock
private KerberosCredentialsService credentialService;
@Mock
private KerberosUserService kerberosUserService;
@Mock
private Processor dummyProcessor;
@Before
@ -59,6 +62,7 @@ public class TestAccumuloService {
accumuloService = new AccumuloService();
when(credentialService.getIdentifier()).thenReturn("1");
when(kerberosUserService.getIdentifier()).thenReturn("kerberosUserService1");
}
@Test
@ -142,7 +146,7 @@ public class TestAccumuloService {
runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
//when
//then
assertServiceIsInvalidWithErrorMessage("Either Kerberos Password or Kerberos Credential Service must be set");
assertServiceIsInvalidWithErrorMessage("Either Kerberos Password, Kerberos Credential Service, or Kerberos User Service must be set");
}
@Test
@ -188,6 +192,56 @@ public class TestAccumuloService {
assertServiceIsInvalidWithErrorMessage("Kerberos Principal (for password) should not be filled out");
}
@Test
public void testServiceNotValidWithAuthTypeKerberosAndKerberosPasswordAndUserServiceSet() throws InitializationException {
//given
runner.addControllerService("accumulo-connector-service", accumuloService);
runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE);
runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER);
runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
runner.setProperty(accumuloService, AccumuloService.KERBEROS_PRINCIPAL, PRINCIPAL);
runner.setProperty(accumuloService, AccumuloService.KERBEROS_PASSWORD, KERBEROS_PASSWORD);
runner.addControllerService("kerberos-user-service", kerberosUserService);
runner.setProperty(accumuloService, AccumuloService.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier());
//when
//then
assertServiceIsInvalidWithErrorMessage("should not be filled out at the same time");
}
@Test
public void testServiceNotValidWithAuthTypeKerberosAndCredentialServiceAndUserServiceSet() throws InitializationException {
//given
runner.addControllerService("accumulo-connector-service", accumuloService);
runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE);
runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER);
runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
runner.addControllerService("kerberos-credentials-service", credentialService);
runner.setProperty(accumuloService, AccumuloService.KERBEROS_CREDENTIALS_SERVICE, credentialService.getIdentifier());
runner.addControllerService("kerberos-user-service", kerberosUserService);
runner.setProperty(accumuloService, AccumuloService.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier());
//when
//then
assertServiceIsInvalidWithErrorMessage("Kerberos User Service cannot be specified while also specifying a Kerberos Credential Service");
}
@Test
public void testServiceIsValidWithAuthTypeKerberosAndKerberosUserServiceSet() throws InitializationException {
//given
runner.addControllerService("accumulo-connector-service", accumuloService);
runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE);
runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER);
runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
runner.addControllerService("kerberos-user-service", kerberosUserService);
runner.enableControllerService(kerberosUserService);
runner.setProperty(accumuloService, AccumuloService.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier());
//when
//then
runner.assertValid(accumuloService);
}
private void assertServiceIsInvalidWithErrorMessage(String errorMessage) {
Exception exception = assertThrows(IllegalStateException.class, () -> runner.enableControllerService(accumuloService));
assertThat(exception.getMessage(), containsString(errorMessage));