mirror of https://github.com/apache/nifi.git
NIFI-1197 Added SSL support for MongoDB processors
This commit is contained in:
parent
10986553aa
commit
d39f82b2c7
|
@ -23,7 +23,6 @@
|
|||
</parent>
|
||||
|
||||
<artifactId>nifi-mongodb-nar</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
<packaging>nar</packaging>
|
||||
<properties>
|
||||
<maven.javadoc.skip>true</maven.javadoc.skip>
|
||||
|
@ -31,10 +30,14 @@
|
|||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-standard-services-api-nar</artifactId>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mongodb-processors</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
|
|
|
@ -38,6 +38,10 @@
|
|||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-processor-utils</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-ssl-context-service-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-io</groupId>
|
||||
<artifactId>commons-io</artifactId>
|
||||
|
|
|
@ -19,16 +19,26 @@
|
|||
package org.apache.nifi.processors.mongodb;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||
import org.apache.nifi.authentication.exception.ProviderCreationException;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.security.util.SslContextFactory;
|
||||
import org.apache.nifi.ssl.SSLContextService;
|
||||
import org.bson.Document;
|
||||
|
||||
import com.mongodb.MongoClient;
|
||||
import com.mongodb.MongoClientOptions;
|
||||
import com.mongodb.MongoClientOptions.Builder;
|
||||
import com.mongodb.MongoClientURI;
|
||||
import com.mongodb.client.MongoCollection;
|
||||
import com.mongodb.client.MongoDatabase;
|
||||
|
@ -52,6 +62,32 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
|
|||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("SSL Context Service")
|
||||
.description("The SSL Context Service used to provide client certificate information for TLS/SSL "
|
||||
+ "connections.")
|
||||
.required(false)
|
||||
.identifiesControllerService(SSLContextService.class)
|
||||
.build();
|
||||
public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
|
||||
.name("Client Auth")
|
||||
.description("Client authentication policy when connecting to secure (TLS/SSL) cluster. "
|
||||
+ "Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context "
|
||||
+ "has been defined and enabled.")
|
||||
.required(false)
|
||||
.allowableValues(SSLContextService.ClientAuth.values())
|
||||
.defaultValue("REQUIRED")
|
||||
.build();
|
||||
|
||||
static List<PropertyDescriptor> descriptors = new ArrayList<>();
|
||||
|
||||
static {
|
||||
descriptors.add(URI);
|
||||
descriptors.add(DATABASE_NAME);
|
||||
descriptors.add(COLLECTION_NAME);
|
||||
descriptors.add(SSL_CONTEXT_SERVICE);
|
||||
descriptors.add(CLIENT_AUTH);
|
||||
}
|
||||
|
||||
protected MongoClient mongoClient;
|
||||
|
||||
|
@ -63,15 +99,48 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
|
|||
|
||||
getLogger().info("Creating MongoClient");
|
||||
|
||||
// Set up the client for secure (SSL/TLS communications) if configured to do so
|
||||
final SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
|
||||
final String rawClientAuth = context.getProperty(CLIENT_AUTH).getValue();
|
||||
final SSLContext sslContext;
|
||||
|
||||
if (sslService != null) {
|
||||
final SSLContextService.ClientAuth clientAuth;
|
||||
if (StringUtils.isBlank(rawClientAuth)) {
|
||||
clientAuth = SSLContextService.ClientAuth.REQUIRED;
|
||||
} else {
|
||||
try {
|
||||
clientAuth = SSLContextService.ClientAuth.valueOf(rawClientAuth);
|
||||
} catch (final IllegalArgumentException iae) {
|
||||
throw new ProviderCreationException(String.format("Unrecognized client auth '%s'. Possible values are [%s]",
|
||||
rawClientAuth, StringUtils.join(SslContextFactory.ClientAuth.values(), ", ")));
|
||||
}
|
||||
}
|
||||
sslContext = sslService.createSSLContext(clientAuth);
|
||||
} else {
|
||||
sslContext = null;
|
||||
}
|
||||
|
||||
try {
|
||||
final String uri = context.getProperty(URI).getValue();
|
||||
if(sslContext == null) {
|
||||
mongoClient = new MongoClient(new MongoClientURI(uri));
|
||||
} else {
|
||||
mongoClient = new MongoClient(new MongoClientURI(uri, getClientOptions(sslContext)));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
getLogger().error("Failed to schedule PutMongo due to {}", new Object[] { e }, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
protected Builder getClientOptions(final SSLContext sslContext) {
|
||||
MongoClientOptions.Builder builder = MongoClientOptions.builder();
|
||||
builder.sslEnabled(true);
|
||||
builder.socketFactory(sslContext.getSocketFactory());
|
||||
return builder;
|
||||
}
|
||||
|
||||
@OnStopped
|
||||
public final void closeClient() {
|
||||
if (mongoClient != null) {
|
||||
|
|
|
@ -100,35 +100,34 @@ public class GetMongo extends AbstractMongoProcessor {
|
|||
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
|
||||
.build();
|
||||
|
||||
private final List<PropertyDescriptor> descriptors;
|
||||
private final static Set<Relationship> relationships;
|
||||
private final static List<PropertyDescriptor> propertyDescriptors;
|
||||
|
||||
private final Set<Relationship> relationships;
|
||||
static {
|
||||
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
|
||||
_propertyDescriptors.addAll(descriptors);
|
||||
_propertyDescriptors.add(QUERY);
|
||||
_propertyDescriptors.add(PROJECTION);
|
||||
_propertyDescriptors.add(SORT);
|
||||
_propertyDescriptors.add(LIMIT);
|
||||
_propertyDescriptors.add(BATCH_SIZE);
|
||||
_propertyDescriptors.add(SSL_CONTEXT_SERVICE);
|
||||
_propertyDescriptors.add(CLIENT_AUTH);
|
||||
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
|
||||
|
||||
public GetMongo() {
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>();
|
||||
descriptors.add(URI);
|
||||
descriptors.add(DATABASE_NAME);
|
||||
descriptors.add(COLLECTION_NAME);
|
||||
descriptors.add(QUERY);
|
||||
descriptors.add(PROJECTION);
|
||||
descriptors.add(SORT);
|
||||
descriptors.add(LIMIT);
|
||||
descriptors.add(BATCH_SIZE);
|
||||
this.descriptors = Collections.unmodifiableList(descriptors);
|
||||
|
||||
final Set<Relationship> relationships = new HashSet<>();
|
||||
relationships.add(REL_SUCCESS);
|
||||
this.relationships = Collections.unmodifiableSet(relationships);
|
||||
final Set<Relationship> _relationships = new HashSet<>();
|
||||
_relationships.add(REL_SUCCESS);
|
||||
relationships = Collections.unmodifiableSet(_relationships);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
return this.relationships;
|
||||
return relationships;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return descriptors;
|
||||
return propertyDescriptors;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -106,36 +106,33 @@ public class PutMongo extends AbstractMongoProcessor {
|
|||
.defaultValue("UTF-8")
|
||||
.build();
|
||||
|
||||
private final List<PropertyDescriptor> descriptors;
|
||||
private final static Set<Relationship> relationships;
|
||||
private final static List<PropertyDescriptor> propertyDescriptors;
|
||||
|
||||
private final Set<Relationship> relationships;
|
||||
static {
|
||||
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
|
||||
_propertyDescriptors.addAll(descriptors);
|
||||
_propertyDescriptors.add(MODE);
|
||||
_propertyDescriptors.add(UPSERT);
|
||||
_propertyDescriptors.add(UPDATE_QUERY_KEY);
|
||||
_propertyDescriptors.add(WRITE_CONCERN);
|
||||
_propertyDescriptors.add(CHARACTER_SET);
|
||||
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
|
||||
|
||||
public PutMongo() {
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>();
|
||||
descriptors.add(URI);
|
||||
descriptors.add(DATABASE_NAME);
|
||||
descriptors.add(COLLECTION_NAME);
|
||||
descriptors.add(MODE);
|
||||
descriptors.add(UPSERT);
|
||||
descriptors.add(UPDATE_QUERY_KEY);
|
||||
descriptors.add(WRITE_CONCERN);
|
||||
descriptors.add(CHARACTER_SET);
|
||||
this.descriptors = Collections.unmodifiableList(descriptors);
|
||||
|
||||
final Set<Relationship> relationships = new HashSet<>();
|
||||
relationships.add(REL_SUCCESS);
|
||||
relationships.add(REL_FAILURE);
|
||||
this.relationships = Collections.unmodifiableSet(relationships);
|
||||
final Set<Relationship> _relationships = new HashSet<>();
|
||||
_relationships.add(REL_SUCCESS);
|
||||
_relationships.add(REL_FAILURE);
|
||||
relationships = Collections.unmodifiableSet(_relationships);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
return this.relationships;
|
||||
return relationships;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return descriptors;
|
||||
return propertyDescriptors;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,104 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.mongodb;
|
||||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.Mockito.any;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
|
||||
import org.apache.nifi.authentication.exception.ProviderCreationException;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.ssl.SSLContextService;
|
||||
import org.apache.nifi.ssl.SSLContextService.ClientAuth;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.mongodb.MongoClientOptions;
|
||||
import com.mongodb.MongoClientOptions.Builder;
|
||||
|
||||
public class AbstractMongoProcessorTest {
|
||||
|
||||
MockAbstractMongoProcessor processor;
|
||||
private TestRunner testRunner;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
processor = new MockAbstractMongoProcessor();
|
||||
testRunner = TestRunners.newTestRunner(processor);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testcreateClientWithSSL() throws Exception {
|
||||
SSLContextService sslService = mock(SSLContextService.class);
|
||||
SSLContext sslContext = mock(SSLContext.class);
|
||||
when(sslService.getIdentifier()).thenReturn("ssl-context");
|
||||
when(sslService.createSSLContext(any(ClientAuth.class))).thenReturn(sslContext);
|
||||
testRunner.addControllerService("ssl-context", sslService);
|
||||
testRunner.enableControllerService(sslService);
|
||||
testRunner.setProperty(AbstractMongoProcessor.URI, "mongodb://localhost:27017");
|
||||
testRunner.setProperty(AbstractMongoProcessor.SSL_CONTEXT_SERVICE, "ssl-context");
|
||||
testRunner.assertValid(sslService);
|
||||
processor.createClient(testRunner.getProcessContext());
|
||||
assertNotNull(processor.mongoClient);
|
||||
processor.mongoClient = null;
|
||||
testRunner.setProperty(AbstractMongoProcessor.CLIENT_AUTH, "WANT");
|
||||
processor.createClient(testRunner.getProcessContext());
|
||||
assertNotNull(processor.mongoClient);
|
||||
}
|
||||
|
||||
@Test(expected = ProviderCreationException.class)
|
||||
public void testcreateClientWithSSLBadClientAuth() throws Exception {
|
||||
SSLContextService sslService = mock(SSLContextService.class);
|
||||
SSLContext sslContext = mock(SSLContext.class);
|
||||
when(sslService.getIdentifier()).thenReturn("ssl-context");
|
||||
when(sslService.createSSLContext(any(ClientAuth.class))).thenReturn(sslContext);
|
||||
testRunner.addControllerService("ssl-context", sslService);
|
||||
testRunner.enableControllerService(sslService);
|
||||
testRunner.setProperty(AbstractMongoProcessor.URI, "mongodb://localhost:27017");
|
||||
testRunner.setProperty(AbstractMongoProcessor.SSL_CONTEXT_SERVICE, "ssl-context");
|
||||
testRunner.assertValid(sslService);
|
||||
processor.createClient(testRunner.getProcessContext());
|
||||
assertNotNull(processor.mongoClient);
|
||||
processor.mongoClient = null;
|
||||
testRunner.setProperty(AbstractMongoProcessor.CLIENT_AUTH, "BAD");
|
||||
processor.createClient(testRunner.getProcessContext());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Provides a stubbed processor instance for testing
|
||||
*/
|
||||
public static class MockAbstractMongoProcessor extends AbstractMongoProcessor {
|
||||
@Override
|
||||
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
|
||||
// nothing to do
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Builder getClientOptions(SSLContext sslContext) {
|
||||
return MongoClientOptions.builder();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -22,9 +22,7 @@
|
|||
<version>1.0.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mongodb-bundle</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
<packaging>pom</packaging>
|
||||
|
||||
<modules>
|
||||
|
@ -32,4 +30,14 @@
|
|||
<module>nifi-mongodb-nar</module>
|
||||
</modules>
|
||||
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mongodb-processors</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
</project>
|
||||
|
|
2
pom.xml
2
pom.xml
|
@ -219,7 +219,7 @@ language governing permissions and limitations under the License. -->
|
|||
<dependency>
|
||||
<groupId>org.mongodb</groupId>
|
||||
<artifactId>mongo-java-driver</artifactId>
|
||||
<version>3.0.2</version>
|
||||
<version>3.2.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.wordnik</groupId>
|
||||
|
|
Loading…
Reference in New Issue