NIFI-570: Added MongoDB put and get processors

This commit is contained in:
Tim Reardon 2015-05-19 08:09:02 -04:00
parent b7ddf89450
commit add03e3893
13 changed files with 1159 additions and 8 deletions

View File

@ -491,6 +491,10 @@ The following binary components are provided under the Apache Software License v
This product includes software developed by
Saxonica (http://www.saxonica.com/).
(ASLv2) MongoDB Java Driver
The following NOTICE information applies:
Copyright (C) 2008-2013 10gen, Inc.
(ASLv2) Parquet MR
The following NOTICE information applies:
Parquet MR

View File

@ -162,6 +162,11 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-kite-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mongodb-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-solr-nar</artifactId>

View File

@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mongodb-bundle</artifactId>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-mongodb-nar</artifactId>
<version>0.1.1-incubating-SNAPSHOT</version>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mongodb-processors</artifactId>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,98 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mongodb-bundle</artifactId>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-mongodb-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>com.github.joelittlejohn.embedmongo</groupId>
<artifactId>embedmongo-maven-plugin</artifactId>
<version>0.1.12</version>
<executions>
<execution>
<id>start</id>
<goals>
<goal>start</goal>
</goals>
<phase>test-compile</phase>
<configuration>
<databaseDirectory>${project.build.directory}/embedmongo/db</databaseDirectory>
<logging>file</logging>
<logFile>${project.build.directory}/embedmongo.log</logFile>
</configuration>
</execution>
<execution>
<id>stop</id>
<goals>
<goal>stop</goal>
</goals>
<phase>prepare-package</phase>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.mongodb;
import java.io.IOException;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.bson.Document;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
public abstract class AbstractMongoProcessor extends AbstractProcessor {
protected static final PropertyDescriptor URI = new PropertyDescriptor.Builder()
.name("Mongo URI")
.description("MongoURI, typically of the form: mongodb://host1[:port1][,host2[:port2],...]")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder()
.name("Mongo Database Name")
.description("The name of the database to use")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor COLLECTION_NAME = new PropertyDescriptor.Builder()
.name("Mongo Collection Name")
.description("The name of the collection to use")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected MongoClient mongoClient;
@OnScheduled
public final void createClient(ProcessContext context) throws IOException {
if (mongoClient != null) {
closeClient();
}
getLogger().info("Creating MongoClient");
try {
final String uri = context.getProperty(URI).getValue();
mongoClient = new MongoClient(new MongoClientURI(uri));
} catch (Exception e) {
getLogger().error("Failed to schedule PutMongo due to {}", new Object[] { e }, e);
throw e;
}
}
@OnStopped
public final void closeClient() {
if (mongoClient != null) {
getLogger().info("Closing MongoClient");
mongoClient.close();
mongoClient = null;
}
}
protected MongoDatabase getDatabase(final ProcessContext context) {
final String databaseName = context.getProperty(DATABASE_NAME).getValue();
return mongoClient.getDatabase(databaseName);
}
protected MongoCollection<Document> getCollection(final ProcessContext context) {
final String collectionName = context.getProperty(COLLECTION_NAME).getValue();
return getDatabase(context).getCollection(collectionName);
}
}

View File

@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.mongodb;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.bson.Document;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
@Tags({ "mongodb", "read", "get" })
@CapabilityDescription("Creates FlowFiles from documents in MongoDB")
public class GetMongo extends AbstractMongoProcessor {
public static final Validator DOCUMENT_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
String reason = null;
try {
Document.parse(value);
} catch (final RuntimeException e) {
reason = e.getClass().getName();
}
return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
}
};
static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All files are routed to success").build();
static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
.name("Query")
.description("The selection criteria; must be a valid BSON document; if omitted the entire collection will be queried")
.required(false)
.addValidator(DOCUMENT_VALIDATOR)
.build();
static final PropertyDescriptor PROJECTION = new PropertyDescriptor.Builder()
.name("Projection")
.description("The fields to be returned from the documents in the result set; must be a valid BSON document")
.required(false)
.addValidator(DOCUMENT_VALIDATOR)
.build();
static final PropertyDescriptor SORT = new PropertyDescriptor.Builder()
.name("Sort")
.description("The fields by which to sort; must be a valid BSON document")
.required(false)
.addValidator(DOCUMENT_VALIDATOR)
.build();
static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
.name("Limit")
.description("The maximum number of elements to return")
.required(false)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("The number of elements returned from the server in one batch")
.required(false)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
private final List<PropertyDescriptor> descriptors;
private final Set<Relationship> relationships;
public GetMongo() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(URI);
descriptors.add(DATABASE_NAME);
descriptors.add(COLLECTION_NAME);
descriptors.add(QUERY);
descriptors.add(PROJECTION);
descriptors.add(SORT);
descriptors.add(LIMIT);
descriptors.add(BATCH_SIZE);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ProcessorLog logger = getLogger();
final Document query = context.getProperty(QUERY).isSet() ? Document.parse(context.getProperty(QUERY).getValue()) : null;
final Document projection = context.getProperty(PROJECTION).isSet() ? Document.parse(context.getProperty(PROJECTION).getValue()) : null;
final Document sort = context.getProperty(SORT).isSet() ? Document.parse(context.getProperty(SORT).getValue()) : null;
final MongoCollection<Document> collection = getCollection(context);
try {
final FindIterable<Document> it = query != null ? collection.find(query) : collection.find();
if (projection != null) {
it.projection(projection);
}
if (sort != null) {
it.sort(sort);
}
if (context.getProperty(LIMIT).isSet()) {
it.limit(context.getProperty(LIMIT).asInteger());
}
if (context.getProperty(BATCH_SIZE).isSet()) {
it.batchSize(context.getProperty(BATCH_SIZE).asInteger());
}
final MongoCursor<Document> cursor = it.iterator();
try {
FlowFile flowFile = null;
while (cursor.hasNext()) {
flowFile = session.create();
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
IOUtils.write(cursor.next().toJson(), out);
}
});
session.getProvenanceReporter().receive(flowFile, context.getProperty(URI).getValue());
session.transfer(flowFile, REL_SUCCESS);
}
session.commit();
} finally {
cursor.close();
}
} catch (final RuntimeException e) {
context.yield();
session.rollback();
logger.error("Failed to execute query {} due to {}", new Object[] { query, e }, e);
}
}
}

