diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoBulkOperations.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoBulkOperations.java new file mode 100644 index 0000000000..c9e09a4731 --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoBulkOperations.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.mongodb; + +import com.mongodb.WriteConcern; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.model.BulkWriteOptions; +import com.mongodb.client.model.Collation; +import com.mongodb.client.model.CollationAlternate; +import com.mongodb.client.model.CollationCaseFirst; +import com.mongodb.client.model.CollationMaxVariable; +import com.mongodb.client.model.CollationStrength; +import com.mongodb.client.model.DeleteManyModel; +import com.mongodb.client.model.DeleteOneModel; +import com.mongodb.client.model.DeleteOptions; +import com.mongodb.client.model.InsertOneModel; +import com.mongodb.client.model.ReplaceOneModel; +import com.mongodb.client.model.ReplaceOptions; +import com.mongodb.client.model.UpdateManyModel; +import com.mongodb.client.model.UpdateOneModel; +import com.mongodb.client.model.UpdateOptions; +import com.mongodb.client.model.WriteModel; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SystemResource; +import org.apache.nifi.annotation.behavior.SystemResourceConsideration; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.bson.BsonArray; +import org.bson.BsonDocument; +import org.bson.Document; +import org.bson.codecs.BsonArrayCodec; +import org.bson.codecs.DecoderContext; +import org.bson.conversions.Bson; +import org.bson.json.JsonReader; + +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.stream.Stream; + +@Tags({ "mongodb", "insert", "update", "write", "put", "bulk" }) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Writes the contents of a FlowFile to MongoDB as bulk-update") +@SystemResourceConsideration(resource = SystemResource.MEMORY) +public class PutMongoBulkOperations extends AbstractMongoProcessor { + static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("All FlowFiles that are written to MongoDB are routed to this relationship").build(); + static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("All FlowFiles that cannot be written to MongoDB are routed to this relationship").build(); + + static final PropertyDescriptor ORDERED = new PropertyDescriptor.Builder() + .name("Ordered") + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .description("Ordered execution of bulk-writes and break on error - otherwise arbitrary order and continue on error") + .required(true) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + + static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder() + .name("Character Set") + .description("The Character Set in which the data is encoded") + .required(true) + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .defaultValue("UTF-8") + .build(); + + private final static Set relationships = Set.of(REL_SUCCESS, REL_FAILURE); + + private final static List propertyDescriptors = Stream.concat(descriptors.stream(), Stream.of(ORDERED, CHARACTER_SET)).toList(); + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public List getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final FlowFile flowFile = session.get(); + if (null == flowFile) { + return; + } + + final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue()); + final WriteConcern writeConcern = clientService.getWriteConcern(); + + try { + final MongoCollection collection = getCollection(context, flowFile).withWriteConcern(writeConcern); + + final BsonArrayCodec arrayCodec = new BsonArrayCodec(); + final DecoderContext decoderContext = DecoderContext.builder().build(); + final BsonArray updateItems; + try (final Reader reader = new InputStreamReader(session.read(flowFile), charset)) { + updateItems = arrayCodec.decode(new JsonReader(reader), decoderContext); + } + + List> updateModels = new ArrayList<>(); + for (Object item : updateItems) { + final BsonDocument updateItem = (BsonDocument) item; + if (updateItem.keySet().size() != 1) { + getLogger().error("Invalid bulk-update in {}: more than one type given {}", flowFile, String.join(", ", updateItem.keySet())); + session.transfer(flowFile, REL_FAILURE); + context.yield(); + return; + } + final WriteModel writeModel = getWriteModel(updateItem); + if (null == writeModel) { + getLogger().error("Invalid bulk-update in {}: invalid update type {}", flowFile, getUpdateType(updateItem)); + session.transfer(flowFile, REL_FAILURE); + context.yield(); + return; + } + updateModels.add(writeModel); + } + + collection.bulkWrite(updateModels, (new BulkWriteOptions().ordered(context.getProperty(ORDERED).asBoolean()))); + getLogger().info("bulk-updated {} into MongoDB", flowFile); + + session.getProvenanceReporter().send(flowFile, getURI(context)); + session.transfer(flowFile, REL_SUCCESS); + } catch (Exception e) { + getLogger().error("Failed to bulk-update {} into MongoDB", flowFile, e); + session.transfer(flowFile, REL_FAILURE); + context.yield(); + } + } + + private WriteModel getWriteModel(final BsonDocument updateItem) { + final String updateType = getUpdateType(updateItem); + final BsonDocument updateSpec = (BsonDocument) updateItem.get(updateType); + final WriteModel writeModel; + if ("insertOne".equals(updateType)) { + writeModel = new InsertOneModel<>(toBsonDocument((BsonDocument) updateSpec.get("document"))); + } else if ("updateOne".equals(updateType)) { + final UpdateOptions options = parseUpdateOptions(updateSpec); + writeModel = new UpdateOneModel<>((BsonDocument) updateSpec.get("filter"), (BsonDocument) updateSpec.get("update"), options); + } else if ("updateMany".equals(updateType)) { + final UpdateOptions options = parseUpdateOptions(updateSpec); + writeModel = new UpdateManyModel<>((BsonDocument) updateSpec.get("filter"), (BsonDocument) updateSpec.get("update"), options); + } else if ("replaceOne".equals(updateType)) { + final ReplaceOptions options = parseReplaceOptions(updateSpec); + writeModel = new ReplaceOneModel<>((BsonDocument) updateSpec.get("filter"), + toBsonDocument((BsonDocument) updateSpec.get("replacement")), options); + } else if ("deleteOne".equals(updateType)) { + final DeleteOptions options = parseDeleteOptions(updateSpec); + writeModel = new DeleteOneModel<>((BsonDocument) updateSpec.get("filter"), options); + } else if ("deleteMany".equals(updateType)) { + final DeleteOptions options = parseDeleteOptions(updateSpec); + writeModel = new DeleteManyModel<>((BsonDocument) updateSpec.get("filter"), options); + } else { + return null; + } + return writeModel; + } + + private static String getUpdateType(BsonDocument updateItem) { + return updateItem.keySet().iterator().next(); + } + + private static Document toBsonDocument(BsonDocument doc) { + if (null == doc) { + return null; + } + return new Document(doc); + } + + protected UpdateOptions parseUpdateOptions(BsonDocument updateSpec) { + final UpdateOptions options = new UpdateOptions(); + if (updateSpec.containsKey("upsert")) { + options.upsert(updateSpec.getBoolean("upsert").getValue()); + } + if (updateSpec.containsKey("arrayFilters")) { + options.arrayFilters((List) updateSpec.get("arrayFilters")); + } + if (updateSpec.containsKey("collation")) { + options.collation(parseCollation((BsonDocument) updateSpec.get("collation"))); + } + return options; + } + + protected ReplaceOptions parseReplaceOptions(BsonDocument updateSpec) { + final ReplaceOptions options = new ReplaceOptions(); + if (updateSpec.containsKey("upsert")) { + options.upsert(updateSpec.getBoolean("upsert").getValue()); + } + if (updateSpec.containsKey("collation")) { + options.collation(parseCollation((BsonDocument) updateSpec.get("collation"))); + } + return options; + } + + protected DeleteOptions parseDeleteOptions(BsonDocument updateSpec) { + final DeleteOptions options = new DeleteOptions(); + if (updateSpec.containsKey("collation")) { + options.collation(parseCollation((BsonDocument) updateSpec.get("collation"))); + } + return options; + } + + protected Collation parseCollation(BsonDocument collationSpec) { + final Collation.Builder builder = Collation.builder(); + if (collationSpec.containsKey("locale")) { + builder.locale(collationSpec.getString("locale").getValue()); + } + if (collationSpec.containsKey("caseLevel")) { + builder.caseLevel(collationSpec.getBoolean("caseLevel").getValue()); + } + if (collationSpec.containsKey("caseFirst")) { + builder.collationCaseFirst(CollationCaseFirst.fromString(collationSpec.getString("caseFirst").getValue())); + } + if (collationSpec.containsKey("strength")) { + builder.collationStrength(CollationStrength.fromInt(collationSpec.getInt32("strength").getValue())); + } + if (collationSpec.containsKey("numericOrdering")) { + builder.numericOrdering(collationSpec.getBoolean("numericOrdering").getValue()); + } + if (collationSpec.containsKey("alternate")) { + builder.collationAlternate(CollationAlternate.fromString(collationSpec.getString("alternate").getValue())); + } + if (collationSpec.containsKey("maxVariable")) { + builder.collationMaxVariable(CollationMaxVariable.fromString(collationSpec.getString("maxVariable").getValue())); + } + if (collationSpec.containsKey("normalization")) { + builder.normalization(collationSpec.getBoolean("normalization").getValue()); + } + if (collationSpec.containsKey("backwards")) { + builder.backwards(collationSpec.getBoolean("backwards").getValue()); + } + return builder.build(); + } + +} diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 3797ca0621..5a040eab7b 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -21,4 +21,5 @@ org.apache.nifi.processors.mongodb.PutMongo org.apache.nifi.processors.mongodb.PutMongoRecord org.apache.nifi.processors.mongodb.gridfs.DeleteGridFS org.apache.nifi.processors.mongodb.gridfs.FetchGridFS -org.apache.nifi.processors.mongodb.gridfs.PutGridFS \ No newline at end of file +org.apache.nifi.processors.mongodb.gridfs.PutGridFS +org.apache.nifi.processors.mongodb.PutMongoBulkOperations \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.PutMongoBulkOperation/additionalDetails.html b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.PutMongoBulkOperation/additionalDetails.html new file mode 100644 index 0000000000..8d78f68f14 --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.PutMongoBulkOperation/additionalDetails.html @@ -0,0 +1,45 @@ + + + + + + PutMongoBulkOperation + + + + + +

