diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/DeleteMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/DeleteMongo.java new file mode 100644 index 0000000000..d8179f9d0e --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/DeleteMongo.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.mongodb; + +import com.mongodb.WriteConcern; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.result.DeleteResult; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.StringUtils; +import org.bson.Document; + +import java.io.ByteArrayOutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@EventDriven +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({ "delete", "mongo", "mongodb" }) +@CapabilityDescription( + "Executes a delete query against a MongoDB collection. The query is provided in the body of the flowfile " + + "and the user can select whether it will delete one or many documents that match it." +) +@ReadsAttribute( + attribute = "mongodb.delete.mode", + description = "Configurable parameter for controlling delete mode on a per-flowfile basis. The process must be " + + "configured to use this option. Acceptable values are 'one' and 'many.'" +) +public class DeleteMongo extends AbstractMongoProcessor { + + private final static Set relationships; + private final static List propertyDescriptors; + + static final AllowableValue DELETE_ONE = new AllowableValue("one", "Delete One", "Delete only the first document that matches the query."); + static final AllowableValue DELETE_MANY = new AllowableValue("many", "Delete Many", "Delete every document that matches the query."); + static final AllowableValue DELETE_ATTR = new AllowableValue("attr", "Use 'mongodb.delete.mode' attribute", + "Read the 'mongodb.delete.mode attribute and use that mode. Acceptable values are 'many' and 'one.'"); + + static final AllowableValue YES_FAIL = new AllowableValue("true", "True", "Fail when no documents are deleted."); + static final AllowableValue NO_FAIL = new AllowableValue("false", "False", "Do not fail when nothing is deleted."); + + static final PropertyDescriptor DELETE_MODE = new PropertyDescriptor.Builder() + .name("delete-mongo-delete-mode") + .displayName("Delete Mode") + .description("Choose between deleting one document by query or many documents by query.") + .allowableValues(DELETE_ONE, DELETE_MANY, DELETE_ATTR) + .defaultValue("one") + .addValidator(Validator.VALID) + .build(); + + static final PropertyDescriptor FAIL_ON_NO_DELETE = new PropertyDescriptor.Builder() + .name("delete-mongo-fail-on-no-delete") + .displayName("Fail When Nothing Is Deleted") + .description("Determines whether to send the flowfile to the success or failure relationship if nothing is successfully deleted.") + .allowableValues(YES_FAIL, NO_FAIL) + .defaultValue("true") + .addValidator(Validator.VALID) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("All FlowFiles that are written to MongoDB are routed to this relationship").build(); + static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("All FlowFiles that cannot be written to MongoDB are routed to this relationship").build(); + + static { + List _propertyDescriptors = new ArrayList<>(); + _propertyDescriptors.addAll(descriptors); + _propertyDescriptors.add(DELETE_MODE); + _propertyDescriptors.add(FAIL_ON_NO_DELETE); + _propertyDescriptors.add(WRITE_CONCERN); + propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); + + final Set _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + _relationships.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(_relationships); + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public List getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + private static final List ALLOWED_DELETE_VALUES; + static { + ALLOWED_DELETE_VALUES = new ArrayList<>(); + ALLOWED_DELETE_VALUES.add("one"); + ALLOWED_DELETE_VALUES.add("many"); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + final WriteConcern writeConcern = getWriteConcern(context); + final MongoCollection collection = getCollection(context).withWriteConcern(writeConcern); + final String deleteMode = context.getProperty(DELETE_MODE).getValue(); + final String deleteAttr = flowFile.getAttribute("mongodb.delete.mode"); + final Boolean failMode = context.getProperty(FAIL_ON_NO_DELETE).asBoolean(); + + if (deleteMode.equals(DELETE_ATTR.getValue()) + && (StringUtils.isEmpty(deleteAttr) || !ALLOWED_DELETE_VALUES.contains(deleteAttr.toLowerCase()) )) { + getLogger().error(String.format("%s is not an allowed value for mongodb.delete.mode", deleteAttr)); + session.transfer(flowFile, REL_FAILURE); + return; + } + + try { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + session.exportTo(flowFile, bos); + bos.close(); + + String json = new String(bos.toByteArray()); + Document query = Document.parse(json); + DeleteResult result; + + if (deleteMode.equals(DELETE_ONE.getValue()) + || (deleteMode.equals(DELETE_ATTR.getValue()) && deleteAttr.toLowerCase().equals("one") )) { + result = collection.deleteOne(query); + } else { + result = collection.deleteMany(query); + } + + if (failMode && result.getDeletedCount() == 0) { + session.transfer(flowFile, REL_FAILURE); + } else { + session.transfer(flowFile, REL_SUCCESS); + } + + } catch (Exception ex) { + getLogger().error("Could not send a delete to MongoDB, failing...", ex); + session.transfer(flowFile, REL_FAILURE); + } + } +} diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 8a17ad8e99..4129b05b1e 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -12,6 +12,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +org.apache.nifi.processors.mongodb.DeleteMongo org.apache.nifi.processors.mongodb.GetMongo org.apache.nifi.processors.mongodb.PutMongo org.apache.nifi.processors.mongodb.PutMongoRecord \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/DeleteMongoTest.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/DeleteMongoTest.java new file mode 100644 index 0000000000..00cd55e06a --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/DeleteMongoTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.mongodb; + +import org.bson.Document; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +@Ignore("This is an integration test and should be marked Ignore until someone needs to run it.") +public class DeleteMongoTest extends MongoWriteTestBase { + @Before + public void setup() { + super.setup(DeleteMongo.class); + collection.insertMany(DOCUMENTS); + } + + @After + public void teardown() { + super.teardown(); + } + + private void testOne(String query, Map attrs) { + runner.enqueue(query, attrs); + runner.run(1, true); + runner.assertTransferCount(DeleteMongo.REL_FAILURE, 0); + runner.assertTransferCount(DeleteMongo.REL_SUCCESS, 1); + + Assert.assertEquals("Found a document that should have been deleted.", + 0, collection.count(Document.parse(query))); + } + + @Test + public void testDeleteOne() { + String query = "{ \"_id\": \"doc_1\" }"; + runner.setProperty(DeleteMongo.DELETE_MODE, DeleteMongo.DELETE_ONE); + testOne(query, new HashMap<>()); + Map attrs = new HashMap<>(); + attrs.put("mongodb.delete.mode", "one"); + runner.setProperty(DeleteMongo.DELETE_MODE, DeleteMongo.DELETE_ATTR); + query = "{ \"_id\": \"doc_2\" }"; + runner.clearTransferState(); + testOne(query, attrs); + } + + private void manyTest(String query, Map attrs) { + runner.enqueue(query, attrs); + runner.run(1, true); + runner.assertTransferCount(DeleteMongo.REL_FAILURE, 0); + runner.assertTransferCount(DeleteMongo.REL_SUCCESS, 1); + + Assert.assertEquals("Found a document that should have been deleted.", + 0, collection.count(Document.parse(query))); + Assert.assertEquals("One document should have been left.", + 1, collection.count(Document.parse("{}"))); + } + + @Test + public void testDeleteMany() { + String query = "{\n" + + "\t\"_id\": {\n" + + "\t\t\"$in\": [\"doc_1\", \"doc_2\"]\n" + + "\t}\n" + + "}"; + runner.setProperty(DeleteMongo.DELETE_MODE, DeleteMongo.DELETE_MANY); + manyTest(query, new HashMap<>()); + + runner.setProperty(DeleteMongo.DELETE_MODE, DeleteMongo.DELETE_ATTR); + Map attrs = new HashMap<>(); + attrs.put("mongodb.delete.mode", "many"); + collection.drop(); + collection.insertMany(DOCUMENTS); + runner.clearTransferState(); + manyTest(query, attrs); + } + + @Test + public void testFailOnNoDeleteOptions() { + String query = "{ \"_id\": \"doc_4\"} "; + runner.enqueue(query); + runner.run(1, true); + runner.assertTransferCount(DeleteMongo.REL_FAILURE, 1); + runner.assertTransferCount(DeleteMongo.REL_SUCCESS, 0); + + Assert.assertEquals("A document was deleted", 3, collection.count(Document.parse("{}"))); + + runner.setProperty(DeleteMongo.FAIL_ON_NO_DELETE, DeleteMongo.NO_FAIL); + runner.clearTransferState(); + runner.enqueue(query); + runner.run(1, true, true); + + + runner.assertTransferCount(DeleteMongo.REL_FAILURE, 0); + runner.assertTransferCount(DeleteMongo.REL_SUCCESS, 1); + + Assert.assertEquals("A document was deleted", 3, collection.count(Document.parse("{}"))); + } +} diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/MongoWriteTestBase.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/MongoWriteTestBase.java new file mode 100644 index 0000000000..76da51eb80 --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/MongoWriteTestBase.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mongodb; + +import com.mongodb.MongoClient; +import com.mongodb.MongoClientURI; +import com.mongodb.client.MongoCollection; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.bson.Document; + +import java.util.Arrays; +import java.util.List; + +public class MongoWriteTestBase { + protected static final String MONGO_URI = "mongodb://localhost"; + protected static final String COLLECTION_NAME = "test"; + protected String DATABASE_NAME; + + protected static final List DOCUMENTS = Arrays.asList( + new Document("_id", "doc_1").append("a", 1).append("b", 2).append("c", 3), + new Document("_id", "doc_2").append("a", 1).append("b", 2).append("c", 4), + new Document("_id", "doc_3").append("a", 1).append("b", 3) + ); + + protected TestRunner runner; + protected MongoClient mongoClient; + protected MongoCollection collection; + + public void setup(Class processor) { + DATABASE_NAME = processor.getSimpleName().toLowerCase(); + runner = TestRunners.newTestRunner(processor); + runner.setVariable("uri", MONGO_URI); + runner.setVariable("db", DATABASE_NAME); + runner.setVariable("collection", COLLECTION_NAME); + runner.setProperty(AbstractMongoProcessor.URI, "${uri}"); + runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}"); + runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}"); + + mongoClient = new MongoClient(new MongoClientURI(MONGO_URI)); + + collection = mongoClient.getDatabase(DATABASE_NAME).getCollection(COLLECTION_NAME); + } + + public void teardown() { + runner = null; + + mongoClient.getDatabase(DATABASE_NAME).drop(); + } +} diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java index 8828333a86..f019704069 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java @@ -16,14 +16,6 @@ */ package org.apache.nifi.processors.mongodb; -import static com.google.common.base.Charsets.UTF_8; -import static org.junit.Assert.assertEquals; - -import java.util.Collection; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; - import org.apache.nifi.components.ValidationResult; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.util.MockFlowFile; @@ -37,51 +29,28 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; -import com.google.common.collect.Lists; -import com.mongodb.MongoClient; -import com.mongodb.MongoClientURI; -import com.mongodb.client.MongoCollection; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; + +import static org.junit.Assert.assertEquals; @Ignore("Integration tests that cause failures in some environments. Require that they be run from Maven to run the embedded mongo maven plugin. Maven Plugin also fails in my CentOS 7 environment.") -public class PutMongoTest { - private static final String MONGO_URI = "mongodb://localhost"; - private static final String DATABASE_NAME = PutMongoTest.class.getSimpleName().toLowerCase(); - private static final String COLLECTION_NAME = "test"; - - private static final List DOCUMENTS = Lists.newArrayList( - new Document("_id", "doc_1").append("a", 1).append("b", 2).append("c", 3), - new Document("_id", "doc_2").append("a", 1).append("b", 2).append("c", 4), - new Document("_id", "doc_3").append("a", 1).append("b", 3) - ); - - private TestRunner runner; - private MongoClient mongoClient; - private MongoCollection collection; - +public class PutMongoTest extends MongoWriteTestBase { @Before public void setup() { - runner = TestRunners.newTestRunner(PutMongo.class); - runner.setVariable("uri", MONGO_URI); - runner.setVariable("db", DATABASE_NAME); - runner.setVariable("collection", COLLECTION_NAME); - runner.setProperty(AbstractMongoProcessor.URI, "${uri}"); - runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}"); - runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}"); - - mongoClient = new MongoClient(new MongoClientURI(MONGO_URI)); - - collection = mongoClient.getDatabase(DATABASE_NAME).getCollection(COLLECTION_NAME); + super.setup(PutMongo.class); } @After public void teardown() { - runner = null; - - mongoClient.getDatabase(DATABASE_NAME).drop(); + super.teardown(); } private byte[] documentToByteArray(Document doc) { - return doc.toJson().getBytes(UTF_8); + return doc.toJson().getBytes(StandardCharsets.UTF_8); } @Test