View File

@ -0,0 +1,217 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mongodb;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import org.bson.Document;
import com.mongodb.WriteConcern;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.UpdateOptions;
@EventDriven
@Tags({ "mongodb", "insert", "update", "write", "put" })
@CapabilityDescription("Writes the contents of a FlowFile to MongoDB")
public class PutMongo extends AbstractMongoProcessor {
static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("All FlowFiles that are written to MongoDB are routed to this relationship").build();
static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("All FlowFiles that cannot be written to MongoDB are routed to this relationship").build();
static final String MODE_INSERT = "insert";
static final String MODE_UPDATE = "update";
static final String WRITE_CONCERN_ACKNOWLEDGED = "ACKNOWLEDGED";
static final String WRITE_CONCERN_UNACKNOWLEDGED = "UNACKNOWLEDGED";
static final String WRITE_CONCERN_FSYNCED = "FSYNCED";
static final String WRITE_CONCERN_JOURNALED = "JOURNALED";
static final String WRITE_CONCERN_REPLICA_ACKNOWLEDGED = "REPLICA_ACKNOWLEDGED";
static final String WRITE_CONCERN_MAJORITY = "MAJORITY";
static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
.name("Mode")
.description("Indicates whether the processor should insert or update content")
.required(true)
.allowableValues(MODE_INSERT, MODE_UPDATE)
.defaultValue(MODE_INSERT)
.build();
static final PropertyDescriptor UPSERT = new PropertyDescriptor.Builder()
.name("Upsert")
.description("When true, inserts a document if no document matches the update query criteria; this property is valid only when using update mode, "
+ "otherwise it is ignored")
.required(true)
.allowableValues("true", "false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.defaultValue("false")
.build();
static final PropertyDescriptor UPDATE_QUERY_KEY = new PropertyDescriptor.Builder()
.name("Update Query Key")
.description("Key name used to build the update query criteria; this property is valid only when using update mode, "
+ "otherwise it is ignored")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("_id")
.build();
static final PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder()
.name("Write Concern")
.description("The write concern to use")
.required(true)
.allowableValues(WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_FSYNCED, WRITE_CONCERN_JOURNALED,
WRITE_CONCERN_REPLICA_ACKNOWLEDGED, WRITE_CONCERN_MAJORITY)
.defaultValue(WRITE_CONCERN_ACKNOWLEDGED)
.build();
static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
.name("Character Set")
.description("The Character Set in which the data is encoded")
.required(true)
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.defaultValue("UTF-8")
.build();
private final List<PropertyDescriptor> descriptors;
private final Set<Relationship> relationships;
public PutMongo() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(URI);
descriptors.add(DATABASE_NAME);
descriptors.add(COLLECTION_NAME);
descriptors.add(MODE);
descriptors.add(UPSERT);
descriptors.add(UPDATE_QUERY_KEY);
descriptors.add(WRITE_CONCERN);
descriptors.add(CHARACTER_SET);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ProcessorLog logger = getLogger();
final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
final String mode = context.getProperty(MODE).getValue();
final WriteConcern writeConcern = getWriteConcern(context);
final MongoCollection<Document> collection = getCollection(context).withWriteConcern(writeConcern);
try {
// Read the contents of the FlowFile into a byte array
final byte[] content = new byte[(int) flowFile.getSize()];
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.fillBuffer(in, content, true);
}
});
// parse
final Document doc = Document.parse(new String(content, charset));
if (MODE_INSERT.equalsIgnoreCase(mode)) {
collection.insertOne(doc);
logger.info("inserted {} into MongoDB", new Object[] { flowFile });
} else {
// update
final boolean upsert = context.getProperty(UPSERT).asBoolean();
final String updateKey = context.getProperty(UPDATE_QUERY_KEY).getValue();
final Document query = new Document(updateKey, doc.get(updateKey));
collection.replaceOne(query, doc, new UpdateOptions().upsert(upsert));
logger.info("updated {} into MongoDB", new Object[] { flowFile });
}
session.getProvenanceReporter().send(flowFile, context.getProperty(URI).getValue());
session.transfer(flowFile, REL_SUCCESS);
} catch (Exception e) {
e.printStackTrace();
logger.error("Failed to insert {} into MongoDB due to {}", new Object[] { flowFile, e }, e);
session.transfer(flowFile, REL_FAILURE);
context.yield();
}
}
protected WriteConcern getWriteConcern(final ProcessContext context) {
final String writeConcernProperty = context.getProperty(WRITE_CONCERN).getValue();
WriteConcern writeConcern = null;
switch (writeConcernProperty) {
case WRITE_CONCERN_ACKNOWLEDGED:
writeConcern = WriteConcern.ACKNOWLEDGED;
break;
case WRITE_CONCERN_UNACKNOWLEDGED:
writeConcern = WriteConcern.UNACKNOWLEDGED;
break;
case WRITE_CONCERN_FSYNCED:
writeConcern = WriteConcern.FSYNCED;
break;
case WRITE_CONCERN_JOURNALED:
writeConcern = WriteConcern.JOURNALED;
break;
case WRITE_CONCERN_REPLICA_ACKNOWLEDGED:
writeConcern = WriteConcern.REPLICA_ACKNOWLEDGED;
break;
case WRITE_CONCERN_MAJORITY:
writeConcern = WriteConcern.MAJORITY;
break;
default:
writeConcern = WriteConcern.ACKNOWLEDGED;
}
return writeConcern;
}
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.mongodb.GetMongo
org.apache.nifi.processors.mongodb.PutMongo

