NIFI-4479 Added DeleteMongo processor.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2295
This commit is contained in:
Mike Thomsen 2017-11-25 08:51:17 -05:00 committed by Matthew Burgess
parent 5b2e2afc17
commit 15eeb22116
5 changed files with 368 additions and 42 deletions

View File

@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.mongodb;
import com.mongodb.WriteConcern;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.result.DeleteResult;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.StringUtils;
import org.bson.Document;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@EventDriven
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({ "delete", "mongo", "mongodb" })
@CapabilityDescription(
"Executes a delete query against a MongoDB collection. The query is provided in the body of the flowfile " +
"and the user can select whether it will delete one or many documents that match it."
)
@ReadsAttribute(
attribute = "mongodb.delete.mode",
description = "Configurable parameter for controlling delete mode on a per-flowfile basis. The process must be " +
"configured to use this option. Acceptable values are 'one' and 'many.'"
)
public class DeleteMongo extends AbstractMongoProcessor {
private final static Set<Relationship> relationships;
private final static List<PropertyDescriptor> propertyDescriptors;
static final AllowableValue DELETE_ONE = new AllowableValue("one", "Delete One", "Delete only the first document that matches the query.");
static final AllowableValue DELETE_MANY = new AllowableValue("many", "Delete Many", "Delete every document that matches the query.");
static final AllowableValue DELETE_ATTR = new AllowableValue("attr", "Use 'mongodb.delete.mode' attribute",
"Read the 'mongodb.delete.mode attribute and use that mode. Acceptable values are 'many' and 'one.'");
static final AllowableValue YES_FAIL = new AllowableValue("true", "True", "Fail when no documents are deleted.");
static final AllowableValue NO_FAIL = new AllowableValue("false", "False", "Do not fail when nothing is deleted.");
static final PropertyDescriptor DELETE_MODE = new PropertyDescriptor.Builder()
.name("delete-mongo-delete-mode")
.displayName("Delete Mode")
.description("Choose between deleting one document by query or many documents by query.")
.allowableValues(DELETE_ONE, DELETE_MANY, DELETE_ATTR)
.defaultValue("one")
.addValidator(Validator.VALID)
.build();
static final PropertyDescriptor FAIL_ON_NO_DELETE = new PropertyDescriptor.Builder()
.name("delete-mongo-fail-on-no-delete")
.displayName("Fail When Nothing Is Deleted")
.description("Determines whether to send the flowfile to the success or failure relationship if nothing is successfully deleted.")
.allowableValues(YES_FAIL, NO_FAIL)
.defaultValue("true")
.addValidator(Validator.VALID)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("All FlowFiles that are written to MongoDB are routed to this relationship").build();
static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("All FlowFiles that cannot be written to MongoDB are routed to this relationship").build();
static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.addAll(descriptors);
_propertyDescriptors.add(DELETE_MODE);
_propertyDescriptors.add(FAIL_ON_NO_DELETE);
_propertyDescriptors.add(WRITE_CONCERN);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
final Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_SUCCESS);
_relationships.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(_relationships);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
private static final List<String> ALLOWED_DELETE_VALUES;
static {
ALLOWED_DELETE_VALUES = new ArrayList<>();
ALLOWED_DELETE_VALUES.add("one");
ALLOWED_DELETE_VALUES.add("many");
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
final WriteConcern writeConcern = getWriteConcern(context);
final MongoCollection<Document> collection = getCollection(context).withWriteConcern(writeConcern);
final String deleteMode = context.getProperty(DELETE_MODE).getValue();
final String deleteAttr = flowFile.getAttribute("mongodb.delete.mode");
final Boolean failMode = context.getProperty(FAIL_ON_NO_DELETE).asBoolean();
if (deleteMode.equals(DELETE_ATTR.getValue())
&& (StringUtils.isEmpty(deleteAttr) || !ALLOWED_DELETE_VALUES.contains(deleteAttr.toLowerCase()) )) {
getLogger().error(String.format("%s is not an allowed value for mongodb.delete.mode", deleteAttr));
session.transfer(flowFile, REL_FAILURE);
return;
}
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
session.exportTo(flowFile, bos);
bos.close();
String json = new String(bos.toByteArray());
Document query = Document.parse(json);
DeleteResult result;
if (deleteMode.equals(DELETE_ONE.getValue())
|| (deleteMode.equals(DELETE_ATTR.getValue()) && deleteAttr.toLowerCase().equals("one") )) {
result = collection.deleteOne(query);
} else {
result = collection.deleteMany(query);
}
if (failMode && result.getDeletedCount() == 0) {
session.transfer(flowFile, REL_FAILURE);
} else {
session.transfer(flowFile, REL_SUCCESS);
}
} catch (Exception ex) {
getLogger().error("Could not send a delete to MongoDB, failing...", ex);
session.transfer(flowFile, REL_FAILURE);
}
}
}

