NIFI-4759 - Fixed a bug that left a hard-coded reference to _id in as the update key for MongoDB upserts.

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2401.
This commit is contained in:
Mike Thomsen 2018-01-10 08:35:09 -05:00 committed by Pierre Villard
parent ea2519e3ea
commit ca54186b60
2 changed files with 62 additions and 11 deletions

View File

@ -34,13 +34,11 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import org.bson.Document;
import org.bson.types.ObjectId;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
@ -155,12 +153,7 @@ public class PutMongo extends AbstractMongoProcessor {
try {
// Read the contents of the FlowFile into a byte array
final byte[] content = new byte[(int) flowFile.getSize()];
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.fillBuffer(in, content, true);
}
});
session.read(flowFile, in -> StreamUtils.fillBuffer(in, content, true));
// parse
final Object doc = (mode.equals(MODE_INSERT) || (mode.equals(MODE_UPDATE) && updateMode.equals(UPDATE_WITH_DOC.getValue())))
@ -173,13 +166,19 @@ public class PutMongo extends AbstractMongoProcessor {
// update
final boolean upsert = context.getProperty(UPSERT).asBoolean();
final String updateKey = context.getProperty(UPDATE_QUERY_KEY).getValue();
final Document query = new Document(updateKey, ((Map)doc).get(updateKey));
Object keyVal = ((Map)doc).get(updateKey);
if (updateKey.equals("_id") && ObjectId.isValid(((String)keyVal))) {
keyVal = new ObjectId((String) keyVal);
}
final Document query = new Document(updateKey, keyVal);
if (updateMode.equals(UPDATE_WITH_DOC.getValue())) {
collection.replaceOne(query, (Document)doc, new UpdateOptions().upsert(upsert));
} else {
BasicDBObject update = (BasicDBObject)doc;
update.remove("_id");
update.remove(updateKey);
collection.updateOne(query, update, new UpdateOptions().upsert(upsert));
}
logger.info("updated {} into MongoDB", new Object[] { flowFile });

View File

@ -23,6 +23,7 @@ import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.bson.Document;
import org.bson.types.ObjectId;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -256,4 +257,55 @@ public class PutMongoTest extends MongoWriteTestBase {
Assert.assertEquals("Msg had wrong value", msg, "Hi");
}
}
/*
* Start NIFI-4759 Regression Tests
*
* 2 issues with ID field:
*
* * Assumed _id is the update key, causing failures when the user configured a different one in the UI.
* * Treated _id as a string even when it is an ObjectID sent from another processor as a string value.
*
* Expected behavior:
*
* * update key field should work no matter what (legal) value it is set to be.
* * _ids that are ObjectID should become real ObjectIDs when added to Mongo.
* * _ids that are arbitrary strings should be still go in as strings.
*
*/
@Test
public void testNiFi_4759_Regressions() {
String[] upserts = new String[]{
"{ \"_id\": \"12345\", \"$set\": { \"msg\": \"Hello, world\" } }",
"{ \"_id\": \"5a5617b9c1f5de6d8276e87d\", \"$set\": { \"msg\": \"Hello, world\" } }",
"{ \"updateKey\": \"12345\", \"$set\": { \"msg\": \"Hello, world\" } }"
};
String[] updateKeyProps = new String[] { "_id", "_id", "updateKey" };
Object[] updateKeys = new Object[] { "12345", new ObjectId("5a5617b9c1f5de6d8276e87d"), "12345" };
int index = 0;
runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_OPERATORS);
runner.setProperty(PutMongo.MODE, "update");
runner.setProperty(PutMongo.UPSERT, "true");
final int LIMIT = 2;
for (String upsert : upserts) {
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, updateKeyProps[index]);
for (int x = 0; x < LIMIT; x++) {
runner.enqueue(upsert);
}
runner.run(LIMIT, true, true);
runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
runner.assertTransferCount(PutMongo.REL_SUCCESS, LIMIT);
Document query = new Document(updateKeyProps[index], updateKeys[index]);
Document result = collection.find(query).first();
Assert.assertNotNull("Result was null", result);
Assert.assertEquals("Count was wrong", 1, collection.count(query));
runner.clearTransferState();
index++;
}
}
}