View File

@ -0,0 +1,201 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.mongodb;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.bson.Document;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.Lists;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
public class GetMongoTest {
private static final String MONGO_URI = "mongodb://localhost";
private static final String DB_NAME = GetMongoTest.class.getSimpleName().toLowerCase();
private static final String COLLECTION_NAME = "test";
private static final List<Document> DOCUMENTS = Lists.newArrayList(
new Document("_id", "doc_1").append("a", 1).append("b", 2).append("c", 3),
new Document("_id", "doc_2").append("a", 1).append("b", 2).append("c", 4),
new Document("_id", "doc_3").append("a", 1).append("b", 3)
);
private TestRunner runner;
private MongoClient mongoClient;
@Before
public void setup() {
runner = TestRunners.newTestRunner(GetMongo.class);
runner.setProperty(AbstractMongoProcessor.URI, MONGO_URI);
runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DB_NAME);
runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME);
mongoClient = new MongoClient(new MongoClientURI(MONGO_URI));
MongoCollection<Document> collection = mongoClient.getDatabase(DB_NAME).getCollection(COLLECTION_NAME);
collection.insertMany(DOCUMENTS);
}
@After
public void teardown() {
runner = null;
mongoClient.getDatabase(DB_NAME).drop();
}
@Test
public void testValidators() {
TestRunner runner = TestRunners.newTestRunner(GetMongo.class);
Collection<ValidationResult> results;
ProcessContext pc;
// missing uri, db, collection
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
results = new HashSet<>();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(3, results.size());
Iterator<ValidationResult> it = results.iterator();
Assert.assertTrue(it.next().toString().contains("is invalid because Mongo URI is required"));
Assert.assertTrue(it.next().toString().contains("is invalid because Mongo Database Name is required"));
Assert.assertTrue(it.next().toString().contains("is invalid because Mongo Collection Name is required"));
// missing query - is ok
runner.setProperty(AbstractMongoProcessor.URI, MONGO_URI);
runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DB_NAME);
runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME);
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
results = new HashSet<>();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(0, results.size());
// invalid query
runner.setProperty(GetMongo.QUERY, "{a: x,y,z}");
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
results = new HashSet<>();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(1, results.size());
Assert.assertTrue(results.iterator().next().toString().matches("'Query' .* is invalid because org.bson.json.JsonParseException"));
// invalid projection
runner.setProperty(GetMongo.QUERY, "{a: 1}");
runner.setProperty(GetMongo.PROJECTION, "{a: x,y,z}");
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
results = new HashSet<>();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(1, results.size());
Assert.assertTrue(results.iterator().next().toString().matches("'Projection' .* is invalid because org.bson.json.JsonParseException"));
// invalid sort
runner.removeProperty(GetMongo.PROJECTION);
runner.setProperty(GetMongo.SORT, "{a: x,y,z}");
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
results = new HashSet<>();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(1, results.size());
Assert.assertTrue(results.iterator().next().toString().matches("'Sort' .* is invalid because org.bson.json.JsonParseException"));
}
@Test
public void testReadOneDocument() throws Exception {
runner.setProperty(GetMongo.QUERY, "{a: 1, b: 3}");
runner.run();
runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
flowFiles.get(0).assertContentEquals(DOCUMENTS.get(2).toJson());
}
@Test
public void testReadMultipleDocuments() throws Exception {
runner.setProperty(GetMongo.QUERY, "{a: {$exists: true}}");
runner.run();
runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 3);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
for (int i=0; i < flowFiles.size(); i++) {
flowFiles.get(i).assertContentEquals(DOCUMENTS.get(i).toJson());
}
}
@Test
public void testProjection() throws Exception {
runner.setProperty(GetMongo.QUERY, "{a: 1, b: 3}");
runner.setProperty(GetMongo.PROJECTION, "{_id: 0, a: 1}");
runner.run();
runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
Document expected = new Document("a", 1);
flowFiles.get(0).assertContentEquals(expected.toJson());
}
@Test
public void testSort() throws Exception {
runner.setProperty(GetMongo.QUERY, "{a: {$exists: true}}");
runner.setProperty(GetMongo.SORT, "{a: -1, b: -1, c: 1}");
runner.run();
runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 3);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
flowFiles.get(0).assertContentEquals(DOCUMENTS.get(2).toJson());
flowFiles.get(1).assertContentEquals(DOCUMENTS.get(0).toJson());
flowFiles.get(2).assertContentEquals(DOCUMENTS.get(1).toJson());
}
@Test
public void testLimit() throws Exception {
runner.setProperty(GetMongo.QUERY, "{a: {$exists: true}}");
runner.setProperty(GetMongo.LIMIT, "1");
runner.run();
runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
flowFiles.get(0).assertContentEquals(DOCUMENTS.get(0).toJson());
}
}

