diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java index cd4263594b..03d3e0c814 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java @@ -16,20 +16,17 @@ */ package org.apache.nifi.processors.mongodb; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - +import com.mongodb.BasicDBObject; +import com.mongodb.WriteConcern; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.model.UpdateOptions; +import com.mongodb.util.JSON; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; @@ -42,9 +39,15 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.stream.io.StreamUtils; import org.bson.Document; -import com.mongodb.WriteConcern; -import com.mongodb.client.MongoCollection; -import com.mongodb.client.model.UpdateOptions; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; @EventDriven @Tags({ "mongodb", "insert", "update", "write", "put" }) @@ -59,6 +62,9 @@ public class PutMongo extends AbstractMongoProcessor { static final String MODE_INSERT = "insert"; static final String MODE_UPDATE = "update"; + static final AllowableValue UPDATE_WITH_DOC = new AllowableValue("doc", "With whole document"); + static final AllowableValue UPDATE_WITH_OPERATORS = new AllowableValue("operators", "With operators enabled"); + static final PropertyDescriptor MODE = new PropertyDescriptor.Builder() .name("Mode") .description("Indicates whether the processor should insert or update content") @@ -83,6 +89,15 @@ public class PutMongo extends AbstractMongoProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .defaultValue("_id") .build(); + static final PropertyDescriptor UPDATE_MODE = new PropertyDescriptor.Builder() + .displayName("Update Mode") + .name("put-mongo-update-mode") + .required(true) + .allowableValues(UPDATE_WITH_DOC, UPDATE_WITH_OPERATORS) + .defaultValue(UPDATE_WITH_DOC.getValue()) + .description("Choose an update mode. You can either supply a JSON document to use as a direct replacement " + + "or specify a document that contains update operators like $set and $unset") + .build(); static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder() .name("Character Set") .description("The Character Set in which the data is encoded") @@ -100,6 +115,7 @@ public class PutMongo extends AbstractMongoProcessor { _propertyDescriptors.add(MODE); _propertyDescriptors.add(UPSERT); _propertyDescriptors.add(UPDATE_QUERY_KEY); + _propertyDescriptors.add(UPDATE_MODE); _propertyDescriptors.add(WRITE_CONCERN); _propertyDescriptors.add(CHARACTER_SET); propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); @@ -131,6 +147,7 @@ public class PutMongo extends AbstractMongoProcessor { final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue()); final String mode = context.getProperty(MODE).getValue(); + final String updateMode = context.getProperty(UPDATE_MODE).getValue(); final WriteConcern writeConcern = getWriteConcern(context); final MongoCollection collection = getCollection(context, flowFile).withWriteConcern(writeConcern); @@ -146,18 +163,25 @@ public class PutMongo extends AbstractMongoProcessor { }); // parse - final Document doc = Document.parse(new String(content, charset)); + final Object doc = (mode.equals(MODE_INSERT) || (mode.equals(MODE_UPDATE) && updateMode.equals(UPDATE_WITH_DOC.getValue()))) + ? Document.parse(new String(content, charset)) : JSON.parse(new String(content, charset)); if (MODE_INSERT.equalsIgnoreCase(mode)) { - collection.insertOne(doc); + collection.insertOne((Document)doc); logger.info("inserted {} into MongoDB", new Object[] { flowFile }); } else { // update final boolean upsert = context.getProperty(UPSERT).asBoolean(); final String updateKey = context.getProperty(UPDATE_QUERY_KEY).getValue(); - final Document query = new Document(updateKey, doc.get(updateKey)); + final Document query = new Document(updateKey, ((Map)doc).get(updateKey)); - collection.replaceOne(query, doc, new UpdateOptions().upsert(upsert)); + if (updateMode.equals(UPDATE_WITH_DOC.getValue())) { + collection.replaceOne(query, (Document)doc, new UpdateOptions().upsert(upsert)); + } else { + BasicDBObject update = (BasicDBObject)doc; + update.remove("_id"); + collection.updateOne(query, update, new UpdateOptions().upsert(upsert)); + } logger.info("updated {} into MongoDB", new Object[] { flowFile }); } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java index 10f81d1e03..8828333a86 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java @@ -256,4 +256,35 @@ public class PutMongoTest { assertEquals(1, collection.count()); assertEquals(doc, collection.find().first()); } + + @Test + public void testUpsertWithOperators() throws Exception { + String upsert = "{\n" + + " \"_id\": \"Test\",\n" + + " \"$push\": {\n" + + " \"testArr\": { \"msg\": \"Hi\" }\n" + + " }\n" + + "}"; + runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_OPERATORS); + runner.setProperty(PutMongo.MODE, "update"); + runner.setProperty(PutMongo.UPSERT, "true"); + for (int x = 0; x < 3; x++) { + runner.enqueue(upsert.getBytes()); + } + runner.run(3, true, true); + runner.assertTransferCount(PutMongo.REL_FAILURE, 0); + runner.assertTransferCount(PutMongo.REL_SUCCESS, 3); + + Document query = new Document("_id", "Test"); + Document result = collection.find(query).first(); + List array = (List)result.get("testArr"); + Assert.assertNotNull("Array was empty", array); + Assert.assertEquals("Wrong size", array.size(), 3); + for (int index = 0; index < array.size(); index++) { + Document doc = (Document)array.get(index); + String msg = doc.getString("msg"); + Assert.assertNotNull("Msg was null", msg); + Assert.assertEquals("Msg had wrong value", msg, "Hi"); + } + } }