NIFI-11129 Added PutMongoBulkOperations Processor

This closes #6918

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Sebastian Rothbucher 2023-01-29 18:13:09 +01:00 committed by exceptionfactory
parent edca4cd347
commit 685700ad7c
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
6 changed files with 526 additions and 2 deletions

View File

@ -0,0 +1,264 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mongodb;
import com.mongodb.WriteConcern;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.Collation;
import com.mongodb.client.model.CollationAlternate;
import com.mongodb.client.model.CollationCaseFirst;
import com.mongodb.client.model.CollationMaxVariable;
import com.mongodb.client.model.CollationStrength;
import com.mongodb.client.model.DeleteManyModel;
import com.mongodb.client.model.DeleteOneModel;
import com.mongodb.client.model.DeleteOptions;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.ReplaceOneModel;
import com.mongodb.client.model.ReplaceOptions;
import com.mongodb.client.model.UpdateManyModel;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.bson.BsonArray;
import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.codecs.BsonArrayCodec;
import org.bson.codecs.DecoderContext;
import org.bson.conversions.Bson;
import org.bson.json.JsonReader;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Stream;
@Tags({ "mongodb", "insert", "update", "write", "put", "bulk" })
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Writes the contents of a FlowFile to MongoDB as bulk-update")
@SystemResourceConsideration(resource = SystemResource.MEMORY)
public class PutMongoBulkOperations extends AbstractMongoProcessor {
static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("All FlowFiles that are written to MongoDB are routed to this relationship").build();
static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("All FlowFiles that cannot be written to MongoDB are routed to this relationship").build();
static final PropertyDescriptor ORDERED = new PropertyDescriptor.Builder()
.name("Ordered")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.description("Ordered execution of bulk-writes and break on error - otherwise arbitrary order and continue on error")
.required(true)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues("true", "false")
.defaultValue("true")
.build();
static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
.name("Character Set")
.description("The Character Set in which the data is encoded")
.required(true)
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.defaultValue("UTF-8")
.build();
private final static Set<Relationship> relationships = Set.of(REL_SUCCESS, REL_FAILURE);
private final static List<PropertyDescriptor> propertyDescriptors = Stream.concat(descriptors.stream(), Stream.of(ORDERED, CHARACTER_SET)).toList();
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final FlowFile flowFile = session.get();
if (null == flowFile) {
return;
}
final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
final WriteConcern writeConcern = clientService.getWriteConcern();
try {
final MongoCollection<Document> collection = getCollection(context, flowFile).withWriteConcern(writeConcern);
final BsonArrayCodec arrayCodec = new BsonArrayCodec();
final DecoderContext decoderContext = DecoderContext.builder().build();
final BsonArray updateItems;
try (final Reader reader = new InputStreamReader(session.read(flowFile), charset)) {
updateItems = arrayCodec.decode(new JsonReader(reader), decoderContext);
}
List<WriteModel<Document>> updateModels = new ArrayList<>();
for (Object item : updateItems) {
final BsonDocument updateItem = (BsonDocument) item;
if (updateItem.keySet().size() != 1) {
getLogger().error("Invalid bulk-update in {}: more than one type given {}", flowFile, String.join(", ", updateItem.keySet()));
session.transfer(flowFile, REL_FAILURE);
context.yield();
return;
}
final WriteModel<Document> writeModel = getWriteModel(updateItem);
if (null == writeModel) {
getLogger().error("Invalid bulk-update in {}: invalid update type {}", flowFile, getUpdateType(updateItem));
session.transfer(flowFile, REL_FAILURE);
context.yield();
return;
}
updateModels.add(writeModel);
}
collection.bulkWrite(updateModels, (new BulkWriteOptions().ordered(context.getProperty(ORDERED).asBoolean())));
getLogger().info("bulk-updated {} into MongoDB", flowFile);
session.getProvenanceReporter().send(flowFile, getURI(context));
session.transfer(flowFile, REL_SUCCESS);
} catch (Exception e) {
getLogger().error("Failed to bulk-update {} into MongoDB", flowFile, e);
session.transfer(flowFile, REL_FAILURE);
context.yield();
}
}
private WriteModel<Document> getWriteModel(final BsonDocument updateItem) {
final String updateType = getUpdateType(updateItem);
final BsonDocument updateSpec = (BsonDocument) updateItem.get(updateType);
final WriteModel<Document> writeModel;
if ("insertOne".equals(updateType)) {
writeModel = new InsertOneModel<>(toBsonDocument((BsonDocument) updateSpec.get("document")));
} else if ("updateOne".equals(updateType)) {
final UpdateOptions options = parseUpdateOptions(updateSpec);
writeModel = new UpdateOneModel<>((BsonDocument) updateSpec.get("filter"), (BsonDocument) updateSpec.get("update"), options);
} else if ("updateMany".equals(updateType)) {
final UpdateOptions options = parseUpdateOptions(updateSpec);
writeModel = new UpdateManyModel<>((BsonDocument) updateSpec.get("filter"), (BsonDocument) updateSpec.get("update"), options);
} else if ("replaceOne".equals(updateType)) {
final ReplaceOptions options = parseReplaceOptions(updateSpec);
writeModel = new ReplaceOneModel<>((BsonDocument) updateSpec.get("filter"),
toBsonDocument((BsonDocument) updateSpec.get("replacement")), options);
} else if ("deleteOne".equals(updateType)) {
final DeleteOptions options = parseDeleteOptions(updateSpec);
writeModel = new DeleteOneModel<>((BsonDocument) updateSpec.get("filter"), options);
} else if ("deleteMany".equals(updateType)) {
final DeleteOptions options = parseDeleteOptions(updateSpec);
writeModel = new DeleteManyModel<>((BsonDocument) updateSpec.get("filter"), options);
} else {
return null;
}
return writeModel;
}
private static String getUpdateType(BsonDocument updateItem) {
return updateItem.keySet().iterator().next();
}
private static Document toBsonDocument(BsonDocument doc) {
if (null == doc) {
return null;
}
return new Document(doc);
}
protected UpdateOptions parseUpdateOptions(BsonDocument updateSpec) {
final UpdateOptions options = new UpdateOptions();
if (updateSpec.containsKey("upsert")) {
options.upsert(updateSpec.getBoolean("upsert").getValue());
}
if (updateSpec.containsKey("arrayFilters")) {
options.arrayFilters((List<? extends Bson>) updateSpec.get("arrayFilters"));
}
if (updateSpec.containsKey("collation")) {
options.collation(parseCollation((BsonDocument) updateSpec.get("collation")));
}
return options;
}
protected ReplaceOptions parseReplaceOptions(BsonDocument updateSpec) {
final ReplaceOptions options = new ReplaceOptions();
if (updateSpec.containsKey("upsert")) {
options.upsert(updateSpec.getBoolean("upsert").getValue());
}
if (updateSpec.containsKey("collation")) {
options.collation(parseCollation((BsonDocument) updateSpec.get("collation")));
}
return options;
}
protected DeleteOptions parseDeleteOptions(BsonDocument updateSpec) {
final DeleteOptions options = new DeleteOptions();
if (updateSpec.containsKey("collation")) {
options.collation(parseCollation((BsonDocument) updateSpec.get("collation")));
}
return options;
}
protected Collation parseCollation(BsonDocument collationSpec) {
final Collation.Builder builder = Collation.builder();
if (collationSpec.containsKey("locale")) {
builder.locale(collationSpec.getString("locale").getValue());
}
if (collationSpec.containsKey("caseLevel")) {
builder.caseLevel(collationSpec.getBoolean("caseLevel").getValue());
}
if (collationSpec.containsKey("caseFirst")) {
builder.collationCaseFirst(CollationCaseFirst.fromString(collationSpec.getString("caseFirst").getValue()));
}
if (collationSpec.containsKey("strength")) {
builder.collationStrength(CollationStrength.fromInt(collationSpec.getInt32("strength").getValue()));
}
if (collationSpec.containsKey("numericOrdering")) {
builder.numericOrdering(collationSpec.getBoolean("numericOrdering").getValue());
}
if (collationSpec.containsKey("alternate")) {
builder.collationAlternate(CollationAlternate.fromString(collationSpec.getString("alternate").getValue()));
}
if (collationSpec.containsKey("maxVariable")) {
builder.collationMaxVariable(CollationMaxVariable.fromString(collationSpec.getString("maxVariable").getValue()));
}
if (collationSpec.containsKey("normalization")) {
builder.normalization(collationSpec.getBoolean("normalization").getValue());
}
if (collationSpec.containsKey("backwards")) {
builder.backwards(collationSpec.getBoolean("backwards").getValue());
}
return builder.build();
}
}