View File

@ -0,0 +1,254 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mongodb;
import static com.google.common.base.Charsets.UTF_8;
import static org.junit.Assert.assertEquals;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.bson.Document;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.Lists;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
public class PutMongoTest {
private static final String MONGO_URI = "mongodb://localhost";
private static final String DATABASE_NAME = PutMongoTest.class.getSimpleName().toLowerCase();
private static final String COLLECTION_NAME = "test";
private static final List<Document> DOCUMENTS = Lists.newArrayList(
new Document("_id", "doc_1").append("a", 1).append("b", 2).append("c", 3),
new Document("_id", "doc_2").append("a", 1).append("b", 2).append("c", 4),
new Document("_id", "doc_3").append("a", 1).append("b", 3)
);
private TestRunner runner;
private MongoClient mongoClient;
private MongoCollection<Document> collection;
@Before
public void setup() {
runner = TestRunners.newTestRunner(PutMongo.class);
runner.setProperty(AbstractMongoProcessor.URI, MONGO_URI);
runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DATABASE_NAME);
runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME);
mongoClient = new MongoClient(new MongoClientURI(MONGO_URI));
collection = mongoClient.getDatabase(DATABASE_NAME).getCollection(COLLECTION_NAME);
}
@After
public void teardown() {
runner = null;
mongoClient.getDatabase(DATABASE_NAME).drop();
}
private byte[] documentToByteArray(Document doc) {
return doc.toJson().getBytes(UTF_8);
}
@Test
public void testValidators() {
TestRunner runner = TestRunners.newTestRunner(PutMongo.class);
Collection<ValidationResult> results;
ProcessContext pc;
// missing uri, db, collection
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
results = new HashSet<>();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(3, results.size());
Iterator<ValidationResult> it = results.iterator();
Assert.assertTrue(it.next().toString().contains("is invalid because Mongo URI is required"));
Assert.assertTrue(it.next().toString().contains("is invalid because Mongo Database Name is required"));
Assert.assertTrue(it.next().toString().contains("is invalid because Mongo Collection Name is required"));
// invalid write concern
runner.setProperty(AbstractMongoProcessor.URI, MONGO_URI);
runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DATABASE_NAME);
runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME);
runner.setProperty(PutMongo.WRITE_CONCERN, "xyz");
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
results = new HashSet<>();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(1, results.size());
Assert.assertTrue(results.iterator().next().toString().matches("'Write Concern' .* is invalid because Given value not found in allowed set .*"));
// valid write concern
runner.setProperty(PutMongo.WRITE_CONCERN, PutMongo.WRITE_CONCERN_UNACKNOWLEDGED);
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
results = new HashSet<>();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(0, results.size());
}
@Test
public void testInsertOne() throws Exception {
Document doc = DOCUMENTS.get(0);
byte[] bytes = documentToByteArray(doc);
runner.enqueue(bytes);
runner.run();
runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 1);
MockFlowFile out = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0);
out.assertContentEquals(bytes);
// verify 1 doc inserted into the collection
assertEquals(1, collection.count());
assertEquals(doc, collection.find().first());
}
@Test
public void testInsertMany() throws Exception {
for (Document doc : DOCUMENTS) {
runner.enqueue(documentToByteArray(doc));
}
runner.run(3);
runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 3);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS);
for (int i=0; i < flowFiles.size(); i++) {
flowFiles.get(i).assertContentEquals(DOCUMENTS.get(i).toJson());
}
// verify 3 docs inserted into the collection
assertEquals(3, collection.count());
}
@Test
public void testInsertWithDuplicateKey() throws Exception {
// pre-insert one document
collection.insertOne(DOCUMENTS.get(0));
for (Document doc : DOCUMENTS) {
runner.enqueue(documentToByteArray(doc));
}
runner.run(3);
// first doc failed, other 2 succeeded
runner.assertTransferCount(PutMongo.REL_FAILURE, 1);
MockFlowFile out = runner.getFlowFilesForRelationship(PutMongo.REL_FAILURE).get(0);
out.assertContentEquals(documentToByteArray(DOCUMENTS.get(0)));
runner.assertTransferCount(PutMongo.REL_SUCCESS, 2);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS);
for (int i=0; i < flowFiles.size(); i++) {
flowFiles.get(i).assertContentEquals(DOCUMENTS.get(i+1).toJson());
}
// verify 2 docs inserted into the collection for a total of 3
assertEquals(3, collection.count());
}
/**
* Verifies that 'update' does not insert if 'upsert' if false.
* @see #testUpsert()
*/
@Test
public void testUpdateDoesNotInsert() throws Exception {
Document doc = DOCUMENTS.get(0);
byte[] bytes = documentToByteArray(doc);
runner.setProperty(PutMongo.MODE, "update");
runner.enqueue(bytes);
runner.run();
runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 1);
MockFlowFile out = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0);
out.assertContentEquals(bytes);
// nothing was in collection, so nothing to update since upsert defaults to false
assertEquals(0, collection.count());
}
/**
* Verifies that 'update' does insert if 'upsert' is true.
* @see #testUpdateDoesNotInsert()
*/
@Test
public void testUpsert() throws Exception {
Document doc = DOCUMENTS.get(0);
byte[] bytes = documentToByteArray(doc);
runner.setProperty(PutMongo.MODE, "update");
runner.setProperty(PutMongo.UPSERT, "true");
runner.enqueue(bytes);
runner.run();
runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 1);
MockFlowFile out = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0);
out.assertContentEquals(bytes);
// verify 1 doc inserted into the collection
assertEquals(1, collection.count());
assertEquals(doc, collection.find().first());
}
@Test
public void testUpdate() throws Exception {
Document doc = DOCUMENTS.get(0);
// pre-insert document
collection.insertOne(doc);
// modify the object
doc.put("abc", "123");
doc.put("xyz", "456");
doc.remove("c");
byte[] bytes = documentToByteArray(doc);
runner.setProperty(PutMongo.MODE, "update");
runner.enqueue(bytes);
runner.run();
runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 1);
MockFlowFile out = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0);
out.assertContentEquals(bytes);
assertEquals(1, collection.count());
assertEquals(doc, collection.find().first());
}
}

