NIFI-8373 Add Kerberos support to Accumulo processors

This closes #4973.

Signed-off-by: Tamas Palfy <tamas.bertalan.palfy@gmail.com>
This commit is contained in:
Timea Barna 2021-04-06 07:14:12 +02:00 committed by Tamas Palfy
parent 4888eaf62d
commit b6f84413c4
8 changed files with 392 additions and 57 deletions

View File

@ -97,6 +97,12 @@
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-credentials-service-api</artifactId>
<version>1.14.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-accumulo-services</artifactId> <artifactId>nifi-accumulo-services</artifactId>

View File

@ -65,12 +65,21 @@ public abstract class BaseAccumuloProcessor extends AbstractProcessor {
.defaultValue("10") .defaultValue("10")
.build(); .build();
protected static final PropertyDescriptor ACCUMULO_TIMEOUT = new PropertyDescriptor.Builder()
.name("accumulo-timeout")
.displayName("Accumulo Timeout")
.description("Max amount of time to wait for an unresponsive server. Set to 0 sec for no timeout. Entered value less than 1 second may be converted to 0 sec.")
.required(false)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("30 sec")
.build();
/** /**
* Implementations can decide to include all base properties or individually include them. List is immutable * Implementations can decide to include all base properties or individually include them. List is immutable
* so that implementations must constructor their own lists knowingly * so that implementations must constructor their own lists knowingly
*/ */
protected static final ImmutableList<PropertyDescriptor> baseProperties = ImmutableList.of(ACCUMULO_CONNECTOR_SERVICE,TABLE_NAME,CREATE_TABLE,THREADS); protected static final ImmutableList<PropertyDescriptor> baseProperties = ImmutableList.of(ACCUMULO_CONNECTOR_SERVICE,TABLE_NAME,CREATE_TABLE,THREADS,ACCUMULO_TIMEOUT);
} }

View File

