diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java index 078a3e8a22..60eb0d5ff1 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java @@ -30,6 +30,8 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; @@ -38,11 +40,13 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; import org.bson.Document; import org.bson.types.ObjectId; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -85,20 +89,29 @@ public class PutMongo extends AbstractMongoProcessor { static final PropertyDescriptor UPDATE_QUERY_KEY = new PropertyDescriptor.Builder() .name("Update Query Key") .description("Key name used to build the update query criteria; this property is valid only when using update mode, " - + "otherwise it is ignored") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .defaultValue("_id") + + "otherwise it is ignored. Example: _id") + .required(false) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(true) .build(); + static final PropertyDescriptor UPDATE_QUERY = new PropertyDescriptor.Builder() + .name("putmongo-update-query") + .displayName("Update Query") + .description("Specify a full MongoDB query to be used for the lookup query to do an update/upsert.") + .required(false) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + static final PropertyDescriptor UPDATE_MODE = new PropertyDescriptor.Builder() - .displayName("Update Mode") - .name("put-mongo-update-mode") - .required(true) - .allowableValues(UPDATE_WITH_DOC, UPDATE_WITH_OPERATORS) - .defaultValue(UPDATE_WITH_DOC.getValue()) - .description("Choose an update mode. You can either supply a JSON document to use as a direct replacement " + - "or specify a document that contains update operators like $set and $unset") - .build(); + .displayName("Update Mode") + .name("put-mongo-update-mode") + .required(true) + .allowableValues(UPDATE_WITH_DOC, UPDATE_WITH_OPERATORS) + .defaultValue(UPDATE_WITH_DOC.getValue()) + .description("Choose an update mode. You can either supply a JSON document to use as a direct replacement " + + "or specify a document that contains update operators like $set and $unset") + .build(); static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder() .name("Character Set") .description("The Character Set in which the data is encoded") @@ -116,6 +129,7 @@ public class PutMongo extends AbstractMongoProcessor { _propertyDescriptors.add(MODE); _propertyDescriptors.add(UPSERT); _propertyDescriptors.add(UPDATE_QUERY_KEY); + _propertyDescriptors.add(UPDATE_QUERY); _propertyDescriptors.add(UPDATE_MODE); _propertyDescriptors.add(WRITE_CONCERN); _propertyDescriptors.add(CHARACTER_SET); @@ -137,6 +151,30 @@ public class PutMongo extends AbstractMongoProcessor { return propertyDescriptors; } + @Override + protected Collection customValidate(final ValidationContext validationContext) { + List problems = new ArrayList<>(); + + final boolean queryKey = validationContext.getProperty(UPDATE_QUERY_KEY).isSet(); + final boolean query = validationContext.getProperty(UPDATE_QUERY).isSet(); + + if (queryKey && query) { + problems.add(new ValidationResult.Builder() + .valid(false) + .explanation("Both update query key and update query cannot be set at the same time.") + .build() + ); + } else if (!queryKey && !query) { + problems.add(new ValidationResult.Builder() + .valid(false) + .explanation("Either the update query key or the update query field must be set.") + .build() + ); + } + + return problems; + } + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { final FlowFile flowFile = session.get(); @@ -168,15 +206,17 @@ public class PutMongo extends AbstractMongoProcessor { } else { // update final boolean upsert = context.getProperty(UPSERT).asBoolean(); - final String updateKey = context.getProperty(UPDATE_QUERY_KEY).getValue(); + final String updateKey = context.getProperty(UPDATE_QUERY_KEY).evaluateAttributeExpressions(flowFile).getValue(); + final String filterQuery = context.getProperty(UPDATE_QUERY).evaluateAttributeExpressions(flowFile).getValue(); + final Document query; - Object keyVal = ((Map)doc).get(updateKey); - if (updateKey.equals("_id") && ObjectId.isValid(((String)keyVal))) { - keyVal = new ObjectId((String) keyVal); + if (!StringUtils.isBlank(updateKey)) { + query = parseUpdateKey(updateKey, (Map)doc); + removeUpdateKeys(updateKey, (Map)doc); + } else { + query = Document.parse(filterQuery); } - final Document query = new Document(updateKey, keyVal); - if (updateMode.equals(UPDATE_WITH_DOC.getValue())) { collection.replaceOne(query, (Document)doc, new UpdateOptions().upsert(upsert)); } else { @@ -196,6 +236,33 @@ public class PutMongo extends AbstractMongoProcessor { } } + private void removeUpdateKeys(String updateKeyParam, Map doc) { + String[] parts = updateKeyParam.split(",[\\s]*"); + for (String part : parts) { + if (part.contains(".")) { + doc.remove(part); + } + } + } + + private Document parseUpdateKey(String updateKey, Map doc) { + Document retVal; + if (updateKey.equals("_id") && ObjectId.isValid(((String) doc.get(updateKey)))) { + retVal = new Document("_id", new ObjectId((String) doc.get(updateKey))); + } else if (updateKey.contains(",")) { + String[] parts = updateKey.split(",[\\s]*"); + retVal = new Document(); + for (String part : parts) { + retVal.append(part, doc.get(part)); + } + } else { + retVal = new Document(updateKey, doc.get(updateKey)); + } + + return retVal; + } + + protected WriteConcern getWriteConcern(final ProcessContext context) { final String writeConcernProperty = context.getProperty(WRITE_CONCERN).getValue(); WriteConcern writeConcern = null; diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/DeleteMongoIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/DeleteMongoIT.java index 42880d3b51..d10dfc7b27 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/DeleteMongoIT.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/DeleteMongoIT.java @@ -19,6 +19,7 @@ package org.apache.nifi.processors.mongodb; +import org.apache.nifi.util.TestRunner; import org.bson.Document; import org.junit.After; import org.junit.Assert; @@ -40,7 +41,7 @@ public class DeleteMongoIT extends MongoWriteTestBase { super.teardown(); } - private void testOne(String query, Map attrs) { + private void testOne(TestRunner runner, String query, Map attrs) { runner.enqueue(query, attrs); runner.run(1, true); runner.assertTransferCount(DeleteMongo.REL_FAILURE, 0); @@ -52,18 +53,19 @@ public class DeleteMongoIT extends MongoWriteTestBase { @Test public void testDeleteOne() { + TestRunner runner = init(DeleteMongo.class); String query = "{ \"_id\": \"doc_1\" }"; runner.setProperty(DeleteMongo.DELETE_MODE, DeleteMongo.DELETE_ONE); - testOne(query, new HashMap<>()); + testOne(runner, query, new HashMap<>()); Map attrs = new HashMap<>(); attrs.put("mongodb.delete.mode", "one"); runner.setProperty(DeleteMongo.DELETE_MODE, DeleteMongo.DELETE_ATTR); query = "{ \"_id\": \"doc_2\" }"; runner.clearTransferState(); - testOne(query, attrs); + testOne(runner, query, attrs); } - private void manyTest(String query, Map attrs) { + private void manyTest(TestRunner runner, String query, Map attrs) { runner.enqueue(query, attrs); runner.run(1, true); runner.assertTransferCount(DeleteMongo.REL_FAILURE, 0); @@ -77,13 +79,14 @@ public class DeleteMongoIT extends MongoWriteTestBase { @Test public void testDeleteMany() { + TestRunner runner = init(DeleteMongo.class); String query = "{\n" + "\t\"_id\": {\n" + "\t\t\"$in\": [\"doc_1\", \"doc_2\"]\n" + "\t}\n" + "}"; runner.setProperty(DeleteMongo.DELETE_MODE, DeleteMongo.DELETE_MANY); - manyTest(query, new HashMap<>()); + manyTest(runner, query, new HashMap<>()); runner.setProperty(DeleteMongo.DELETE_MODE, DeleteMongo.DELETE_ATTR); Map attrs = new HashMap<>(); @@ -91,11 +94,12 @@ public class DeleteMongoIT extends MongoWriteTestBase { collection.drop(); collection.insertMany(DOCUMENTS); runner.clearTransferState(); - manyTest(query, attrs); + manyTest(runner, query, attrs); } @Test public void testFailOnNoDeleteOptions() { + TestRunner runner = init(DeleteMongo.class); String query = "{ \"_id\": \"doc_4\"} "; runner.enqueue(query); runner.run(1, true); diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/MongoWriteTestBase.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/MongoWriteTestBase.java index 76da51eb80..7294adeba6 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/MongoWriteTestBase.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/MongoWriteTestBase.java @@ -38,28 +38,27 @@ public class MongoWriteTestBase { new Document("_id", "doc_3").append("a", 1).append("b", 3) ); - protected TestRunner runner; protected MongoClient mongoClient; protected MongoCollection collection; public void setup(Class processor) { DATABASE_NAME = processor.getSimpleName().toLowerCase(); - runner = TestRunners.newTestRunner(processor); + mongoClient = new MongoClient(new MongoClientURI(MONGO_URI)); + collection = mongoClient.getDatabase(DATABASE_NAME).getCollection(COLLECTION_NAME); + } + + public TestRunner init(Class processor) { + TestRunner runner = TestRunners.newTestRunner(processor); runner.setVariable("uri", MONGO_URI); runner.setVariable("db", DATABASE_NAME); runner.setVariable("collection", COLLECTION_NAME); runner.setProperty(AbstractMongoProcessor.URI, "${uri}"); runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}"); runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}"); - - mongoClient = new MongoClient(new MongoClientURI(MONGO_URI)); - - collection = mongoClient.getDatabase(DATABASE_NAME).getCollection(COLLECTION_NAME); + return runner; } public void teardown() { - runner = null; - mongoClient.getDatabase(DATABASE_NAME).drop(); } } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java index 7e7f33086b..6c14c0607a 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.mongodb; +import com.mongodb.client.MongoCursor; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.util.MockFlowFile; @@ -31,9 +32,11 @@ import org.junit.Test; import java.nio.charset.StandardCharsets; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import static org.junit.Assert.assertEquals; @@ -43,6 +46,7 @@ public class PutMongoIT extends MongoWriteTestBase { super.setup(PutMongo.class); } + @Override @After public void teardown() { super.teardown(); @@ -76,6 +80,7 @@ public class PutMongoIT extends MongoWriteTestBase { runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DATABASE_NAME); runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME); runner.setProperty(PutMongo.WRITE_CONCERN, "xyz"); + runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id"); runner.enqueue(new byte[0]); pc = runner.getProcessContext(); results = new HashSet<>(); @@ -96,8 +101,184 @@ public class PutMongoIT extends MongoWriteTestBase { Assert.assertEquals(0, results.size()); } + @Test + public void testQueryAndUpdateKey() { + TestRunner runner = init(PutMongo.class); + runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id"); + runner.setProperty(PutMongo.UPDATE_QUERY, "{}"); + runner.assertNotValid(); + } + + @Test + public void testNoQueryAndNoUpdateKey() { + TestRunner runner = init(PutMongo.class); + runner.removeProperty(PutMongo.UPDATE_QUERY); + runner.setProperty(PutMongo.UPDATE_QUERY_KEY, ""); + runner.assertNotValid(); + } + + @Test + public void testBlankUpdateKey() { + TestRunner runner = init(PutMongo.class); + runner.setProperty(PutMongo.UPDATE_QUERY_KEY, " "); + runner.assertNotValid(); + } + + @Test + public void testUpdateQuery() { + TestRunner runner = init(PutMongo.class); + Document document = new Document() + .append("name", "John Smith") + .append("department", "Engineering"); + collection.insertOne(document); + String updateBody = "{\n" + + "\t\"$set\": {\n" + + "\t\t\"email\": \"john.smith@test.com\",\n" + + "\t\t\"grade\": \"Sr. Principle Eng.\"\n" + + "\t},\n" + + "\t\"$inc\": {\n" + + "\t\t\"writes\": 1\n" + + "\t}\n" + + "}"; + Map attr = new HashMap<>(); + attr.put("mongo.update.query", document.toJson()); + runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_OPERATORS); + runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE); + runner.setProperty(PutMongo.UPDATE_QUERY, "${mongo.update.query}"); + runner.setValidateExpressionUsage(true); + runner.enqueue(updateBody, attr); + updateTests(runner, document); + } + + @Test + public void testUpdateBySimpleKey() { + TestRunner runner = init(PutMongo.class); + Document document = new Document() + .append("name", "John Smith") + .append("department", "Engineering"); + collection.insertOne(document); + + String updateBody = "{\n" + + "\t\"name\": \"John Smith\",\n" + + "\t\"$set\": {\n" + + "\t\t\"email\": \"john.smith@test.com\",\n" + + "\t\t\"grade\": \"Sr. Principle Eng.\"\n" + + "\t},\n" + + "\t\"$inc\": {\n" + + "\t\t\"writes\": 1\n" + + "\t}\n" + + "}"; + runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "name"); + runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_OPERATORS); + runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE); + runner.setValidateExpressionUsage(true); + runner.enqueue(updateBody); + updateTests(runner, document); + } + + @Test + public void testUpdateWithFullDocByKeys() { + TestRunner runner = init(PutMongo.class); + runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "name,department"); + testUpdateFullDocument(runner); + } + + @Test + public void testUpdateWithFullDocByQuery() { + TestRunner runner = init(PutMongo.class); + String query = "{ \"name\": \"John Smith\"}"; + runner.setProperty(PutMongo.UPDATE_QUERY, query); + testUpdateFullDocument(runner); + } + + private void testUpdateFullDocument(TestRunner runner) { + Document document = new Document() + .append("name", "John Smith") + .append("department", "Engineering"); + collection.insertOne(document); + String updateBody = "{\n" + + "\t\"name\": \"John Smith\",\n" + + "\t\"department\": \"Engineering\",\n" + + "\t\"contacts\": {\n" + + "\t\t\"phone\": \"555-555-5555\",\n" + + "\t\t\"email\": \"john.smith@test.com\",\n" + + "\t\t\"twitter\": \"@JohnSmith\"\n" + + "\t}\n" + + "}"; + runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_DOC); + runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE); + runner.setValidateExpressionUsage(true); + runner.enqueue(updateBody); + runner.run(); + runner.assertTransferCount(PutMongo.REL_FAILURE, 0); + runner.assertTransferCount(PutMongo.REL_SUCCESS, 1); + + MongoCursor cursor = collection.find(document).iterator(); + Document found = cursor.next(); + Assert.assertEquals(found.get("name"), document.get("name")); + Assert.assertEquals(found.get("department"), document.get("department")); + Document contacts = (Document)found.get("contacts"); + Assert.assertNotNull(contacts); + Assert.assertEquals(contacts.get("twitter"), "@JohnSmith"); + Assert.assertEquals(contacts.get("email"), "john.smith@test.com"); + Assert.assertEquals(contacts.get("phone"), "555-555-5555"); + Assert.assertEquals(collection.count(document), 1); + } + + @Test + public void testUpdateByComplexKey() { + TestRunner runner = init(PutMongo.class); + Document document = new Document() + .append("name", "John Smith") + .append("department", "Engineering") + .append("contacts", new Document().append("email", "john.smith@test.com") + .append("phone", "555-555-5555")); + collection.insertOne(document); + String updateBody = "{\n" + + "\t\"contacts.phone\": \"555-555-5555\",\n" + + "\t\"contacts.email\": \"john.smith@test.com\",\n" + + "\t\"$set\": {\n" + + "\t\t\"contacts.twitter\": \"@JohnSmith\"\n" + + "\t},\n" + + "\t\"$inc\": {\n" + + "\t\t\"writes\": 1\n" + + "\t}\n" + + "}"; + runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "contacts.phone,contacts.email"); + runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_OPERATORS); + runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE); + runner.setValidateExpressionUsage(true); + runner.enqueue(updateBody); + runner.run(); + runner.assertTransferCount(PutMongo.REL_FAILURE, 0); + runner.assertTransferCount(PutMongo.REL_SUCCESS, 1); + + MongoCursor iterator = collection.find(new Document("name", "John Smith")).iterator(); + Assert.assertTrue("Document did not come back.", iterator.hasNext()); + Document val = iterator.next(); + Map contacts = (Map)val.get("contacts"); + Assert.assertNotNull(contacts); + Assert.assertTrue(contacts.containsKey("twitter") && contacts.get("twitter").equals("@JohnSmith")); + Assert.assertTrue(val.containsKey("writes") && val.get("writes").equals(1)); + } + + private void updateTests(TestRunner runner, Document document) { + runner.run(); + runner.assertTransferCount(PutMongo.REL_FAILURE, 0); + runner.assertTransferCount(PutMongo.REL_SUCCESS, 1); + + MongoCursor iterator = collection.find(document).iterator(); + Assert.assertTrue("Document did not come back.", iterator.hasNext()); + Document val = iterator.next(); + Assert.assertTrue(val.containsKey("email") && val.get("email").equals("john.smith@test.com")); + Assert.assertTrue(val.containsKey("grade") && val.get("grade").equals("Sr. Principle Eng.")); + Assert.assertTrue(val.containsKey("writes") && val.get("writes").equals(1)); + } + @Test public void testInsertOne() throws Exception { + TestRunner runner = init(PutMongo.class); + runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id"); Document doc = DOCUMENTS.get(0); byte[] bytes = documentToByteArray(doc); @@ -115,6 +296,8 @@ public class PutMongoIT extends MongoWriteTestBase { @Test public void testInsertMany() throws Exception { + TestRunner runner = init(PutMongo.class); + runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id"); for (Document doc : DOCUMENTS) { runner.enqueue(documentToByteArray(doc)); } @@ -132,6 +315,8 @@ public class PutMongoIT extends MongoWriteTestBase { @Test public void testInsertWithDuplicateKey() throws Exception { + TestRunner runner = init(PutMongo.class); + runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id"); // pre-insert one document collection.insertOne(DOCUMENTS.get(0)); @@ -161,6 +346,8 @@ public class PutMongoIT extends MongoWriteTestBase { */ @Test public void testUpdateDoesNotInsert() throws Exception { + TestRunner runner = init(PutMongo.class); + runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id"); Document doc = DOCUMENTS.get(0); byte[] bytes = documentToByteArray(doc); @@ -182,6 +369,8 @@ public class PutMongoIT extends MongoWriteTestBase { */ @Test public void testUpsert() throws Exception { + TestRunner runner = init(PutMongo.class); + runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id"); Document doc = DOCUMENTS.get(0); byte[] bytes = documentToByteArray(doc); @@ -201,6 +390,8 @@ public class PutMongoIT extends MongoWriteTestBase { @Test public void testUpdate() throws Exception { + TestRunner runner = init(PutMongo.class); + runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id"); Document doc = DOCUMENTS.get(0); // pre-insert document @@ -227,6 +418,7 @@ public class PutMongoIT extends MongoWriteTestBase { @Test public void testUpsertWithOperators() throws Exception { + TestRunner runner = init(PutMongo.class); String upsert = "{\n" + " \"_id\": \"Test\",\n" + " \"$push\": {\n" + @@ -234,6 +426,7 @@ public class PutMongoIT extends MongoWriteTestBase { " }\n" + "}"; runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_OPERATORS); + runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id"); runner.setProperty(PutMongo.MODE, "update"); runner.setProperty(PutMongo.UPSERT, "true"); for (int x = 0; x < 3; x++) { @@ -273,6 +466,7 @@ public class PutMongoIT extends MongoWriteTestBase { */ @Test public void testNiFi_4759_Regressions() { + TestRunner runner = init(PutMongo.class); String[] upserts = new String[]{ "{ \"_id\": \"12345\", \"$set\": { \"msg\": \"Hello, world\" } }", "{ \"_id\": \"5a5617b9c1f5de6d8276e87d\", \"$set\": { \"msg\": \"Hello, world\" } }", diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java index db4be57039..5332695cdd 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java @@ -19,6 +19,7 @@ package org.apache.nifi.processors.mongodb; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.MockRecordParser; @@ -53,9 +54,6 @@ public class PutMongoRecordIT extends MongoWriteTestBase { public void setup() throws Exception { super.setup(PutMongoRecord.class); recordReader = new MockRecordParser(); - runner.addControllerService("reader", recordReader); - runner.enableControllerService(recordReader); - runner.setProperty(PutMongoRecord.RECORD_READER_FACTORY, "reader"); } @After @@ -63,6 +61,14 @@ public class PutMongoRecordIT extends MongoWriteTestBase { super.teardown(); } + private TestRunner init() throws InitializationException { + TestRunner runner = init(PutMongoRecord.class); + runner.addControllerService("reader", recordReader); + runner.enableControllerService(recordReader); + runner.setProperty(PutMongoRecord.RECORD_READER_FACTORY, "reader"); + return runner; + } + private byte[] documentToByteArray(Document doc) { return doc.toJson().getBytes(StandardCharsets.UTF_8); } @@ -117,6 +123,7 @@ public class PutMongoRecordIT extends MongoWriteTestBase { @Test public void testInsertFlatRecords() throws Exception { + TestRunner runner = init(); recordReader.addSchemaField("name", RecordFieldType.STRING); recordReader.addSchemaField("age", RecordFieldType.INT); recordReader.addSchemaField("sport", RecordFieldType.STRING); @@ -141,6 +148,7 @@ public class PutMongoRecordIT extends MongoWriteTestBase { @Test public void testInsertNestedRecords() throws Exception { + TestRunner runner = init(); recordReader.addSchemaField("id", RecordFieldType.INT); final List personFields = new ArrayList<>(); final RecordField nameField = new RecordField("name", RecordFieldType.STRING.getDataType()); @@ -183,4 +191,4 @@ public class PutMongoRecordIT extends MongoWriteTestBase { assertEquals(4, collection.count()); //assertEquals(doc, collection.find().first()); } -} \ No newline at end of file +}