View File

@ -12,6 +12,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
org.apache.nifi.processors.mongodb.DeleteMongo
org.apache.nifi.processors.mongodb.GetMongo org.apache.nifi.processors.mongodb.GetMongo
org.apache.nifi.processors.mongodb.PutMongo org.apache.nifi.processors.mongodb.PutMongo
org.apache.nifi.processors.mongodb.PutMongoRecord org.apache.nifi.processors.mongodb.PutMongoRecord

View File

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.mongodb;
import org.bson.Document;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
@Ignore("This is an integration test and should be marked Ignore until someone needs to run it.")
public class DeleteMongoTest extends MongoWriteTestBase {
@Before
public void setup() {
super.setup(DeleteMongo.class);
collection.insertMany(DOCUMENTS);
}
@After
public void teardown() {
super.teardown();
}
private void testOne(String query, Map<String, String> attrs) {
runner.enqueue(query, attrs);
runner.run(1, true);
runner.assertTransferCount(DeleteMongo.REL_FAILURE, 0);
runner.assertTransferCount(DeleteMongo.REL_SUCCESS, 1);
Assert.assertEquals("Found a document that should have been deleted.",
0, collection.count(Document.parse(query)));
}
@Test
public void testDeleteOne() {
String query = "{ \"_id\": \"doc_1\" }";
runner.setProperty(DeleteMongo.DELETE_MODE, DeleteMongo.DELETE_ONE);
testOne(query, new HashMap<>());
Map<String, String> attrs = new HashMap<>();
attrs.put("mongodb.delete.mode", "one");
runner.setProperty(DeleteMongo.DELETE_MODE, DeleteMongo.DELETE_ATTR);
query = "{ \"_id\": \"doc_2\" }";
runner.clearTransferState();
testOne(query, attrs);
}
private void manyTest(String query, Map<String, String> attrs) {
runner.enqueue(query, attrs);
runner.run(1, true);
runner.assertTransferCount(DeleteMongo.REL_FAILURE, 0);
runner.assertTransferCount(DeleteMongo.REL_SUCCESS, 1);
Assert.assertEquals("Found a document that should have been deleted.",
0, collection.count(Document.parse(query)));
Assert.assertEquals("One document should have been left.",
1, collection.count(Document.parse("{}")));
}
@Test
public void testDeleteMany() {
String query = "{\n" +
"\t\"_id\": {\n" +
"\t\t\"$in\": [\"doc_1\", \"doc_2\"]\n" +
"\t}\n" +
"}";
runner.setProperty(DeleteMongo.DELETE_MODE, DeleteMongo.DELETE_MANY);
manyTest(query, new HashMap<>());
runner.setProperty(DeleteMongo.DELETE_MODE, DeleteMongo.DELETE_ATTR);
Map<String, String> attrs = new HashMap<>();
attrs.put("mongodb.delete.mode", "many");
collection.drop();
collection.insertMany(DOCUMENTS);
runner.clearTransferState();
manyTest(query, attrs);
}
@Test
public void testFailOnNoDeleteOptions() {
String query = "{ \"_id\": \"doc_4\"} ";
runner.enqueue(query);
runner.run(1, true);
runner.assertTransferCount(DeleteMongo.REL_FAILURE, 1);
runner.assertTransferCount(DeleteMongo.REL_SUCCESS, 0);
Assert.assertEquals("A document was deleted", 3, collection.count(Document.parse("{}")));
runner.setProperty(DeleteMongo.FAIL_ON_NO_DELETE, DeleteMongo.NO_FAIL);
runner.clearTransferState();
runner.enqueue(query);
runner.run(1, true, true);
runner.assertTransferCount(DeleteMongo.REL_FAILURE, 0);
runner.assertTransferCount(DeleteMongo.REL_SUCCESS, 1);
Assert.assertEquals("A document was deleted", 3, collection.count(Document.parse("{}")));
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mongodb;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.bson.Document;
import java.util.Arrays;
import java.util.List;
public class MongoWriteTestBase {
protected static final String MONGO_URI = "mongodb://localhost";
protected static final String COLLECTION_NAME = "test";
protected String DATABASE_NAME;
protected static final List<Document> DOCUMENTS = Arrays.asList(
new Document("_id", "doc_1").append("a", 1).append("b", 2).append("c", 3),
new Document("_id", "doc_2").append("a", 1).append("b", 2).append("c", 4),
new Document("_id", "doc_3").append("a", 1).append("b", 3)
);
protected TestRunner runner;
protected MongoClient mongoClient;
protected MongoCollection<Document> collection;
public void setup(Class processor) {
DATABASE_NAME = processor.getSimpleName().toLowerCase();
runner = TestRunners.newTestRunner(processor);
runner.setVariable("uri", MONGO_URI);
runner.setVariable("db", DATABASE_NAME);
runner.setVariable("collection", COLLECTION_NAME);
runner.setProperty(AbstractMongoProcessor.URI, "${uri}");
runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}");
runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}");
mongoClient = new MongoClient(new MongoClientURI(MONGO_URI));
collection = mongoClient.getDatabase(DATABASE_NAME).getCollection(COLLECTION_NAME);
}
public void teardown() {
runner = null;
mongoClient.getDatabase(DATABASE_NAME).drop();
}
}