View File

@ -22,3 +22,4 @@ org.apache.nifi.processors.mongodb.PutMongoRecord
org.apache.nifi.processors.mongodb.gridfs.DeleteGridFS org.apache.nifi.processors.mongodb.gridfs.DeleteGridFS
org.apache.nifi.processors.mongodb.gridfs.FetchGridFS org.apache.nifi.processors.mongodb.gridfs.FetchGridFS
org.apache.nifi.processors.mongodb.gridfs.PutGridFS org.apache.nifi.processors.mongodb.gridfs.PutGridFS
org.apache.nifi.processors.mongodb.PutMongoBulkOperations

View File

@ -0,0 +1,45 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>PutMongoBulkOperation</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<!-- Processor Documentation ================================================== -->
<h2>Description:</h2>
<p>
This processor runs bulk updates against MongoDB collections. The flowfile content is expected to be a JSON array with bulk write operations as described in the <a href="https://www.mongodb.com/docs/manual/reference/method/db.collection.bulkWrite/" target="_blank">manual for db.collection.bulkWrite</a>.
</p>
<p>
You can use all (currently 6) operators described there. The flowfile content is returned as-is. You can merge many operations into one - and get massive performance improvements.
</p>
<h2>Example:</h2>
<p>
The following is an example flowfile content that does two things: insert a new document, and update all documents where value of <em>hey</em> is greater than zero.
</p>
<code>
<pre>
[
{"insertOne": {"document": {"ho": 42}}},
{"updateMany": {"filter": {"hey": {"$gt": 0}}, "update": {"$inc": {"hey": 2}}}}
]
</pre>
</code>
</body>
</html>