View File

@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mongodb-bundle</artifactId>
<version>0.1.1-incubating-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
<module>nifi-mongodb-processors</module>
<module>nifi-mongodb-nar</module>
</modules>
</project>

View File

@ -33,13 +33,14 @@
<module>nifi-standard-services</module>
<module>nifi-update-attribute-bundle</module>
<module>nifi-kafka-bundle</module>
<module>nifi-kite-bundle</module>
<module>nifi-kite-bundle</module>
<module>nifi-solr-bundle</module>
<module>nifi-aws-bundle</module>
<module>nifi-social-media-bundle</module>
<module>nifi-geo-bundle</module>
<module>nifi-hl7-bundle</module>
<module>nifi-language-translation-bundle</module>
<module>nifi-aws-bundle</module>
<module>nifi-social-media-bundle</module>
<module>nifi-geo-bundle</module>
<module>nifi-hl7-bundle</module>
<module>nifi-language-translation-bundle</module>
<module>nifi-mongodb-bundle</module>
</modules>
<dependencyManagement>
<dependencies>
@ -66,7 +67,7 @@
<version>0.1.1-incubating-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-http-context-map-api</artifactId>
<version>0.1.1-incubating-SNAPSHOT</version>
@ -87,7 +88,7 @@
<artifactId>nifi-ssl-context-service</artifactId>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-http-context-map</artifactId>
<version>0.1.1-incubating-SNAPSHOT</version>

View File

@ -748,6 +748,12 @@
<version>0.1.1-incubating-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mongodb-nar</artifactId>
<version>0.1.1-incubating-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-solr-nar</artifactId>