View File

@ -16,14 +16,6 @@
*/ */
package org.apache.nifi.processors.mongodb; package org.apache.nifi.processors.mongodb;
import static com.google.common.base.Charsets.UTF_8;
import static org.junit.Assert.assertEquals;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
@ -37,51 +29,28 @@ import org.junit.Before;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import com.google.common.collect.Lists; import java.nio.charset.StandardCharsets;
import com.mongodb.MongoClient; import java.util.Collection;
import com.mongodb.MongoClientURI; import java.util.HashSet;
import com.mongodb.client.MongoCollection; import java.util.Iterator;
import java.util.List;
import static org.junit.Assert.assertEquals;
@Ignore("Integration tests that cause failures in some environments. Require that they be run from Maven to run the embedded mongo maven plugin. Maven Plugin also fails in my CentOS 7 environment.") @Ignore("Integration tests that cause failures in some environments. Require that they be run from Maven to run the embedded mongo maven plugin. Maven Plugin also fails in my CentOS 7 environment.")
public class PutMongoTest { public class PutMongoTest extends MongoWriteTestBase {
private static final String MONGO_URI = "mongodb://localhost";
private static final String DATABASE_NAME = PutMongoTest.class.getSimpleName().toLowerCase();
private static final String COLLECTION_NAME = "test";
private static final List<Document> DOCUMENTS = Lists.newArrayList(
new Document("_id", "doc_1").append("a", 1).append("b", 2).append("c", 3),
new Document("_id", "doc_2").append("a", 1).append("b", 2).append("c", 4),
new Document("_id", "doc_3").append("a", 1).append("b", 3)
);
private TestRunner runner;
private MongoClient mongoClient;
private MongoCollection<Document> collection;
@Before @Before
public void setup() { public void setup() {
runner = TestRunners.newTestRunner(PutMongo.class); super.setup(PutMongo.class);
runner.setVariable("uri", MONGO_URI);
runner.setVariable("db", DATABASE_NAME);
runner.setVariable("collection", COLLECTION_NAME);
runner.setProperty(AbstractMongoProcessor.URI, "${uri}");
runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}");
runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}");
mongoClient = new MongoClient(new MongoClientURI(MONGO_URI));
collection = mongoClient.getDatabase(DATABASE_NAME).getCollection(COLLECTION_NAME);
} }
@After @After
public void teardown() { public void teardown() {
runner = null; super.teardown();
mongoClient.getDatabase(DATABASE_NAME).drop();
} }
private byte[] documentToByteArray(Document doc) { private byte[] documentToByteArray(Document doc) {
return doc.toJson().getBytes(UTF_8); return doc.toJson().getBytes(StandardCharsets.UTF_8);
} }
@Test @Test