NIFI-9334 Add support for upsert in 'PutMongoRecord'. Use 'bulkWrite' for both insert and upsert.

This closes #5482.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Tamas Palfy 2021-10-15 12:46:11 +02:00 committed by Peter Turcsanyi
parent 5aced2b4bc
commit 2730a9000e
2 changed files with 639 additions and 23 deletions

View File

@ -19,10 +19,19 @@ package org.apache.nifi.processors.mongodb;
import com.mongodb.MongoException;
import com.mongodb.WriteConcern;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.UpdateManyModel;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
@ -39,30 +48,45 @@ import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.bson.Document;
import org.bson.conversions.Bson;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@EventDriven
@Tags({"mongodb", "insert", "record", "put"})
@Tags({"mongodb", "insert", "update", "upsert", "record", "put"})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription("This processor is a record-aware processor for inserting data into MongoDB. It uses a configured record reader and " +
"schema to read an incoming record set from the body of a flowfile and then inserts batches of those records into " +
"a configured MongoDB collection. This processor does not support updates, deletes or upserts. The number of documents to insert at a time is controlled " +
"by the \"Insert Batch Size\" configuration property. This value should be set to a reasonable size to ensure " +
"that MongoDB is not overloaded with too many inserts at once.")
@CapabilityDescription("This processor is a record-aware processor for inserting/upserting data into MongoDB. It uses a configured record reader and " +
"schema to read an incoming record set from the body of a flowfile and then inserts/upserts batches of those records into " +
"a configured MongoDB collection. This processor does not support deletes. The number of documents to insert/upsert at a time is controlled " +
"by the \"Batch Size\" configuration property. This value should be set to a reasonable size to ensure " +
"that MongoDB is not overloaded with too many operations at once.")
@ReadsAttribute(
attribute = PutMongoRecord.MONGODB_UPDATE_MODE,
description = "Configurable parameter for controlling update mode on a per-flowfile basis." +
" Acceptable values are 'one' and 'many' and controls whether a single incoming record should update a single or multiple Mongo documents."
)
public class PutMongoRecord extends AbstractMongoProcessor {
static final String MONGODB_UPDATE_MODE = "mongodb.update.mode";
static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("All FlowFiles that are written to MongoDB are routed to this relationship").build();
static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("All FlowFiles that cannot be written to MongoDB are routed to this relationship").build();
static final AllowableValue UPDATE_ONE = new AllowableValue("one", "Update One", "Updates only the first document that matches the query.");
static final AllowableValue UPDATE_MANY = new AllowableValue("many", "Update Many", "Updates every document that matches the query.");
static final AllowableValue UPDATE_FF_ATTRIBUTE = new AllowableValue("flowfile-attribute", "Use '" + MONGODB_UPDATE_MODE + "' flowfile attribute.",
"Use the value of the '" + MONGODB_UPDATE_MODE + "' attribute of the incoming flowfile. Acceptable values are 'one' and 'many'.");
static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
.name("record-reader")
.displayName("Record Reader")
@ -70,15 +94,55 @@ public class PutMongoRecord extends AbstractMongoProcessor {
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
.build();
static final PropertyDescriptor INSERT_COUNT = new PropertyDescriptor.Builder()
.name("insert_count")
.displayName("Insert Batch Size")
.description("The number of records to group together for one single insert operation against MongoDB.")
.displayName("Batch Size")
.description("The number of records to group together for one single insert/upsert operation against MongoDB.")
.defaultValue("100")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
static final PropertyDescriptor ORDERED = new PropertyDescriptor.Builder()
.name("ordered")
.displayName("Ordered")
.description("Perform ordered or unordered operations")
.allowableValues("True", "False")
.defaultValue("False")
.required(true)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
static final PropertyDescriptor BYPASS_VALIDATION = new PropertyDescriptor.Builder()
.name("bypass-validation")
.displayName("Bypass Validation")
.description("Bypass schema validation during insert/upsert")
.allowableValues("True", "False")
.defaultValue("True")
.required(true)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
static final PropertyDescriptor UPDATE_KEY_FIELDS = new PropertyDescriptor.Builder()
.name("update-key-fields")
.displayName("Update Key Fields")
.description("Comma separated list of fields based on which to identify documents that need to be updated. " +
"If this property is set NiFi will attempt an upsert operation on all documents. " +
"If this property is not set all documents will be inserted.")
.required(false)
.addValidator(StandardValidators.createListValidator(true, false, StandardValidators.NON_EMPTY_VALIDATOR))
.build();
static final PropertyDescriptor UPDATE_MODE = new PropertyDescriptor.Builder()
.name("update-mode")
.displayName("Update Mode")
.dependsOn(UPDATE_KEY_FIELDS)
.description("Choose between updating a single document or multiple documents per incoming record.")
.allowableValues(UPDATE_ONE, UPDATE_MANY, UPDATE_FF_ATTRIBUTE)
.defaultValue(UPDATE_ONE.getValue())
.build();
private final static Set<Relationship> relationships;
private final static List<PropertyDescriptor> propertyDescriptors;
@ -88,6 +152,10 @@ public class PutMongoRecord extends AbstractMongoProcessor {
_propertyDescriptors.add(WRITE_CONCERN);
_propertyDescriptors.add(RECORD_READER_FACTORY);
_propertyDescriptors.add(INSERT_COUNT);
_propertyDescriptors.add(ORDERED);
_propertyDescriptors.add(BYPASS_VALIDATION);
_propertyDescriptors.add(UPDATE_KEY_FIELDS);
_propertyDescriptors.add(UPDATE_MODE);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
final Set<Relationship> _relationships = new HashSet<>();
@ -118,15 +186,36 @@ public class PutMongoRecord extends AbstractMongoProcessor {
final WriteConcern writeConcern = getWriteConcern(context);
List<Document> inserts = new ArrayList<>();
int ceiling = context.getProperty(INSERT_COUNT).asInteger();
int added = 0;
int written = 0;
boolean error = false;
try (final InputStream inStream = session.read(flowFile);
final RecordReader reader = recordParserFactory.createRecordReader(flowFile, inStream, getLogger())) {
final MongoCollection<Document> collection = getCollection(context, flowFile).withWriteConcern(writeConcern);
boolean ordered = context.getProperty(ORDERED).asBoolean();
boolean bypass = context.getProperty(BYPASS_VALIDATION).asBoolean();
Map<String, List<String>> updateKeyFieldPathToFieldChain = new LinkedHashMap<>();
if (context.getProperty(UPDATE_KEY_FIELDS).isSet()) {
Arrays.stream(context.getProperty(UPDATE_KEY_FIELDS).getValue().split("\\s*,\\s*"))
.forEach(updateKeyField -> updateKeyFieldPathToFieldChain.put(
updateKeyField,
Arrays.asList(updateKeyField.split("\\."))
));
}
BulkWriteOptions bulkWriteOptions = new BulkWriteOptions();
bulkWriteOptions.ordered(ordered);
bulkWriteOptions.bypassDocumentValidation(bypass);
try (
final InputStream inStream = session.read(flowFile);
final RecordReader reader = recordParserFactory.createRecordReader(flowFile, inStream, getLogger());
) {
RecordSchema schema = reader.getSchema();
final MongoCollection<Document> collection = getCollection(context, flowFile).withWriteConcern(writeConcern);
List<WriteModel<Document>> writeModels = new ArrayList<>();
Record record;
while ((record = reader.nextRecord()) != null) {
// Convert each Record to HashMap and put into the Mongo document
@ -135,17 +224,43 @@ public class PutMongoRecord extends AbstractMongoProcessor {
for (String name : schema.getFieldNames()) {
document.put(name, contentMap.get(name));
}
inserts.add(convertArrays(document));
if (inserts.size() == ceiling) {
collection.insertMany(inserts);
added += inserts.size();
inserts = new ArrayList<>();
Document readyToUpsert = convertArrays(document);
WriteModel<Document> writeModel;
if (context.getProperty(UPDATE_KEY_FIELDS).isSet()) {
Bson[] filters = buildFilters(updateKeyFieldPathToFieldChain, readyToUpsert);
if (updateModeMatches(UPDATE_ONE.getValue(), context, flowFile)) {
writeModel = new UpdateOneModel<>(
Filters.and(filters),
new Document("$set", readyToUpsert),
new UpdateOptions().upsert(true)
);
} else if (updateModeMatches(UPDATE_MANY.getValue(), context, flowFile)) {
writeModel = new UpdateManyModel<>(
Filters.and(filters),
new Document("$set", readyToUpsert),
new UpdateOptions().upsert(true)
);
} else {
String flowfileUpdateMode = flowFile.getAttribute(MONGODB_UPDATE_MODE);
throw new ProcessException("Unrecognized '" + MONGODB_UPDATE_MODE + "' value '" + flowfileUpdateMode + "'");
}
} else {
writeModel = new InsertOneModel<>(readyToUpsert);
}
writeModels.add(writeModel);
if (writeModels.size() == ceiling) {
collection.bulkWrite(writeModels, bulkWriteOptions);
written += writeModels.size();
writeModels = new ArrayList<>();
}
}
if (inserts.size() > 0) {
collection.insertMany(inserts);
if (writeModels.size() > 0) {
collection.bulkWrite(writeModels, bulkWriteOptions);
}
} catch (SchemaNotFoundException | IOException | MalformedRecordException | MongoException e) {
} catch (ProcessException | SchemaNotFoundException | IOException | MalformedRecordException | MongoException e) {
getLogger().error("PutMongoRecord failed with error:", e);
session.transfer(flowFile, REL_FAILURE);
error = true;
@ -154,9 +269,9 @@ public class PutMongoRecord extends AbstractMongoProcessor {
String url = clientService != null
? clientService.getURI()
: context.getProperty(URI).evaluateAttributeExpressions().getValue();
session.getProvenanceReporter().send(flowFile, url, String.format("Added %d documents to MongoDB.", added));
session.getProvenanceReporter().send(flowFile, url, String.format("Written %d documents to MongoDB.", written));
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Inserted {} records into MongoDB", new Object[]{ added });
getLogger().info("Written {} records into MongoDB", new Object[]{ written });
}
}
}
@ -190,4 +305,48 @@ public class PutMongoRecord extends AbstractMongoProcessor {
return retVal;
}
private Bson[] buildFilters(Map<String, List<String>> updateKeyFieldPathToFieldChain, Document readyToUpsert) {
Bson[] filters = updateKeyFieldPathToFieldChain.entrySet()
.stream()
.map(updateKeyFieldPath__fieldChain -> {
String fieldPath = updateKeyFieldPath__fieldChain.getKey();
List<String> fieldChain = updateKeyFieldPath__fieldChain.getValue();
Object value = readyToUpsert;
String previousField = null;
for (String field : fieldChain) {
if (!(value instanceof Map)) {
throw new ProcessException("field '" + previousField + "' (from field expression '" + fieldPath + "') is not an embedded document");
}
value = ((Map) value).get(field);
if (value == null) {
throw new ProcessException("field '" + field + "' (from field expression '" + fieldPath + "') has no value");
}
previousField = field;
}
Bson filter = Filters.eq(fieldPath, value);
return filter;
})
.toArray(Bson[]::new);
return filters;
}
private boolean updateModeMatches(String updateModeToMatch, ProcessContext context, FlowFile flowFile) {
String updateMode = context.getProperty(UPDATE_MODE).getValue();
boolean updateModeMatches = updateMode.equals(updateModeToMatch)
||
(
updateMode.equals(UPDATE_FF_ATTRIBUTE.getValue())
&& updateModeToMatch.equalsIgnoreCase(flowFile.getAttribute(MONGODB_UPDATE_MODE))
);
return updateModeMatches;
}
}

View File

@ -45,12 +45,16 @@ import org.junit.jupiter.api.Test;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -243,4 +247,457 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
runner.assertTransferCount(PutMongoRecord.REL_FAILURE, 0);
runner.assertTransferCount(PutMongoRecord.REL_SUCCESS, 1);
}
@Test
void testUpsertAsInsert() throws Exception {
// GIVEN
TestRunner runner = init();
runner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "id");
recordReader.addSchemaField("id", RecordFieldType.INT);
recordReader.addSchemaField("person", RecordFieldType.RECORD);
final RecordSchema personSchema = new SimpleRecordSchema(Arrays.asList(
new RecordField("name", RecordFieldType.STRING.getDataType()),
new RecordField("age", RecordFieldType.INT.getDataType())
));
List<List<Object[]>> inputs = Arrays.asList(
Arrays.asList(
new Object[]{1, new MapRecord(personSchema, new HashMap<String, Object>() {{
put("name", "name1");
put("age", 21);
}})},
new Object[]{2, new MapRecord(personSchema, new HashMap<String, Object>() {{
put("name", "name2");
put("age", 22);
}})}
)
);
Set<Map<String, Object>> expected = new HashSet<>(Arrays.asList(
new HashMap<String, Object>() {{
put("id", 1);
put("person", new Document(new HashMap<String, Object>() {{
put("name", "name1");
put("age", 21);
}}));
}},
new HashMap<String, Object>() {{
put("id", 2);
put("person", new Document(new HashMap<String, Object>() {{
put("name", "name2");
put("age", 22);
}}));
}}
));
// WHEN
// THEN
testUpsertSuccess(runner, inputs, expected);
}
@Test
void testUpsertAsUpdate() throws Exception {
// GIVEN
TestRunner runner = init();
runner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "id");
recordReader.addSchemaField("id", RecordFieldType.INT);
recordReader.addSchemaField("person", RecordFieldType.RECORD);
final RecordSchema personSchema = new SimpleRecordSchema(Arrays.asList(
new RecordField("name", RecordFieldType.STRING.getDataType()),
new RecordField("age", RecordFieldType.INT.getDataType())
));
List<List<Object[]>> inputs = Arrays.asList(
Arrays.asList(
new Object[]{1, new MapRecord(personSchema, new HashMap<String, Object>() {{
put("name", "updating_name1");
put("age", "age1".length());
}})},
new Object[]{2, new MapRecord(personSchema, new HashMap<String, Object>() {{
put("name", "name2");
put("age", "updating_age2".length());
}})}
),
Arrays.asList(
new Object[]{1, new MapRecord(personSchema, new HashMap<String, Object>() {{
put("name", "updated_name1");
put("age", "age1".length());
}})},
new Object[]{2, new MapRecord(personSchema, new HashMap<String, Object>() {{
put("name", "name2");
put("age", "updated_age2".length());
}})}
)
);
Set<Map<String, Object>> expected = new HashSet<>(Arrays.asList(
new HashMap<String, Object>() {{
put("id", 1);
put("person", new Document(new HashMap<String, Object>() {{
put("name", "updated_name1");
put("age", "age1".length());
}}));
}},
new HashMap<String, Object>() {{
put("id", 2);
put("person", new Document(new HashMap<String, Object>() {{
put("name", "name2");
put("age", "updated_age2".length());
}}));
}}
));
// WHEN
testUpsertSuccess(runner, inputs, expected);
}
@Test
void testUpsertAsInsertAndUpdate() throws Exception {
// GIVEN
TestRunner runner = init();
runner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "id");
recordReader.addSchemaField("id", RecordFieldType.INT);
recordReader.addSchemaField("person", RecordFieldType.RECORD);
final RecordSchema personSchema = new SimpleRecordSchema(Arrays.asList(
new RecordField("name", RecordFieldType.STRING.getDataType()),
new RecordField("age", RecordFieldType.INT.getDataType())
));
List<List<Object[]>> inputs = Arrays.asList(
Collections.singletonList(
new Object[]{1, new MapRecord(personSchema, new HashMap<String, Object>() {{
put("name", "updating_name1");
put("age", "updating_age1".length());
}})}
),
Arrays.asList(
new Object[]{1, new MapRecord(personSchema, new HashMap<String, Object>() {{
put("name", "updated_name1");
put("age", "updated_age1".length());
}})},
new Object[]{2, new MapRecord(personSchema, new HashMap<String, Object>() {{
put("name", "inserted_name2");
put("age", "inserted_age2".length());
}})}
)
);
Set<Map<String, Object>> expected = new HashSet<>(Arrays.asList(
new HashMap<String, Object>() {{
put("id", 1);
put("person", new Document(new HashMap<String, Object>() {{
put("name", "updated_name1");
put("age", "updated_age1".length());
}}));
}},
new HashMap<String, Object>() {{
put("id", 2);
put("person", new Document(new HashMap<String, Object>() {{
put("name", "inserted_name2");
put("age", "inserted_age2".length());
}}));
}}
));
// WHEN
testUpsertSuccess(runner, inputs, expected);
}
@Test
void testRouteToFailureWhenKeyFieldDoesNotExist() throws Exception {
// GIVEN
TestRunner runner = init();
runner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "id,non_existent_field");
recordReader.addSchemaField("id", RecordFieldType.INT);
recordReader.addSchemaField("person", RecordFieldType.RECORD);
final RecordSchema personSchema = new SimpleRecordSchema(Arrays.asList(
new RecordField("name", RecordFieldType.STRING.getDataType()),
new RecordField("age", RecordFieldType.INT.getDataType())
));
List<List<Object[]>> inputs = Arrays.asList(
Collections.singletonList(
new Object[]{1, new MapRecord(personSchema, new HashMap<String, Object>() {{
put("name", "unimportant");
put("age", "unimportant".length());
}})}
)
);
// WHEN
// THEN
testUpsertFailure(runner, inputs);
}
@Test
void testUpdateMany() throws Exception {
// GIVEN
TestRunner initRunner = init();
// Init Mongo data
recordReader.addSchemaField("name", RecordFieldType.STRING);
recordReader.addSchemaField("team", RecordFieldType.STRING);
recordReader.addSchemaField("color", RecordFieldType.STRING);
List<Object[]> init = Arrays.asList(
new Object[]{"Joe", "A", "green"},
new Object[]{"Jane", "A", "green"},
new Object[]{"Jeff", "B", "blue"},
new Object[]{"Janet", "B", "blue"}
);
init.forEach(recordReader::addRecord);
initRunner.enqueue("");
initRunner.run();
// Update Mongo data
setup();
TestRunner updateRunner = init();
updateRunner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "team");
updateRunner.setProperty(PutMongoRecord.UPDATE_MODE, PutMongoRecord.UPDATE_MANY.getValue());
recordReader.addSchemaField("team", RecordFieldType.STRING);
recordReader.addSchemaField("color", RecordFieldType.STRING);
List<List<Object[]>> inputs = Arrays.asList(
Arrays.asList(
new Object[]{"A", "yellow"},
new Object[]{"B", "red"}
)
);
Set<Map<String, Object>> expected = new HashSet<>(Arrays.asList(
new HashMap<String, Object>() {{
put("name", "Joe");
put("team", "A");
put("color", "yellow");
}},
new HashMap<String, Object>() {{
put("name", "Jane");
put("team", "A");
put("color", "yellow");
}},
new HashMap<String, Object>() {{
put("name", "Jeff");
put("team", "B");
put("color", "red");
}},
new HashMap<String, Object>() {{
put("name", "Janet");
put("team", "B");
put("color", "red");
}}
));
// WHEN
// THEN
testUpsertSuccess(updateRunner, inputs, expected);
}
@Test
void testUpdateModeFFAttributeSetToMany() throws Exception {
// GIVEN
TestRunner initRunner = init();
// Init Mongo data
recordReader.addSchemaField("name", RecordFieldType.STRING);
recordReader.addSchemaField("team", RecordFieldType.STRING);
recordReader.addSchemaField("color", RecordFieldType.STRING);
List<Object[]> init = Arrays.asList(
new Object[]{"Joe", "A", "green"},
new Object[]{"Jane", "A", "green"},
new Object[]{"Jeff", "B", "blue"},
new Object[]{"Janet", "B", "blue"}
);
init.forEach(recordReader::addRecord);
initRunner.enqueue("");
initRunner.run();
// Update Mongo data
setup();
TestRunner updateRunner = init();
updateRunner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "team");
updateRunner.setProperty(PutMongoRecord.UPDATE_MODE, PutMongoRecord.UPDATE_FF_ATTRIBUTE.getValue());
recordReader.addSchemaField("team", RecordFieldType.STRING);
recordReader.addSchemaField("color", RecordFieldType.STRING);
List<List<Object[]>> inputs = Arrays.asList(
Arrays.asList(
new Object[]{"A", "yellow"},
new Object[]{"B", "red"}
)
);
Set<Map<String, Object>> expected = new HashSet<>(Arrays.asList(
new HashMap<String, Object>() {{
put("name", "Joe");
put("team", "A");
put("color", "yellow");
}},
new HashMap<String, Object>() {{
put("name", "Jane");
put("team", "A");
put("color", "yellow");
}},
new HashMap<String, Object>() {{
put("name", "Jeff");
put("team", "B");
put("color", "red");
}},
new HashMap<String, Object>() {{
put("name", "Janet");
put("team", "B");
put("color", "red");
}}
));
// WHEN
inputs.forEach(input -> {
input.forEach(recordReader::addRecord);
MockFlowFile flowFile = new MockFlowFile(1);
flowFile.putAttributes(new HashMap<String, String>(){{
put(PutMongoRecord.MONGODB_UPDATE_MODE, "many");
}});
updateRunner.enqueue(flowFile);
updateRunner.run();
});
// THEN
assertEquals(0, updateRunner.getQueueSize().getObjectCount());
updateRunner.assertAllFlowFilesTransferred(PutMongoRecord.REL_SUCCESS, inputs.size());
Set<Map<String, Object>> actual = new HashSet<>();
for (Document document : collection.find()) {
actual.add(document.entrySet().stream()
.filter(key__value -> !key__value.getKey().equals("_id"))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
}
assertEquals(expected, actual);
}
@Test
void testRouteToFailureWhenUpdateModeFFAttributeSetToInvalid() throws Exception {
TestRunner runner = init();
runner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "team");
runner.setProperty(PutMongoRecord.UPDATE_MODE, PutMongoRecord.UPDATE_FF_ATTRIBUTE.getValue());
recordReader.addSchemaField("team", RecordFieldType.STRING);
recordReader.addSchemaField("color", RecordFieldType.STRING);
List<List<Object[]>> inputs = Arrays.asList(
Arrays.asList(
new Object[]{"A", "yellow"},
new Object[]{"B", "red"}
)
);
// WHEN
// THEN
testUpsertFailure(runner, inputs);
}
@Test
void testRouteToFailureWhenKeyFieldReferencesNonEmbeddedDocument() throws Exception {
// GIVEN
TestRunner runner = init();
runner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "id,id.is_not_an_embedded_document");
recordReader.addSchemaField("id", RecordFieldType.INT);
recordReader.addSchemaField("person", RecordFieldType.RECORD);
final RecordSchema personSchema = new SimpleRecordSchema(Arrays.asList(
new RecordField("name", RecordFieldType.STRING.getDataType()),
new RecordField("age", RecordFieldType.INT.getDataType())
));
List<List<Object[]>> inputs = Arrays.asList(
Collections.singletonList(
new Object[]{1, new MapRecord(personSchema, new HashMap<String, Object>() {{
put("name", "unimportant");
put("age", "unimportant".length());
}})}
)
);
// WHEN
// THEN
testUpsertFailure(runner, inputs);
}
private void testUpsertSuccess(TestRunner runner, List<List<Object[]>> inputs, Set<Map<String, Object>> expected) {
// GIVEN
// WHEN
inputs.forEach(input -> {
input.forEach(recordReader::addRecord);
runner.enqueue("");
runner.run();
});
// THEN
assertEquals(0, runner.getQueueSize().getObjectCount());
runner.assertAllFlowFilesTransferred(PutMongoRecord.REL_SUCCESS, inputs.size());
Set<Map<String, Object>> actual = new HashSet<>();
for (Document document : collection.find()) {
actual.add(document.entrySet().stream()
.filter(key__value -> !key__value.getKey().equals("_id"))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
}
assertEquals(expected, actual);
}
private void testUpsertFailure(TestRunner runner, List<List<Object[]>> inputs) {
// GIVEN
Set<Object> expected = Collections.emptySet();
// WHEN
inputs.forEach(input -> {
input.forEach(recordReader::addRecord);
runner.enqueue("");
runner.run();
});
// THEN
assertEquals(0, runner.getQueueSize().getObjectCount());
runner.assertAllFlowFilesTransferred(PutMongoRecord.REL_FAILURE, inputs.size());
Set<Map<String, Object>> actual = new HashSet<>();
for (Document document : collection.find()) {
actual.add(document.entrySet().stream()
.filter(key__value -> !key__value.getKey().equals("_id"))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
}
assertEquals(expected, actual);
}
}