@ -279,6 +279,7 @@ public class PutAccumuloRecord extends BaseAccumuloProcessor {
BatchWriterConfig writerConfig = new BatchWriterConfig(); BatchWriterConfig writerConfig = new BatchWriterConfig();
writerConfig.setMaxWriteThreads(context.getProperty(THREADS).asInteger()); writerConfig.setMaxWriteThreads(context.getProperty(THREADS).asInteger());
writerConfig.setMaxMemory(maxBytes.longValue()); writerConfig.setMaxMemory(maxBytes.longValue());
writerConfig.setTimeout(context.getProperty(ACCUMULO_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).longValue(), TimeUnit.SECONDS);
tableWriter = client.createMultiTableBatchWriter(writerConfig); tableWriter = client.createMultiTableBatchWriter(writerConfig);
flushOnEveryFlow = context.getProperty(FLUSH_ON_FLOWFILE).asBoolean(); flushOnEveryFlow = context.getProperty(FLUSH_ON_FLOWFILE).asBoolean();
if (!flushOnEveryFlow){ if (!flushOnEveryFlow){
@ -355,6 +356,8 @@ public class PutAccumuloRecord extends BaseAccumuloProcessor {
final String tableName = processContext.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); final String tableName = processContext.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
accumuloConnectorService.renewTgtIfNecessary();
// create the table if EL is present, create table is true and the table does not exist. // create the table if EL is present, create table is true and the table does not exist.
if (processContext.getProperty(TABLE_NAME).isExpressionLanguagePresent() && processContext.getProperty(CREATE_TABLE).asBoolean()) { if (processContext.getProperty(TABLE_NAME).isExpressionLanguagePresent() && processContext.getProperty(CREATE_TABLE).asBoolean()) {
final TableOperations tableOps = this.client.tableOperations(); final TableOperations tableOps = this.client.tableOperations();

View File

@ -71,6 +71,7 @@ import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.atomic.LongAdder;
@EventDriven @EventDriven
@ -85,6 +86,7 @@ import java.util.concurrent.atomic.LongAdder;
* *
*/ */
public class ScanAccumulo extends BaseAccumuloProcessor { public class ScanAccumulo extends BaseAccumuloProcessor {
static final PropertyDescriptor START_KEY = new PropertyDescriptor.Builder() static final PropertyDescriptor START_KEY = new PropertyDescriptor.Builder()
.displayName("Start key") .displayName("Start key")
.name("start-key") .name("start-key")
@ -243,10 +245,13 @@ public class ScanAccumulo extends BaseAccumuloProcessor {
boolean cloneFlowFile = incomingFlowFile.isPresent(); boolean cloneFlowFile = incomingFlowFile.isPresent();
accumuloConnectorService.renewTgtIfNecessary();
try (BatchScanner scanner = client.createBatchScanner(table,auths,threads)) { try (BatchScanner scanner = client.createBatchScanner(table,auths,threads)) {
if (!StringUtils.isBlank(startKeyCf) && StringUtils.isBlank(endKeyCf)) if (!StringUtils.isBlank(startKeyCf) && StringUtils.isBlank(endKeyCf))
scanner.fetchColumnFamily(new Text(startKeyCf)); scanner.fetchColumnFamily(new Text(startKeyCf));
scanner.setRanges(Collections.singleton(lookupRange)); scanner.setRanges(Collections.singleton(lookupRange));
scanner.setTimeout(processContext.getProperty(ACCUMULO_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).longValue(), TimeUnit.SECONDS);
final Iterator<Map.Entry<Key,Value>> kvIter = scanner.iterator(); final Iterator<Map.Entry<Key,Value>> kvIter = scanner.iterator();
if (!kvIter.hasNext()){ if (!kvIter.hasNext()){

View File

@ -28,5 +28,6 @@ public interface BaseAccumuloService extends ControllerService {
AccumuloClient getClient(); AccumuloClient getClient();
void renewTgtIfNecessary();
} }

View File

@ -66,5 +66,26 @@
<artifactId>nifi-accumulo-services-api</artifactId> <artifactId>nifi-accumulo-services-api</artifactId>
<version>1.14.0-SNAPSHOT</version> <version>1.14.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-credentials-service-api</artifactId>
<version>1.14.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-kerberos</artifactId>
<version>1.14.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hadoop-utils</artifactId>
<version>1.14.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.14.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -20,7 +20,10 @@ package org.apache.nifi.accumulo.controllerservices;
import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.KerberosToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
@ -33,14 +36,21 @@ import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext; import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosPasswordUser;
import org.apache.nifi.security.krb.KerberosUser;
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Properties;
/** /**
* Purpose: Controller service that provides us a configured connector. Note that we don't need to close this * Purpose: Controller service that provides us a configured connector. Note that we don't need to close this
@ -50,16 +60,18 @@ import java.util.List;
*/ */
@RequiresInstanceClassLoading @RequiresInstanceClassLoading
@Tags({"accumulo", "client", "service"}) @Tags({"accumulo", "client", "service"})
@CapabilityDescription("A controller service for accessing an HBase client.") @CapabilityDescription("A controller service for accessing an Accumulo Client.")
public class AccumuloService extends AbstractControllerService implements BaseAccumuloService { public class AccumuloService extends AbstractControllerService implements BaseAccumuloService {
private enum AuthenticationType { private enum AuthenticationType {
PASSWORD, PASSWORD,
KERBEROS,
NONE NONE
} }
protected static final PropertyDescriptor ZOOKEEPER_QUORUM = new PropertyDescriptor.Builder() protected static final PropertyDescriptor ZOOKEEPER_QUORUM = new PropertyDescriptor.Builder()
.name("ZooKeeper Quorum") .name("ZooKeeper Quorum")
.displayName("ZooKeeper Quorum")
.description("Comma-separated list of ZooKeeper hosts for Accumulo.") .description("Comma-separated list of ZooKeeper hosts for Accumulo.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
@ -67,35 +79,76 @@ public class AccumuloService extends AbstractControllerService implements BaseAc
protected static final PropertyDescriptor INSTANCE_NAME = new PropertyDescriptor.Builder() protected static final PropertyDescriptor INSTANCE_NAME = new PropertyDescriptor.Builder()
.name("Instance Name") .name("Instance Name")
.displayName("Instance Name")
.description("Instance name of the Accumulo cluster") .description("Instance name of the Accumulo cluster")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build(); .build();
protected static final PropertyDescriptor ACCUMULO_USER = new PropertyDescriptor.Builder()
.name("Accumulo User")
.description("Connecting user for Accumulo")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
protected static final PropertyDescriptor ACCUMULO_PASSWORD = new PropertyDescriptor.Builder()
.name("Accumulo Password")
.description("Connecting user's password when using the PASSWORD Authentication type")
.sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
protected static final PropertyDescriptor AUTHENTICATION_TYPE = new PropertyDescriptor.Builder() protected static final PropertyDescriptor AUTHENTICATION_TYPE = new PropertyDescriptor.Builder()
.name("Authentication Type") .name("accumulo-authentication-type")
.displayName("Authentication Type")
.description("Authentication Type") .description("Authentication Type")
.allowableValues(AuthenticationType.values()) .allowableValues(AuthenticationType.values())
.defaultValue(AuthenticationType.PASSWORD.toString()) .defaultValue(AuthenticationType.PASSWORD.toString())
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();
protected static final PropertyDescriptor ACCUMULO_USER = new PropertyDescriptor.Builder()
.name("Accumulo User")
.displayName("Accumulo User")
.description("Connecting user for Accumulo")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.dependsOn(AUTHENTICATION_TYPE, AuthenticationType.PASSWORD.toString())
.build();
protected static final PropertyDescriptor ACCUMULO_PASSWORD = new PropertyDescriptor.Builder()
.name("Accumulo Password")
.displayName("Accumulo Password")
.description("Connecting user's password")
.sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.dependsOn(AUTHENTICATION_TYPE, AuthenticationType.PASSWORD.toString())
.build();
protected static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
.name("kerberos-credentials-service")
.displayName("Kerberos Credentials Service")
.description("Specifies the Kerberos Credentials Controller Service that should be used for principal + keytab Kerberos authentication")
.identifiesControllerService(KerberosCredentialsService.class)
.dependsOn(AUTHENTICATION_TYPE, AuthenticationType.KERBEROS.toString())
.build();
protected static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder()
.name("kerberos-principal")
.displayName("Kerberos Principal")
.description("Kerberos Principal")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.dependsOn(AUTHENTICATION_TYPE, AuthenticationType.KERBEROS.toString())
.build();
protected static final PropertyDescriptor KERBEROS_PASSWORD = new PropertyDescriptor.Builder()
.name("kerberos-password")
.displayName("Kerberos Password")
.description("Kerberos Password")
.sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.dependsOn(AUTHENTICATION_TYPE, AuthenticationType.KERBEROS.toString())
.build();
protected static final PropertyDescriptor ACCUMULO_SASL_QOP = new PropertyDescriptor.Builder()
.name("accumulo-sasl-qop")
.displayName("Accumulo SASL quality of protection")
.description("Accumulo SASL quality of protection for KERBEROS Authentication type")
.allowableValues("auth", "auth-int", "auth-conf")
.defaultValue("auth-conf")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.dependsOn(AUTHENTICATION_TYPE, AuthenticationType.KERBEROS.toString())
.build();
/** /**
* Reference to the accumulo client. * Reference to the accumulo client.
@ -107,34 +160,27 @@ public class AccumuloService extends AbstractControllerService implements BaseAc
*/ */
private List<PropertyDescriptor> properties; private List<PropertyDescriptor> properties;
private KerberosUser kerberosUser;
private AuthenticationType authType;
@Override @Override
protected void init(ControllerServiceInitializationContext config) throws InitializationException { protected void init(ControllerServiceInitializationContext config) {
List<PropertyDescriptor> props = new ArrayList<>(); List<PropertyDescriptor> props = new ArrayList<>();
props.add(ZOOKEEPER_QUORUM); props.add(ZOOKEEPER_QUORUM);
props.add(INSTANCE_NAME); props.add(INSTANCE_NAME);
props.add(ACCUMULO_USER);
props.add(AUTHENTICATION_TYPE); props.add(AUTHENTICATION_TYPE);
props.add(ACCUMULO_USER);
props.add(ACCUMULO_PASSWORD); props.add(ACCUMULO_PASSWORD);
props.add(KERBEROS_CREDENTIALS_SERVICE);
props.add(KERBEROS_PRINCIPAL);
props.add(KERBEROS_PASSWORD);
props.add(ACCUMULO_SASL_QOP);
properties = Collections.unmodifiableList(props); properties = Collections.unmodifiableList(props);
} }
private AuthenticationToken getToken(final AuthenticationType type, final ConfigurationContext context){
switch(type){
case PASSWORD:
return new PasswordToken(context.getProperty(ACCUMULO_PASSWORD).getValue());
default:
return null;
}
}
@Override @Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(INSTANCE_NAME);
properties.add(ZOOKEEPER_QUORUM);
properties.add(ACCUMULO_USER);
properties.add(AUTHENTICATION_TYPE);
properties.add(ACCUMULO_PASSWORD);
return properties; return properties;
} }
@ -150,22 +196,38 @@ public class AccumuloService extends AbstractControllerService implements BaseAc
problems.add(new ValidationResult.Builder().valid(false).subject(ZOOKEEPER_QUORUM.getName()).explanation("Zookeepers must be supplied").build()); problems.add(new ValidationResult.Builder().valid(false).subject(ZOOKEEPER_QUORUM.getName()).explanation("Zookeepers must be supplied").build());
} }
if (!validationContext.getProperty(ACCUMULO_USER).isSet()){
problems.add(new ValidationResult.Builder().valid(false).subject(ACCUMULO_USER.getName()).explanation("Accumulo user must be supplied").build());
}
final AuthenticationType type = validationContext.getProperty( final AuthenticationType type = validationContext.getProperty(
AUTHENTICATION_TYPE).isSet() ? AuthenticationType.valueOf( validationContext.getProperty(AUTHENTICATION_TYPE).getValue() ) : AuthenticationType.PASSWORD; AUTHENTICATION_TYPE).isSet() ? AuthenticationType.valueOf( validationContext.getProperty(AUTHENTICATION_TYPE).getValue() ) : AuthenticationType.NONE;
switch(type){ switch(type){
case PASSWORD: case PASSWORD:
if (!validationContext.getProperty(ACCUMULO_USER).isSet()){
problems.add(
new ValidationResult.Builder().valid(false).subject(ACCUMULO_USER.getName()).explanation("Accumulo user must be supplied for the Password Authentication type").build());
}
if (!validationContext.getProperty(ACCUMULO_PASSWORD).isSet()){ if (!validationContext.getProperty(ACCUMULO_PASSWORD).isSet()){
problems.add( problems.add(
new ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName()).explanation("Password must be supplied for the Password Authentication type").build()); new ValidationResult.Builder().valid(false).subject(ACCUMULO_PASSWORD.getName())
.explanation("Password must be supplied for the Password Authentication type").build());
}
break;
case KERBEROS:
if (!validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && !validationContext.getProperty(KERBEROS_PASSWORD).isSet()){
problems.add(new ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName())
.explanation("Either Kerberos Password or Kerberos Credential Service must be set").build());
} else if (validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && validationContext.getProperty(KERBEROS_PASSWORD).isSet()){
problems.add(new ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName())
.explanation("Kerberos Password and Kerberos Credential Service should not be filled out at the same time").build());
} else if (validationContext.getProperty(KERBEROS_PASSWORD).isSet() && !validationContext.getProperty(KERBEROS_PRINCIPAL).isSet()) {
problems.add(new ValidationResult.Builder().valid(false).subject(KERBEROS_PRINCIPAL.getName())
.explanation("Kerberos Principal must be supplied when principal + password Kerberos authentication is used").build());
} else if (validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && validationContext.getProperty(KERBEROS_PRINCIPAL).isSet()){
problems.add(new ValidationResult.Builder().valid(false).subject(KERBEROS_PRINCIPAL.getName())
.explanation("Kerberos Principal (for password) should not be filled out when principal + keytab Kerberos authentication is used").build());
} }
break; break;
default: default:
problems.add(new ValidationResult.Builder().valid(false).subject(ACCUMULO_PASSWORD.getName()).explanation("Non supported Authentication type").build()); problems.add(new ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName()).explanation("Non supported Authentication type").build());
} }
return problems; return problems;
@ -173,28 +235,53 @@ public class AccumuloService extends AbstractControllerService implements BaseAc
@OnEnabled @OnEnabled
public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException { public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException {
if (!context.getProperty(INSTANCE_NAME).isSet() || !context.getProperty(ZOOKEEPER_QUORUM).isSet() || !context.getProperty(ACCUMULO_USER).isSet()){ if (!context.getProperty(INSTANCE_NAME).isSet() || !context.getProperty(ZOOKEEPER_QUORUM).isSet()) {
throw new InitializationException("Instance name and Zookeeper Quorum must be specified"); throw new InitializationException("Instance name and Zookeeper Quorum must be specified");
} }
final KerberosCredentialsService kerberosService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final String instanceName = context.getProperty(INSTANCE_NAME).evaluateAttributeExpressions().getValue(); final String instanceName = context.getProperty(INSTANCE_NAME).evaluateAttributeExpressions().getValue();
final String zookeepers = context.getProperty(ZOOKEEPER_QUORUM).evaluateAttributeExpressions().getValue(); final String zookeepers = context.getProperty(ZOOKEEPER_QUORUM).evaluateAttributeExpressions().getValue();
this.authType = AuthenticationType.valueOf( context.getProperty(AUTHENTICATION_TYPE).getValue());
final Properties clientConf = new Properties();
clientConf.setProperty("instance.zookeepers", zookeepers);
clientConf.setProperty("instance.name", instanceName);
switch(authType){
case PASSWORD:
final String accumuloUser = context.getProperty(ACCUMULO_USER).evaluateAttributeExpressions().getValue(); final String accumuloUser = context.getProperty(ACCUMULO_USER).evaluateAttributeExpressions().getValue();
final AuthenticationType type = AuthenticationType.valueOf( context.getProperty(AUTHENTICATION_TYPE).getValue() ); final AuthenticationToken token = new PasswordToken(context.getProperty(ACCUMULO_PASSWORD).getValue());
this.client = Accumulo.newClient().from(clientConf).as(accumuloUser, token).build();
break;
case KERBEROS:
final String principal;
if (kerberosService == null) {
final AuthenticationToken token = getToken(type,context); principal = context.getProperty(KERBEROS_PRINCIPAL).getValue();
this.kerberosUser = new KerberosPasswordUser(principal, context.getProperty(KERBEROS_PASSWORD).getValue());
this.client = Accumulo.newClient().to(instanceName,zookeepers).as(accumuloUser,token).build(); } else {
principal = kerberosService.getPrincipal();
if (null == token){ this.kerberosUser = new KerberosKeytabUser(principal, kerberosService.getKeytab());
throw new InitializationException("Feature not implemented");
} }
clientConf.setProperty("sasl.enabled", "true");
clientConf.setProperty("sasl.qop", context.getProperty(ACCUMULO_SASL_QOP).getValue());
//Client uses the currently logged in user's security context, so need to login first.
Configuration conf = new Configuration();
conf.set("hadoop.security.authentication", "kerberos");
UserGroupInformation.setConfiguration(conf);
final UserGroupInformation clientUgi = SecurityUtil.getUgiForKerberosUser(conf, kerberosUser);
this.client = clientUgi.doAs((PrivilegedExceptionAction<AccumuloClient>) () ->
Accumulo.newClient().from(clientConf).as(principal, new KerberosToken()).build());
break;
default:
throw new InitializationException("Not supported authentication type.");
}
} }
@Override @Override
@ -202,9 +289,17 @@ public class AccumuloService extends AbstractControllerService implements BaseAc
return client; return client;
} }
@OnDisabled @Override
public void shutdown() { public void renewTgtIfNecessary() {
client.close(); if (authType.equals(AuthenticationType.KERBEROS)) {
SecurityUtil.checkTGTAndRelogin(getLogger(), kerberosUser);
}
} }
@OnDisabled
public void shutdown() {
if (client != null) {
client.close();
}
}
} }

View File

@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.accumulo.controllerservices;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.reporting.InitializationException;
import org.junit.Before;
import org.junit.Test;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.when;
public class TestAccumuloService {
private static final String INSTANCE = "instance";
private static final String ZOOKEEPER = "zookeeper";
private static final String PASSWORD = "PASSWORD";
private static final String USER = "USER";
private static final String KERBEROS = "KERBEROS";
private static final String PRINCIPAL = "principal";
private static final String KERBEROS_PASSWORD = "kerberos_password";
private static final String NONE = "NONE";
private TestRunner runner;
private AccumuloService accumuloService;
@Mock
private KerberosCredentialsService credentialService;
@Mock
private Processor dummyProcessor;
@Before
public void init() {
MockitoAnnotations.initMocks(this);
runner = TestRunners.newTestRunner(dummyProcessor);
accumuloService = new AccumuloService();
when(credentialService.getIdentifier()).thenReturn("1");
}
@Test
public void testServiceValidWithAuthTypePasswordAndInstanceZookeeperUserPasswordAreSet() throws InitializationException {
//given
runner.addControllerService("accumulo-connector-service", accumuloService);
runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE);
runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER);
runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, PASSWORD);
runner.setProperty(accumuloService, AccumuloService.ACCUMULO_USER, USER);
runner.setProperty(accumuloService, AccumuloService.ACCUMULO_PASSWORD, PASSWORD);
//when
//then
runner.assertValid(accumuloService);
}
@Test
public void testServiceNotValidWithInstanceMissing() throws InitializationException {
//given
runner.addControllerService("accumulo-connector-service", accumuloService);
runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER);
//when
//then
assertServiceIsInvalidWithErrorMessage("Instance name must be supplied");
}
@Test
public void testServiceNotValidWithZookeeperMissing() throws InitializationException {
//given
runner.addControllerService("accumulo-connector-service", accumuloService);
runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE);
//when
//then
assertServiceIsInvalidWithErrorMessage("Zookeepers must be supplied");
}
@Test
public void testServiceNotValidWithAuthTypeNone() throws InitializationException {
//given
runner.addControllerService("accumulo-connector-service", accumuloService);
runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE);
runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER);
runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, NONE);
//when
//then
assertServiceIsInvalidWithErrorMessage("Non supported Authentication type");
}
@Test
public void testServiceNotValidWithAuthTypePasswordAndUserMissing() throws InitializationException {
//given
runner.addControllerService("accumulo-connector-service", accumuloService);
runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE);
runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER);
runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, PASSWORD);
runner.setProperty(accumuloService, AccumuloService.ACCUMULO_PASSWORD, PASSWORD);
//when
//then
assertServiceIsInvalidWithErrorMessage("Accumulo user must be supplied");
}
@Test
public void testServiceNotValidWithAuthTypePasswordAndPasswordMissing() throws InitializationException {
//given
runner.addControllerService("accumulo-connector-service", accumuloService);
runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE);
runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER);
runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, PASSWORD);
runner.setProperty(accumuloService, AccumuloService.ACCUMULO_USER, USER);
//when
//then
assertServiceIsInvalidWithErrorMessage("Password must be supplied");
}
@Test
public void testServiceNotValidWithAuthTypeKerberosAndKerberosPasswordAndCredentialServiceMissing() throws InitializationException {
//given
runner.addControllerService("accumulo-connector-service", accumuloService);
runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE);
runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER);
runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
//when
//then
assertServiceIsInvalidWithErrorMessage("Either Kerberos Password or Kerberos Credential Service must be set");
}
@Test
public void testServiceNotValidWithAuthTypeKerberosAndKerberosPrincipalMissing() throws InitializationException {
//given
runner.addControllerService("accumulo-connector-service", accumuloService);
runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE);
runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER);
runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
runner.setProperty(accumuloService, AccumuloService.KERBEROS_PASSWORD, KERBEROS_PASSWORD);
//when
//then
assertServiceIsInvalidWithErrorMessage("Kerberos Principal must be supplied");
}
@Test
public void testServiceNotValidWithAuthTypeKerberosAndKerberosPasswordAndCredentialServiceSet() throws InitializationException {
//given
runner.addControllerService("accumulo-connector-service", accumuloService);
runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE);
runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER);
runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
runner.setProperty(accumuloService, AccumuloService.KERBEROS_PASSWORD, KERBEROS_PASSWORD);
runner.addControllerService("kerberos-credentials-service", credentialService);
runner.setProperty(accumuloService, AccumuloService.KERBEROS_CREDENTIALS_SERVICE, credentialService.getIdentifier());
//when
//then
assertServiceIsInvalidWithErrorMessage("should not be filled out at the same time");
}
@Test
public void testServiceNotValidWithAuthTypeKerberosAndPrincipalAndCredentialServiceSet() throws InitializationException {
//given
runner.addControllerService("accumulo-connector-service", accumuloService);
runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE);
runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER);
runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
runner.setProperty(accumuloService, AccumuloService.KERBEROS_PRINCIPAL, PRINCIPAL);
runner.addControllerService("kerberos-credentials-service", credentialService);
runner.setProperty(accumuloService, AccumuloService.KERBEROS_CREDENTIALS_SERVICE, credentialService.getIdentifier());
//when
//then
assertServiceIsInvalidWithErrorMessage("Kerberos Principal (for password) should not be filled out");
}
private void assertServiceIsInvalidWithErrorMessage(String errorMessage) {
Exception exception = assertThrows(IllegalStateException.class, () -> runner.enableControllerService(accumuloService));
assertThat(exception.getMessage(), containsString(errorMessage));
}
}