NIFI-4588 Added the ability to use update operators like $push and $set to PutMongo.

Removed commented out code

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2259
This commit is contained in:
Mike Thomsen 2017-11-09 08:54:41 -05:00 committed by Matthew Burgess
parent 387dce5ad0
commit fe3f288944
2 changed files with 71 additions and 16 deletions

View File

@ -16,20 +16,17 @@
*/
package org.apache.nifi.processors.mongodb;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import com.mongodb.BasicDBObject;
import com.mongodb.WriteConcern;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.util.JSON;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
@ -42,9 +39,15 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import org.bson.Document;
import com.mongodb.WriteConcern;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.UpdateOptions;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@EventDriven
@Tags({ "mongodb", "insert", "update", "write", "put" })
@ -59,6 +62,9 @@ public class PutMongo extends AbstractMongoProcessor {
static final String MODE_INSERT = "insert";
static final String MODE_UPDATE = "update";
static final AllowableValue UPDATE_WITH_DOC = new AllowableValue("doc", "With whole document");
static final AllowableValue UPDATE_WITH_OPERATORS = new AllowableValue("operators", "With operators enabled");
static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
.name("Mode")
.description("Indicates whether the processor should insert or update content")
@ -83,6 +89,15 @@ public class PutMongo extends AbstractMongoProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("_id")
.build();
static final PropertyDescriptor UPDATE_MODE = new PropertyDescriptor.Builder()
.displayName("Update Mode")
.name("put-mongo-update-mode")
.required(true)
.allowableValues(UPDATE_WITH_DOC, UPDATE_WITH_OPERATORS)
.defaultValue(UPDATE_WITH_DOC.getValue())
.description("Choose an update mode. You can either supply a JSON document to use as a direct replacement " +
"or specify a document that contains update operators like $set and $unset")
.build();
static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
.name("Character Set")
.description("The Character Set in which the data is encoded")
@ -100,6 +115,7 @@ public class PutMongo extends AbstractMongoProcessor {
_propertyDescriptors.add(MODE);
_propertyDescriptors.add(UPSERT);
_propertyDescriptors.add(UPDATE_QUERY_KEY);
_propertyDescriptors.add(UPDATE_MODE);
_propertyDescriptors.add(WRITE_CONCERN);
_propertyDescriptors.add(CHARACTER_SET);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
@ -131,6 +147,7 @@ public class PutMongo extends AbstractMongoProcessor {
final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
final String mode = context.getProperty(MODE).getValue();
final String updateMode = context.getProperty(UPDATE_MODE).getValue();
final WriteConcern writeConcern = getWriteConcern(context);
final MongoCollection<Document> collection = getCollection(context, flowFile).withWriteConcern(writeConcern);
@ -146,18 +163,25 @@ public class PutMongo extends AbstractMongoProcessor {
});
// parse
final Document doc = Document.parse(new String(content, charset));
final Object doc = (mode.equals(MODE_INSERT) || (mode.equals(MODE_UPDATE) && updateMode.equals(UPDATE_WITH_DOC.getValue())))
? Document.parse(new String(content, charset)) : JSON.parse(new String(content, charset));
if (MODE_INSERT.equalsIgnoreCase(mode)) {
collection.insertOne(doc);
collection.insertOne((Document)doc);
logger.info("inserted {} into MongoDB", new Object[] { flowFile });
} else {
// update
final boolean upsert = context.getProperty(UPSERT).asBoolean();
final String updateKey = context.getProperty(UPDATE_QUERY_KEY).getValue();
final Document query = new Document(updateKey, doc.get(updateKey));
final Document query = new Document(updateKey, ((Map)doc).get(updateKey));
collection.replaceOne(query, doc, new UpdateOptions().upsert(upsert));
if (updateMode.equals(UPDATE_WITH_DOC.getValue())) {
collection.replaceOne(query, (Document)doc, new UpdateOptions().upsert(upsert));
} else {
BasicDBObject update = (BasicDBObject)doc;
update.remove("_id");
collection.updateOne(query, update, new UpdateOptions().upsert(upsert));
}
logger.info("updated {} into MongoDB", new Object[] { flowFile });
}

View File

@ -256,4 +256,35 @@ public class PutMongoTest {
assertEquals(1, collection.count());
assertEquals(doc, collection.find().first());
}
@Test
public void testUpsertWithOperators() throws Exception {
String upsert = "{\n" +
" \"_id\": \"Test\",\n" +
" \"$push\": {\n" +
" \"testArr\": { \"msg\": \"Hi\" }\n" +
" }\n" +
"}";
runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_OPERATORS);
runner.setProperty(PutMongo.MODE, "update");
runner.setProperty(PutMongo.UPSERT, "true");
for (int x = 0; x < 3; x++) {
runner.enqueue(upsert.getBytes());
}
runner.run(3, true, true);
runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
runner.assertTransferCount(PutMongo.REL_SUCCESS, 3);
Document query = new Document("_id", "Test");
Document result = collection.find(query).first();
List array = (List)result.get("testArr");
Assert.assertNotNull("Array was empty", array);
Assert.assertEquals("Wrong size", array.size(), 3);
for (int index = 0; index < array.size(); index++) {
Document doc = (Document)array.get(index);
String msg = doc.getString("msg");
Assert.assertNotNull("Msg was null", msg);
Assert.assertEquals("Msg had wrong value", msg, "Hi");
}
}
}