From 2730a9000e7996a0f05735854e28507e499166ac Mon Sep 17 00:00:00 2001 From: Tamas Palfy Date: Fri, 15 Oct 2021 12:46:11 +0200 Subject: [PATCH] NIFI-9334 Add support for upsert in 'PutMongoRecord'. Use 'bulkWrite' for both insert and upsert. This closes #5482. Signed-off-by: Peter Turcsanyi --- .../processors/mongodb/PutMongoRecord.java | 205 +++++++- .../processors/mongodb/PutMongoRecordIT.java | 457 ++++++++++++++++++ 2 files changed, 639 insertions(+), 23 deletions(-) diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java index 50686719fc..9c3df044f6 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java @@ -19,10 +19,19 @@ package org.apache.nifi.processors.mongodb; import com.mongodb.MongoException; import com.mongodb.WriteConcern; import com.mongodb.client.MongoCollection; +import com.mongodb.client.model.BulkWriteOptions; +import com.mongodb.client.model.Filters; +import com.mongodb.client.model.InsertOneModel; +import com.mongodb.client.model.UpdateManyModel; +import com.mongodb.client.model.UpdateOneModel; +import com.mongodb.client.model.UpdateOptions; +import com.mongodb.client.model.WriteModel; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; @@ -39,30 +48,45 @@ import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.bson.Document; +import org.bson.conversions.Bson; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @EventDriven -@Tags({"mongodb", "insert", "record", "put"}) +@Tags({"mongodb", "insert", "update", "upsert", "record", "put"}) @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) -@CapabilityDescription("This processor is a record-aware processor for inserting data into MongoDB. It uses a configured record reader and " + - "schema to read an incoming record set from the body of a flowfile and then inserts batches of those records into " + - "a configured MongoDB collection. This processor does not support updates, deletes or upserts. The number of documents to insert at a time is controlled " + - "by the \"Insert Batch Size\" configuration property. This value should be set to a reasonable size to ensure " + - "that MongoDB is not overloaded with too many inserts at once.") +@CapabilityDescription("This processor is a record-aware processor for inserting/upserting data into MongoDB. It uses a configured record reader and " + + "schema to read an incoming record set from the body of a flowfile and then inserts/upserts batches of those records into " + + "a configured MongoDB collection. This processor does not support deletes. The number of documents to insert/upsert at a time is controlled " + + "by the \"Batch Size\" configuration property. This value should be set to a reasonable size to ensure " + + "that MongoDB is not overloaded with too many operations at once.") +@ReadsAttribute( + attribute = PutMongoRecord.MONGODB_UPDATE_MODE, + description = "Configurable parameter for controlling update mode on a per-flowfile basis." + + " Acceptable values are 'one' and 'many' and controls whether a single incoming record should update a single or multiple Mongo documents." +) public class PutMongoRecord extends AbstractMongoProcessor { + static final String MONGODB_UPDATE_MODE = "mongodb.update.mode"; + static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") .description("All FlowFiles that are written to MongoDB are routed to this relationship").build(); static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") .description("All FlowFiles that cannot be written to MongoDB are routed to this relationship").build(); + static final AllowableValue UPDATE_ONE = new AllowableValue("one", "Update One", "Updates only the first document that matches the query."); + static final AllowableValue UPDATE_MANY = new AllowableValue("many", "Update Many", "Updates every document that matches the query."); + static final AllowableValue UPDATE_FF_ATTRIBUTE = new AllowableValue("flowfile-attribute", "Use '" + MONGODB_UPDATE_MODE + "' flowfile attribute.", + "Use the value of the '" + MONGODB_UPDATE_MODE + "' attribute of the incoming flowfile. Acceptable values are 'one' and 'many'."); + static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() .name("record-reader") .displayName("Record Reader") @@ -70,15 +94,55 @@ public class PutMongoRecord extends AbstractMongoProcessor { .identifiesControllerService(RecordReaderFactory.class) .required(true) .build(); + static final PropertyDescriptor INSERT_COUNT = new PropertyDescriptor.Builder() .name("insert_count") - .displayName("Insert Batch Size") - .description("The number of records to group together for one single insert operation against MongoDB.") + .displayName("Batch Size") + .description("The number of records to group together for one single insert/upsert operation against MongoDB.") .defaultValue("100") .required(true) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); + static final PropertyDescriptor ORDERED = new PropertyDescriptor.Builder() + .name("ordered") + .displayName("Ordered") + .description("Perform ordered or unordered operations") + .allowableValues("True", "False") + .defaultValue("False") + .required(true) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + + static final PropertyDescriptor BYPASS_VALIDATION = new PropertyDescriptor.Builder() + .name("bypass-validation") + .displayName("Bypass Validation") + .description("Bypass schema validation during insert/upsert") + .allowableValues("True", "False") + .defaultValue("True") + .required(true) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + + static final PropertyDescriptor UPDATE_KEY_FIELDS = new PropertyDescriptor.Builder() + .name("update-key-fields") + .displayName("Update Key Fields") + .description("Comma separated list of fields based on which to identify documents that need to be updated. " + + "If this property is set NiFi will attempt an upsert operation on all documents. " + + "If this property is not set all documents will be inserted.") + .required(false) + .addValidator(StandardValidators.createListValidator(true, false, StandardValidators.NON_EMPTY_VALIDATOR)) + .build(); + + static final PropertyDescriptor UPDATE_MODE = new PropertyDescriptor.Builder() + .name("update-mode") + .displayName("Update Mode") + .dependsOn(UPDATE_KEY_FIELDS) + .description("Choose between updating a single document or multiple documents per incoming record.") + .allowableValues(UPDATE_ONE, UPDATE_MANY, UPDATE_FF_ATTRIBUTE) + .defaultValue(UPDATE_ONE.getValue()) + .build(); + private final static Set relationships; private final static List propertyDescriptors; @@ -88,6 +152,10 @@ public class PutMongoRecord extends AbstractMongoProcessor { _propertyDescriptors.add(WRITE_CONCERN); _propertyDescriptors.add(RECORD_READER_FACTORY); _propertyDescriptors.add(INSERT_COUNT); + _propertyDescriptors.add(ORDERED); + _propertyDescriptors.add(BYPASS_VALIDATION); + _propertyDescriptors.add(UPDATE_KEY_FIELDS); + _propertyDescriptors.add(UPDATE_MODE); propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); final Set _relationships = new HashSet<>(); @@ -118,15 +186,36 @@ public class PutMongoRecord extends AbstractMongoProcessor { final WriteConcern writeConcern = getWriteConcern(context); - List inserts = new ArrayList<>(); int ceiling = context.getProperty(INSERT_COUNT).asInteger(); - int added = 0; + int written = 0; boolean error = false; - try (final InputStream inStream = session.read(flowFile); - final RecordReader reader = recordParserFactory.createRecordReader(flowFile, inStream, getLogger())) { - final MongoCollection collection = getCollection(context, flowFile).withWriteConcern(writeConcern); + boolean ordered = context.getProperty(ORDERED).asBoolean(); + boolean bypass = context.getProperty(BYPASS_VALIDATION).asBoolean(); + + Map> updateKeyFieldPathToFieldChain = new LinkedHashMap<>(); + if (context.getProperty(UPDATE_KEY_FIELDS).isSet()) { + Arrays.stream(context.getProperty(UPDATE_KEY_FIELDS).getValue().split("\\s*,\\s*")) + .forEach(updateKeyField -> updateKeyFieldPathToFieldChain.put( + updateKeyField, + Arrays.asList(updateKeyField.split("\\.")) + )); + } + + BulkWriteOptions bulkWriteOptions = new BulkWriteOptions(); + bulkWriteOptions.ordered(ordered); + bulkWriteOptions.bypassDocumentValidation(bypass); + + try ( + final InputStream inStream = session.read(flowFile); + final RecordReader reader = recordParserFactory.createRecordReader(flowFile, inStream, getLogger()); + ) { RecordSchema schema = reader.getSchema(); + + final MongoCollection collection = getCollection(context, flowFile).withWriteConcern(writeConcern); + + List> writeModels = new ArrayList<>(); + Record record; while ((record = reader.nextRecord()) != null) { // Convert each Record to HashMap and put into the Mongo document @@ -135,17 +224,43 @@ public class PutMongoRecord extends AbstractMongoProcessor { for (String name : schema.getFieldNames()) { document.put(name, contentMap.get(name)); } - inserts.add(convertArrays(document)); - if (inserts.size() == ceiling) { - collection.insertMany(inserts); - added += inserts.size(); - inserts = new ArrayList<>(); + Document readyToUpsert = convertArrays(document); + + WriteModel writeModel; + if (context.getProperty(UPDATE_KEY_FIELDS).isSet()) { + Bson[] filters = buildFilters(updateKeyFieldPathToFieldChain, readyToUpsert); + + if (updateModeMatches(UPDATE_ONE.getValue(), context, flowFile)) { + writeModel = new UpdateOneModel<>( + Filters.and(filters), + new Document("$set", readyToUpsert), + new UpdateOptions().upsert(true) + ); + } else if (updateModeMatches(UPDATE_MANY.getValue(), context, flowFile)) { + writeModel = new UpdateManyModel<>( + Filters.and(filters), + new Document("$set", readyToUpsert), + new UpdateOptions().upsert(true) + ); + } else { + String flowfileUpdateMode = flowFile.getAttribute(MONGODB_UPDATE_MODE); + throw new ProcessException("Unrecognized '" + MONGODB_UPDATE_MODE + "' value '" + flowfileUpdateMode + "'"); + } + } else { + writeModel = new InsertOneModel<>(readyToUpsert); + } + + writeModels.add(writeModel); + if (writeModels.size() == ceiling) { + collection.bulkWrite(writeModels, bulkWriteOptions); + written += writeModels.size(); + writeModels = new ArrayList<>(); } } - if (inserts.size() > 0) { - collection.insertMany(inserts); + if (writeModels.size() > 0) { + collection.bulkWrite(writeModels, bulkWriteOptions); } - } catch (SchemaNotFoundException | IOException | MalformedRecordException | MongoException e) { + } catch (ProcessException | SchemaNotFoundException | IOException | MalformedRecordException | MongoException e) { getLogger().error("PutMongoRecord failed with error:", e); session.transfer(flowFile, REL_FAILURE); error = true; @@ -154,9 +269,9 @@ public class PutMongoRecord extends AbstractMongoProcessor { String url = clientService != null ? clientService.getURI() : context.getProperty(URI).evaluateAttributeExpressions().getValue(); - session.getProvenanceReporter().send(flowFile, url, String.format("Added %d documents to MongoDB.", added)); + session.getProvenanceReporter().send(flowFile, url, String.format("Written %d documents to MongoDB.", written)); session.transfer(flowFile, REL_SUCCESS); - getLogger().info("Inserted {} records into MongoDB", new Object[]{ added }); + getLogger().info("Written {} records into MongoDB", new Object[]{ written }); } } } @@ -190,4 +305,48 @@ public class PutMongoRecord extends AbstractMongoProcessor { return retVal; } + + private Bson[] buildFilters(Map> updateKeyFieldPathToFieldChain, Document readyToUpsert) { + Bson[] filters = updateKeyFieldPathToFieldChain.entrySet() + .stream() + .map(updateKeyFieldPath__fieldChain -> { + String fieldPath = updateKeyFieldPath__fieldChain.getKey(); + List fieldChain = updateKeyFieldPath__fieldChain.getValue(); + + Object value = readyToUpsert; + String previousField = null; + for (String field : fieldChain) { + if (!(value instanceof Map)) { + throw new ProcessException("field '" + previousField + "' (from field expression '" + fieldPath + "') is not an embedded document"); + } + + value = ((Map) value).get(field); + + if (value == null) { + throw new ProcessException("field '" + field + "' (from field expression '" + fieldPath + "') has no value"); + } + + previousField = field; + } + + Bson filter = Filters.eq(fieldPath, value); + return filter; + }) + .toArray(Bson[]::new); + + return filters; + } + + private boolean updateModeMatches(String updateModeToMatch, ProcessContext context, FlowFile flowFile) { + String updateMode = context.getProperty(UPDATE_MODE).getValue(); + + boolean updateModeMatches = updateMode.equals(updateModeToMatch) + || + ( + updateMode.equals(UPDATE_FF_ATTRIBUTE.getValue()) + && updateModeToMatch.equalsIgnoreCase(flowFile.getAttribute(MONGODB_UPDATE_MODE)) + ); + + return updateModeMatches; + } } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java index a30a76c53a..e57769ba17 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java @@ -45,12 +45,16 @@ import org.junit.jupiter.api.Test; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -243,4 +247,457 @@ public class PutMongoRecordIT extends MongoWriteTestBase { runner.assertTransferCount(PutMongoRecord.REL_FAILURE, 0); runner.assertTransferCount(PutMongoRecord.REL_SUCCESS, 1); } + + @Test + void testUpsertAsInsert() throws Exception { + // GIVEN + TestRunner runner = init(); + + runner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "id"); + + recordReader.addSchemaField("id", RecordFieldType.INT); + recordReader.addSchemaField("person", RecordFieldType.RECORD); + + final RecordSchema personSchema = new SimpleRecordSchema(Arrays.asList( + new RecordField("name", RecordFieldType.STRING.getDataType()), + new RecordField("age", RecordFieldType.INT.getDataType()) + )); + + List> inputs = Arrays.asList( + Arrays.asList( + new Object[]{1, new MapRecord(personSchema, new HashMap() {{ + put("name", "name1"); + put("age", 21); + }})}, + new Object[]{2, new MapRecord(personSchema, new HashMap() {{ + put("name", "name2"); + put("age", 22); + }})} + ) + ); + + Set> expected = new HashSet<>(Arrays.asList( + new HashMap() {{ + put("id", 1); + put("person", new Document(new HashMap() {{ + put("name", "name1"); + put("age", 21); + }})); + }}, + new HashMap() {{ + put("id", 2); + put("person", new Document(new HashMap() {{ + put("name", "name2"); + put("age", 22); + }})); + }} + )); + + // WHEN + // THEN + testUpsertSuccess(runner, inputs, expected); + } + + @Test + void testUpsertAsUpdate() throws Exception { + // GIVEN + TestRunner runner = init(); + + runner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "id"); + + recordReader.addSchemaField("id", RecordFieldType.INT); + recordReader.addSchemaField("person", RecordFieldType.RECORD); + + final RecordSchema personSchema = new SimpleRecordSchema(Arrays.asList( + new RecordField("name", RecordFieldType.STRING.getDataType()), + new RecordField("age", RecordFieldType.INT.getDataType()) + )); + + List> inputs = Arrays.asList( + Arrays.asList( + new Object[]{1, new MapRecord(personSchema, new HashMap() {{ + put("name", "updating_name1"); + put("age", "age1".length()); + }})}, + new Object[]{2, new MapRecord(personSchema, new HashMap() {{ + put("name", "name2"); + put("age", "updating_age2".length()); + }})} + ), + Arrays.asList( + new Object[]{1, new MapRecord(personSchema, new HashMap() {{ + put("name", "updated_name1"); + put("age", "age1".length()); + }})}, + new Object[]{2, new MapRecord(personSchema, new HashMap() {{ + put("name", "name2"); + put("age", "updated_age2".length()); + }})} + ) + ); + + Set> expected = new HashSet<>(Arrays.asList( + new HashMap() {{ + put("id", 1); + put("person", new Document(new HashMap() {{ + put("name", "updated_name1"); + put("age", "age1".length()); + }})); + }}, + new HashMap() {{ + put("id", 2); + put("person", new Document(new HashMap() {{ + put("name", "name2"); + put("age", "updated_age2".length()); + }})); + }} + )); + + // WHEN + testUpsertSuccess(runner, inputs, expected); + } + + @Test + void testUpsertAsInsertAndUpdate() throws Exception { + // GIVEN + TestRunner runner = init(); + + runner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "id"); + + recordReader.addSchemaField("id", RecordFieldType.INT); + recordReader.addSchemaField("person", RecordFieldType.RECORD); + + final RecordSchema personSchema = new SimpleRecordSchema(Arrays.asList( + new RecordField("name", RecordFieldType.STRING.getDataType()), + new RecordField("age", RecordFieldType.INT.getDataType()) + )); + + List> inputs = Arrays.asList( + Collections.singletonList( + new Object[]{1, new MapRecord(personSchema, new HashMap() {{ + put("name", "updating_name1"); + put("age", "updating_age1".length()); + }})} + ), + Arrays.asList( + new Object[]{1, new MapRecord(personSchema, new HashMap() {{ + put("name", "updated_name1"); + put("age", "updated_age1".length()); + }})}, + new Object[]{2, new MapRecord(personSchema, new HashMap() {{ + put("name", "inserted_name2"); + put("age", "inserted_age2".length()); + }})} + ) + ); + + Set> expected = new HashSet<>(Arrays.asList( + new HashMap() {{ + put("id", 1); + put("person", new Document(new HashMap() {{ + put("name", "updated_name1"); + put("age", "updated_age1".length()); + }})); + }}, + new HashMap() {{ + put("id", 2); + put("person", new Document(new HashMap() {{ + put("name", "inserted_name2"); + put("age", "inserted_age2".length()); + }})); + }} + )); + + // WHEN + testUpsertSuccess(runner, inputs, expected); + } + + @Test + void testRouteToFailureWhenKeyFieldDoesNotExist() throws Exception { + // GIVEN + TestRunner runner = init(); + + runner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "id,non_existent_field"); + + recordReader.addSchemaField("id", RecordFieldType.INT); + recordReader.addSchemaField("person", RecordFieldType.RECORD); + + final RecordSchema personSchema = new SimpleRecordSchema(Arrays.asList( + new RecordField("name", RecordFieldType.STRING.getDataType()), + new RecordField("age", RecordFieldType.INT.getDataType()) + )); + + List> inputs = Arrays.asList( + Collections.singletonList( + new Object[]{1, new MapRecord(personSchema, new HashMap() {{ + put("name", "unimportant"); + put("age", "unimportant".length()); + }})} + ) + ); + + // WHEN + // THEN + testUpsertFailure(runner, inputs); + } + + @Test + void testUpdateMany() throws Exception { + // GIVEN + TestRunner initRunner = init(); + + // Init Mongo data + recordReader.addSchemaField("name", RecordFieldType.STRING); + recordReader.addSchemaField("team", RecordFieldType.STRING); + recordReader.addSchemaField("color", RecordFieldType.STRING); + + List init = Arrays.asList( + new Object[]{"Joe", "A", "green"}, + new Object[]{"Jane", "A", "green"}, + new Object[]{"Jeff", "B", "blue"}, + new Object[]{"Janet", "B", "blue"} + ); + + init.forEach(recordReader::addRecord); + + initRunner.enqueue(""); + initRunner.run(); + + // Update Mongo data + setup(); + TestRunner updateRunner = init(); + + updateRunner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "team"); + updateRunner.setProperty(PutMongoRecord.UPDATE_MODE, PutMongoRecord.UPDATE_MANY.getValue()); + + recordReader.addSchemaField("team", RecordFieldType.STRING); + recordReader.addSchemaField("color", RecordFieldType.STRING); + + List> inputs = Arrays.asList( + Arrays.asList( + new Object[]{"A", "yellow"}, + new Object[]{"B", "red"} + ) + ); + + Set> expected = new HashSet<>(Arrays.asList( + new HashMap() {{ + put("name", "Joe"); + put("team", "A"); + put("color", "yellow"); + }}, + new HashMap() {{ + put("name", "Jane"); + put("team", "A"); + put("color", "yellow"); + }}, + new HashMap() {{ + put("name", "Jeff"); + put("team", "B"); + put("color", "red"); + }}, + new HashMap() {{ + put("name", "Janet"); + put("team", "B"); + put("color", "red"); + }} + )); + + // WHEN + // THEN + testUpsertSuccess(updateRunner, inputs, expected); + } + + @Test + void testUpdateModeFFAttributeSetToMany() throws Exception { + // GIVEN + TestRunner initRunner = init(); + + // Init Mongo data + recordReader.addSchemaField("name", RecordFieldType.STRING); + recordReader.addSchemaField("team", RecordFieldType.STRING); + recordReader.addSchemaField("color", RecordFieldType.STRING); + + List init = Arrays.asList( + new Object[]{"Joe", "A", "green"}, + new Object[]{"Jane", "A", "green"}, + new Object[]{"Jeff", "B", "blue"}, + new Object[]{"Janet", "B", "blue"} + ); + + init.forEach(recordReader::addRecord); + + initRunner.enqueue(""); + initRunner.run(); + + // Update Mongo data + setup(); + TestRunner updateRunner = init(); + + updateRunner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "team"); + updateRunner.setProperty(PutMongoRecord.UPDATE_MODE, PutMongoRecord.UPDATE_FF_ATTRIBUTE.getValue()); + + recordReader.addSchemaField("team", RecordFieldType.STRING); + recordReader.addSchemaField("color", RecordFieldType.STRING); + + List> inputs = Arrays.asList( + Arrays.asList( + new Object[]{"A", "yellow"}, + new Object[]{"B", "red"} + ) + ); + + Set> expected = new HashSet<>(Arrays.asList( + new HashMap() {{ + put("name", "Joe"); + put("team", "A"); + put("color", "yellow"); + }}, + new HashMap() {{ + put("name", "Jane"); + put("team", "A"); + put("color", "yellow"); + }}, + new HashMap() {{ + put("name", "Jeff"); + put("team", "B"); + put("color", "red"); + }}, + new HashMap() {{ + put("name", "Janet"); + put("team", "B"); + put("color", "red"); + }} + )); + + // WHEN + inputs.forEach(input -> { + input.forEach(recordReader::addRecord); + + MockFlowFile flowFile = new MockFlowFile(1); + flowFile.putAttributes(new HashMap(){{ + put(PutMongoRecord.MONGODB_UPDATE_MODE, "many"); + }}); + updateRunner.enqueue(flowFile); + updateRunner.run(); + }); + + // THEN + assertEquals(0, updateRunner.getQueueSize().getObjectCount()); + + updateRunner.assertAllFlowFilesTransferred(PutMongoRecord.REL_SUCCESS, inputs.size()); + + Set> actual = new HashSet<>(); + for (Document document : collection.find()) { + actual.add(document.entrySet().stream() + .filter(key__value -> !key__value.getKey().equals("_id")) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); + } + + assertEquals(expected, actual); + } + + @Test + void testRouteToFailureWhenUpdateModeFFAttributeSetToInvalid() throws Exception { + TestRunner runner = init(); + + runner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "team"); + runner.setProperty(PutMongoRecord.UPDATE_MODE, PutMongoRecord.UPDATE_FF_ATTRIBUTE.getValue()); + + recordReader.addSchemaField("team", RecordFieldType.STRING); + recordReader.addSchemaField("color", RecordFieldType.STRING); + + List> inputs = Arrays.asList( + Arrays.asList( + new Object[]{"A", "yellow"}, + new Object[]{"B", "red"} + ) + ); + + // WHEN + // THEN + testUpsertFailure(runner, inputs); + } + + @Test + void testRouteToFailureWhenKeyFieldReferencesNonEmbeddedDocument() throws Exception { + // GIVEN + TestRunner runner = init(); + + runner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "id,id.is_not_an_embedded_document"); + + recordReader.addSchemaField("id", RecordFieldType.INT); + recordReader.addSchemaField("person", RecordFieldType.RECORD); + + final RecordSchema personSchema = new SimpleRecordSchema(Arrays.asList( + new RecordField("name", RecordFieldType.STRING.getDataType()), + new RecordField("age", RecordFieldType.INT.getDataType()) + )); + + List> inputs = Arrays.asList( + Collections.singletonList( + new Object[]{1, new MapRecord(personSchema, new HashMap() {{ + put("name", "unimportant"); + put("age", "unimportant".length()); + }})} + ) + ); + + // WHEN + // THEN + testUpsertFailure(runner, inputs); + } + + private void testUpsertSuccess(TestRunner runner, List> inputs, Set> expected) { + // GIVEN + + // WHEN + inputs.forEach(input -> { + input.forEach(recordReader::addRecord); + + runner.enqueue(""); + runner.run(); + }); + + // THEN + assertEquals(0, runner.getQueueSize().getObjectCount()); + + runner.assertAllFlowFilesTransferred(PutMongoRecord.REL_SUCCESS, inputs.size()); + + Set> actual = new HashSet<>(); + for (Document document : collection.find()) { + actual.add(document.entrySet().stream() + .filter(key__value -> !key__value.getKey().equals("_id")) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); + } + + assertEquals(expected, actual); + } + + private void testUpsertFailure(TestRunner runner, List> inputs) { + // GIVEN + Set expected = Collections.emptySet(); + + // WHEN + inputs.forEach(input -> { + input.forEach(recordReader::addRecord); + + runner.enqueue(""); + runner.run(); + }); + + // THEN + assertEquals(0, runner.getQueueSize().getObjectCount()); + + runner.assertAllFlowFilesTransferred(PutMongoRecord.REL_FAILURE, inputs.size()); + + Set> actual = new HashSet<>(); + for (Document document : collection.find()) { + actual.add(document.entrySet().stream() + .filter(key__value -> !key__value.getKey().equals("_id")) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); + } + + assertEquals(expected, actual); + } }