diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-nar/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-nar/pom.xml
index a17b05fa59..e714da387a 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-nar/pom.xml
@@ -23,7 +23,6 @@
nifi-mongodb-nar
- 1.0.0-SNAPSHOT
nar
true
@@ -31,10 +30,14 @@
+
+ org.apache.nifi
+ nifi-standard-services-api-nar
+ nar
+
org.apache.nifi
nifi-mongodb-processors
- 1.0.0-SNAPSHOT
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
index ff67f79b3e..0a26cd6c4d 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
@@ -38,6 +38,10 @@
org.apache.nifi
nifi-processor-utils
+
+ org.apache.nifi
+ nifi-ssl-context-service-api
+
commons-io
commons-io
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
index fae007f152..2ffaa16d68 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
@@ -19,16 +19,26 @@
package org.apache.nifi.processors.mongodb;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import javax.net.ssl.SSLContext;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.authentication.exception.ProviderCreationException;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.security.util.SslContextFactory;
+import org.apache.nifi.ssl.SSLContextService;
import org.bson.Document;
import com.mongodb.MongoClient;
+import com.mongodb.MongoClientOptions;
+import com.mongodb.MongoClientOptions.Builder;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
@@ -52,6 +62,32 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
+ public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+ .name("SSL Context Service")
+ .description("The SSL Context Service used to provide client certificate information for TLS/SSL "
+ + "connections.")
+ .required(false)
+ .identifiesControllerService(SSLContextService.class)
+ .build();
+ public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
+ .name("Client Auth")
+ .description("Client authentication policy when connecting to secure (TLS/SSL) cluster. "
+ + "Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context "
+ + "has been defined and enabled.")
+ .required(false)
+ .allowableValues(SSLContextService.ClientAuth.values())
+ .defaultValue("REQUIRED")
+ .build();
+
+ static List descriptors = new ArrayList<>();
+
+ static {
+ descriptors.add(URI);
+ descriptors.add(DATABASE_NAME);
+ descriptors.add(COLLECTION_NAME);
+ descriptors.add(SSL_CONTEXT_SERVICE);
+ descriptors.add(CLIENT_AUTH);
+ }
protected MongoClient mongoClient;
@@ -63,15 +99,48 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
getLogger().info("Creating MongoClient");
+ // Set up the client for secure (SSL/TLS communications) if configured to do so
+ final SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ final String rawClientAuth = context.getProperty(CLIENT_AUTH).getValue();
+ final SSLContext sslContext;
+
+ if (sslService != null) {
+ final SSLContextService.ClientAuth clientAuth;
+ if (StringUtils.isBlank(rawClientAuth)) {
+ clientAuth = SSLContextService.ClientAuth.REQUIRED;
+ } else {
+ try {
+ clientAuth = SSLContextService.ClientAuth.valueOf(rawClientAuth);
+ } catch (final IllegalArgumentException iae) {
+ throw new ProviderCreationException(String.format("Unrecognized client auth '%s'. Possible values are [%s]",
+ rawClientAuth, StringUtils.join(SslContextFactory.ClientAuth.values(), ", ")));
+ }
+ }
+ sslContext = sslService.createSSLContext(clientAuth);
+ } else {
+ sslContext = null;
+ }
+
try {
final String uri = context.getProperty(URI).getValue();
- mongoClient = new MongoClient(new MongoClientURI(uri));
+ if(sslContext == null) {
+ mongoClient = new MongoClient(new MongoClientURI(uri));
+ } else {
+ mongoClient = new MongoClient(new MongoClientURI(uri, getClientOptions(sslContext)));
+ }
} catch (Exception e) {
getLogger().error("Failed to schedule PutMongo due to {}", new Object[] { e }, e);
throw e;
}
}
+ protected Builder getClientOptions(final SSLContext sslContext) {
+ MongoClientOptions.Builder builder = MongoClientOptions.builder();
+ builder.sslEnabled(true);
+ builder.socketFactory(sslContext.getSocketFactory());
+ return builder;
+ }
+
@OnStopped
public final void closeClient() {
if (mongoClient != null) {
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
index c2b49d9689..ebe7a24f29 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
@@ -100,35 +100,34 @@ public class GetMongo extends AbstractMongoProcessor {
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
- private final List descriptors;
+ private final static Set relationships;
+ private final static List propertyDescriptors;
- private final Set relationships;
+ static {
+ List _propertyDescriptors = new ArrayList<>();
+ _propertyDescriptors.addAll(descriptors);
+ _propertyDescriptors.add(QUERY);
+ _propertyDescriptors.add(PROJECTION);
+ _propertyDescriptors.add(SORT);
+ _propertyDescriptors.add(LIMIT);
+ _propertyDescriptors.add(BATCH_SIZE);
+ _propertyDescriptors.add(SSL_CONTEXT_SERVICE);
+ _propertyDescriptors.add(CLIENT_AUTH);
+ propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
- public GetMongo() {
- final List descriptors = new ArrayList<>();
- descriptors.add(URI);
- descriptors.add(DATABASE_NAME);
- descriptors.add(COLLECTION_NAME);
- descriptors.add(QUERY);
- descriptors.add(PROJECTION);
- descriptors.add(SORT);
- descriptors.add(LIMIT);
- descriptors.add(BATCH_SIZE);
- this.descriptors = Collections.unmodifiableList(descriptors);
-
- final Set relationships = new HashSet<>();
- relationships.add(REL_SUCCESS);
- this.relationships = Collections.unmodifiableSet(relationships);
+ final Set _relationships = new HashSet<>();
+ _relationships.add(REL_SUCCESS);
+ relationships = Collections.unmodifiableSet(_relationships);
}
@Override
public Set getRelationships() {
- return this.relationships;
+ return relationships;
}
@Override
public final List getSupportedPropertyDescriptors() {
- return descriptors;
+ return propertyDescriptors;
}
@Override
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
index ae4009ce8a..5f6d875678 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
@@ -106,36 +106,33 @@ public class PutMongo extends AbstractMongoProcessor {
.defaultValue("UTF-8")
.build();
- private final List descriptors;
+ private final static Set relationships;
+ private final static List propertyDescriptors;
- private final Set relationships;
+ static {
+ List _propertyDescriptors = new ArrayList<>();
+ _propertyDescriptors.addAll(descriptors);
+ _propertyDescriptors.add(MODE);
+ _propertyDescriptors.add(UPSERT);
+ _propertyDescriptors.add(UPDATE_QUERY_KEY);
+ _propertyDescriptors.add(WRITE_CONCERN);
+ _propertyDescriptors.add(CHARACTER_SET);
+ propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
- public PutMongo() {
- final List descriptors = new ArrayList<>();
- descriptors.add(URI);
- descriptors.add(DATABASE_NAME);
- descriptors.add(COLLECTION_NAME);
- descriptors.add(MODE);
- descriptors.add(UPSERT);
- descriptors.add(UPDATE_QUERY_KEY);
- descriptors.add(WRITE_CONCERN);
- descriptors.add(CHARACTER_SET);
- this.descriptors = Collections.unmodifiableList(descriptors);
-
- final Set relationships = new HashSet<>();
- relationships.add(REL_SUCCESS);
- relationships.add(REL_FAILURE);
- this.relationships = Collections.unmodifiableSet(relationships);
+ final Set _relationships = new HashSet<>();
+ _relationships.add(REL_SUCCESS);
+ _relationships.add(REL_FAILURE);
+ relationships = Collections.unmodifiableSet(_relationships);
}
@Override
public Set getRelationships() {
- return this.relationships;
+ return relationships;
}
@Override
public final List getSupportedPropertyDescriptors() {
- return descriptors;
+ return propertyDescriptors;
}
@Override
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessorTest.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessorTest.java
new file mode 100644
index 0000000000..1750cc2b25
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessorTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mongodb;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.any;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.authentication.exception.ProviderCreationException;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.ssl.SSLContextService.ClientAuth;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.mongodb.MongoClientOptions;
+import com.mongodb.MongoClientOptions.Builder;
+
+public class AbstractMongoProcessorTest {
+
+ MockAbstractMongoProcessor processor;
+ private TestRunner testRunner;
+
+ @Before
+ public void setUp() throws Exception {
+ processor = new MockAbstractMongoProcessor();
+ testRunner = TestRunners.newTestRunner(processor);
+ }
+
+ @Test
+ public void testcreateClientWithSSL() throws Exception {
+ SSLContextService sslService = mock(SSLContextService.class);
+ SSLContext sslContext = mock(SSLContext.class);
+ when(sslService.getIdentifier()).thenReturn("ssl-context");
+ when(sslService.createSSLContext(any(ClientAuth.class))).thenReturn(sslContext);
+ testRunner.addControllerService("ssl-context", sslService);
+ testRunner.enableControllerService(sslService);
+ testRunner.setProperty(AbstractMongoProcessor.URI, "mongodb://localhost:27017");
+ testRunner.setProperty(AbstractMongoProcessor.SSL_CONTEXT_SERVICE, "ssl-context");
+ testRunner.assertValid(sslService);
+ processor.createClient(testRunner.getProcessContext());
+ assertNotNull(processor.mongoClient);
+ processor.mongoClient = null;
+ testRunner.setProperty(AbstractMongoProcessor.CLIENT_AUTH, "WANT");
+ processor.createClient(testRunner.getProcessContext());
+ assertNotNull(processor.mongoClient);
+ }
+
+ @Test(expected = ProviderCreationException.class)
+ public void testcreateClientWithSSLBadClientAuth() throws Exception {
+ SSLContextService sslService = mock(SSLContextService.class);
+ SSLContext sslContext = mock(SSLContext.class);
+ when(sslService.getIdentifier()).thenReturn("ssl-context");
+ when(sslService.createSSLContext(any(ClientAuth.class))).thenReturn(sslContext);
+ testRunner.addControllerService("ssl-context", sslService);
+ testRunner.enableControllerService(sslService);
+ testRunner.setProperty(AbstractMongoProcessor.URI, "mongodb://localhost:27017");
+ testRunner.setProperty(AbstractMongoProcessor.SSL_CONTEXT_SERVICE, "ssl-context");
+ testRunner.assertValid(sslService);
+ processor.createClient(testRunner.getProcessContext());
+ assertNotNull(processor.mongoClient);
+ processor.mongoClient = null;
+ testRunner.setProperty(AbstractMongoProcessor.CLIENT_AUTH, "BAD");
+ processor.createClient(testRunner.getProcessContext());
+ }
+
+
+ /**
+ * Provides a stubbed processor instance for testing
+ */
+ public static class MockAbstractMongoProcessor extends AbstractMongoProcessor {
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ // nothing to do
+ }
+
+ @Override
+ protected Builder getClientOptions(SSLContext sslContext) {
+ return MongoClientOptions.builder();
+ }
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml
index db9e87decb..25c6f6916d 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml
@@ -22,9 +22,7 @@
1.0.0-SNAPSHOT
- org.apache.nifi
nifi-mongodb-bundle
- 1.0.0-SNAPSHOT
pom
@@ -32,4 +30,14 @@
nifi-mongodb-nar
+
+
+
+ org.apache.nifi
+ nifi-mongodb-processors
+ 1.0.0-SNAPSHOT
+
+
+
+
diff --git a/pom.xml b/pom.xml
index e7027855eb..29b936b193 100644
--- a/pom.xml
+++ b/pom.xml
@@ -219,7 +219,7 @@ language governing permissions and limitations under the License. -->
org.mongodb
mongo-java-driver
- 3.0.2
+ 3.2.2
com.wordnik