NIFI-4989 Made PutMongo able to use nested lookup keys, a query param and multiple lookup keys.

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2560.
This commit is contained in:
Mike Thomsen 2018-03-16 21:51:19 -04:00 committed by Pierre Villard
parent 95dd1ebffc
commit 66590b78f5
5 changed files with 308 additions and 36 deletions

View File

@ -30,6 +30,8 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
@ -38,11 +40,13 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StringUtils;
import org.bson.Document; import org.bson.Document;
import org.bson.types.ObjectId; import org.bson.types.ObjectId;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -85,20 +89,29 @@ public class PutMongo extends AbstractMongoProcessor {
static final PropertyDescriptor UPDATE_QUERY_KEY = new PropertyDescriptor.Builder() static final PropertyDescriptor UPDATE_QUERY_KEY = new PropertyDescriptor.Builder()
.name("Update Query Key") .name("Update Query Key")
.description("Key name used to build the update query criteria; this property is valid only when using update mode, " .description("Key name used to build the update query criteria; this property is valid only when using update mode, "
+ "otherwise it is ignored") + "otherwise it is ignored. Example: _id")
.required(true) .required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.defaultValue("_id") .expressionLanguageSupported(true)
.build(); .build();
static final PropertyDescriptor UPDATE_QUERY = new PropertyDescriptor.Builder()
.name("putmongo-update-query")
.displayName("Update Query")
.description("Specify a full MongoDB query to be used for the lookup query to do an update/upsert.")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(true)
.build();
static final PropertyDescriptor UPDATE_MODE = new PropertyDescriptor.Builder() static final PropertyDescriptor UPDATE_MODE = new PropertyDescriptor.Builder()
.displayName("Update Mode") .displayName("Update Mode")
.name("put-mongo-update-mode") .name("put-mongo-update-mode")
.required(true) .required(true)
.allowableValues(UPDATE_WITH_DOC, UPDATE_WITH_OPERATORS) .allowableValues(UPDATE_WITH_DOC, UPDATE_WITH_OPERATORS)
.defaultValue(UPDATE_WITH_DOC.getValue()) .defaultValue(UPDATE_WITH_DOC.getValue())
.description("Choose an update mode. You can either supply a JSON document to use as a direct replacement " + .description("Choose an update mode. You can either supply a JSON document to use as a direct replacement " +
"or specify a document that contains update operators like $set and $unset") "or specify a document that contains update operators like $set and $unset")
.build(); .build();
static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder() static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
.name("Character Set") .name("Character Set")
.description("The Character Set in which the data is encoded") .description("The Character Set in which the data is encoded")
@ -116,6 +129,7 @@ public class PutMongo extends AbstractMongoProcessor {
_propertyDescriptors.add(MODE); _propertyDescriptors.add(MODE);
_propertyDescriptors.add(UPSERT); _propertyDescriptors.add(UPSERT);
_propertyDescriptors.add(UPDATE_QUERY_KEY); _propertyDescriptors.add(UPDATE_QUERY_KEY);
_propertyDescriptors.add(UPDATE_QUERY);
_propertyDescriptors.add(UPDATE_MODE); _propertyDescriptors.add(UPDATE_MODE);
_propertyDescriptors.add(WRITE_CONCERN); _propertyDescriptors.add(WRITE_CONCERN);
_propertyDescriptors.add(CHARACTER_SET); _propertyDescriptors.add(CHARACTER_SET);
@ -137,6 +151,30 @@ public class PutMongo extends AbstractMongoProcessor {
return propertyDescriptors; return propertyDescriptors;
} }
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
List<ValidationResult> problems = new ArrayList<>();
final boolean queryKey = validationContext.getProperty(UPDATE_QUERY_KEY).isSet();
final boolean query = validationContext.getProperty(UPDATE_QUERY).isSet();
if (queryKey && query) {
problems.add(new ValidationResult.Builder()
.valid(false)
.explanation("Both update query key and update query cannot be set at the same time.")
.build()
);
} else if (!queryKey && !query) {
problems.add(new ValidationResult.Builder()
.valid(false)
.explanation("Either the update query key or the update query field must be set.")
.build()
);
}
return problems;
}
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final FlowFile flowFile = session.get(); final FlowFile flowFile = session.get();
@ -168,15 +206,17 @@ public class PutMongo extends AbstractMongoProcessor {
} else { } else {
// update // update
final boolean upsert = context.getProperty(UPSERT).asBoolean(); final boolean upsert = context.getProperty(UPSERT).asBoolean();
final String updateKey = context.getProperty(UPDATE_QUERY_KEY).getValue(); final String updateKey = context.getProperty(UPDATE_QUERY_KEY).evaluateAttributeExpressions(flowFile).getValue();
final String filterQuery = context.getProperty(UPDATE_QUERY).evaluateAttributeExpressions(flowFile).getValue();
final Document query;
Object keyVal = ((Map)doc).get(updateKey); if (!StringUtils.isBlank(updateKey)) {
if (updateKey.equals("_id") && ObjectId.isValid(((String)keyVal))) { query = parseUpdateKey(updateKey, (Map)doc);
keyVal = new ObjectId((String) keyVal); removeUpdateKeys(updateKey, (Map)doc);
} else {
query = Document.parse(filterQuery);
} }
final Document query = new Document(updateKey, keyVal);
if (updateMode.equals(UPDATE_WITH_DOC.getValue())) { if (updateMode.equals(UPDATE_WITH_DOC.getValue())) {
collection.replaceOne(query, (Document)doc, new UpdateOptions().upsert(upsert)); collection.replaceOne(query, (Document)doc, new UpdateOptions().upsert(upsert));
} else { } else {
@ -196,6 +236,33 @@ public class PutMongo extends AbstractMongoProcessor {
} }
} }
private void removeUpdateKeys(String updateKeyParam, Map doc) {
String[] parts = updateKeyParam.split(",[\\s]*");
for (String part : parts) {
if (part.contains(".")) {
doc.remove(part);
}
}
}
private Document parseUpdateKey(String updateKey, Map doc) {
Document retVal;
if (updateKey.equals("_id") && ObjectId.isValid(((String) doc.get(updateKey)))) {
retVal = new Document("_id", new ObjectId((String) doc.get(updateKey)));
} else if (updateKey.contains(",")) {
String[] parts = updateKey.split(",[\\s]*");
retVal = new Document();
for (String part : parts) {
retVal.append(part, doc.get(part));
}
} else {
retVal = new Document(updateKey, doc.get(updateKey));
}
return retVal;
}
protected WriteConcern getWriteConcern(final ProcessContext context) { protected WriteConcern getWriteConcern(final ProcessContext context) {
final String writeConcernProperty = context.getProperty(WRITE_CONCERN).getValue(); final String writeConcernProperty = context.getProperty(WRITE_CONCERN).getValue();
WriteConcern writeConcern = null; WriteConcern writeConcern = null;

View File

@ -19,6 +19,7 @@
package org.apache.nifi.processors.mongodb; package org.apache.nifi.processors.mongodb;
import org.apache.nifi.util.TestRunner;
import org.bson.Document; import org.bson.Document;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -40,7 +41,7 @@ public class DeleteMongoIT extends MongoWriteTestBase {
super.teardown(); super.teardown();
} }
private void testOne(String query, Map<String, String> attrs) { private void testOne(TestRunner runner, String query, Map<String, String> attrs) {
runner.enqueue(query, attrs); runner.enqueue(query, attrs);
runner.run(1, true); runner.run(1, true);
runner.assertTransferCount(DeleteMongo.REL_FAILURE, 0); runner.assertTransferCount(DeleteMongo.REL_FAILURE, 0);
@ -52,18 +53,19 @@ public class DeleteMongoIT extends MongoWriteTestBase {
@Test @Test
public void testDeleteOne() { public void testDeleteOne() {
TestRunner runner = init(DeleteMongo.class);
String query = "{ \"_id\": \"doc_1\" }"; String query = "{ \"_id\": \"doc_1\" }";
runner.setProperty(DeleteMongo.DELETE_MODE, DeleteMongo.DELETE_ONE); runner.setProperty(DeleteMongo.DELETE_MODE, DeleteMongo.DELETE_ONE);
testOne(query, new HashMap<>()); testOne(runner, query, new HashMap<>());
Map<String, String> attrs = new HashMap<>(); Map<String, String> attrs = new HashMap<>();
attrs.put("mongodb.delete.mode", "one"); attrs.put("mongodb.delete.mode", "one");
runner.setProperty(DeleteMongo.DELETE_MODE, DeleteMongo.DELETE_ATTR); runner.setProperty(DeleteMongo.DELETE_MODE, DeleteMongo.DELETE_ATTR);
query = "{ \"_id\": \"doc_2\" }"; query = "{ \"_id\": \"doc_2\" }";
runner.clearTransferState(); runner.clearTransferState();
testOne(query, attrs); testOne(runner, query, attrs);
} }
private void manyTest(String query, Map<String, String> attrs) { private void manyTest(TestRunner runner, String query, Map<String, String> attrs) {
runner.enqueue(query, attrs); runner.enqueue(query, attrs);
runner.run(1, true); runner.run(1, true);
runner.assertTransferCount(DeleteMongo.REL_FAILURE, 0); runner.assertTransferCount(DeleteMongo.REL_FAILURE, 0);
@ -77,13 +79,14 @@ public class DeleteMongoIT extends MongoWriteTestBase {
@Test @Test
public void testDeleteMany() { public void testDeleteMany() {
TestRunner runner = init(DeleteMongo.class);
String query = "{\n" + String query = "{\n" +
"\t\"_id\": {\n" + "\t\"_id\": {\n" +
"\t\t\"$in\": [\"doc_1\", \"doc_2\"]\n" + "\t\t\"$in\": [\"doc_1\", \"doc_2\"]\n" +
"\t}\n" + "\t}\n" +
"}"; "}";
runner.setProperty(DeleteMongo.DELETE_MODE, DeleteMongo.DELETE_MANY); runner.setProperty(DeleteMongo.DELETE_MODE, DeleteMongo.DELETE_MANY);
manyTest(query, new HashMap<>()); manyTest(runner, query, new HashMap<>());
runner.setProperty(DeleteMongo.DELETE_MODE, DeleteMongo.DELETE_ATTR); runner.setProperty(DeleteMongo.DELETE_MODE, DeleteMongo.DELETE_ATTR);
Map<String, String> attrs = new HashMap<>(); Map<String, String> attrs = new HashMap<>();
@ -91,11 +94,12 @@ public class DeleteMongoIT extends MongoWriteTestBase {
collection.drop(); collection.drop();
collection.insertMany(DOCUMENTS); collection.insertMany(DOCUMENTS);
runner.clearTransferState(); runner.clearTransferState();
manyTest(query, attrs); manyTest(runner, query, attrs);
} }
@Test @Test
public void testFailOnNoDeleteOptions() { public void testFailOnNoDeleteOptions() {
TestRunner runner = init(DeleteMongo.class);
String query = "{ \"_id\": \"doc_4\"} "; String query = "{ \"_id\": \"doc_4\"} ";
runner.enqueue(query); runner.enqueue(query);
runner.run(1, true); runner.run(1, true);

View File

@ -38,28 +38,27 @@ public class MongoWriteTestBase {
new Document("_id", "doc_3").append("a", 1).append("b", 3) new Document("_id", "doc_3").append("a", 1).append("b", 3)
); );
protected TestRunner runner;
protected MongoClient mongoClient; protected MongoClient mongoClient;
protected MongoCollection<Document> collection; protected MongoCollection<Document> collection;
public void setup(Class processor) { public void setup(Class processor) {
DATABASE_NAME = processor.getSimpleName().toLowerCase(); DATABASE_NAME = processor.getSimpleName().toLowerCase();
runner = TestRunners.newTestRunner(processor); mongoClient = new MongoClient(new MongoClientURI(MONGO_URI));
collection = mongoClient.getDatabase(DATABASE_NAME).getCollection(COLLECTION_NAME);
}
public TestRunner init(Class processor) {
TestRunner runner = TestRunners.newTestRunner(processor);
runner.setVariable("uri", MONGO_URI); runner.setVariable("uri", MONGO_URI);
runner.setVariable("db", DATABASE_NAME); runner.setVariable("db", DATABASE_NAME);
runner.setVariable("collection", COLLECTION_NAME); runner.setVariable("collection", COLLECTION_NAME);
runner.setProperty(AbstractMongoProcessor.URI, "${uri}"); runner.setProperty(AbstractMongoProcessor.URI, "${uri}");
runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}"); runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}");
runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}"); runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}");
return runner;
mongoClient = new MongoClient(new MongoClientURI(MONGO_URI));
collection = mongoClient.getDatabase(DATABASE_NAME).getCollection(COLLECTION_NAME);
} }
public void teardown() { public void teardown() {
runner = null;
mongoClient.getDatabase(DATABASE_NAME).drop(); mongoClient.getDatabase(DATABASE_NAME).drop();
} }
} }

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.nifi.processors.mongodb; package org.apache.nifi.processors.mongodb;
import com.mongodb.client.MongoCursor;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
@ -31,9 +32,11 @@ import org.junit.Test;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -43,6 +46,7 @@ public class PutMongoIT extends MongoWriteTestBase {
super.setup(PutMongo.class); super.setup(PutMongo.class);
} }
@Override
@After @After
public void teardown() { public void teardown() {
super.teardown(); super.teardown();
@ -76,6 +80,7 @@ public class PutMongoIT extends MongoWriteTestBase {
runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DATABASE_NAME); runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DATABASE_NAME);
runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME); runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME);
runner.setProperty(PutMongo.WRITE_CONCERN, "xyz"); runner.setProperty(PutMongo.WRITE_CONCERN, "xyz");
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
runner.enqueue(new byte[0]); runner.enqueue(new byte[0]);
pc = runner.getProcessContext(); pc = runner.getProcessContext();
results = new HashSet<>(); results = new HashSet<>();
@ -96,8 +101,184 @@ public class PutMongoIT extends MongoWriteTestBase {
Assert.assertEquals(0, results.size()); Assert.assertEquals(0, results.size());
} }
@Test
public void testQueryAndUpdateKey() {
TestRunner runner = init(PutMongo.class);
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
runner.setProperty(PutMongo.UPDATE_QUERY, "{}");
runner.assertNotValid();
}
@Test
public void testNoQueryAndNoUpdateKey() {
TestRunner runner = init(PutMongo.class);
runner.removeProperty(PutMongo.UPDATE_QUERY);
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "");
runner.assertNotValid();
}
@Test
public void testBlankUpdateKey() {
TestRunner runner = init(PutMongo.class);
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, " ");
runner.assertNotValid();
}
@Test
public void testUpdateQuery() {
TestRunner runner = init(PutMongo.class);
Document document = new Document()
.append("name", "John Smith")
.append("department", "Engineering");
collection.insertOne(document);
String updateBody = "{\n" +
"\t\"$set\": {\n" +
"\t\t\"email\": \"john.smith@test.com\",\n" +
"\t\t\"grade\": \"Sr. Principle Eng.\"\n" +
"\t},\n" +
"\t\"$inc\": {\n" +
"\t\t\"writes\": 1\n" +
"\t}\n" +
"}";
Map<String, String> attr = new HashMap<>();
attr.put("mongo.update.query", document.toJson());
runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_OPERATORS);
runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
runner.setProperty(PutMongo.UPDATE_QUERY, "${mongo.update.query}");
runner.setValidateExpressionUsage(true);
runner.enqueue(updateBody, attr);
updateTests(runner, document);
}
@Test
public void testUpdateBySimpleKey() {
TestRunner runner = init(PutMongo.class);
Document document = new Document()
.append("name", "John Smith")
.append("department", "Engineering");
collection.insertOne(document);
String updateBody = "{\n" +
"\t\"name\": \"John Smith\",\n" +
"\t\"$set\": {\n" +
"\t\t\"email\": \"john.smith@test.com\",\n" +
"\t\t\"grade\": \"Sr. Principle Eng.\"\n" +
"\t},\n" +
"\t\"$inc\": {\n" +
"\t\t\"writes\": 1\n" +
"\t}\n" +
"}";
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "name");
runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_OPERATORS);
runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
runner.setValidateExpressionUsage(true);
runner.enqueue(updateBody);
updateTests(runner, document);
}
@Test
public void testUpdateWithFullDocByKeys() {
TestRunner runner = init(PutMongo.class);
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "name,department");
testUpdateFullDocument(runner);
}
@Test
public void testUpdateWithFullDocByQuery() {
TestRunner runner = init(PutMongo.class);
String query = "{ \"name\": \"John Smith\"}";
runner.setProperty(PutMongo.UPDATE_QUERY, query);
testUpdateFullDocument(runner);
}
private void testUpdateFullDocument(TestRunner runner) {
Document document = new Document()
.append("name", "John Smith")
.append("department", "Engineering");
collection.insertOne(document);
String updateBody = "{\n" +
"\t\"name\": \"John Smith\",\n" +
"\t\"department\": \"Engineering\",\n" +
"\t\"contacts\": {\n" +
"\t\t\"phone\": \"555-555-5555\",\n" +
"\t\t\"email\": \"john.smith@test.com\",\n" +
"\t\t\"twitter\": \"@JohnSmith\"\n" +
"\t}\n" +
"}";
runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_DOC);
runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
runner.setValidateExpressionUsage(true);
runner.enqueue(updateBody);
runner.run();
runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
runner.assertTransferCount(PutMongo.REL_SUCCESS, 1);
MongoCursor<Document> cursor = collection.find(document).iterator();
Document found = cursor.next();
Assert.assertEquals(found.get("name"), document.get("name"));
Assert.assertEquals(found.get("department"), document.get("department"));
Document contacts = (Document)found.get("contacts");
Assert.assertNotNull(contacts);
Assert.assertEquals(contacts.get("twitter"), "@JohnSmith");
Assert.assertEquals(contacts.get("email"), "john.smith@test.com");
Assert.assertEquals(contacts.get("phone"), "555-555-5555");
Assert.assertEquals(collection.count(document), 1);
}
@Test
public void testUpdateByComplexKey() {
TestRunner runner = init(PutMongo.class);
Document document = new Document()
.append("name", "John Smith")
.append("department", "Engineering")
.append("contacts", new Document().append("email", "john.smith@test.com")
.append("phone", "555-555-5555"));
collection.insertOne(document);
String updateBody = "{\n" +
"\t\"contacts.phone\": \"555-555-5555\",\n" +
"\t\"contacts.email\": \"john.smith@test.com\",\n" +
"\t\"$set\": {\n" +
"\t\t\"contacts.twitter\": \"@JohnSmith\"\n" +
"\t},\n" +
"\t\"$inc\": {\n" +
"\t\t\"writes\": 1\n" +
"\t}\n" +
"}";
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "contacts.phone,contacts.email");
runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_OPERATORS);
runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
runner.setValidateExpressionUsage(true);
runner.enqueue(updateBody);
runner.run();
runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
runner.assertTransferCount(PutMongo.REL_SUCCESS, 1);
MongoCursor<Document> iterator = collection.find(new Document("name", "John Smith")).iterator();
Assert.assertTrue("Document did not come back.", iterator.hasNext());
Document val = iterator.next();
Map contacts = (Map)val.get("contacts");
Assert.assertNotNull(contacts);
Assert.assertTrue(contacts.containsKey("twitter") && contacts.get("twitter").equals("@JohnSmith"));
Assert.assertTrue(val.containsKey("writes") && val.get("writes").equals(1));
}
private void updateTests(TestRunner runner, Document document) {
runner.run();
runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
runner.assertTransferCount(PutMongo.REL_SUCCESS, 1);
MongoCursor<Document> iterator = collection.find(document).iterator();
Assert.assertTrue("Document did not come back.", iterator.hasNext());
Document val = iterator.next();
Assert.assertTrue(val.containsKey("email") && val.get("email").equals("john.smith@test.com"));
Assert.assertTrue(val.containsKey("grade") && val.get("grade").equals("Sr. Principle Eng."));
Assert.assertTrue(val.containsKey("writes") && val.get("writes").equals(1));
}
@Test @Test
public void testInsertOne() throws Exception { public void testInsertOne() throws Exception {
TestRunner runner = init(PutMongo.class);
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
Document doc = DOCUMENTS.get(0); Document doc = DOCUMENTS.get(0);
byte[] bytes = documentToByteArray(doc); byte[] bytes = documentToByteArray(doc);
@ -115,6 +296,8 @@ public class PutMongoIT extends MongoWriteTestBase {
@Test @Test
public void testInsertMany() throws Exception { public void testInsertMany() throws Exception {
TestRunner runner = init(PutMongo.class);
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
for (Document doc : DOCUMENTS) { for (Document doc : DOCUMENTS) {
runner.enqueue(documentToByteArray(doc)); runner.enqueue(documentToByteArray(doc));
} }
@ -132,6 +315,8 @@ public class PutMongoIT extends MongoWriteTestBase {
@Test @Test
public void testInsertWithDuplicateKey() throws Exception { public void testInsertWithDuplicateKey() throws Exception {
TestRunner runner = init(PutMongo.class);
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
// pre-insert one document // pre-insert one document
collection.insertOne(DOCUMENTS.get(0)); collection.insertOne(DOCUMENTS.get(0));
@ -161,6 +346,8 @@ public class PutMongoIT extends MongoWriteTestBase {
*/ */
@Test @Test
public void testUpdateDoesNotInsert() throws Exception { public void testUpdateDoesNotInsert() throws Exception {
TestRunner runner = init(PutMongo.class);
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
Document doc = DOCUMENTS.get(0); Document doc = DOCUMENTS.get(0);
byte[] bytes = documentToByteArray(doc); byte[] bytes = documentToByteArray(doc);
@ -182,6 +369,8 @@ public class PutMongoIT extends MongoWriteTestBase {
*/ */
@Test @Test
public void testUpsert() throws Exception { public void testUpsert() throws Exception {
TestRunner runner = init(PutMongo.class);
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
Document doc = DOCUMENTS.get(0); Document doc = DOCUMENTS.get(0);
byte[] bytes = documentToByteArray(doc); byte[] bytes = documentToByteArray(doc);
@ -201,6 +390,8 @@ public class PutMongoIT extends MongoWriteTestBase {
@Test @Test
public void testUpdate() throws Exception { public void testUpdate() throws Exception {
TestRunner runner = init(PutMongo.class);
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
Document doc = DOCUMENTS.get(0); Document doc = DOCUMENTS.get(0);
// pre-insert document // pre-insert document
@ -227,6 +418,7 @@ public class PutMongoIT extends MongoWriteTestBase {
@Test @Test
public void testUpsertWithOperators() throws Exception { public void testUpsertWithOperators() throws Exception {
TestRunner runner = init(PutMongo.class);
String upsert = "{\n" + String upsert = "{\n" +
" \"_id\": \"Test\",\n" + " \"_id\": \"Test\",\n" +
" \"$push\": {\n" + " \"$push\": {\n" +
@ -234,6 +426,7 @@ public class PutMongoIT extends MongoWriteTestBase {
" }\n" + " }\n" +
"}"; "}";
runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_OPERATORS); runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_OPERATORS);
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
runner.setProperty(PutMongo.MODE, "update"); runner.setProperty(PutMongo.MODE, "update");
runner.setProperty(PutMongo.UPSERT, "true"); runner.setProperty(PutMongo.UPSERT, "true");
for (int x = 0; x < 3; x++) { for (int x = 0; x < 3; x++) {
@ -273,6 +466,7 @@ public class PutMongoIT extends MongoWriteTestBase {
*/ */
@Test @Test
public void testNiFi_4759_Regressions() { public void testNiFi_4759_Regressions() {
TestRunner runner = init(PutMongo.class);
String[] upserts = new String[]{ String[] upserts = new String[]{
"{ \"_id\": \"12345\", \"$set\": { \"msg\": \"Hello, world\" } }", "{ \"_id\": \"12345\", \"$set\": { \"msg\": \"Hello, world\" } }",
"{ \"_id\": \"5a5617b9c1f5de6d8276e87d\", \"$set\": { \"msg\": \"Hello, world\" } }", "{ \"_id\": \"5a5617b9c1f5de6d8276e87d\", \"$set\": { \"msg\": \"Hello, world\" } }",

View File

@ -19,6 +19,7 @@ package org.apache.nifi.processors.mongodb;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.MockRecordParser; import org.apache.nifi.serialization.record.MockRecordParser;
@ -53,9 +54,6 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
public void setup() throws Exception { public void setup() throws Exception {
super.setup(PutMongoRecord.class); super.setup(PutMongoRecord.class);
recordReader = new MockRecordParser(); recordReader = new MockRecordParser();
runner.addControllerService("reader", recordReader);
runner.enableControllerService(recordReader);
runner.setProperty(PutMongoRecord.RECORD_READER_FACTORY, "reader");
} }
@After @After
@ -63,6 +61,14 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
super.teardown(); super.teardown();
} }
private TestRunner init() throws InitializationException {
TestRunner runner = init(PutMongoRecord.class);
runner.addControllerService("reader", recordReader);
runner.enableControllerService(recordReader);
runner.setProperty(PutMongoRecord.RECORD_READER_FACTORY, "reader");
return runner;
}
private byte[] documentToByteArray(Document doc) { private byte[] documentToByteArray(Document doc) {
return doc.toJson().getBytes(StandardCharsets.UTF_8); return doc.toJson().getBytes(StandardCharsets.UTF_8);
} }
@ -117,6 +123,7 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
@Test @Test
public void testInsertFlatRecords() throws Exception { public void testInsertFlatRecords() throws Exception {
TestRunner runner = init();
recordReader.addSchemaField("name", RecordFieldType.STRING); recordReader.addSchemaField("name", RecordFieldType.STRING);
recordReader.addSchemaField("age", RecordFieldType.INT); recordReader.addSchemaField("age", RecordFieldType.INT);
recordReader.addSchemaField("sport", RecordFieldType.STRING); recordReader.addSchemaField("sport", RecordFieldType.STRING);
@ -141,6 +148,7 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
@Test @Test
public void testInsertNestedRecords() throws Exception { public void testInsertNestedRecords() throws Exception {
TestRunner runner = init();
recordReader.addSchemaField("id", RecordFieldType.INT); recordReader.addSchemaField("id", RecordFieldType.INT);
final List<RecordField> personFields = new ArrayList<>(); final List<RecordField> personFields = new ArrayList<>();
final RecordField nameField = new RecordField("name", RecordFieldType.STRING.getDataType()); final RecordField nameField = new RecordField("name", RecordFieldType.STRING.getDataType());
@ -183,4 +191,4 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
assertEquals(4, collection.count()); assertEquals(4, collection.count());
//assertEquals(doc, collection.find().first()); //assertEquals(doc, collection.find().first());
} }
} }