Description:

+

+ This processor runs bulk updates against MongoDB collections. The flowfile content is expected to be a JSON array with bulk write operations as described in the manual for db.collection.bulkWrite. +

+

+ You can use all (currently 6) operators described there. The flowfile content is returned as-is. You can merge many operations into one - and get massive performance improvements. +

+

Example:

+

+ The following is an example flowfile content that does two things: insert a new document, and update all documents where value of hey is greater than zero. +

+ +
+                [
+                    {"insertOne": {"document": {"ho": 42}}},
+                    {"updateMany": {"filter": {"hey": {"$gt": 0}}, "update": {"$inc": {"hey": 2}}}}
+                ]
+            
+
+ + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/AbstractMongoIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/AbstractMongoIT.java index 4eac6c3b0d..16d07e64e3 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/AbstractMongoIT.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/AbstractMongoIT.java @@ -26,7 +26,7 @@ import org.testcontainers.utility.DockerImageName; @Testcontainers public class AbstractMongoIT { - private static final String DOCKER_IMAGE = System.getProperty("mongo.docker.image"); + private static final String DOCKER_IMAGE = System.getProperty("mongo.docker.image", "mongo:5"); @Container protected static final MongoDBContainer MONGO_CONTAINER = new MongoDBContainer(DockerImageName.parse(DOCKER_IMAGE)); } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/MongoWriteTestBase.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/MongoWriteTestBase.java index c595dc4346..d56477ef97 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/MongoWriteTestBase.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/MongoWriteTestBase.java @@ -24,6 +24,7 @@ import org.apache.nifi.mongodb.MongoDBClientService; import org.apache.nifi.mongodb.MongoDBControllerService; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.bson.BsonDocument; import org.bson.Document; import java.util.Arrays; @@ -68,6 +69,8 @@ public class MongoWriteTestBase extends AbstractMongoIT { } public void teardown() { + mongoClient.getDatabase(DATABASE_NAME).getCollection(COLLECTION_NAME).deleteMany(BsonDocument.parse("{}")); mongoClient.getDatabase(DATABASE_NAME).drop(); + mongoClient.close(); } } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoBulkOperationsIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoBulkOperationsIT.java new file mode 100644 index 0000000000..1655785118 --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoBulkOperationsIT.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.mongodb; + +import org.apache.nifi.util.TestRunner; +import org.bson.Document; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class PutMongoBulkOperationsIT extends MongoWriteTestBase { + + @BeforeEach + public void setup() { + super.setup(PutMongoBulkOperations.class); + } + + @Override + @AfterEach + public void teardown() { + super.teardown(); + } + + @Test + public void testBulkWriteInsert() throws Exception { + final TestRunner runner = init(PutMongoBulkOperations.class); + + StringBuffer doc = new StringBuffer(); + doc.append("["); + for (int i = 0; i < DOCUMENTS.size(); i++) { + if (i > 0) { + doc.append(", "); + } + doc.append("{\"insertOne\": {\"document\": "); + doc.append(DOCUMENTS.get(i).toJson()); + doc.append("}}"); + } + doc.append("]"); + runner.enqueue(doc.toString()); + runner.run(); + runner.assertTransferCount(PutMongo.REL_FAILURE, 0); + runner.assertTransferCount(PutMongo.REL_SUCCESS, 1); + + assertEquals(3, collection.countDocuments()); + Document doc1 = collection.find(new Document().append("_id", "doc_2")).first(); + assertNotNull(doc1); + assertEquals(4, doc1.getInteger("c", 0)); + } + + @Test + public void testBulkWriteUpdateOne() throws Exception { + final TestRunner runner = init(PutMongoBulkOperations.class); + + collection.insertMany(DOCUMENTS); + + runner.enqueue("[{\"updateOne\": {\"filter\": {\"_id\": {\"$in\": [\"doc_1\", \"doc_2\"]}}, \"update\": {\"$set\": {\"z\": 42}}}}]"); + runner.run(); + runner.assertTransferCount(PutMongo.REL_FAILURE, 0); + runner.assertTransferCount(PutMongo.REL_SUCCESS, 1); + + assertEquals(1, collection.countDocuments(new Document().append("z", 42))); + } + + @Test + public void testBulkWriteUpdateMany() throws Exception { + final TestRunner runner = init(PutMongoBulkOperations.class); + + collection.insertMany(DOCUMENTS); + + runner.enqueue("[{\"updateMany\": {\"filter\": {\"_id\": {\"$in\": [\"doc_1\", \"doc_2\"]}}, \"update\": {\"$set\": {\"z\": 42}}}}]"); + runner.run(); + runner.assertTransferCount(PutMongo.REL_FAILURE, 0); + runner.assertTransferCount(PutMongo.REL_SUCCESS, 1); + + assertEquals(2, collection.countDocuments(new Document().append("z", 42))); + } + + @Test + public void testBulkWriteReplaceOne() throws Exception { + final TestRunner runner = init(PutMongoBulkOperations.class); + + collection.insertMany(DOCUMENTS); + + runner.enqueue("[{\"replaceOne\": {\"filter\": {\"_id\": \"doc_1\"}, \"replacement\": {\"_id\": \"doc_1\", \"z\": 42}}}]"); + runner.run(); + runner.assertTransferCount(PutMongo.REL_FAILURE, 0); + runner.assertTransferCount(PutMongo.REL_SUCCESS, 1); + + assertEquals(1, collection.countDocuments(new Document().append("z", 42))); + Document doc1 = collection.find(new Document().append("_id", "doc_1")).first(); + assertNotNull(doc1); + assertEquals(42, doc1.getInteger("z", 0)); + assertNull(doc1.get("a")); + } + + @Test + public void testBulkWriteDeleteOne() throws Exception { + final TestRunner runner = init(PutMongoBulkOperations.class); + + collection.insertMany(DOCUMENTS); + + runner.enqueue("[{\"deleteOne\": {\"filter\": {\"_id\": {\"$in\": [\"doc_1\", \"doc_2\"]}}}}]"); + runner.run(); + runner.assertTransferCount(PutMongo.REL_FAILURE, 0); + runner.assertTransferCount(PutMongo.REL_SUCCESS, 1); + + assertEquals(2, collection.countDocuments()); + assertEquals(0, collection.countDocuments(new Document().append("z", 42))); + } + + @Test + public void testBulkWriteDeleteMany() throws Exception { + final TestRunner runner = init(PutMongoBulkOperations.class); + + collection.insertMany(DOCUMENTS); + + runner.enqueue("[{\"deleteMany\": {\"filter\": {\"_id\": {\"$in\": [\"doc_1\", \"doc_2\"]}}}}]"); + runner.run(); + runner.assertTransferCount(PutMongo.REL_FAILURE, 0); + runner.assertTransferCount(PutMongo.REL_SUCCESS, 1); + + assertEquals(1, collection.countDocuments()); + assertEquals(0, collection.countDocuments(new Document().append("z", 42))); + } + + @Test + public void testInvalid() throws Exception { + final TestRunner runner = init(PutMongoBulkOperations.class); + + runner.enqueue("[{\"whatever\": {\"filter\": {\"_id\": {\"$in\": [\"doc_1\", \"doc_2\"]}}}}]"); + runner.run(); + runner.assertTransferCount(PutMongo.REL_FAILURE, 1); + runner.assertTransferCount(PutMongo.REL_SUCCESS, 0); + } + + @Test + public void testBulkWriteOrderedAsIs() throws Exception { + final TestRunner runner = init(PutMongoBulkOperations.class); + runner.setProperty(PutMongoBulkOperations.ORDERED, "true"); // default, still + + StringBuffer doc = new StringBuffer(); + doc.append("["); + // inserting same ID twice fails w/in mongo, not before, so we can really test transactions and ordering + doc.append("{\"insertOne\": {\"document\": {\"_id\": \"doc_1\"}}},{\"insertOne\": {\"document\": "); + doc.append(DOCUMENTS.get(0).toJson()); + doc.append("}}]"); + runner.enqueue(doc.toString()); + runner.run(); + runner.assertTransferCount(PutMongo.REL_FAILURE, 1); + runner.assertTransferCount(PutMongo.REL_SUCCESS, 0); + + assertEquals(1, collection.countDocuments()); + } + + @Test + public void testBulkWriteOrderedNoTransaction() throws Exception { + final TestRunner runner = init(PutMongoBulkOperations.class); + runner.setProperty(PutMongoBulkOperations.ORDERED, "true"); // default, still + + StringBuffer doc = new StringBuffer(); + doc.append("["); + doc.append("{\"insertOne\": {\"document\": "); + doc.append(DOCUMENTS.get(0).toJson()); + // inserting same ID twice fails w/in mongo, not before, so we can really test transactions and ordering + doc.append("}}, {\"insertOne\": {\"document\": {\"_id\": \"doc_1\"}}}]"); + runner.enqueue(doc.toString()); + runner.run(); + runner.assertTransferCount(PutMongo.REL_FAILURE, 1); + runner.assertTransferCount(PutMongo.REL_SUCCESS, 0); + + assertEquals(1, collection.countDocuments()); + } + + @Test + public void testBulkWriteUnordered() throws Exception { + final TestRunner runner = init(PutMongoBulkOperations.class); + runner.setProperty(PutMongoBulkOperations.ORDERED, "false"); + + StringBuffer doc = new StringBuffer(); + doc.append("["); + // inserting same ID twice fails w/in mongo, not before, so we can really test transactions and ordering + doc.append("{\"insertOne\": {\"document\": {\"_id\": \"doc_1\"}}},{\"insertOne\": {\"document\": "); + doc.append(DOCUMENTS.get(0).toJson()); + doc.append("}}]"); + runner.enqueue(doc.toString()); + runner.run(); + runner.assertTransferCount(PutMongo.REL_FAILURE, 1); + runner.assertTransferCount(PutMongo.REL_SUCCESS, 0); + + assertEquals(1, collection.countDocuments()); + } + +}