View File

@ -26,7 +26,7 @@ import org.testcontainers.utility.DockerImageName;
@Testcontainers @Testcontainers
public class AbstractMongoIT { public class AbstractMongoIT {
private static final String DOCKER_IMAGE = System.getProperty("mongo.docker.image"); private static final String DOCKER_IMAGE = System.getProperty("mongo.docker.image", "mongo:5");
@Container @Container
protected static final MongoDBContainer MONGO_CONTAINER = new MongoDBContainer(DockerImageName.parse(DOCKER_IMAGE)); protected static final MongoDBContainer MONGO_CONTAINER = new MongoDBContainer(DockerImageName.parse(DOCKER_IMAGE));
} }

View File

@ -24,6 +24,7 @@ import org.apache.nifi.mongodb.MongoDBClientService;
import org.apache.nifi.mongodb.MongoDBControllerService; import org.apache.nifi.mongodb.MongoDBControllerService;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.bson.BsonDocument;
import org.bson.Document; import org.bson.Document;
import java.util.Arrays; import java.util.Arrays;
@ -68,6 +69,8 @@ public class MongoWriteTestBase extends AbstractMongoIT {
} }
public void teardown() { public void teardown() {
mongoClient.getDatabase(DATABASE_NAME).getCollection(COLLECTION_NAME).deleteMany(BsonDocument.parse("{}"));
mongoClient.getDatabase(DATABASE_NAME).drop(); mongoClient.getDatabase(DATABASE_NAME).drop();
mongoClient.close();
} }
} }

View File

