diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/pom.xml b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/pom.xml
index 77d938d491..19db6c30bd 100644
--- a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/pom.xml
@@ -97,6 +97,12 @@
+
+ org.apache.nifi
+ nifi-kerberos-credentials-service-api
+ 1.14.0-SNAPSHOT
+ test
+
org.apache.nifi
nifi-accumulo-services
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/BaseAccumuloProcessor.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/BaseAccumuloProcessor.java
index d0888acff9..a594d5e09e 100644
--- a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/BaseAccumuloProcessor.java
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/BaseAccumuloProcessor.java
@@ -65,12 +65,21 @@ public abstract class BaseAccumuloProcessor extends AbstractProcessor {
.defaultValue("10")
.build();
+ protected static final PropertyDescriptor ACCUMULO_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("accumulo-timeout")
+ .displayName("Accumulo Timeout")
+ .description("Max amount of time to wait for an unresponsive server. Set to 0 sec for no timeout. Entered value less than 1 second may be converted to 0 sec.")
+ .required(false)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("30 sec")
+ .build();
+
/**
* Implementations can decide to include all base properties or individually include them. List is immutable
* so that implementations must constructor their own lists knowingly
*/
- protected static final ImmutableList baseProperties = ImmutableList.of(ACCUMULO_CONNECTOR_SERVICE,TABLE_NAME,CREATE_TABLE,THREADS);
+ protected static final ImmutableList baseProperties = ImmutableList.of(ACCUMULO_CONNECTOR_SERVICE,TABLE_NAME,CREATE_TABLE,THREADS,ACCUMULO_TIMEOUT);
}
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/PutAccumuloRecord.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/PutAccumuloRecord.java
index 6a751d666b..7808b82156 100644
--- a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/PutAccumuloRecord.java
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/PutAccumuloRecord.java
@@ -279,6 +279,7 @@ public class PutAccumuloRecord extends BaseAccumuloProcessor {
BatchWriterConfig writerConfig = new BatchWriterConfig();
writerConfig.setMaxWriteThreads(context.getProperty(THREADS).asInteger());
writerConfig.setMaxMemory(maxBytes.longValue());
+ writerConfig.setTimeout(context.getProperty(ACCUMULO_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).longValue(), TimeUnit.SECONDS);
tableWriter = client.createMultiTableBatchWriter(writerConfig);
flushOnEveryFlow = context.getProperty(FLUSH_ON_FLOWFILE).asBoolean();
if (!flushOnEveryFlow){
@@ -355,6 +356,8 @@ public class PutAccumuloRecord extends BaseAccumuloProcessor {
final String tableName = processContext.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ accumuloConnectorService.renewTgtIfNecessary();
+
// create the table if EL is present, create table is true and the table does not exist.
if (processContext.getProperty(TABLE_NAME).isExpressionLanguagePresent() && processContext.getProperty(CREATE_TABLE).asBoolean()) {
final TableOperations tableOps = this.client.tableOperations();
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java
index 62d024ac89..23aeefdd2a 100644
--- a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java
@@ -71,6 +71,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
@EventDriven
@@ -85,6 +86,7 @@ import java.util.concurrent.atomic.LongAdder;
*
*/
public class ScanAccumulo extends BaseAccumuloProcessor {
+
static final PropertyDescriptor START_KEY = new PropertyDescriptor.Builder()
.displayName("Start key")
.name("start-key")
@@ -243,10 +245,13 @@ public class ScanAccumulo extends BaseAccumuloProcessor {
boolean cloneFlowFile = incomingFlowFile.isPresent();
+ accumuloConnectorService.renewTgtIfNecessary();
+
try (BatchScanner scanner = client.createBatchScanner(table,auths,threads)) {
if (!StringUtils.isBlank(startKeyCf) && StringUtils.isBlank(endKeyCf))
scanner.fetchColumnFamily(new Text(startKeyCf));
scanner.setRanges(Collections.singleton(lookupRange));
+ scanner.setTimeout(processContext.getProperty(ACCUMULO_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).longValue(), TimeUnit.SECONDS);
final Iterator> kvIter = scanner.iterator();
if (!kvIter.hasNext()){
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/src/main/java/org/apache/nifi/accumulo/controllerservices/BaseAccumuloService.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/src/main/java/org/apache/nifi/accumulo/controllerservices/BaseAccumuloService.java
index d92b152bca..3266ad54dd 100644
--- a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/src/main/java/org/apache/nifi/accumulo/controllerservices/BaseAccumuloService.java
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/src/main/java/org/apache/nifi/accumulo/controllerservices/BaseAccumuloService.java
@@ -28,5 +28,6 @@ public interface BaseAccumuloService extends ControllerService {
AccumuloClient getClient();
+ void renewTgtIfNecessary();
}
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml
index f466ddd2bc..30841d3448 100644
--- a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml
@@ -66,5 +66,26 @@
nifi-accumulo-services-api
1.14.0-SNAPSHOT
+
+ org.apache.nifi
+ nifi-kerberos-credentials-service-api
+ 1.14.0-SNAPSHOT
+
+
+ org.apache.nifi
+ nifi-security-kerberos
+ 1.14.0-SNAPSHOT
+
+
+ org.apache.nifi
+ nifi-hadoop-utils
+ 1.14.0-SNAPSHOT
+
+
+ org.apache.nifi
+ nifi-mock
+ 1.14.0-SNAPSHOT
+ test
+
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java
index 91da7fed19..38eee6962a 100644
--- a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java
@@ -20,7 +20,10 @@ package org.apache.nifi.accumulo.controllerservices;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
@@ -33,14 +36,21 @@ import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.hadoop.SecurityUtil;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.security.krb.KerberosKeytabUser;
+import org.apache.nifi.security.krb.KerberosPasswordUser;
+import org.apache.nifi.security.krb.KerberosUser;
import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Properties;
/**
* Purpose: Controller service that provides us a configured connector. Note that we don't need to close this
@@ -50,16 +60,18 @@ import java.util.List;
*/
@RequiresInstanceClassLoading
@Tags({"accumulo", "client", "service"})
-@CapabilityDescription("A controller service for accessing an HBase client.")
+@CapabilityDescription("A controller service for accessing an Accumulo Client.")
public class AccumuloService extends AbstractControllerService implements BaseAccumuloService {
- private enum AuthenticationType{
+ private enum AuthenticationType {
PASSWORD,
+ KERBEROS,
NONE
}
protected static final PropertyDescriptor ZOOKEEPER_QUORUM = new PropertyDescriptor.Builder()
.name("ZooKeeper Quorum")
+ .displayName("ZooKeeper Quorum")
.description("Comma-separated list of ZooKeeper hosts for Accumulo.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
@@ -67,35 +79,76 @@ public class AccumuloService extends AbstractControllerService implements BaseAc
protected static final PropertyDescriptor INSTANCE_NAME = new PropertyDescriptor.Builder()
.name("Instance Name")
+ .displayName("Instance Name")
.description("Instance name of the Accumulo cluster")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
-
- protected static final PropertyDescriptor ACCUMULO_USER = new PropertyDescriptor.Builder()
- .name("Accumulo User")
- .description("Connecting user for Accumulo")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .build();
-
- protected static final PropertyDescriptor ACCUMULO_PASSWORD = new PropertyDescriptor.Builder()
- .name("Accumulo Password")
- .description("Connecting user's password when using the PASSWORD Authentication type")
- .sensitive(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .build();
-
protected static final PropertyDescriptor AUTHENTICATION_TYPE = new PropertyDescriptor.Builder()
- .name("Authentication Type")
+ .name("accumulo-authentication-type")
+ .displayName("Authentication Type")
.description("Authentication Type")
.allowableValues(AuthenticationType.values())
.defaultValue(AuthenticationType.PASSWORD.toString())
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
+ protected static final PropertyDescriptor ACCUMULO_USER = new PropertyDescriptor.Builder()
+ .name("Accumulo User")
+ .displayName("Accumulo User")
+ .description("Connecting user for Accumulo")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .dependsOn(AUTHENTICATION_TYPE, AuthenticationType.PASSWORD.toString())
+ .build();
+
+ protected static final PropertyDescriptor ACCUMULO_PASSWORD = new PropertyDescriptor.Builder()
+ .name("Accumulo Password")
+ .displayName("Accumulo Password")
+ .description("Connecting user's password")
+ .sensitive(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .dependsOn(AUTHENTICATION_TYPE, AuthenticationType.PASSWORD.toString())
+ .build();
+
+ protected static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
+ .name("kerberos-credentials-service")
+ .displayName("Kerberos Credentials Service")
+ .description("Specifies the Kerberos Credentials Controller Service that should be used for principal + keytab Kerberos authentication")
+ .identifiesControllerService(KerberosCredentialsService.class)
+ .dependsOn(AUTHENTICATION_TYPE, AuthenticationType.KERBEROS.toString())
+ .build();
+
+ protected static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder()
+ .name("kerberos-principal")
+ .displayName("Kerberos Principal")
+ .description("Kerberos Principal")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .dependsOn(AUTHENTICATION_TYPE, AuthenticationType.KERBEROS.toString())
+ .build();
+
+ protected static final PropertyDescriptor KERBEROS_PASSWORD = new PropertyDescriptor.Builder()
+ .name("kerberos-password")
+ .displayName("Kerberos Password")
+ .description("Kerberos Password")
+ .sensitive(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .dependsOn(AUTHENTICATION_TYPE, AuthenticationType.KERBEROS.toString())
+ .build();
+
+ protected static final PropertyDescriptor ACCUMULO_SASL_QOP = new PropertyDescriptor.Builder()
+ .name("accumulo-sasl-qop")
+ .displayName("Accumulo SASL quality of protection")
+ .description("Accumulo SASL quality of protection for KERBEROS Authentication type")
+ .allowableValues("auth", "auth-int", "auth-conf")
+ .defaultValue("auth-conf")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .dependsOn(AUTHENTICATION_TYPE, AuthenticationType.KERBEROS.toString())
+ .build();
/**
* Reference to the accumulo client.
@@ -107,34 +160,27 @@ public class AccumuloService extends AbstractControllerService implements BaseAc
*/
private List properties;
+ private KerberosUser kerberosUser;
+
+ private AuthenticationType authType;
+
@Override
- protected void init(ControllerServiceInitializationContext config) throws InitializationException {
+ protected void init(ControllerServiceInitializationContext config) {
List props = new ArrayList<>();
props.add(ZOOKEEPER_QUORUM);
props.add(INSTANCE_NAME);
- props.add(ACCUMULO_USER);
props.add(AUTHENTICATION_TYPE);
+ props.add(ACCUMULO_USER);
props.add(ACCUMULO_PASSWORD);
+ props.add(KERBEROS_CREDENTIALS_SERVICE);
+ props.add(KERBEROS_PRINCIPAL);
+ props.add(KERBEROS_PASSWORD);
+ props.add(ACCUMULO_SASL_QOP);
properties = Collections.unmodifiableList(props);
}
- private AuthenticationToken getToken(final AuthenticationType type, final ConfigurationContext context){
- switch(type){
- case PASSWORD:
- return new PasswordToken(context.getProperty(ACCUMULO_PASSWORD).getValue());
- default:
- return null;
- }
- }
-
@Override
public final List getSupportedPropertyDescriptors() {
- final List properties = new ArrayList<>();
- properties.add(INSTANCE_NAME);
- properties.add(ZOOKEEPER_QUORUM);
- properties.add(ACCUMULO_USER);
- properties.add(AUTHENTICATION_TYPE);
- properties.add(ACCUMULO_PASSWORD);
return properties;
}
@@ -150,22 +196,38 @@ public class AccumuloService extends AbstractControllerService implements BaseAc
problems.add(new ValidationResult.Builder().valid(false).subject(ZOOKEEPER_QUORUM.getName()).explanation("Zookeepers must be supplied").build());
}
- if (!validationContext.getProperty(ACCUMULO_USER).isSet()){
- problems.add(new ValidationResult.Builder().valid(false).subject(ACCUMULO_USER.getName()).explanation("Accumulo user must be supplied").build());
- }
-
final AuthenticationType type = validationContext.getProperty(
- AUTHENTICATION_TYPE).isSet() ? AuthenticationType.valueOf( validationContext.getProperty(AUTHENTICATION_TYPE).getValue() ) : AuthenticationType.PASSWORD;
+ AUTHENTICATION_TYPE).isSet() ? AuthenticationType.valueOf( validationContext.getProperty(AUTHENTICATION_TYPE).getValue() ) : AuthenticationType.NONE;
switch(type){
case PASSWORD:
+ if (!validationContext.getProperty(ACCUMULO_USER).isSet()){
+ problems.add(
+ new ValidationResult.Builder().valid(false).subject(ACCUMULO_USER.getName()).explanation("Accumulo user must be supplied for the Password Authentication type").build());
+ }
if (!validationContext.getProperty(ACCUMULO_PASSWORD).isSet()){
problems.add(
- new ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName()).explanation("Password must be supplied for the Password Authentication type").build());
+ new ValidationResult.Builder().valid(false).subject(ACCUMULO_PASSWORD.getName())
+ .explanation("Password must be supplied for the Password Authentication type").build());
+ }
+ break;
+ case KERBEROS:
+ if (!validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && !validationContext.getProperty(KERBEROS_PASSWORD).isSet()){
+ problems.add(new ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName())
+ .explanation("Either Kerberos Password or Kerberos Credential Service must be set").build());
+ } else if (validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && validationContext.getProperty(KERBEROS_PASSWORD).isSet()){
+ problems.add(new ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName())
+ .explanation("Kerberos Password and Kerberos Credential Service should not be filled out at the same time").build());
+ } else if (validationContext.getProperty(KERBEROS_PASSWORD).isSet() && !validationContext.getProperty(KERBEROS_PRINCIPAL).isSet()) {
+ problems.add(new ValidationResult.Builder().valid(false).subject(KERBEROS_PRINCIPAL.getName())
+ .explanation("Kerberos Principal must be supplied when principal + password Kerberos authentication is used").build());
+ } else if (validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && validationContext.getProperty(KERBEROS_PRINCIPAL).isSet()){
+ problems.add(new ValidationResult.Builder().valid(false).subject(KERBEROS_PRINCIPAL.getName())
+ .explanation("Kerberos Principal (for password) should not be filled out when principal + keytab Kerberos authentication is used").build());
}
break;
default:
- problems.add(new ValidationResult.Builder().valid(false).subject(ACCUMULO_PASSWORD.getName()).explanation("Non supported Authentication type").build());
+ problems.add(new ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName()).explanation("Non supported Authentication type").build());
}
return problems;
@@ -173,38 +235,71 @@ public class AccumuloService extends AbstractControllerService implements BaseAc
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException {
- if (!context.getProperty(INSTANCE_NAME).isSet() || !context.getProperty(ZOOKEEPER_QUORUM).isSet() || !context.getProperty(ACCUMULO_USER).isSet()){
+ if (!context.getProperty(INSTANCE_NAME).isSet() || !context.getProperty(ZOOKEEPER_QUORUM).isSet()) {
throw new InitializationException("Instance name and Zookeeper Quorum must be specified");
}
-
-
+ final KerberosCredentialsService kerberosService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final String instanceName = context.getProperty(INSTANCE_NAME).evaluateAttributeExpressions().getValue();
final String zookeepers = context.getProperty(ZOOKEEPER_QUORUM).evaluateAttributeExpressions().getValue();
- final String accumuloUser = context.getProperty(ACCUMULO_USER).evaluateAttributeExpressions().getValue();
+ this.authType = AuthenticationType.valueOf( context.getProperty(AUTHENTICATION_TYPE).getValue());
- final AuthenticationType type = AuthenticationType.valueOf( context.getProperty(AUTHENTICATION_TYPE).getValue() );
+ final Properties clientConf = new Properties();
+ clientConf.setProperty("instance.zookeepers", zookeepers);
+ clientConf.setProperty("instance.name", instanceName);
+ switch(authType){
+ case PASSWORD:
+ final String accumuloUser = context.getProperty(ACCUMULO_USER).evaluateAttributeExpressions().getValue();
+ final AuthenticationToken token = new PasswordToken(context.getProperty(ACCUMULO_PASSWORD).getValue());
- final AuthenticationToken token = getToken(type,context);
+ this.client = Accumulo.newClient().from(clientConf).as(accumuloUser, token).build();
+ break;
+ case KERBEROS:
+ final String principal;
- this.client = Accumulo.newClient().to(instanceName,zookeepers).as(accumuloUser,token).build();
+ if (kerberosService == null) {
+ principal = context.getProperty(KERBEROS_PRINCIPAL).getValue();
+ this.kerberosUser = new KerberosPasswordUser(principal, context.getProperty(KERBEROS_PASSWORD).getValue());
+ } else {
+ principal = kerberosService.getPrincipal();
+ this.kerberosUser = new KerberosKeytabUser(principal, kerberosService.getKeytab());
+ }
- if (null == token){
- throw new InitializationException("Feature not implemented");
+ clientConf.setProperty("sasl.enabled", "true");
+ clientConf.setProperty("sasl.qop", context.getProperty(ACCUMULO_SASL_QOP).getValue());
+
+ //Client uses the currently logged in user's security context, so need to login first.
+ Configuration conf = new Configuration();
+ conf.set("hadoop.security.authentication", "kerberos");
+ UserGroupInformation.setConfiguration(conf);
+ final UserGroupInformation clientUgi = SecurityUtil.getUgiForKerberosUser(conf, kerberosUser);
+
+ this.client = clientUgi.doAs((PrivilegedExceptionAction) () ->
+ Accumulo.newClient().from(clientConf).as(principal, new KerberosToken()).build());
+ break;
+ default:
+ throw new InitializationException("Not supported authentication type.");
}
-
}
@Override
- public AccumuloClient getClient(){
+ public AccumuloClient getClient() {
return client;
}
+ @Override
+ public void renewTgtIfNecessary() {
+ if (authType.equals(AuthenticationType.KERBEROS)) {
+ SecurityUtil.checkTGTAndRelogin(getLogger(), kerberosUser);
+ }
+ }
+
@OnDisabled
public void shutdown() {
- client.close();
+ if (client != null) {
+ client.close();
+ }
}
-
}
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/test/java/org/apache/nifi/accumulo/controllerservices/TestAccumuloService.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/test/java/org/apache/nifi/accumulo/controllerservices/TestAccumuloService.java
new file mode 100644
index 0000000000..51332bbf38
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/test/java/org/apache/nifi/accumulo/controllerservices/TestAccumuloService.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.accumulo.controllerservices;
+
+import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.reporting.InitializationException;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.Mockito.when;
+
+public class TestAccumuloService {
+
+ private static final String INSTANCE = "instance";
+ private static final String ZOOKEEPER = "zookeeper";
+ private static final String PASSWORD = "PASSWORD";
+ private static final String USER = "USER";
+ private static final String KERBEROS = "KERBEROS";
+ private static final String PRINCIPAL = "principal";
+ private static final String KERBEROS_PASSWORD = "kerberos_password";
+ private static final String NONE = "NONE";
+
+ private TestRunner runner;
+ private AccumuloService accumuloService;
+
+ @Mock
+ private KerberosCredentialsService credentialService;
+ @Mock
+ private Processor dummyProcessor;
+
+ @Before
+ public void init() {
+ MockitoAnnotations.initMocks(this);
+
+ runner = TestRunners.newTestRunner(dummyProcessor);
+ accumuloService = new AccumuloService();
+
+ when(credentialService.getIdentifier()).thenReturn("1");
+ }
+
+ @Test
+ public void testServiceValidWithAuthTypePasswordAndInstanceZookeeperUserPasswordAreSet() throws InitializationException {
+ //given
+ runner.addControllerService("accumulo-connector-service", accumuloService);
+ runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE);
+ runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER);
+ runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, PASSWORD);
+ runner.setProperty(accumuloService, AccumuloService.ACCUMULO_USER, USER);
+ runner.setProperty(accumuloService, AccumuloService.ACCUMULO_PASSWORD, PASSWORD);
+ //when
+ //then
+ runner.assertValid(accumuloService);
+ }
+
+ @Test
+ public void testServiceNotValidWithInstanceMissing() throws InitializationException {
+ //given
+ runner.addControllerService("accumulo-connector-service", accumuloService);
+ runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER);
+ //when
+ //then
+ assertServiceIsInvalidWithErrorMessage("Instance name must be supplied");
+ }
+
+ @Test
+ public void testServiceNotValidWithZookeeperMissing() throws InitializationException {
+ //given
+ runner.addControllerService("accumulo-connector-service", accumuloService);
+ runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE);
+ //when
+ //then
+ assertServiceIsInvalidWithErrorMessage("Zookeepers must be supplied");
+ }
+
+ @Test
+ public void testServiceNotValidWithAuthTypeNone() throws InitializationException {
+ //given
+ runner.addControllerService("accumulo-connector-service", accumuloService);
+ runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE);
+ runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER);
+ runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, NONE);
+ //when
+ //then
+ assertServiceIsInvalidWithErrorMessage("Non supported Authentication type");
+ }
+
+ @Test
+ public void testServiceNotValidWithAuthTypePasswordAndUserMissing() throws InitializationException {
+ //given
+ runner.addControllerService("accumulo-connector-service", accumuloService);
+ runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE);
+ runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER);
+ runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, PASSWORD);
+ runner.setProperty(accumuloService, AccumuloService.ACCUMULO_PASSWORD, PASSWORD);
+ //when
+ //then
+ assertServiceIsInvalidWithErrorMessage("Accumulo user must be supplied");
+ }
+
+ @Test
+ public void testServiceNotValidWithAuthTypePasswordAndPasswordMissing() throws InitializationException {
+ //given
+ runner.addControllerService("accumulo-connector-service", accumuloService);
+ runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE);
+ runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER);
+ runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, PASSWORD);
+ runner.setProperty(accumuloService, AccumuloService.ACCUMULO_USER, USER);
+ //when
+ //then
+ assertServiceIsInvalidWithErrorMessage("Password must be supplied");
+ }
+
+ @Test
+ public void testServiceNotValidWithAuthTypeKerberosAndKerberosPasswordAndCredentialServiceMissing() throws InitializationException {
+ //given
+ runner.addControllerService("accumulo-connector-service", accumuloService);
+ runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE);
+ runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER);
+ runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
+ //when
+ //then
+ assertServiceIsInvalidWithErrorMessage("Either Kerberos Password or Kerberos Credential Service must be set");
+ }
+
+ @Test
+ public void testServiceNotValidWithAuthTypeKerberosAndKerberosPrincipalMissing() throws InitializationException {
+ //given
+ runner.addControllerService("accumulo-connector-service", accumuloService);
+ runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE);
+ runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER);
+ runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
+ runner.setProperty(accumuloService, AccumuloService.KERBEROS_PASSWORD, KERBEROS_PASSWORD);
+ //when
+ //then
+ assertServiceIsInvalidWithErrorMessage("Kerberos Principal must be supplied");
+ }
+
+ @Test
+ public void testServiceNotValidWithAuthTypeKerberosAndKerberosPasswordAndCredentialServiceSet() throws InitializationException {
+ //given
+ runner.addControllerService("accumulo-connector-service", accumuloService);
+ runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE);
+ runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER);
+ runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
+ runner.setProperty(accumuloService, AccumuloService.KERBEROS_PASSWORD, KERBEROS_PASSWORD);
+ runner.addControllerService("kerberos-credentials-service", credentialService);
+ runner.setProperty(accumuloService, AccumuloService.KERBEROS_CREDENTIALS_SERVICE, credentialService.getIdentifier());
+ //when
+ //then
+ assertServiceIsInvalidWithErrorMessage("should not be filled out at the same time");
+ }
+
+ @Test
+ public void testServiceNotValidWithAuthTypeKerberosAndPrincipalAndCredentialServiceSet() throws InitializationException {
+ //given
+ runner.addControllerService("accumulo-connector-service", accumuloService);
+ runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE);
+ runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER);
+ runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
+ runner.setProperty(accumuloService, AccumuloService.KERBEROS_PRINCIPAL, PRINCIPAL);
+ runner.addControllerService("kerberos-credentials-service", credentialService);
+ runner.setProperty(accumuloService, AccumuloService.KERBEROS_CREDENTIALS_SERVICE, credentialService.getIdentifier());
+ //when
+ //then
+ assertServiceIsInvalidWithErrorMessage("Kerberos Principal (for password) should not be filled out");
+ }
+
+ private void assertServiceIsInvalidWithErrorMessage(String errorMessage) {
+ Exception exception = assertThrows(IllegalStateException.class, () -> runner.enableControllerService(accumuloService));
+ assertThat(exception.getMessage(), containsString(errorMessage));
+ }
+}
\ No newline at end of file