diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml
index a8538bda1b..2fdfff0f1c 100644
--- a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml
@@ -70,6 +70,11 @@
nifi-kerberos-credentials-service-api
1.15.0-SNAPSHOT
+
+ org.apache.nifi
+ nifi-kerberos-user-service-api
+ 1.15.0-SNAPSHOT
+
org.apache.nifi
nifi-security-kerberos
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java
index 38eee6962a..bd9f7856e4 100644
--- a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java
@@ -38,6 +38,7 @@ import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.krb.KerberosKeytabUser;
@@ -113,6 +114,14 @@ public class AccumuloService extends AbstractControllerService implements BaseAc
.dependsOn(AUTHENTICATION_TYPE, AuthenticationType.PASSWORD.toString())
.build();
+ protected static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
+ .name("kerberos-user-service")
+ .displayName("Kerberos User Service")
+ .description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos")
+ .identifiesControllerService(KerberosUserService.class)
+ .required(false)
+ .build();
+
protected static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
.name("kerberos-credentials-service")
.displayName("Kerberos Credentials Service")
@@ -172,6 +181,7 @@ public class AccumuloService extends AbstractControllerService implements BaseAc
props.add(AUTHENTICATION_TYPE);
props.add(ACCUMULO_USER);
props.add(ACCUMULO_PASSWORD);
+ props.add(KERBEROS_USER_SERVICE);
props.add(KERBEROS_CREDENTIALS_SERVICE);
props.add(KERBEROS_PRINCIPAL);
props.add(KERBEROS_PASSWORD);
@@ -212,9 +222,10 @@ public class AccumuloService extends AbstractControllerService implements BaseAc
}
break;
case KERBEROS:
- if (!validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && !validationContext.getProperty(KERBEROS_PASSWORD).isSet()){
+ if (!validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && !validationContext.getProperty(KERBEROS_USER_SERVICE).isSet()
+ && !validationContext.getProperty(KERBEROS_PASSWORD).isSet()){
problems.add(new ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName())
- .explanation("Either Kerberos Password or Kerberos Credential Service must be set").build());
+ .explanation("Either Kerberos Password, Kerberos Credential Service, or Kerberos User Service must be set").build());
} else if (validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && validationContext.getProperty(KERBEROS_PASSWORD).isSet()){
problems.add(new ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName())
.explanation("Kerberos Password and Kerberos Credential Service should not be filled out at the same time").build());
@@ -224,6 +235,15 @@ public class AccumuloService extends AbstractControllerService implements BaseAc
} else if (validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && validationContext.getProperty(KERBEROS_PRINCIPAL).isSet()){
problems.add(new ValidationResult.Builder().valid(false).subject(KERBEROS_PRINCIPAL.getName())
.explanation("Kerberos Principal (for password) should not be filled out when principal + keytab Kerberos authentication is used").build());
+ } else if (validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && validationContext.getProperty(KERBEROS_USER_SERVICE).isSet()) {
+ problems.add(new ValidationResult.Builder().valid(false).subject(KERBEROS_USER_SERVICE.getName())
+ .explanation("Kerberos User Service cannot be specified while also specifying a Kerberos Credential Service").build());
+ } else if (validationContext.getProperty(KERBEROS_USER_SERVICE).isSet() && validationContext.getProperty(KERBEROS_PASSWORD).isSet()){
+ problems.add(new ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName())
+ .explanation("Kerberos Password and Kerberos User Service should not be filled out at the same time").build());
+ } else if (validationContext.getProperty(KERBEROS_USER_SERVICE).isSet() && validationContext.getProperty(KERBEROS_PRINCIPAL).isSet()){
+ problems.add(new ValidationResult.Builder().valid(false).subject(KERBEROS_PRINCIPAL.getName())
+ .explanation("Kerberos Principal (for password) should not be filled out when Kerberos User Service is used").build());
}
break;
default:
@@ -239,7 +259,8 @@ public class AccumuloService extends AbstractControllerService implements BaseAc
throw new InitializationException("Instance name and Zookeeper Quorum must be specified");
}
- final KerberosCredentialsService kerberosService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+ final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
+ final KerberosCredentialsService kerberosCredentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final String instanceName = context.getProperty(INSTANCE_NAME).evaluateAttributeExpressions().getValue();
final String zookeepers = context.getProperty(ZOOKEEPER_QUORUM).evaluateAttributeExpressions().getValue();
this.authType = AuthenticationType.valueOf( context.getProperty(AUTHENTICATION_TYPE).getValue());
@@ -257,14 +278,12 @@ public class AccumuloService extends AbstractControllerService implements BaseAc
this.client = Accumulo.newClient().from(clientConf).as(accumuloUser, token).build();
break;
case KERBEROS:
- final String principal;
-
- if (kerberosService == null) {
- principal = context.getProperty(KERBEROS_PRINCIPAL).getValue();
- this.kerberosUser = new KerberosPasswordUser(principal, context.getProperty(KERBEROS_PASSWORD).getValue());
+ if (kerberosUserService != null) {
+ this.kerberosUser = kerberosUserService.createKerberosUser();
+ } else if (kerberosCredentialsService != null) {
+ this.kerberosUser = new KerberosKeytabUser(kerberosCredentialsService.getPrincipal(), kerberosCredentialsService.getKeytab());
} else {
- principal = kerberosService.getPrincipal();
- this.kerberosUser = new KerberosKeytabUser(principal, kerberosService.getKeytab());
+ this.kerberosUser = new KerberosPasswordUser(context.getProperty(KERBEROS_PRINCIPAL).getValue(), context.getProperty(KERBEROS_PASSWORD).getValue());
}
clientConf.setProperty("sasl.enabled", "true");
@@ -277,7 +296,7 @@ public class AccumuloService extends AbstractControllerService implements BaseAc
final UserGroupInformation clientUgi = SecurityUtil.getUgiForKerberosUser(conf, kerberosUser);
this.client = clientUgi.doAs((PrivilegedExceptionAction) () ->
- Accumulo.newClient().from(clientConf).as(principal, new KerberosToken()).build());
+ Accumulo.newClient().from(clientConf).as(kerberosUser.getPrincipal(), new KerberosToken()).build());
break;
default:
throw new InitializationException("Not supported authentication type.");
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/test/java/org/apache/nifi/accumulo/controllerservices/TestAccumuloService.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/test/java/org/apache/nifi/accumulo/controllerservices/TestAccumuloService.java
index 51332bbf38..0feb3f26dc 100644
--- a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/test/java/org/apache/nifi/accumulo/controllerservices/TestAccumuloService.java
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/test/java/org/apache/nifi/accumulo/controllerservices/TestAccumuloService.java
@@ -18,6 +18,7 @@
package org.apache.nifi.accumulo.controllerservices;
import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.reporting.InitializationException;
import org.junit.Before;
@@ -49,6 +50,8 @@ public class TestAccumuloService {
@Mock
private KerberosCredentialsService credentialService;
@Mock
+ private KerberosUserService kerberosUserService;
+ @Mock
private Processor dummyProcessor;
@Before
@@ -59,6 +62,7 @@ public class TestAccumuloService {
accumuloService = new AccumuloService();
when(credentialService.getIdentifier()).thenReturn("1");
+ when(kerberosUserService.getIdentifier()).thenReturn("kerberosUserService1");
}
@Test
@@ -142,7 +146,7 @@ public class TestAccumuloService {
runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
//when
//then
- assertServiceIsInvalidWithErrorMessage("Either Kerberos Password or Kerberos Credential Service must be set");
+ assertServiceIsInvalidWithErrorMessage("Either Kerberos Password, Kerberos Credential Service, or Kerberos User Service must be set");
}
@Test
@@ -188,6 +192,56 @@ public class TestAccumuloService {
assertServiceIsInvalidWithErrorMessage("Kerberos Principal (for password) should not be filled out");
}
+ @Test
+ public void testServiceNotValidWithAuthTypeKerberosAndKerberosPasswordAndUserServiceSet() throws InitializationException {
+ //given
+ runner.addControllerService("accumulo-connector-service", accumuloService);
+ runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE);
+ runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER);
+ runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
+ runner.setProperty(accumuloService, AccumuloService.KERBEROS_PRINCIPAL, PRINCIPAL);
+ runner.setProperty(accumuloService, AccumuloService.KERBEROS_PASSWORD, KERBEROS_PASSWORD);
+ runner.addControllerService("kerberos-user-service", kerberosUserService);
+ runner.setProperty(accumuloService, AccumuloService.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier());
+ //when
+ //then
+ assertServiceIsInvalidWithErrorMessage("should not be filled out at the same time");
+ }
+
+ @Test
+ public void testServiceNotValidWithAuthTypeKerberosAndCredentialServiceAndUserServiceSet() throws InitializationException {
+ //given
+ runner.addControllerService("accumulo-connector-service", accumuloService);
+ runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE);
+ runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER);
+ runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
+
+ runner.addControllerService("kerberos-credentials-service", credentialService);
+ runner.setProperty(accumuloService, AccumuloService.KERBEROS_CREDENTIALS_SERVICE, credentialService.getIdentifier());
+
+ runner.addControllerService("kerberos-user-service", kerberosUserService);
+ runner.setProperty(accumuloService, AccumuloService.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier());
+
+ //when
+ //then
+ assertServiceIsInvalidWithErrorMessage("Kerberos User Service cannot be specified while also specifying a Kerberos Credential Service");
+ }
+
+ @Test
+ public void testServiceIsValidWithAuthTypeKerberosAndKerberosUserServiceSet() throws InitializationException {
+ //given
+ runner.addControllerService("accumulo-connector-service", accumuloService);
+ runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE);
+ runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER);
+ runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
+ runner.addControllerService("kerberos-user-service", kerberosUserService);
+ runner.enableControllerService(kerberosUserService);
+ runner.setProperty(accumuloService, AccumuloService.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier());
+ //when
+ //then
+ runner.assertValid(accumuloService);
+ }
+
private void assertServiceIsInvalidWithErrorMessage(String errorMessage) {
Exception exception = assertThrows(IllegalStateException.class, () -> runner.enableControllerService(accumuloService));
assertThat(exception.getMessage(), containsString(errorMessage));