@ -0,0 +1,211 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mongodb;
import org.apache.nifi.util.TestRunner;
import org.bson.Document;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
public class PutMongoBulkOperationsIT extends MongoWriteTestBase {
@BeforeEach
public void setup() {
super.setup(PutMongoBulkOperations.class);
}
@Override
@AfterEach
public void teardown() {
super.teardown();
}
@Test
public void testBulkWriteInsert() throws Exception {
final TestRunner runner = init(PutMongoBulkOperations.class);
StringBuffer doc = new StringBuffer();
doc.append("[");
for (int i = 0; i < DOCUMENTS.size(); i++) {
if (i > 0) {
doc.append(", ");
}
doc.append("{\"insertOne\": {\"document\": ");
doc.append(DOCUMENTS.get(i).toJson());
doc.append("}}");
}
doc.append("]");
runner.enqueue(doc.toString());
runner.run();
runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
runner.assertTransferCount(PutMongo.REL_SUCCESS, 1);
assertEquals(3, collection.countDocuments());
Document doc1 = collection.find(new Document().append("_id", "doc_2")).first();
assertNotNull(doc1);
assertEquals(4, doc1.getInteger("c", 0));
}
@Test
public void testBulkWriteUpdateOne() throws Exception {
final TestRunner runner = init(PutMongoBulkOperations.class);
collection.insertMany(DOCUMENTS);
runner.enqueue("[{\"updateOne\": {\"filter\": {\"_id\": {\"$in\": [\"doc_1\", \"doc_2\"]}}, \"update\": {\"$set\": {\"z\": 42}}}}]");
runner.run();
runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
runner.assertTransferCount(PutMongo.REL_SUCCESS, 1);
assertEquals(1, collection.countDocuments(new Document().append("z", 42)));
}
@Test
public void testBulkWriteUpdateMany() throws Exception {
final TestRunner runner = init(PutMongoBulkOperations.class);
collection.insertMany(DOCUMENTS);
runner.enqueue("[{\"updateMany\": {\"filter\": {\"_id\": {\"$in\": [\"doc_1\", \"doc_2\"]}}, \"update\": {\"$set\": {\"z\": 42}}}}]");
runner.run();
runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
runner.assertTransferCount(PutMongo.REL_SUCCESS, 1);
assertEquals(2, collection.countDocuments(new Document().append("z", 42)));
}
@Test
public void testBulkWriteReplaceOne() throws Exception {
final TestRunner runner = init(PutMongoBulkOperations.class);
collection.insertMany(DOCUMENTS);
runner.enqueue("[{\"replaceOne\": {\"filter\": {\"_id\": \"doc_1\"}, \"replacement\": {\"_id\": \"doc_1\", \"z\": 42}}}]");
runner.run();
runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
runner.assertTransferCount(PutMongo.REL_SUCCESS, 1);
assertEquals(1, collection.countDocuments(new Document().append("z", 42)));
Document doc1 = collection.find(new Document().append("_id", "doc_1")).first();
assertNotNull(doc1);
assertEquals(42, doc1.getInteger("z", 0));
assertNull(doc1.get("a"));
}
@Test
public void testBulkWriteDeleteOne() throws Exception {
final TestRunner runner = init(PutMongoBulkOperations.class);
collection.insertMany(DOCUMENTS);
runner.enqueue("[{\"deleteOne\": {\"filter\": {\"_id\": {\"$in\": [\"doc_1\", \"doc_2\"]}}}}]");
runner.run();
runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
runner.assertTransferCount(PutMongo.REL_SUCCESS, 1);
assertEquals(2, collection.countDocuments());
assertEquals(0, collection.countDocuments(new Document().append("z", 42)));
}
@Test
public void testBulkWriteDeleteMany() throws Exception {
final TestRunner runner = init(PutMongoBulkOperations.class);
collection.insertMany(DOCUMENTS);
runner.enqueue("[{\"deleteMany\": {\"filter\": {\"_id\": {\"$in\": [\"doc_1\", \"doc_2\"]}}}}]");
runner.run();
runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
runner.assertTransferCount(PutMongo.REL_SUCCESS, 1);
assertEquals(1, collection.countDocuments());
assertEquals(0, collection.countDocuments(new Document().append("z", 42)));
}
@Test
public void testInvalid() throws Exception {
final TestRunner runner = init(PutMongoBulkOperations.class);
runner.enqueue("[{\"whatever\": {\"filter\": {\"_id\": {\"$in\": [\"doc_1\", \"doc_2\"]}}}}]");
runner.run();
runner.assertTransferCount(PutMongo.REL_FAILURE, 1);
runner.assertTransferCount(PutMongo.REL_SUCCESS, 0);
}
@Test
public void testBulkWriteOrderedAsIs() throws Exception {
final TestRunner runner = init(PutMongoBulkOperations.class);
runner.setProperty(PutMongoBulkOperations.ORDERED, "true"); // default, still
StringBuffer doc = new StringBuffer();
doc.append("[");
// inserting same ID twice fails w/in mongo, not before, so we can really test transactions and ordering
doc.append("{\"insertOne\": {\"document\": {\"_id\": \"doc_1\"}}},{\"insertOne\": {\"document\": ");
doc.append(DOCUMENTS.get(0).toJson());
doc.append("}}]");
runner.enqueue(doc.toString());
runner.run();
runner.assertTransferCount(PutMongo.REL_FAILURE, 1);
runner.assertTransferCount(PutMongo.REL_SUCCESS, 0);
assertEquals(1, collection.countDocuments());
}
@Test
public void testBulkWriteOrderedNoTransaction() throws Exception {
final TestRunner runner = init(PutMongoBulkOperations.class);
runner.setProperty(PutMongoBulkOperations.ORDERED, "true"); // default, still
StringBuffer doc = new StringBuffer();
doc.append("[");
doc.append("{\"insertOne\": {\"document\": ");
doc.append(DOCUMENTS.get(0).toJson());
// inserting same ID twice fails w/in mongo, not before, so we can really test transactions and ordering
doc.append("}}, {\"insertOne\": {\"document\": {\"_id\": \"doc_1\"}}}]");
runner.enqueue(doc.toString());
runner.run();
runner.assertTransferCount(PutMongo.REL_FAILURE, 1);
runner.assertTransferCount(PutMongo.REL_SUCCESS, 0);
assertEquals(1, collection.countDocuments());
}
@Test
public void testBulkWriteUnordered() throws Exception {
final TestRunner runner = init(PutMongoBulkOperations.class);
runner.setProperty(PutMongoBulkOperations.ORDERED, "false");
StringBuffer doc = new StringBuffer();
doc.append("[");
// inserting same ID twice fails w/in mongo, not before, so we can really test transactions and ordering
doc.append("{\"insertOne\": {\"document\": {\"_id\": \"doc_1\"}}},{\"insertOne\": {\"document\": ");
doc.append(DOCUMENTS.get(0).toJson());
doc.append("}}]");
runner.enqueue(doc.toString());
runner.run();
runner.assertTransferCount(PutMongo.REL_FAILURE, 1);
runner.assertTransferCount(PutMongo.REL_SUCCESS, 0);
assertEquals(1, collection.countDocuments());
}
}