NIFI-1197 Added SSL support for MongoDB processors

This commit is contained in:
Pierre Villard 2016-04-17 17:24:59 +02:00
parent 10986553aa
commit d39f82b2c7
8 changed files with 229 additions and 45 deletions

View File

@ -23,7 +23,6 @@
</parent> </parent>
<artifactId>nifi-mongodb-nar</artifactId> <artifactId>nifi-mongodb-nar</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>nar</packaging> <packaging>nar</packaging>
<properties> <properties>
<maven.javadoc.skip>true</maven.javadoc.skip> <maven.javadoc.skip>true</maven.javadoc.skip>
@ -31,10 +30,14 @@
</properties> </properties>
<dependencies> <dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<type>nar</type>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-mongodb-processors</artifactId> <artifactId>nifi-mongodb-processors</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@ -38,6 +38,10 @@
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId> <artifactId>nifi-processor-utils</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
</dependency>
<dependency> <dependency>
<groupId>commons-io</groupId> <groupId>commons-io</groupId>
<artifactId>commons-io</artifactId> <artifactId>commons-io</artifactId>

View File

@ -19,16 +19,26 @@
package org.apache.nifi.processors.mongodb; package org.apache.nifi.processors.mongodb;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.authentication.exception.ProviderCreationException;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.ssl.SSLContextService;
import org.bson.Document; import org.bson.Document;
import com.mongodb.MongoClient; import com.mongodb.MongoClient;
import com.mongodb.MongoClientOptions;
import com.mongodb.MongoClientOptions.Builder;
import com.mongodb.MongoClientURI; import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase; import com.mongodb.client.MongoDatabase;
@ -52,6 +62,32 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
.required(true) .required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The SSL Context Service used to provide client certificate information for TLS/SSL "
+ "connections.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
.name("Client Auth")
.description("Client authentication policy when connecting to secure (TLS/SSL) cluster. "
+ "Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context "
+ "has been defined and enabled.")
.required(false)
.allowableValues(SSLContextService.ClientAuth.values())
.defaultValue("REQUIRED")
.build();
static List<PropertyDescriptor> descriptors = new ArrayList<>();
static {
descriptors.add(URI);
descriptors.add(DATABASE_NAME);
descriptors.add(COLLECTION_NAME);
descriptors.add(SSL_CONTEXT_SERVICE);
descriptors.add(CLIENT_AUTH);
}
protected MongoClient mongoClient; protected MongoClient mongoClient;
@ -63,15 +99,48 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
getLogger().info("Creating MongoClient"); getLogger().info("Creating MongoClient");
// Set up the client for secure (SSL/TLS communications) if configured to do so
final SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final String rawClientAuth = context.getProperty(CLIENT_AUTH).getValue();
final SSLContext sslContext;
if (sslService != null) {
final SSLContextService.ClientAuth clientAuth;
if (StringUtils.isBlank(rawClientAuth)) {
clientAuth = SSLContextService.ClientAuth.REQUIRED;
} else {
try {
clientAuth = SSLContextService.ClientAuth.valueOf(rawClientAuth);
} catch (final IllegalArgumentException iae) {
throw new ProviderCreationException(String.format("Unrecognized client auth '%s'. Possible values are [%s]",
rawClientAuth, StringUtils.join(SslContextFactory.ClientAuth.values(), ", ")));
}
}
sslContext = sslService.createSSLContext(clientAuth);
} else {
sslContext = null;
}
try { try {
final String uri = context.getProperty(URI).getValue(); final String uri = context.getProperty(URI).getValue();
if(sslContext == null) {
mongoClient = new MongoClient(new MongoClientURI(uri)); mongoClient = new MongoClient(new MongoClientURI(uri));
} else {
mongoClient = new MongoClient(new MongoClientURI(uri, getClientOptions(sslContext)));
}
} catch (Exception e) { } catch (Exception e) {
getLogger().error("Failed to schedule PutMongo due to {}", new Object[] { e }, e); getLogger().error("Failed to schedule PutMongo due to {}", new Object[] { e }, e);
throw e; throw e;
} }
} }
protected Builder getClientOptions(final SSLContext sslContext) {
MongoClientOptions.Builder builder = MongoClientOptions.builder();
builder.sslEnabled(true);
builder.socketFactory(sslContext.getSocketFactory());
return builder;
}
@OnStopped @OnStopped
public final void closeClient() { public final void closeClient() {
if (mongoClient != null) { if (mongoClient != null) {

View File

@ -100,35 +100,34 @@ public class GetMongo extends AbstractMongoProcessor {
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build(); .build();
private final List<PropertyDescriptor> descriptors; private final static Set<Relationship> relationships;
private final static List<PropertyDescriptor> propertyDescriptors;
private final Set<Relationship> relationships; static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.addAll(descriptors);
_propertyDescriptors.add(QUERY);
_propertyDescriptors.add(PROJECTION);
_propertyDescriptors.add(SORT);
_propertyDescriptors.add(LIMIT);
_propertyDescriptors.add(BATCH_SIZE);
_propertyDescriptors.add(SSL_CONTEXT_SERVICE);
_propertyDescriptors.add(CLIENT_AUTH);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
public GetMongo() { final Set<Relationship> _relationships = new HashSet<>();
final List<PropertyDescriptor> descriptors = new ArrayList<>(); _relationships.add(REL_SUCCESS);
descriptors.add(URI); relationships = Collections.unmodifiableSet(_relationships);
descriptors.add(DATABASE_NAME);
descriptors.add(COLLECTION_NAME);
descriptors.add(QUERY);
descriptors.add(PROJECTION);
descriptors.add(SORT);
descriptors.add(LIMIT);
descriptors.add(BATCH_SIZE);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
this.relationships = Collections.unmodifiableSet(relationships);
} }
@Override @Override
public Set<Relationship> getRelationships() { public Set<Relationship> getRelationships() {
return this.relationships; return relationships;
} }
@Override @Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors; return propertyDescriptors;
} }
@Override @Override

View File

@ -106,36 +106,33 @@ public class PutMongo extends AbstractMongoProcessor {
.defaultValue("UTF-8") .defaultValue("UTF-8")
.build(); .build();
private final List<PropertyDescriptor> descriptors; private final static Set<Relationship> relationships;
private final static List<PropertyDescriptor> propertyDescriptors;
private final Set<Relationship> relationships; static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.addAll(descriptors);
_propertyDescriptors.add(MODE);
_propertyDescriptors.add(UPSERT);
_propertyDescriptors.add(UPDATE_QUERY_KEY);
_propertyDescriptors.add(WRITE_CONCERN);
_propertyDescriptors.add(CHARACTER_SET);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
public PutMongo() { final Set<Relationship> _relationships = new HashSet<>();
final List<PropertyDescriptor> descriptors = new ArrayList<>(); _relationships.add(REL_SUCCESS);
descriptors.add(URI); _relationships.add(REL_FAILURE);
descriptors.add(DATABASE_NAME); relationships = Collections.unmodifiableSet(_relationships);
descriptors.add(COLLECTION_NAME);
descriptors.add(MODE);
descriptors.add(UPSERT);
descriptors.add(UPDATE_QUERY_KEY);
descriptors.add(WRITE_CONCERN);
descriptors.add(CHARACTER_SET);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
} }
@Override @Override
public Set<Relationship> getRelationships() { public Set<Relationship> getRelationships() {
return this.relationships; return relationships;
} }
@Override @Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors; return propertyDescriptors;
} }
@Override @Override

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mongodb;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.any;
import javax.net.ssl.SSLContext;
import org.apache.nifi.authentication.exception.ProviderCreationException;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.SSLContextService.ClientAuth;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import com.mongodb.MongoClientOptions;
import com.mongodb.MongoClientOptions.Builder;
public class AbstractMongoProcessorTest {
MockAbstractMongoProcessor processor;
private TestRunner testRunner;
@Before
public void setUp() throws Exception {
processor = new MockAbstractMongoProcessor();
testRunner = TestRunners.newTestRunner(processor);
}
@Test
public void testcreateClientWithSSL() throws Exception {
SSLContextService sslService = mock(SSLContextService.class);
SSLContext sslContext = mock(SSLContext.class);
when(sslService.getIdentifier()).thenReturn("ssl-context");
when(sslService.createSSLContext(any(ClientAuth.class))).thenReturn(sslContext);
testRunner.addControllerService("ssl-context", sslService);
testRunner.enableControllerService(sslService);
testRunner.setProperty(AbstractMongoProcessor.URI, "mongodb://localhost:27017");
testRunner.setProperty(AbstractMongoProcessor.SSL_CONTEXT_SERVICE, "ssl-context");
testRunner.assertValid(sslService);
processor.createClient(testRunner.getProcessContext());
assertNotNull(processor.mongoClient);
processor.mongoClient = null;
testRunner.setProperty(AbstractMongoProcessor.CLIENT_AUTH, "WANT");
processor.createClient(testRunner.getProcessContext());
assertNotNull(processor.mongoClient);
}
@Test(expected = ProviderCreationException.class)
public void testcreateClientWithSSLBadClientAuth() throws Exception {
SSLContextService sslService = mock(SSLContextService.class);
SSLContext sslContext = mock(SSLContext.class);
when(sslService.getIdentifier()).thenReturn("ssl-context");
when(sslService.createSSLContext(any(ClientAuth.class))).thenReturn(sslContext);
testRunner.addControllerService("ssl-context", sslService);
testRunner.enableControllerService(sslService);
testRunner.setProperty(AbstractMongoProcessor.URI, "mongodb://localhost:27017");
testRunner.setProperty(AbstractMongoProcessor.SSL_CONTEXT_SERVICE, "ssl-context");
testRunner.assertValid(sslService);
processor.createClient(testRunner.getProcessContext());
assertNotNull(processor.mongoClient);
processor.mongoClient = null;
testRunner.setProperty(AbstractMongoProcessor.CLIENT_AUTH, "BAD");
processor.createClient(testRunner.getProcessContext());
}
/**
* Provides a stubbed processor instance for testing
*/
public static class MockAbstractMongoProcessor extends AbstractMongoProcessor {
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
// nothing to do
}
@Override
protected Builder getClientOptions(SSLContext sslContext) {
return MongoClientOptions.builder();
}
}
}

View File

@ -22,9 +22,7 @@
<version>1.0.0-SNAPSHOT</version> <version>1.0.0-SNAPSHOT</version>
</parent> </parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mongodb-bundle</artifactId> <artifactId>nifi-mongodb-bundle</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>pom</packaging> <packaging>pom</packaging>
<modules> <modules>
@ -32,4 +30,14 @@
<module>nifi-mongodb-nar</module> <module>nifi-mongodb-nar</module>
</modules> </modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mongodb-processors</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
</dependencies>
</dependencyManagement>
</project> </project>

View File

@ -219,7 +219,7 @@ language governing permissions and limitations under the License. -->
<dependency> <dependency>
<groupId>org.mongodb</groupId> <groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId> <artifactId>mongo-java-driver</artifactId>
<version>3.0.2</version> <version>3.2.2</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.wordnik</groupId> <groupId>com.wordnik</groupId>