From d39f82b2c79d8ecba9fe6dd7ebf673b584d42f80 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Sun, 17 Apr 2016 17:24:59 +0200 Subject: [PATCH] NIFI-1197 Added SSL support for MongoDB processors --- .../nifi-mongodb-nar/pom.xml | 7 +- .../nifi-mongodb-processors/pom.xml | 4 + .../mongodb/AbstractMongoProcessor.java | 71 +++++++++++- .../nifi/processors/mongodb/GetMongo.java | 37 +++---- .../nifi/processors/mongodb/PutMongo.java | 37 +++---- .../mongodb/AbstractMongoProcessorTest.java | 104 ++++++++++++++++++ nifi-nar-bundles/nifi-mongodb-bundle/pom.xml | 12 +- pom.xml | 2 +- 8 files changed, 229 insertions(+), 45 deletions(-) create mode 100644 nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessorTest.java diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-nar/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-nar/pom.xml index a17b05fa59..e714da387a 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-nar/pom.xml +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-nar/pom.xml @@ -23,7 +23,6 @@ nifi-mongodb-nar - 1.0.0-SNAPSHOT nar true @@ -31,10 +30,14 @@ + + org.apache.nifi + nifi-standard-services-api-nar + nar + org.apache.nifi nifi-mongodb-processors - 1.0.0-SNAPSHOT diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml index ff67f79b3e..0a26cd6c4d 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml @@ -38,6 +38,10 @@ org.apache.nifi nifi-processor-utils + + org.apache.nifi + nifi-ssl-context-service-api + commons-io commons-io diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java index fae007f152..2ffaa16d68 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java @@ -19,16 +19,26 @@ package org.apache.nifi.processors.mongodb; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import javax.net.ssl.SSLContext; + +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.authentication.exception.ProviderCreationException; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.security.util.SslContextFactory; +import org.apache.nifi.ssl.SSLContextService; import org.bson.Document; import com.mongodb.MongoClient; +import com.mongodb.MongoClientOptions; +import com.mongodb.MongoClientOptions.Builder; import com.mongodb.MongoClientURI; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; @@ -52,6 +62,32 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .description("The SSL Context Service used to provide client certificate information for TLS/SSL " + + "connections.") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); + public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder() + .name("Client Auth") + .description("Client authentication policy when connecting to secure (TLS/SSL) cluster. " + + "Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context " + + "has been defined and enabled.") + .required(false) + .allowableValues(SSLContextService.ClientAuth.values()) + .defaultValue("REQUIRED") + .build(); + + static List descriptors = new ArrayList<>(); + + static { + descriptors.add(URI); + descriptors.add(DATABASE_NAME); + descriptors.add(COLLECTION_NAME); + descriptors.add(SSL_CONTEXT_SERVICE); + descriptors.add(CLIENT_AUTH); + } protected MongoClient mongoClient; @@ -63,15 +99,48 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { getLogger().info("Creating MongoClient"); + // Set up the client for secure (SSL/TLS communications) if configured to do so + final SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + final String rawClientAuth = context.getProperty(CLIENT_AUTH).getValue(); + final SSLContext sslContext; + + if (sslService != null) { + final SSLContextService.ClientAuth clientAuth; + if (StringUtils.isBlank(rawClientAuth)) { + clientAuth = SSLContextService.ClientAuth.REQUIRED; + } else { + try { + clientAuth = SSLContextService.ClientAuth.valueOf(rawClientAuth); + } catch (final IllegalArgumentException iae) { + throw new ProviderCreationException(String.format("Unrecognized client auth '%s'. Possible values are [%s]", + rawClientAuth, StringUtils.join(SslContextFactory.ClientAuth.values(), ", "))); + } + } + sslContext = sslService.createSSLContext(clientAuth); + } else { + sslContext = null; + } + try { final String uri = context.getProperty(URI).getValue(); - mongoClient = new MongoClient(new MongoClientURI(uri)); + if(sslContext == null) { + mongoClient = new MongoClient(new MongoClientURI(uri)); + } else { + mongoClient = new MongoClient(new MongoClientURI(uri, getClientOptions(sslContext))); + } } catch (Exception e) { getLogger().error("Failed to schedule PutMongo due to {}", new Object[] { e }, e); throw e; } } + protected Builder getClientOptions(final SSLContext sslContext) { + MongoClientOptions.Builder builder = MongoClientOptions.builder(); + builder.sslEnabled(true); + builder.socketFactory(sslContext.getSocketFactory()); + return builder; + } + @OnStopped public final void closeClient() { if (mongoClient != null) { diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java index c2b49d9689..ebe7a24f29 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java @@ -100,35 +100,34 @@ public class GetMongo extends AbstractMongoProcessor { .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); - private final List descriptors; + private final static Set relationships; + private final static List propertyDescriptors; - private final Set relationships; + static { + List _propertyDescriptors = new ArrayList<>(); + _propertyDescriptors.addAll(descriptors); + _propertyDescriptors.add(QUERY); + _propertyDescriptors.add(PROJECTION); + _propertyDescriptors.add(SORT); + _propertyDescriptors.add(LIMIT); + _propertyDescriptors.add(BATCH_SIZE); + _propertyDescriptors.add(SSL_CONTEXT_SERVICE); + _propertyDescriptors.add(CLIENT_AUTH); + propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); - public GetMongo() { - final List descriptors = new ArrayList<>(); - descriptors.add(URI); - descriptors.add(DATABASE_NAME); - descriptors.add(COLLECTION_NAME); - descriptors.add(QUERY); - descriptors.add(PROJECTION); - descriptors.add(SORT); - descriptors.add(LIMIT); - descriptors.add(BATCH_SIZE); - this.descriptors = Collections.unmodifiableList(descriptors); - - final Set relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - this.relationships = Collections.unmodifiableSet(relationships); + final Set _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + relationships = Collections.unmodifiableSet(_relationships); } @Override public Set getRelationships() { - return this.relationships; + return relationships; } @Override public final List getSupportedPropertyDescriptors() { - return descriptors; + return propertyDescriptors; } @Override diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java index ae4009ce8a..5f6d875678 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java @@ -106,36 +106,33 @@ public class PutMongo extends AbstractMongoProcessor { .defaultValue("UTF-8") .build(); - private final List descriptors; + private final static Set relationships; + private final static List propertyDescriptors; - private final Set relationships; + static { + List _propertyDescriptors = new ArrayList<>(); + _propertyDescriptors.addAll(descriptors); + _propertyDescriptors.add(MODE); + _propertyDescriptors.add(UPSERT); + _propertyDescriptors.add(UPDATE_QUERY_KEY); + _propertyDescriptors.add(WRITE_CONCERN); + _propertyDescriptors.add(CHARACTER_SET); + propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); - public PutMongo() { - final List descriptors = new ArrayList<>(); - descriptors.add(URI); - descriptors.add(DATABASE_NAME); - descriptors.add(COLLECTION_NAME); - descriptors.add(MODE); - descriptors.add(UPSERT); - descriptors.add(UPDATE_QUERY_KEY); - descriptors.add(WRITE_CONCERN); - descriptors.add(CHARACTER_SET); - this.descriptors = Collections.unmodifiableList(descriptors); - - final Set relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - relationships.add(REL_FAILURE); - this.relationships = Collections.unmodifiableSet(relationships); + final Set _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + _relationships.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(_relationships); } @Override public Set getRelationships() { - return this.relationships; + return relationships; } @Override public final List getSupportedPropertyDescriptors() { - return descriptors; + return propertyDescriptors; } @Override diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessorTest.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessorTest.java new file mode 100644 index 0000000000..1750cc2b25 --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessorTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.mongodb; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.any; + +import javax.net.ssl.SSLContext; + +import org.apache.nifi.authentication.exception.ProviderCreationException; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.ssl.SSLContextService.ClientAuth; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +import com.mongodb.MongoClientOptions; +import com.mongodb.MongoClientOptions.Builder; + +public class AbstractMongoProcessorTest { + + MockAbstractMongoProcessor processor; + private TestRunner testRunner; + + @Before + public void setUp() throws Exception { + processor = new MockAbstractMongoProcessor(); + testRunner = TestRunners.newTestRunner(processor); + } + + @Test + public void testcreateClientWithSSL() throws Exception { + SSLContextService sslService = mock(SSLContextService.class); + SSLContext sslContext = mock(SSLContext.class); + when(sslService.getIdentifier()).thenReturn("ssl-context"); + when(sslService.createSSLContext(any(ClientAuth.class))).thenReturn(sslContext); + testRunner.addControllerService("ssl-context", sslService); + testRunner.enableControllerService(sslService); + testRunner.setProperty(AbstractMongoProcessor.URI, "mongodb://localhost:27017"); + testRunner.setProperty(AbstractMongoProcessor.SSL_CONTEXT_SERVICE, "ssl-context"); + testRunner.assertValid(sslService); + processor.createClient(testRunner.getProcessContext()); + assertNotNull(processor.mongoClient); + processor.mongoClient = null; + testRunner.setProperty(AbstractMongoProcessor.CLIENT_AUTH, "WANT"); + processor.createClient(testRunner.getProcessContext()); + assertNotNull(processor.mongoClient); + } + + @Test(expected = ProviderCreationException.class) + public void testcreateClientWithSSLBadClientAuth() throws Exception { + SSLContextService sslService = mock(SSLContextService.class); + SSLContext sslContext = mock(SSLContext.class); + when(sslService.getIdentifier()).thenReturn("ssl-context"); + when(sslService.createSSLContext(any(ClientAuth.class))).thenReturn(sslContext); + testRunner.addControllerService("ssl-context", sslService); + testRunner.enableControllerService(sslService); + testRunner.setProperty(AbstractMongoProcessor.URI, "mongodb://localhost:27017"); + testRunner.setProperty(AbstractMongoProcessor.SSL_CONTEXT_SERVICE, "ssl-context"); + testRunner.assertValid(sslService); + processor.createClient(testRunner.getProcessContext()); + assertNotNull(processor.mongoClient); + processor.mongoClient = null; + testRunner.setProperty(AbstractMongoProcessor.CLIENT_AUTH, "BAD"); + processor.createClient(testRunner.getProcessContext()); + } + + + /** + * Provides a stubbed processor instance for testing + */ + public static class MockAbstractMongoProcessor extends AbstractMongoProcessor { + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + // nothing to do + } + + @Override + protected Builder getClientOptions(SSLContext sslContext) { + return MongoClientOptions.builder(); + } + } + +} diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml index db9e87decb..25c6f6916d 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml @@ -22,9 +22,7 @@ 1.0.0-SNAPSHOT - org.apache.nifi nifi-mongodb-bundle - 1.0.0-SNAPSHOT pom @@ -32,4 +30,14 @@ nifi-mongodb-nar + + + + org.apache.nifi + nifi-mongodb-processors + 1.0.0-SNAPSHOT + + + + diff --git a/pom.xml b/pom.xml index e7027855eb..29b936b193 100644 --- a/pom.xml +++ b/pom.xml @@ -219,7 +219,7 @@ language governing permissions and limitations under the License. --> org.mongodb mongo-java-driver - 3.0.2 + 3.2.2 com.wordnik