From ca54186b608682d028719ae836bd8d8f83fd24d7 Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Wed, 10 Jan 2018 08:35:09 -0500 Subject: [PATCH] NIFI-4759 - Fixed a bug that left a hard-coded reference to _id in as the update key for MongoDB upserts. Signed-off-by: Pierre Villard This closes #2401. --- .../nifi/processors/mongodb/PutMongo.java | 21 ++++---- .../nifi/processors/mongodb/PutMongoTest.java | 52 +++++++++++++++++++ 2 files changed, 62 insertions(+), 11 deletions(-) diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java index 03d3e0c814..2df59b6426 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java @@ -34,13 +34,11 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.stream.io.StreamUtils; import org.bson.Document; +import org.bson.types.ObjectId; -import java.io.IOException; -import java.io.InputStream; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collections; @@ -155,12 +153,7 @@ public class PutMongo extends AbstractMongoProcessor { try { // Read the contents of the FlowFile into a byte array final byte[] content = new byte[(int) flowFile.getSize()]; - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - StreamUtils.fillBuffer(in, content, true); - } - }); + session.read(flowFile, in -> StreamUtils.fillBuffer(in, content, true)); // parse final Object doc = (mode.equals(MODE_INSERT) || (mode.equals(MODE_UPDATE) && updateMode.equals(UPDATE_WITH_DOC.getValue()))) @@ -173,13 +166,19 @@ public class PutMongo extends AbstractMongoProcessor { // update final boolean upsert = context.getProperty(UPSERT).asBoolean(); final String updateKey = context.getProperty(UPDATE_QUERY_KEY).getValue(); - final Document query = new Document(updateKey, ((Map)doc).get(updateKey)); + + Object keyVal = ((Map)doc).get(updateKey); + if (updateKey.equals("_id") && ObjectId.isValid(((String)keyVal))) { + keyVal = new ObjectId((String) keyVal); + } + + final Document query = new Document(updateKey, keyVal); if (updateMode.equals(UPDATE_WITH_DOC.getValue())) { collection.replaceOne(query, (Document)doc, new UpdateOptions().upsert(upsert)); } else { BasicDBObject update = (BasicDBObject)doc; - update.remove("_id"); + update.remove(updateKey); collection.updateOne(query, update, new UpdateOptions().upsert(upsert)); } logger.info("updated {} into MongoDB", new Object[] { flowFile }); diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java index f019704069..d0b1a9dc32 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java @@ -23,6 +23,7 @@ import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.bson.Document; +import org.bson.types.ObjectId; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -256,4 +257,55 @@ public class PutMongoTest extends MongoWriteTestBase { Assert.assertEquals("Msg had wrong value", msg, "Hi"); } } + + /* + * Start NIFI-4759 Regression Tests + * + * 2 issues with ID field: + * + * * Assumed _id is the update key, causing failures when the user configured a different one in the UI. + * * Treated _id as a string even when it is an ObjectID sent from another processor as a string value. + * + * Expected behavior: + * + * * update key field should work no matter what (legal) value it is set to be. + * * _ids that are ObjectID should become real ObjectIDs when added to Mongo. + * * _ids that are arbitrary strings should be still go in as strings. + * + */ + @Test + public void testNiFi_4759_Regressions() { + String[] upserts = new String[]{ + "{ \"_id\": \"12345\", \"$set\": { \"msg\": \"Hello, world\" } }", + "{ \"_id\": \"5a5617b9c1f5de6d8276e87d\", \"$set\": { \"msg\": \"Hello, world\" } }", + "{ \"updateKey\": \"12345\", \"$set\": { \"msg\": \"Hello, world\" } }" + }; + + String[] updateKeyProps = new String[] { "_id", "_id", "updateKey" }; + Object[] updateKeys = new Object[] { "12345", new ObjectId("5a5617b9c1f5de6d8276e87d"), "12345" }; + int index = 0; + + runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_OPERATORS); + runner.setProperty(PutMongo.MODE, "update"); + runner.setProperty(PutMongo.UPSERT, "true"); + + final int LIMIT = 2; + + for (String upsert : upserts) { + runner.setProperty(PutMongo.UPDATE_QUERY_KEY, updateKeyProps[index]); + for (int x = 0; x < LIMIT; x++) { + runner.enqueue(upsert); + } + runner.run(LIMIT, true, true); + runner.assertTransferCount(PutMongo.REL_FAILURE, 0); + runner.assertTransferCount(PutMongo.REL_SUCCESS, LIMIT); + + Document query = new Document(updateKeyProps[index], updateKeys[index]); + Document result = collection.find(query).first(); + Assert.assertNotNull("Result was null", result); + Assert.assertEquals("Count was wrong", 1, collection.count(query)); + runner.clearTransferState(); + index++; + } + } }