mirror of https://github.com/apache/nifi.git
NIFI-4345 Added a MongoDB controller service and a lookup service.
NIFI-4345: Changed Lookup Key to Lookup Value Field Signed-off-by: Matthew Burgess <mattyb149@apache.org> This closes #2123
This commit is contained in:
parent
05b5dd1488
commit
9a8e6b2eb1
|
@ -245,7 +245,12 @@
|
|||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mongodb-nar</artifactId>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mongodb-services-nar</artifactId>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-solr-nar</artifactId>
|
||||
|
|
|
@ -0,0 +1,43 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-standard-services</artifactId>
|
||||
<version>1.4.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>nifi-mongodb-client-service-api</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-api</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-utils</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mongodb</groupId>
|
||||
<artifactId>mongo-java-driver</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -0,0 +1,45 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.mongodb;
|
||||
|
||||
import org.apache.nifi.controller.ControllerService;
|
||||
import org.bson.Document;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface MongoDBClientService extends ControllerService {
|
||||
default Document convertJson(String query) {
|
||||
return Document.parse(query);
|
||||
}
|
||||
|
||||
long count(Document query);
|
||||
void delete(Document query);
|
||||
boolean exists(Document query);
|
||||
Document findOne(Document query);
|
||||
List<Document> findMany(Document query);
|
||||
List<Document> findMany(Document query, int limit);
|
||||
List<Document> findMany(Document query, Document sort, int limit);
|
||||
void insert(Document doc);
|
||||
void insert(List<Document> docs);
|
||||
void update(Document query, Document update);
|
||||
void update(Document query, Document update, boolean multiple);
|
||||
void updateOne(Document query, Document update);
|
||||
void upsert(Document query, Document update);
|
||||
void dropDatabase();
|
||||
void dropCollection();
|
||||
}
|
|
@ -0,0 +1,40 @@
|
|||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mongodb-services-bundle</artifactId>
|
||||
<version>1.4.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<artifactId>nifi-mongodb-services-nar</artifactId>
|
||||
<packaging>nar</packaging>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-standard-services-api-nar</artifactId>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-lookup-services</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mongodb-services</artifactId>
|
||||
<version>1.4.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -0,0 +1,131 @@
|
|||
nifi-mongodb-services-nar
|
||||
Copyright 2017 The Apache Software Foundation
|
||||
|
||||
This product includes software developed at
|
||||
The Apache Software Foundation (http://www.apache.org/).
|
||||
|
||||
******************
|
||||
Apache Software License v2
|
||||
******************
|
||||
|
||||
The following binary components are provided under the Apache Software License v2
|
||||
|
||||
(ASLv2) Apache Commons Configuration
|
||||
The following NOTICE information applies:
|
||||
Apache Commons Configuration
|
||||
Copyright 2001-2017 The Apache Software Foundation
|
||||
|
||||
This product includes software developed at
|
||||
The Apache Software Foundation (http://www.apache.org/).
|
||||
|
||||
(ASLv2) Apache Commons CSV
|
||||
The following NOTICE information applies:
|
||||
Apache Commons CSV
|
||||
Copyright 2005-2016 The Apache Software Foundation
|
||||
|
||||
This product includes software developed at
|
||||
The Apache Software Foundation (http://www.apache.org/).
|
||||
|
||||
(ASLv2) Apache Commons BeanUtils
|
||||
The following NOTICE information applies:
|
||||
Apache Commons BeanUtils
|
||||
Copyright 2000-2016 The Apache Software Foundation
|
||||
|
||||
This product includes software developed at
|
||||
The Apache Software Foundation (http://www.apache.org/).
|
||||
|
||||
(ASLv2) Apache Commons Lang
|
||||
The following NOTICE information applies:
|
||||
Apache Commons Lang
|
||||
Copyright 2001-2015 The Apache Software Foundation
|
||||
|
||||
This product includes software from the Spring Framework,
|
||||
under the Apache License 2.0 (see: StringUtils.containsWhitespace())
|
||||
|
||||
(ASLv2) Apache HttpComponents
|
||||
The following NOTICE information applies:
|
||||
Apache HttpClient
|
||||
Copyright 1999-2014 The Apache Software Foundation
|
||||
|
||||
Apache HttpCore
|
||||
Copyright 2005-2014 The Apache Software Foundation
|
||||
|
||||
This project contains annotations derived from JCIP-ANNOTATIONS
|
||||
Copyright (c) 2005 Brian Goetz and Tim Peierls. See http://www.jcip.net
|
||||
|
||||
(ASLv2) Apache Commons Codec
|
||||
The following NOTICE information applies:
|
||||
Apache Commons Codec
|
||||
Copyright 2002-2014 The Apache Software Foundation
|
||||
|
||||
src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
|
||||
contains test data from http://aspell.net/test/orig/batch0.tab.
|
||||
Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
|
||||
|
||||
===============================================================================
|
||||
|
||||
The content of package org.apache.commons.codec.language.bm has been translated
|
||||
from the original php source code available at http://stevemorse.org/phoneticinfo.htm
|
||||
with permission from the original authors.
|
||||
Original source copyright:
|
||||
Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
|
||||
|
||||
(ASLv2) Apache Commons Logging
|
||||
The following NOTICE information applies:
|
||||
Apache Commons Logging
|
||||
Copyright 2003-2013 The Apache Software Foundation
|
||||
|
||||
(ASLv2) Apache Commons Net
|
||||
The following NOTICE information applies:
|
||||
Apache Commons Net
|
||||
Copyright 2001-2016 The Apache Software Foundation
|
||||
|
||||
(ASLv2) Apache Commons Collections
|
||||
The following NOTICE information applies:
|
||||
Apache Commons Collections
|
||||
Copyright 2001-2016 The Apache Software Foundation
|
||||
|
||||
(ASLv2) Apache Commons IO
|
||||
The following NOTICE information applies:
|
||||
Apache Commons IO
|
||||
Copyright 2002-2016 The Apache Software Foundation
|
||||
|
||||
(ASLv2) GeoIP2 Java API
|
||||
The following NOTICE information applies:
|
||||
GeoIP2 Java API
|
||||
This software is Copyright (c) 2013 by MaxMind, Inc.
|
||||
|
||||
(ASLv2) Google HTTP Client Library for Java
|
||||
The following NOTICE information applies:
|
||||
Copyright 2011 Google Inc.
|
||||
|
||||
(ASLv2) Jackson JSON processor
|
||||
The following NOTICE information applies:
|
||||
# Jackson JSON processor
|
||||
|
||||
Jackson is a high-performance, Free/Open Source JSON processing library.
|
||||
It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
|
||||
been in development since 2007.
|
||||
It is currently developed by a community of developers, as well as supported
|
||||
commercially by FasterXML.com.
|
||||
|
||||
## Licensing
|
||||
|
||||
Jackson core and extension components may licensed under different licenses.
|
||||
To find the details that apply to this artifact see the accompanying LICENSE file.
|
||||
For more information, including possible other licensing options, contact
|
||||
FasterXML.com (http://fasterxml.com).
|
||||
|
||||
## Credits
|
||||
|
||||
A list of contributors may be found from CREDITS file, which is included
|
||||
in some artifacts (usually source distributions); but is always available
|
||||
from the source code management (SCM) system project uses.
|
||||
|
||||
************************
|
||||
Creative Commons Attribution-ShareAlike 3.0
|
||||
************************
|
||||
|
||||
The following binary components are provided under the Creative Commons Attribution-ShareAlike 3.0. See project link for details.
|
||||
|
||||
(CCAS 3.0) MaxMind DB (https://github.com/maxmind/MaxMind-DB)
|
|
@ -0,0 +1,89 @@
|
|||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<!-- Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with this
|
||||
work for additional information regarding copyright ownership. The ASF licenses
|
||||
this file to You under the Apache License, Version 2.0 (the "License"); you
|
||||
may not use this file except in compliance with the License. You may obtain
|
||||
a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
|
||||
required by applicable law or agreed to in writing, software distributed
|
||||
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
|
||||
OR CONDITIONS OF ANY KIND, either express or implied. See the License for
|
||||
the specific language governing permissions and limitations under the License. -->
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mongodb-services-bundle</artifactId>
|
||||
<version>1.4.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<artifactId>nifi-mongodb-services</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-processor-utils</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-lookup-service-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-utils</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-record</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mongodb-client-service-api</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mongodb</groupId>
|
||||
<artifactId>mongo-java-driver</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mock</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-simple</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-ssl-context-service-api</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.rat</groupId>
|
||||
<artifactId>apache-rat-plugin</artifactId>
|
||||
<configuration>
|
||||
<excludes combine.children="append">
|
||||
<exclude>src/test/resources/test.csv</exclude>
|
||||
<exclude>src/test/resources/test.properties</exclude>
|
||||
<exclude>src/test/resources/test.xml</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
|
@ -0,0 +1,227 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.mongodb;
|
||||
|
||||
import com.mongodb.MongoClient;
|
||||
import com.mongodb.MongoClientOptions;
|
||||
import com.mongodb.MongoClientOptions.Builder;
|
||||
import com.mongodb.MongoClientURI;
|
||||
import com.mongodb.WriteConcern;
|
||||
import com.mongodb.client.MongoCollection;
|
||||
import com.mongodb.client.MongoDatabase;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||
import org.apache.nifi.authentication.exception.ProviderCreationException;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.security.util.SslContextFactory;
|
||||
import org.apache.nifi.ssl.SSLContextService;
|
||||
import org.bson.Document;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class AbstractMongoDBControllerService extends AbstractControllerService {
|
||||
static final String WRITE_CONCERN_ACKNOWLEDGED = "ACKNOWLEDGED";
|
||||
static final String WRITE_CONCERN_UNACKNOWLEDGED = "UNACKNOWLEDGED";
|
||||
static final String WRITE_CONCERN_FSYNCED = "FSYNCED";
|
||||
static final String WRITE_CONCERN_JOURNALED = "JOURNALED";
|
||||
static final String WRITE_CONCERN_REPLICA_ACKNOWLEDGED = "REPLICA_ACKNOWLEDGED";
|
||||
static final String WRITE_CONCERN_MAJORITY = "MAJORITY";
|
||||
|
||||
protected static final PropertyDescriptor URI = new PropertyDescriptor.Builder()
|
||||
.name("mongo-uri")
|
||||
.displayName("Mongo URI")
|
||||
.description("MongoURI, typically of the form: mongodb://host1[:port1][,host2[:port2],...]")
|
||||
.required(true)
|
||||
.expressionLanguageSupported(true)
|
||||
.addValidator(Validation.DOCUMENT_VALIDATOR)
|
||||
.build();
|
||||
protected static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder()
|
||||
.name("mongo-db-name")
|
||||
.displayName("Mongo Database Name")
|
||||
.description("The name of the database to use")
|
||||
.required(true)
|
||||
.expressionLanguageSupported(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
protected static final PropertyDescriptor COLLECTION_NAME = new PropertyDescriptor.Builder()
|
||||
.name("mongo-collection-name")
|
||||
.displayName("Mongo Collection Name")
|
||||
.description("The name of the collection to use")
|
||||
.required(true)
|
||||
.expressionLanguageSupported(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("ssl-context-service")
|
||||
.displayName("SSL Context Service")
|
||||
.description("The SSL Context Service used to provide client certificate information for TLS/SSL "
|
||||
+ "connections.")
|
||||
.required(false)
|
||||
.identifiesControllerService(SSLContextService.class)
|
||||
.build();
|
||||
public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
|
||||
.name("ssl-client-auth")
|
||||
.displayName("Client Auth")
|
||||
.description("Client authentication policy when connecting to secure (TLS/SSL) cluster. "
|
||||
+ "Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context "
|
||||
+ "has been defined and enabled.")
|
||||
.required(false)
|
||||
.allowableValues(SSLContextService.ClientAuth.values())
|
||||
.defaultValue("REQUIRED")
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder()
|
||||
.name("mongo-write-concern")
|
||||
.displayName("Write Concern")
|
||||
.description("The write concern to use")
|
||||
.required(true)
|
||||
.allowableValues(WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_FSYNCED, WRITE_CONCERN_JOURNALED,
|
||||
WRITE_CONCERN_REPLICA_ACKNOWLEDGED, WRITE_CONCERN_MAJORITY)
|
||||
.defaultValue(WRITE_CONCERN_ACKNOWLEDGED)
|
||||
.build();
|
||||
|
||||
static List<PropertyDescriptor> descriptors = new ArrayList<>();
|
||||
|
||||
static {
|
||||
descriptors.add(URI);
|
||||
descriptors.add(DATABASE_NAME);
|
||||
descriptors.add(COLLECTION_NAME);
|
||||
descriptors.add(SSL_CONTEXT_SERVICE);
|
||||
descriptors.add(CLIENT_AUTH);
|
||||
}
|
||||
|
||||
protected MongoClient mongoClient;
|
||||
|
||||
protected final void createClient(ConfigurationContext context) throws IOException {
|
||||
if (mongoClient != null) {
|
||||
closeClient();
|
||||
}
|
||||
|
||||
getLogger().info("Creating MongoClient");
|
||||
|
||||
// Set up the client for secure (SSL/TLS communications) if configured to do so
|
||||
final SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
|
||||
final String rawClientAuth = context.getProperty(CLIENT_AUTH).getValue();
|
||||
final SSLContext sslContext;
|
||||
|
||||
if (sslService != null) {
|
||||
final SSLContextService.ClientAuth clientAuth;
|
||||
if (StringUtils.isBlank(rawClientAuth)) {
|
||||
clientAuth = SSLContextService.ClientAuth.REQUIRED;
|
||||
} else {
|
||||
try {
|
||||
clientAuth = SSLContextService.ClientAuth.valueOf(rawClientAuth);
|
||||
} catch (final IllegalArgumentException iae) {
|
||||
throw new ProviderCreationException(String.format("Unrecognized client auth '%s'. Possible values are [%s]",
|
||||
rawClientAuth, StringUtils.join(SslContextFactory.ClientAuth.values(), ", ")));
|
||||
}
|
||||
}
|
||||
sslContext = sslService.createSSLContext(clientAuth);
|
||||
} else {
|
||||
sslContext = null;
|
||||
}
|
||||
|
||||
try {
|
||||
if(sslContext == null) {
|
||||
mongoClient = new MongoClient(new MongoClientURI(getURI(context)));
|
||||
} else {
|
||||
mongoClient = new MongoClient(new MongoClientURI(getURI(context), getClientOptions(sslContext)));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
getLogger().error("Failed to schedule {} due to {}", new Object[] { this.getClass().getName(), e }, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
protected Builder getClientOptions(final SSLContext sslContext) {
|
||||
MongoClientOptions.Builder builder = MongoClientOptions.builder();
|
||||
builder.sslEnabled(true);
|
||||
builder.socketFactory(sslContext.getSocketFactory());
|
||||
return builder;
|
||||
}
|
||||
|
||||
@OnStopped
|
||||
public final void closeClient() {
|
||||
if (mongoClient != null) {
|
||||
mongoClient.close();
|
||||
mongoClient = null;
|
||||
}
|
||||
}
|
||||
|
||||
protected MongoDatabase getDatabase(final ConfigurationContext context) {
|
||||
return getDatabase(context, null);
|
||||
}
|
||||
|
||||
protected MongoDatabase getDatabase(final ConfigurationContext context, final FlowFile flowFile) {
|
||||
final String databaseName = context.getProperty(DATABASE_NAME).evaluateAttributeExpressions(flowFile).getValue();
|
||||
return mongoClient.getDatabase(databaseName);
|
||||
}
|
||||
|
||||
protected MongoCollection<Document> getCollection(final ConfigurationContext context) {
|
||||
return getCollection(context, null);
|
||||
}
|
||||
|
||||
protected MongoCollection<Document> getCollection(final ConfigurationContext context, final FlowFile flowFile) {
|
||||
final String collectionName = context.getProperty(COLLECTION_NAME).evaluateAttributeExpressions(flowFile).getValue();
|
||||
return getDatabase(context, flowFile).getCollection(collectionName);
|
||||
}
|
||||
|
||||
protected String getURI(final ConfigurationContext context) {
|
||||
return context.getProperty(URI).evaluateAttributeExpressions().getValue();
|
||||
}
|
||||
|
||||
protected WriteConcern getWriteConcern(final ConfigurationContext context) {
|
||||
final String writeConcernProperty = context.getProperty(WRITE_CONCERN).getValue();
|
||||
WriteConcern writeConcern = null;
|
||||
switch (writeConcernProperty) {
|
||||
case WRITE_CONCERN_ACKNOWLEDGED:
|
||||
writeConcern = WriteConcern.ACKNOWLEDGED;
|
||||
break;
|
||||
case WRITE_CONCERN_UNACKNOWLEDGED:
|
||||
writeConcern = WriteConcern.UNACKNOWLEDGED;
|
||||
break;
|
||||
case WRITE_CONCERN_FSYNCED:
|
||||
writeConcern = WriteConcern.FSYNCED;
|
||||
break;
|
||||
case WRITE_CONCERN_JOURNALED:
|
||||
writeConcern = WriteConcern.JOURNALED;
|
||||
break;
|
||||
case WRITE_CONCERN_REPLICA_ACKNOWLEDGED:
|
||||
writeConcern = WriteConcern.REPLICA_ACKNOWLEDGED;
|
||||
break;
|
||||
case WRITE_CONCERN_MAJORITY:
|
||||
writeConcern = WriteConcern.MAJORITY;
|
||||
break;
|
||||
default:
|
||||
writeConcern = WriteConcern.ACKNOWLEDGED;
|
||||
}
|
||||
return writeConcern;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return descriptors;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,154 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.mongodb;
|
||||
|
||||
import com.mongodb.client.FindIterable;
|
||||
import com.mongodb.client.MongoCollection;
|
||||
import com.mongodb.client.MongoCursor;
|
||||
import com.mongodb.client.MongoDatabase;
|
||||
import com.mongodb.client.model.UpdateOptions;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnDisabled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.bson.Document;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@Tags({"mongo", "mongodb", "service"})
|
||||
@CapabilityDescription(
|
||||
"Provides a controller service that wraps most of the functionality of the MongoDB driver."
|
||||
)
|
||||
public class MongoDBControllerService extends AbstractMongoDBControllerService implements MongoDBClientService {
|
||||
private MongoDatabase db;
|
||||
private MongoCollection<Document> col;
|
||||
|
||||
@OnEnabled
|
||||
public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException {
|
||||
this.createClient(context);
|
||||
this.db = this.mongoClient.getDatabase(context.getProperty(MongoDBControllerService.DATABASE_NAME).getValue());
|
||||
this.col = this.db.getCollection(context.getProperty(MongoDBControllerService.COLLECTION_NAME).getValue());
|
||||
}
|
||||
|
||||
@OnDisabled
|
||||
public void onDisable() {
|
||||
this.mongoClient.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long count(Document query) {
|
||||
return this.col.count(query);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete(Document query) {
|
||||
this.col.deleteMany(query);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean exists(Document query) {
|
||||
return this.col.count(query) > 0;
|
||||
}
|
||||
|
||||
public Document findOne(Document query) {
|
||||
MongoCursor<Document> cursor = this.col.find(query).limit(1).iterator();
|
||||
Document retVal = cursor.next();
|
||||
cursor.close();
|
||||
|
||||
return retVal;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Document> findMany(Document query) {
|
||||
return findMany(query, null, -1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Document> findMany(Document query, int limit) {
|
||||
return findMany(query, null, limit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Document> findMany(Document query, Document sort, int limit) {
|
||||
FindIterable<Document> fi = this.col.find(query);
|
||||
if (limit > 0) {
|
||||
fi = fi.limit(limit);
|
||||
}
|
||||
if (sort != null) {
|
||||
fi = fi.sort(sort);
|
||||
}
|
||||
MongoCursor<Document> cursor = fi.iterator();
|
||||
List<Document> retVal = new ArrayList<>();
|
||||
while (cursor.hasNext()) {
|
||||
retVal.add(cursor.next());
|
||||
}
|
||||
cursor.close();
|
||||
|
||||
return retVal;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void insert(Document doc) {
|
||||
this.col.insertOne(doc);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void insert(List<Document> docs) {
|
||||
this.col.insertMany(docs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void update(Document query, Document update, boolean multiple) {
|
||||
if (multiple) {
|
||||
this.col.updateMany(query, update);
|
||||
} else {
|
||||
this.col.updateOne(query, update);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void update(Document query, Document update) {
|
||||
update(query, update, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateOne(Document query, Document update) {
|
||||
this.update(query, update, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void upsert(Document query, Document update) {
|
||||
this.col.updateOne(query, update, new UpdateOptions().upsert(true));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dropDatabase() {
|
||||
this.db.drop();
|
||||
this.col = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dropCollection() {
|
||||
this.col.drop();
|
||||
this.col = null;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,131 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.mongodb;
|
||||
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.lookup.LookupFailureException;
|
||||
import org.apache.nifi.lookup.LookupService;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||
import org.apache.nifi.serialization.record.MapRecord;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordField;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.bson.Document;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
|
||||
@Tags({"mongo", "mongodb", "lookup", "record"})
|
||||
@CapabilityDescription(
|
||||
"Provides a lookup service based around MongoDB. Each key that is specified \n" +
|
||||
"will be added to a query as-is. For example, if you specify the two keys, \n" +
|
||||
"user and email, the resulting query will be { \"user\": \"tester\", \"email\": \"tester@test.com\" }.\n" +
|
||||
"The query is limited to the first result (findOne in the Mongo documentation). If no \"Lookup Value Field\" is specified " +
|
||||
"then the entire MongoDB result document minus the _id field will be returned as a record."
|
||||
)
|
||||
public class MongoDBLookupService extends MongoDBControllerService implements LookupService<Object> {
|
||||
|
||||
public static final PropertyDescriptor LOOKUP_VALUE_FIELD = new PropertyDescriptor.Builder()
|
||||
.name("mongo-lookup-value-field")
|
||||
.displayName("Lookup Value Field")
|
||||
.description("The field whose value will be returned when the lookup key(s) match a record. If not specified then the entire " +
|
||||
"MongoDB result document minus the _id field will be returned as a record.")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
private String lookupValueField;
|
||||
|
||||
private static final List<PropertyDescriptor> lookupDescriptors;
|
||||
|
||||
static {
|
||||
lookupDescriptors = new ArrayList<>();
|
||||
lookupDescriptors.addAll(descriptors);
|
||||
lookupDescriptors.add(LOOKUP_VALUE_FIELD);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Object> lookup(Map<String, String> coordinates) throws LookupFailureException {
|
||||
Map<String, Object> clean = new HashMap<>();
|
||||
clean.putAll(coordinates);
|
||||
Document query = new Document(clean);
|
||||
|
||||
if (coordinates.size() == 0) {
|
||||
throw new LookupFailureException("No keys were configured. Mongo query would return random documents.");
|
||||
}
|
||||
|
||||
try {
|
||||
Document result = this.findOne(query);
|
||||
|
||||
if (lookupValueField != null && !lookupValueField.equals("")) {
|
||||
return Optional.ofNullable(result.get(lookupValueField));
|
||||
} else {
|
||||
final List<RecordField> fields = new ArrayList<>();
|
||||
|
||||
for (String key : result.keySet()) {
|
||||
if (key.equals("_id")) {
|
||||
continue;
|
||||
}
|
||||
fields.add(new RecordField(key, RecordFieldType.STRING.getDataType()));
|
||||
}
|
||||
|
||||
final RecordSchema schema = new SimpleRecordSchema(fields);
|
||||
return Optional.ofNullable(new MapRecord(schema, result));
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
getLogger().error("Error during lookup {}", new Object[]{ query.toJson() }, ex);
|
||||
throw new LookupFailureException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
@OnEnabled
|
||||
public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException {
|
||||
this.lookupValueField = context.getProperty(LOOKUP_VALUE_FIELD).getValue();
|
||||
super.onEnabled(context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<?> getValueType() {
|
||||
return Record.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getRequiredKeys() {
|
||||
return Collections.emptySet();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
|
||||
return lookupDescriptors;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.mongodb;
|
||||
|
||||
import com.mongodb.MongoClientURI;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.components.Validator;
|
||||
|
||||
public class Validation {
|
||||
public static final Validator DOCUMENT_VALIDATOR = new Validator() {
|
||||
|
||||
@Override
|
||||
public ValidationResult validate(String subject, String value, ValidationContext context) {
|
||||
final ValidationResult.Builder builder = new ValidationResult.Builder();
|
||||
builder.subject(subject).input(value);
|
||||
|
||||
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
|
||||
return builder.valid(true).explanation("Contains Expression Language").build();
|
||||
}
|
||||
|
||||
String reason = null;
|
||||
try {
|
||||
new MongoClientURI(value);
|
||||
} catch (final Exception e) {
|
||||
reason = e.getLocalizedMessage();
|
||||
}
|
||||
|
||||
return builder.explanation(reason).valid(reason == null).build();
|
||||
}
|
||||
};
|
||||
}
|
|
@ -0,0 +1,16 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
org.apache.nifi.mongodb.MongoDBLookupService
|
||||
org.apache.nifi.mongodb.MongoDBControllerService
|
|
@ -0,0 +1,48 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.mongodb;
|
||||
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class TestControllerServiceProcessor extends AbstractProcessor {
|
||||
|
||||
static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("Client Service")
|
||||
.description("MongoDBClientService")
|
||||
.identifiesControllerService(MongoDBClientService.class)
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
@Override
|
||||
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
List<PropertyDescriptor> propDescs = new ArrayList<>();
|
||||
propDescs.add(CLIENT_SERVICE);
|
||||
return propDescs;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,48 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.mongodb;
|
||||
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class TestLookupServiceProcessor extends AbstractProcessor {
|
||||
|
||||
static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("Client Service")
|
||||
.description("MongoDBLookupService")
|
||||
.identifiesControllerService(MongoDBLookupService.class)
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
@Override
|
||||
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
List<PropertyDescriptor> propDescs = new ArrayList<>();
|
||||
propDescs.add(CLIENT_SERVICE);
|
||||
return propDescs;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,179 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.mongodb;
|
||||
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.bson.Document;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Calendar;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.UUID;
|
||||
|
||||
@Ignore("This is an integration test and requires a copy of MongoDB running on localhost")
|
||||
public class TestMongoDBControllerService {
|
||||
private static final String DB_NAME = String.format("nifi_test-%d", Calendar.getInstance().getTimeInMillis());
|
||||
private static final String COL_NAME = String.format("nifi_test-%d", Calendar.getInstance().getTimeInMillis());
|
||||
|
||||
private TestRunner runner;
|
||||
private MongoDBControllerService service;
|
||||
|
||||
@Before
|
||||
public void before() throws Exception {
|
||||
runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class);
|
||||
service = new MongoDBControllerService();
|
||||
runner.addControllerService("Client Service", service);
|
||||
runner.setProperty(service, MongoDBControllerService.DATABASE_NAME, DB_NAME);
|
||||
runner.setProperty(service, MongoDBControllerService.COLLECTION_NAME, COL_NAME);
|
||||
runner.setProperty(service, MongoDBControllerService.URI, "mongodb://localhost:27017");
|
||||
runner.enableControllerService(service);
|
||||
}
|
||||
|
||||
@After
|
||||
public void after() throws Exception {
|
||||
service.dropDatabase();
|
||||
service.onDisable();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInit() throws Exception {
|
||||
runner.assertValid(service);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBasicCRUD() throws Exception {
|
||||
Document doc = service.convertJson("{\n" +
|
||||
"\t\"uuid\": \"x-y-z\",\n" +
|
||||
"\t\"message\": \"Testing!\"\n" +
|
||||
"}");
|
||||
Document lookup = service.convertJson("{ \"uuid\": \"x-y-z\" }");
|
||||
Document update = service.convertJson("{\n" +
|
||||
"\t\"$set\": {\n" +
|
||||
"\t\t\"updatedBy\": \"testUser\"\n" +
|
||||
"\t}\n" +
|
||||
"}");
|
||||
|
||||
service.insert(doc);
|
||||
Document result = service.findOne(lookup);
|
||||
|
||||
Assert.assertNotNull("The result was null", result);
|
||||
Assert.assertEquals("The UUID did not match", result.getString("uuid"), "x-y-z");
|
||||
Assert.assertNotNull("The message block was missing", result.getString("message"));
|
||||
Assert.assertEquals("The message block did not match", result.getString("message"), "Testing!");
|
||||
|
||||
service.update(lookup, update, false);
|
||||
|
||||
result = service.findOne(lookup);
|
||||
|
||||
Assert.assertNotNull("The result was null", result);
|
||||
Assert.assertEquals("The UUID did not match", result.getString("uuid"), "x-y-z");
|
||||
Assert.assertNotNull("The message block was missing", result.getString("message"));
|
||||
Assert.assertEquals("The message block did not match", result.getString("message"), "Testing!");
|
||||
Assert.assertNotNull("The updatedBy block was missing", result.getString("updatedBy"));
|
||||
Assert.assertEquals("The updatedBy block did not match", result.getString("updatedBy"), "testUser");
|
||||
|
||||
service.delete(lookup);
|
||||
|
||||
boolean exists = service.exists(lookup);
|
||||
|
||||
Assert.assertFalse("After the delete, the document still existed", exists);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleCRUD() throws Exception {
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
List<Document> sampleDocuments = new ArrayList<>();
|
||||
List<String> uuids = new ArrayList<>();
|
||||
Map<String, Object> mappings = new HashMap<>();
|
||||
Random random = new Random();
|
||||
int count = random.nextInt(1000);
|
||||
for (int x = 0; x < count; x++) {
|
||||
Map<String, Object> doc = new HashMap<>();
|
||||
String uuid = UUID.randomUUID().toString();
|
||||
String ts = Calendar.getInstance().getTime().toString();
|
||||
uuids.add(uuid);
|
||||
mappings.put(uuid, ts);
|
||||
|
||||
doc.put("uuid", uuid);
|
||||
doc.put("timestamp", ts);
|
||||
doc.put("randomNumber", random.nextInt(10));
|
||||
|
||||
String json = mapper.writeValueAsString(doc);
|
||||
sampleDocuments.add(service.convertJson(json));
|
||||
}
|
||||
|
||||
service.insert(sampleDocuments);
|
||||
|
||||
long docCount = service.count(service.convertJson("{}"));
|
||||
|
||||
Assert.assertEquals("The counts did not match", docCount, count);
|
||||
for (String uuid : uuids) {
|
||||
Document lookup = service.convertJson(String.format("{ \"uuid\": \"%s\" }", uuid));
|
||||
Document result = service.findOne(lookup);
|
||||
Assert.assertNotNull("The document was not found", result);
|
||||
Assert.assertEquals("The uuid did not match", result.getString("uuid"), uuid);
|
||||
Assert.assertEquals("The timestamp did not match", result.getString("timestamp"), mappings.get(uuid));
|
||||
}
|
||||
|
||||
Document query = service.convertJson("{ \"randomNumber\": 5 }");
|
||||
docCount = service.count(query);
|
||||
List<Document> results = service.findMany(query);
|
||||
|
||||
Assert.assertTrue("Count should have been >= 1", docCount >= 1);
|
||||
Assert.assertNotNull("Result set was null", results);
|
||||
Assert.assertEquals("The counts did not match up", docCount, results.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpsert() throws Exception {
|
||||
Document query = service.convertJson(String.format("{ \"uuid\": \"%s\" }", UUID.randomUUID().toString()));
|
||||
Document update = service.convertJson("{ \"$set\": { \"message\": \"Hello, world\" } }");
|
||||
service.upsert(query, update);
|
||||
|
||||
Document result = service.findOne(query);
|
||||
Assert.assertNotNull("No result returned", result);
|
||||
Assert.assertEquals("UUID did not match", result.getString("uuid"), query.getString("uuid"));
|
||||
Assert.assertEquals("Message did not match", result.getString("message"), "Hello, world");
|
||||
|
||||
Map<String, String> mappings = new HashMap<>();
|
||||
for (int x = 0; x < 5; x++) {
|
||||
String fieldName = String.format("field_%d", x);
|
||||
String uuid = UUID.randomUUID().toString();
|
||||
mappings.put(fieldName, uuid);
|
||||
update = service.convertJson(String.format("{ \"$set\": { \"%s\": \"%s\" } }", fieldName, uuid));
|
||||
|
||||
service.upsert(query, update);
|
||||
}
|
||||
|
||||
result = service.findOne(query);
|
||||
|
||||
for (Map.Entry<String, String> entry : mappings.entrySet()) {
|
||||
Assert.assertEquals("Entry did not match.", entry.getValue(), result.getString(entry.getKey()));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,154 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.mongodb;
|
||||
|
||||
import org.apache.nifi.lookup.LookupFailureException;
|
||||
import org.apache.nifi.serialization.record.MapRecord;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.bson.Document;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Calendar;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
@Ignore("This is an integration test and requires a copy of MongoDB running on localhost")
|
||||
public class TestMongoDBLookupService {
|
||||
private static final String DB_NAME = String.format("nifi_test-%d", Calendar.getInstance().getTimeInMillis());
|
||||
private static final String COL_NAME = String.format("nifi_test-%d", Calendar.getInstance().getTimeInMillis());
|
||||
|
||||
private TestRunner runner;
|
||||
private MongoDBLookupService service;
|
||||
|
||||
@Before
|
||||
public void before() throws Exception {
|
||||
runner = TestRunners.newTestRunner(TestLookupServiceProcessor.class);
|
||||
service = new MongoDBLookupService();
|
||||
runner.addControllerService("Client Service", service);
|
||||
runner.setProperty(service, MongoDBLookupService.DATABASE_NAME, DB_NAME);
|
||||
runner.setProperty(service, MongoDBLookupService.COLLECTION_NAME, COL_NAME);
|
||||
runner.setProperty(service, MongoDBLookupService.URI, "mongodb://localhost:27017");
|
||||
runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, "message");
|
||||
}
|
||||
|
||||
@After
|
||||
public void after() throws Exception {
|
||||
service.dropDatabase();
|
||||
service.onDisable();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInit() throws Exception {
|
||||
runner.enableControllerService(service);
|
||||
runner.assertValid(service);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLookupSingle() throws Exception {
|
||||
runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, "message");
|
||||
runner.enableControllerService(service);
|
||||
Document document = service.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }");
|
||||
service.insert(document);
|
||||
|
||||
Map<String, String> criteria = new HashMap<>();
|
||||
criteria.put("uuid", "x-y-z");
|
||||
Optional result = service.lookup(criteria);
|
||||
|
||||
Assert.assertNotNull("The value was null.", result.get());
|
||||
Assert.assertEquals("The value was wrong.", "Hello, world", result.get());
|
||||
|
||||
Map<String, Object> clean = new HashMap<>();
|
||||
clean.putAll(criteria);
|
||||
service.delete(new Document(clean));
|
||||
|
||||
boolean error = false;
|
||||
try {
|
||||
service.lookup(criteria);
|
||||
} catch (LookupFailureException ex) {
|
||||
error = true;
|
||||
}
|
||||
|
||||
Assert.assertTrue("An error should have been thrown.", error);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLookupRecord() throws Exception {
|
||||
runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, "");
|
||||
runner.enableControllerService(service);
|
||||
Document document = service.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }");
|
||||
service.insert(document);
|
||||
|
||||
Map<String, String> criteria = new HashMap<>();
|
||||
criteria.put("uuid", "x-y-z");
|
||||
Optional result = service.lookup(criteria);
|
||||
|
||||
Assert.assertNotNull("The value was null.", result.get());
|
||||
Assert.assertTrue("The value was wrong.", result.get() instanceof MapRecord);
|
||||
MapRecord record = (MapRecord)result.get();
|
||||
Assert.assertEquals("The value was wrong.", "Hello, world", record.getAsString("message"));
|
||||
Assert.assertEquals("The value was wrong.", "x-y-z", record.getAsString("uuid"));
|
||||
|
||||
Map<String, Object> clean = new HashMap<>();
|
||||
clean.putAll(criteria);
|
||||
service.delete(new Document(clean));
|
||||
|
||||
boolean error = false;
|
||||
try {
|
||||
service.lookup(criteria);
|
||||
} catch (LookupFailureException ex) {
|
||||
error = true;
|
||||
}
|
||||
|
||||
Assert.assertTrue("An error should have been thrown.", error);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testServiceParameters() throws Exception {
|
||||
runner.enableControllerService(service);
|
||||
Document document = service.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }");
|
||||
service.insert(document);
|
||||
|
||||
Map<String, String> criteria = new HashMap<>();
|
||||
criteria.put("uuid", "x-y-z");
|
||||
|
||||
boolean error = false;
|
||||
try {
|
||||
service.lookup(criteria);
|
||||
} catch(Exception ex) {
|
||||
error = true;
|
||||
}
|
||||
|
||||
Assert.assertFalse("An error was thrown when no error should have been thrown.", error);
|
||||
error = false;
|
||||
|
||||
try {
|
||||
service.lookup(new HashMap());
|
||||
} catch (Exception ex) {
|
||||
error = true;
|
||||
Assert.assertTrue("The exception was the wrong type", ex instanceof LookupFailureException);
|
||||
}
|
||||
|
||||
Assert.assertTrue("An error was not thrown when the input was empty", error);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-standard-services</artifactId>
|
||||
<version>1.4.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<artifactId>nifi-mongodb-services-bundle</artifactId>
|
||||
<packaging>pom</packaging>
|
||||
<modules>
|
||||
<module>nifi-mongodb-services</module>
|
||||
<module>nifi-mongodb-services-nar</module>
|
||||
</modules>
|
||||
</project>
|
|
@ -51,6 +51,11 @@
|
|||
<artifactId>nifi-dbcp-service-api</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mongodb-client-service-api</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-hbase-client-service-api</artifactId>
|
||||
|
|
|
@ -41,5 +41,7 @@
|
|||
<module>nifi-record-serialization-service-api</module>
|
||||
<module>nifi-record-serialization-services-bundle</module>
|
||||
<module>nifi-hwx-schema-registry-bundle</module>
|
||||
<module>nifi-mongodb-client-service-api</module>
|
||||
<module>nifi-mongodb-services-bundle</module>
|
||||
</modules>
|
||||
</project>
|
||||
|
|
7
pom.xml
7
pom.xml
|
@ -1109,7 +1109,7 @@
|
|||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-kudu-nar</artifactId>
|
||||
<artifactId>nifi-mongodb-services-nar</artifactId>
|
||||
<version>1.4.0-SNAPSHOT</version>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
|
@ -1597,6 +1597,11 @@
|
|||
<artifactId>nifi-hbase-client-service-api</artifactId>
|
||||
<version>1.4.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mongodb-client-service-api</artifactId>
|
||||
<version>1.4.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-websocket-services-api</artifactId>
|
||||
|
|
Loading…
Reference in New Issue