NIFI-5054: Add Couchbase user authentication

This commit is contained in:
Koji Kawamura 2018-06-01 16:55:26 +09:00 committed by Matt Gilman
parent 2277b9411e
commit f05c5e6ea0
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
2 changed files with 83 additions and 8 deletions

View File

@ -17,6 +17,7 @@
package org.apache.nifi.couchbase; package org.apache.nifi.couchbase;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -28,14 +29,18 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled; import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import com.couchbase.client.core.CouchbaseException; import com.couchbase.client.core.CouchbaseException;
import com.couchbase.client.java.Bucket; import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.CouchbaseCluster; import com.couchbase.client.java.CouchbaseCluster;
import org.apache.nifi.util.StringUtils;
/** /**
* Provides a centralized Couchbase connection and bucket passwords management. * Provides a centralized Couchbase connection and bucket passwords management.
@ -43,14 +48,43 @@ import com.couchbase.client.java.CouchbaseCluster;
@CapabilityDescription("Provides a centralized Couchbase connection and bucket passwords management." @CapabilityDescription("Provides a centralized Couchbase connection and bucket passwords management."
+ " Bucket passwords can be specified via dynamic properties.") + " Bucket passwords can be specified via dynamic properties.")
@Tags({ "nosql", "couchbase", "database", "connection" }) @Tags({ "nosql", "couchbase", "database", "connection" })
@DynamicProperty(name = "Bucket Password for BUCKET_NAME", value = "bucket password", description = "Specify bucket password if necessary.") @DynamicProperty(name = "Bucket Password for BUCKET_NAME", value = "bucket password",
description = "Specify bucket password if necessary." +
" Couchbase Server 5.0 or later should use 'User Name' and 'User Password' instead.")
public class CouchbaseClusterService extends AbstractControllerService implements CouchbaseClusterControllerService { public class CouchbaseClusterService extends AbstractControllerService implements CouchbaseClusterControllerService {
public static final PropertyDescriptor CONNECTION_STRING = new PropertyDescriptor public static final PropertyDescriptor CONNECTION_STRING = new PropertyDescriptor
.Builder().name("Connection String") .Builder()
.name("Connection String")
.description("The hostnames or ip addresses of the bootstraping nodes and optional parameters." .description("The hostnames or ip addresses of the bootstraping nodes and optional parameters."
+ " Syntax) couchbase://node1,node2,nodeN?param1=value1&param2=value2&paramN=valueN") + " Syntax) couchbase://node1,node2,nodeN?param1=value1&param2=value2&paramN=valueN")
.required(true) .required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor USER_NAME = new PropertyDescriptor
.Builder()
.name("user-name")
.displayName("User Name")
.description("The user name to authenticate NiFi as a Couchbase client." +
" This configuration can be used against Couchbase Server 5.0 or later" +
" supporting Roll-Based Access Control.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor USER_PASSWORD = new PropertyDescriptor
.Builder()
.name("user-password")
.displayName("User Password")
.description("The user password to authenticate NiFi as a Couchbase client." +
" This configuration can be used against Couchbase Server 5.0 or later" +
" supporting Roll-Based Access Control.")
.required(false)
.sensitive(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();
@ -59,13 +93,15 @@ public class CouchbaseClusterService extends AbstractControllerService implement
static { static {
final List<PropertyDescriptor> props = new ArrayList<>(); final List<PropertyDescriptor> props = new ArrayList<>();
props.add(CONNECTION_STRING); props.add(CONNECTION_STRING);
props.add(USER_NAME);
props.add(USER_PASSWORD);
properties = Collections.unmodifiableList(props); properties = Collections.unmodifiableList(props);
} }
private static final String DYNAMIC_PROP_BUCKET_PASSWORD = "Bucket Password for "; private static final String DYNAMIC_PROP_BUCKET_PASSWORD = "Bucket Password for ";
private static final Map<String, String> bucketPasswords = new HashMap<>();
private Map<String, String> bucketPasswords;
private volatile CouchbaseCluster cluster; private volatile CouchbaseCluster cluster;
@Override @Override
@ -83,11 +119,38 @@ public class CouchbaseClusterService extends AbstractControllerService implement
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.dynamic(true) .dynamic(true)
.sensitive(true) .sensitive(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build(); .build();
} }
return null; return null;
} }
@Override
protected Collection<ValidationResult> customValidate(ValidationContext context) {
final Collection<ValidationResult> results = new ArrayList<>();
final boolean isUserNameSet = context.getProperty(USER_NAME).isSet();
final boolean isUserPasswordSet = context.getProperty(USER_PASSWORD).isSet();
if ((isUserNameSet && !isUserPasswordSet) || (!isUserNameSet && isUserPasswordSet)) {
results.add(new ValidationResult.Builder()
.subject("User Name and Password")
.explanation("Both User Name and Password are required to use.")
.build());
}
final boolean isBucketPasswordSet = context.getProperties().keySet().stream()
.anyMatch(p -> p.isDynamic() && p.getName().startsWith(DYNAMIC_PROP_BUCKET_PASSWORD));
if (isUserNameSet && isUserPasswordSet && isBucketPasswordSet) {
results.add(new ValidationResult.Builder()
.subject("Authentication methods")
.explanation("Different authentication methods can not be used at the same time," +
" Use either one of User Name and Password, or Bucket Password.")
.build());
}
return results;
}
/** /**
* Establish a connection to a Couchbase cluster. * Establish a connection to a Couchbase cluster.
@ -97,15 +160,23 @@ public class CouchbaseClusterService extends AbstractControllerService implement
@OnEnabled @OnEnabled
public void onConfigured(final ConfigurationContext context) throws InitializationException { public void onConfigured(final ConfigurationContext context) throws InitializationException {
bucketPasswords = new HashMap<>();
for(PropertyDescriptor p : context.getProperties().keySet()){ for(PropertyDescriptor p : context.getProperties().keySet()){
if(p.isDynamic() && p.getName().startsWith(DYNAMIC_PROP_BUCKET_PASSWORD)){ if(p.isDynamic() && p.getName().startsWith(DYNAMIC_PROP_BUCKET_PASSWORD)){
String bucketName = p.getName().substring(DYNAMIC_PROP_BUCKET_PASSWORD.length()); String bucketName = p.getName().substring(DYNAMIC_PROP_BUCKET_PASSWORD.length());
String password = context.getProperty(p).getValue(); String password = context.getProperty(p).evaluateAttributeExpressions().getValue();
bucketPasswords.put(bucketName, password); bucketPasswords.put(bucketName, password);
} }
} }
final String userName = context.getProperty(USER_NAME).evaluateAttributeExpressions().getValue();
final String userPassword = context.getProperty(USER_PASSWORD).evaluateAttributeExpressions().getValue();
try { try {
cluster = CouchbaseCluster.fromConnectionString(context.getProperty(CONNECTION_STRING).getValue()); cluster = CouchbaseCluster.fromConnectionString(context.getProperty(CONNECTION_STRING).evaluateAttributeExpressions().getValue());
if (!StringUtils.isEmpty(userName) && !StringUtils.isEmpty(userPassword)) {
cluster.authenticate(userName, userPassword);
}
} catch(CouchbaseException e) { } catch(CouchbaseException e) {
throw new InitializationException(e); throw new InitializationException(e);
} }
@ -113,9 +184,13 @@ public class CouchbaseClusterService extends AbstractControllerService implement
@Override @Override
public Bucket openBucket(String bucketName){ public Bucket openBucket(String bucketName){
if (bucketPasswords.containsKey(bucketName)) {
return cluster.openBucket(bucketName, bucketPasswords.get(bucketName)); return cluster.openBucket(bucketName, bucketPasswords.get(bucketName));
} }
return cluster.openBucket(bucketName);
}
/** /**
* Disconnect from the Couchbase cluster. * Disconnect from the Couchbase cluster.
*/ */

View File

@ -33,7 +33,7 @@
<dependency> <dependency>
<groupId>com.couchbase.client</groupId> <groupId>com.couchbase.client</groupId>
<artifactId>java-client</artifactId> <artifactId>java-client</artifactId>
<version>2.2.0</version> <version>2.5.8</version>
</dependency> </dependency>
</dependencies> </dependencies>
</project> </project>