NIFI-4212 - RethinkDB Delete Processor

Signed-off-by: James Wing <jvwing@gmail.com>

This closes #2030.
This commit is contained in:
mans2singh 2017-07-21 19:07:15 -07:00 committed by James Wing
parent 70878fe6d6
commit 0bb1411532
10 changed files with 785 additions and 82 deletions

View File

@ -18,9 +18,10 @@ package org.apache.nifi.processors.rethinkdb;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
@ -103,6 +104,30 @@ abstract class AbstractRethinkDBProcessor extends AbstractProcessor {
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR) .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor RETHINKDB_DOCUMENT_ID = new PropertyDescriptor.Builder()
.displayName("Document Identifier")
.name("rethinkdb-document-identifier")
.description("A FlowFile attribute, or attribute expression used " +
"for determining RethinkDB key for the Flow File content")
.required(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
.expressionLanguageSupported(true)
.build();
public static AllowableValue DURABILITY_SOFT = new AllowableValue("soft", "Soft", "Don't save changes to disk before ack");
public static AllowableValue DURABILITY_HARD = new AllowableValue("hard", "Hard", "Save change to disk before ack");
protected static final PropertyDescriptor DURABILITY = new PropertyDescriptor.Builder()
.name("rethinkdb-durability")
.displayName("Durablity of documents")
.description("Durability of documents being inserted")
.required(true)
.defaultValue("hard")
.allowableValues(DURABILITY_HARD, DURABILITY_SOFT)
.expressionLanguageSupported(true)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("Sucessful FlowFiles are routed to this relationship").build(); .description("Sucessful FlowFiles are routed to this relationship").build();
@ -122,7 +147,10 @@ abstract class AbstractRethinkDBProcessor extends AbstractProcessor {
public static final String RESULT_FIRST_ERROR_KEY = "first_error"; public static final String RESULT_FIRST_ERROR_KEY = "first_error";
public static final String RESULT_WARNINGS_KEY = "warnings"; public static final String RESULT_WARNINGS_KEY = "warnings";
public static final String DURABILITY_OPTION_KEY = "durability";
public static final String RETHINKDB_ERROR_MESSAGE = "rethinkdb.error.message"; public static final String RETHINKDB_ERROR_MESSAGE = "rethinkdb.error.message";
public static final String DOCUMENT_ID_EMPTY_MESSAGE = "Document Id cannot be empty";
protected Connection rethinkDbConnection; protected Connection rethinkDbConnection;
protected String databaseName; protected String databaseName;
@ -156,7 +184,6 @@ abstract class AbstractRethinkDBProcessor extends AbstractProcessor {
password = context.getProperty(PASSWORD).getValue(); password = context.getProperty(PASSWORD).getValue();
databaseName = context.getProperty(DB_NAME).getValue(); databaseName = context.getProperty(DB_NAME).getValue();
tableName = context.getProperty(TABLE_NAME).getValue(); tableName = context.getProperty(TABLE_NAME).getValue();
maxDocumentsSize = context.getProperty(MAX_DOCUMENTS_SIZE).asDataSize(DataUnit.B).longValue();
try { try {
rethinkDbConnection = makeConnection(); rethinkDbConnection = makeConnection();

View File

@ -0,0 +1,215 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.rethinkdb;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import com.google.gson.Gson;
import java.io.ByteArrayInputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@EventDriven
@Tags({"rethinkdb", "delete", "remove"})
@CapabilityDescription("Processor to remove a JSON document from RethinkDB (https://www.rethinkdb.com/) using the document id.")
@WritesAttributes({
@WritesAttribute(attribute = DeleteRethinkDB.RETHINKDB_ERROR_MESSAGE, description = "RethinkDB error message"),
@WritesAttribute(attribute = DeleteRethinkDB.RETHINKDB_DELETE_RESULT_ERROR_KEY, description = "Error count while delete documents"),
@WritesAttribute(attribute = DeleteRethinkDB.RETHINKDB_DELETE_RESULT_DELETED_KEY, description = "Number of documents deleted"),
@WritesAttribute(attribute = DeleteRethinkDB.RETHINKDB_DELETE_RESULT_INSERTED_KEY, description = "Number of documents inserted"),
@WritesAttribute(attribute = DeleteRethinkDB.RETHINKDB_DELETE_RESULT_REPLACED_KEY, description = "Number of documents replaced"),
@WritesAttribute(attribute = DeleteRethinkDB.RETHINKDB_DELETE_RESULT_SKIPPED_KEY, description = "Number of documents skipped"),
@WritesAttribute(attribute = DeleteRethinkDB.RETHINKDB_DELETE_RESULT_UNCHANGED_KEY, description = "Number of documents unchanged since they already existed"),
})
@SeeAlso({PutRethinkDB.class,GetRethinkDB.class})
public class DeleteRethinkDB extends AbstractRethinkDBProcessor {
public static AllowableValue RETURN_CHANGES_TRUE = new AllowableValue("true", "True", "Return changed document");
public static AllowableValue RETURN_CHANGES_FALSE = new AllowableValue("false", "False", "Do not return changed document");
protected static final PropertyDescriptor RETURN_CHANGES = new PropertyDescriptor.Builder()
.name("rethinkdb-return-result")
.displayName("Return deleted value")
.description("Return old value which were deleted")
.required(true)
.defaultValue(RETURN_CHANGES_TRUE.getValue())
.allowableValues(RETURN_CHANGES_TRUE, RETURN_CHANGES_FALSE)
.expressionLanguageSupported(true)
.build();
private static final Set<Relationship> relationships;
private static final List<PropertyDescriptor> propertyDescriptors;
public static final String RETHINKDB_DELETE_RESULT_ERROR_KEY = "rethinkdb.delete.errors";
public static final String RETHINKDB_DELETE_RESULT_DELETED_KEY = "rethinkdb.delete.deleted";
public static final String RETHINKDB_DELETE_RESULT_INSERTED_KEY = "rethinkdb.delete.inserted";
public static final String RETHINKDB_DELETE_RESULT_REPLACED_KEY = "rethinkdb.delete.replaced";
public static final String RETHINKDB_DELETE_RESULT_SKIPPED_KEY = "rethinkdb.delete.skipped";
public static final String RETHINKDB_DELETE_RESULT_UNCHANGED_KEY = "rethinkdb.delete.unchanged";
public static final String RESULT_CHANGES_KEY = "changes";
public static final String RETURN_CHANGES_OPTION_KEY = "return_changes";
protected Gson gson = new Gson();
static {
final Set<Relationship> tempRelationships = new HashSet<>();
tempRelationships.add(REL_SUCCESS);
tempRelationships.add(REL_FAILURE);
tempRelationships.add(REL_NOT_FOUND);
relationships = Collections.unmodifiableSet(tempRelationships);
final List<PropertyDescriptor> tempDescriptors = new ArrayList<>();
tempDescriptors.add(DB_NAME);
tempDescriptors.add(DB_HOST);
tempDescriptors.add(DB_PORT);
tempDescriptors.add(USERNAME);
tempDescriptors.add(PASSWORD);
tempDescriptors.add(TABLE_NAME);
tempDescriptors.add(CHARSET);
tempDescriptors.add(RETHINKDB_DOCUMENT_ID);
tempDescriptors.add(RETURN_CHANGES);
tempDescriptors.add(DURABILITY);
propertyDescriptors = Collections.unmodifiableList(tempDescriptors);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
super.onScheduled(context);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
String id = context.getProperty(RETHINKDB_DOCUMENT_ID).evaluateAttributeExpressions(flowFile).getValue();
String durablity = context.getProperty(DURABILITY).evaluateAttributeExpressions(flowFile).getValue();
Boolean returnChanges = context.getProperty(RETURN_CHANGES).evaluateAttributeExpressions(flowFile).asBoolean();
if ( StringUtils.isEmpty(id) ) {
getLogger().error(DOCUMENT_ID_EMPTY_MESSAGE);
flowFile = session.putAttribute(flowFile, RETHINKDB_ERROR_MESSAGE, DOCUMENT_ID_EMPTY_MESSAGE);
session.transfer(flowFile, REL_FAILURE);
return;
}
try {
long startTimeMillis = System.currentTimeMillis();
Map<String,Object> result = deleteDocument(id, durablity, returnChanges);
final long endTimeMillis = System.currentTimeMillis();
getLogger().debug("Json document {} deleted Result: {}", new Object[] {id, result});
flowFile = populateAttributes(session, flowFile, result);
Long deletedCount = ((Long)result.get(RESULT_DELETED_KEY)).longValue();
if ( deletedCount == 0L ) {
getLogger().debug("Deleted count should be 1 but was " + deletedCount + " for document with id '" + id + "'");
flowFile = populateAttributes(session, flowFile, result);
flowFile = session.putAttribute(flowFile, RETHINKDB_ERROR_MESSAGE, "Deleted count should be 1 but was " + deletedCount + " for document with id '" + id + "'");
session.transfer(flowFile, REL_NOT_FOUND);
return;
}
if ( returnChanges ) {
String json = gson.toJson(((List)result.get(RESULT_CHANGES_KEY)).get(0));
byte [] documentBytes = json.getBytes(charset);
ByteArrayInputStream bais = new ByteArrayInputStream(documentBytes);
session.importFrom(bais, flowFile);
session.getProvenanceReporter().modifyContent(flowFile,
new StringBuilder("rethinkdb://").append(databaseName).append("/").append(tableName).append("/").append(id).toString(),
(endTimeMillis - startTimeMillis));
}
session.transfer(flowFile, REL_SUCCESS);
} catch (Exception exception) {
getLogger().error("Failed to delete document from RethinkDB due to error {}",
new Object[]{exception.getLocalizedMessage()}, exception);
flowFile = session.putAttribute(flowFile, RETHINKDB_ERROR_MESSAGE, exception.getMessage());
session.transfer(flowFile, REL_FAILURE);
context.yield();
}
}
private FlowFile populateAttributes(final ProcessSession session, FlowFile flowFile,
Map<String, Object> result) {
Map<String,String> resultAttributes = new HashMap<>();
resultAttributes.put(RETHINKDB_DELETE_RESULT_ERROR_KEY, String.valueOf(result.get(RESULT_ERROR_KEY)));
resultAttributes.put(RETHINKDB_DELETE_RESULT_DELETED_KEY, String.valueOf(result.get(RESULT_DELETED_KEY)));
resultAttributes.put(RETHINKDB_DELETE_RESULT_INSERTED_KEY, String.valueOf(result.get(RESULT_INSERTED_KEY)));
resultAttributes.put(RETHINKDB_DELETE_RESULT_REPLACED_KEY, String.valueOf(result.get(RESULT_REPLACED_KEY)));
resultAttributes.put(RETHINKDB_DELETE_RESULT_SKIPPED_KEY, String.valueOf(result.get(RESULT_SKIPPED_KEY)));
resultAttributes.put(RETHINKDB_DELETE_RESULT_UNCHANGED_KEY, String.valueOf(result.get(RESULT_UNCHANGED_KEY)));
flowFile = session.putAllAttributes(flowFile, resultAttributes);
return flowFile;
}
protected Map<String,Object> deleteDocument(String id, String durablity, Boolean returnChanges) {
return getRdbTable().get(id).delete().optArg(DURABILITY_OPTION_KEY,durablity).optArg(RETURN_CHANGES_OPTION_KEY, returnChanges).run(rethinkDbConnection);
}
@OnStopped
public void close() {
super.close();
}
}

View File

@ -28,13 +28,12 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import com.google.gson.Gson; import com.google.gson.Gson;
@ -54,10 +53,9 @@ import java.util.Set;
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute = GetRethinkDB.RETHINKDB_ERROR_MESSAGE, description = "RethinkDB error message"), @WritesAttribute(attribute = GetRethinkDB.RETHINKDB_ERROR_MESSAGE, description = "RethinkDB error message"),
}) })
@SeeAlso({PutRethinkDB.class}) @SeeAlso({PutRethinkDB.class,DeleteRethinkDB.class})
public class GetRethinkDB extends AbstractRethinkDBProcessor { public class GetRethinkDB extends AbstractRethinkDBProcessor {
public static final String DOCUMENT_ID_EMPTY_MESSAGE = "Document Id cannot be empty";
public static AllowableValue READ_MODE_SINGLE = new AllowableValue("single", "Single", "Read values from memory from primary replica (Default)"); public static AllowableValue READ_MODE_SINGLE = new AllowableValue("single", "Single", "Read values from memory from primary replica (Default)");
public static AllowableValue READ_MODE_MAJORITY = new AllowableValue("majority", "Majority", "Read values committed to disk on majority of replicas"); public static AllowableValue READ_MODE_MAJORITY = new AllowableValue("majority", "Majority", "Read values committed to disk on majority of replicas");
public static AllowableValue READ_MODE_OUTDATED = new AllowableValue("outdated", "Outdated", "Read values from memory from an arbitrary replica "); public static AllowableValue READ_MODE_OUTDATED = new AllowableValue("outdated", "Outdated", "Read values from memory from an arbitrary replica ");
@ -72,16 +70,6 @@ public class GetRethinkDB extends AbstractRethinkDBProcessor {
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor RETHINKDB_DOCUMENT_ID = new PropertyDescriptor.Builder()
.displayName("Document Identifier")
.name("rethinkdb-document-identifier")
.description("A FlowFile attribute, or attribute expression used " +
"for determining RethinkDB key for the Flow File content")
.required(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
.expressionLanguageSupported(true)
.build();
protected String READ_MODE_KEY = "read_mode"; protected String READ_MODE_KEY = "read_mode";
private static final Set<Relationship> relationships; private static final Set<Relationship> relationships;
@ -122,6 +110,7 @@ public class GetRethinkDB extends AbstractRethinkDBProcessor {
@OnScheduled @OnScheduled
public void onScheduled(final ProcessContext context) { public void onScheduled(final ProcessContext context) {
maxDocumentsSize = context.getProperty(MAX_DOCUMENTS_SIZE).asDataSize(DataUnit.B).longValue();
super.onScheduled(context); super.onScheduled(context);
} }

View File

@ -29,6 +29,7 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
@ -62,16 +63,13 @@ import java.util.Set;
@WritesAttribute(attribute = PutRethinkDB.RETHINKDB_INSERT_RESULT_FIRST_ERROR_KEY, description = "First error while inserting documents"), @WritesAttribute(attribute = PutRethinkDB.RETHINKDB_INSERT_RESULT_FIRST_ERROR_KEY, description = "First error while inserting documents"),
@WritesAttribute(attribute = PutRethinkDB.RETHINKDB_INSERT_RESULT_WARNINGS_KEY, description = "Warning message in case of large number of ids being returned on insertion") @WritesAttribute(attribute = PutRethinkDB.RETHINKDB_INSERT_RESULT_WARNINGS_KEY, description = "Warning message in case of large number of ids being returned on insertion")
}) })
@SeeAlso({GetRethinkDB.class}) @SeeAlso({GetRethinkDB.class,DeleteRethinkDB.class})
public class PutRethinkDB extends AbstractRethinkDBProcessor { public class PutRethinkDB extends AbstractRethinkDBProcessor {
public static AllowableValue CONFLICT_STRATEGY_UPDATE = new AllowableValue("update", "Update", "Update the document having same id with new values"); public static AllowableValue CONFLICT_STRATEGY_UPDATE = new AllowableValue("update", "Update", "Update the document having same id with new values");
public static AllowableValue CONFLICT_STRATEGY_REPLACE = new AllowableValue("replace", "Replace", "Replace the document with having same id new document"); public static AllowableValue CONFLICT_STRATEGY_REPLACE = new AllowableValue("replace", "Replace", "Replace the document with having same id new document");
public static AllowableValue CONFLICT_STRATEGY_ERROR = new AllowableValue("error", "Error", "Return error if the document with same id exists"); public static AllowableValue CONFLICT_STRATEGY_ERROR = new AllowableValue("error", "Error", "Return error if the document with same id exists");
public static AllowableValue DURABILITY_SOFT = new AllowableValue("soft", "Soft", "Don't save document on disk before ack");
public static AllowableValue DURABILITY_HARD = new AllowableValue("hard", "Hard", "Save document on disk before ack");
protected static final PropertyDescriptor CONFLICT_STRATEGY = new PropertyDescriptor.Builder() protected static final PropertyDescriptor CONFLICT_STRATEGY = new PropertyDescriptor.Builder()
.name("rethinkdb-conflict-strategy") .name("rethinkdb-conflict-strategy")
.displayName("Conflict strategy") .displayName("Conflict strategy")
@ -82,19 +80,6 @@ public class PutRethinkDB extends AbstractRethinkDBProcessor {
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.build(); .build();
protected static final PropertyDescriptor DURABILITY = new PropertyDescriptor.Builder()
.name("rethinkdb-durability")
.displayName("Durablity of documents")
.description("Durability of documents being inserted")
.required(true)
.defaultValue("hard")
.allowableValues(DURABILITY_HARD, DURABILITY_SOFT)
.expressionLanguageSupported(true)
.build();
protected String CONFLICT_OPTION_KEY = "conflict";
protected String DURABILITY_OPTION_KEY = "durability";
private static final Set<Relationship> relationships; private static final Set<Relationship> relationships;
private static final List<PropertyDescriptor> propertyDescriptors; private static final List<PropertyDescriptor> propertyDescriptors;
@ -109,6 +94,8 @@ public class PutRethinkDB extends AbstractRethinkDBProcessor {
public static final String RETHINKDB_INSERT_RESULT_FIRST_ERROR_KEY = "rethinkdb.insert.first_error"; public static final String RETHINKDB_INSERT_RESULT_FIRST_ERROR_KEY = "rethinkdb.insert.first_error";
public static final String RETHINKDB_INSERT_RESULT_WARNINGS_KEY = "rethinkdb.insert.warnings"; public static final String RETHINKDB_INSERT_RESULT_WARNINGS_KEY = "rethinkdb.insert.warnings";
public final String CONFLICT_OPTION_KEY = "conflict";
static { static {
final Set<Relationship> tempRelationships = new HashSet<>(); final Set<Relationship> tempRelationships = new HashSet<>();
tempRelationships.add(REL_SUCCESS); tempRelationships.add(REL_SUCCESS);
@ -141,6 +128,7 @@ public class PutRethinkDB extends AbstractRethinkDBProcessor {
@OnScheduled @OnScheduled
public void onScheduled(final ProcessContext context) { public void onScheduled(final ProcessContext context) {
maxDocumentsSize = context.getProperty(MAX_DOCUMENTS_SIZE).asDataSize(DataUnit.B).longValue();
super.onScheduled(context); super.onScheduled(context);
} }

View File

@ -14,3 +14,4 @@
# limitations under the License. # limitations under the License.
org.apache.nifi.processors.rethinkdb.PutRethinkDB org.apache.nifi.processors.rethinkdb.PutRethinkDB
org.apache.nifi.processors.rethinkdb.GetRethinkDB org.apache.nifi.processors.rethinkdb.GetRethinkDB
org.apache.nifi.processors.rethinkdb.DeleteRethinkDB

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.rethinkdb;
import org.apache.nifi.util.TestRunner;
import com.rethinkdb.RethinkDB;
import com.rethinkdb.net.Connection;
/**
* Abstract base class for RethinkDB integration tests
*/
public class ITAbstractRethinkDBTest {
protected TestRunner runner;
protected Connection connection;
protected String dbName = "test";
protected String dbHost = "localhost";
protected String dbPort = "28015";
protected String user = "admin";
protected String password = "admin";
protected String table = "test";
public void setUp() throws Exception {
runner.setProperty(AbstractRethinkDBProcessor.DB_NAME, dbName);
runner.setProperty(AbstractRethinkDBProcessor.DB_HOST, dbHost);
runner.setProperty(AbstractRethinkDBProcessor.DB_PORT, dbPort);
runner.setProperty(AbstractRethinkDBProcessor.USERNAME, user);
runner.setProperty(AbstractRethinkDBProcessor.PASSWORD, password);
runner.setProperty(AbstractRethinkDBProcessor.TABLE_NAME, table);
runner.setProperty(AbstractRethinkDBProcessor.CHARSET, "UTF-8");
connection = RethinkDB.r.connection().user(user, password).db(dbName).hostname(dbHost).port(Integer.parseInt(dbPort)).connect();
}
public void tearDown() throws Exception {
runner = null;
connection.close();
connection = null;
}
}

View File

@ -0,0 +1,207 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.rethinkdb;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import com.rethinkdb.RethinkDB;
import net.minidev.json.JSONObject;
/**
* Integration test for deleting documents from RethinkDB. Please ensure that the RethinkDB is running
* on local host with default port and has database test with table test and user
* <code>admin</code> with password <code>admin</code> before running the integration tests or set the attributes in the
* test accordingly.
*/
@Ignore("Comment this out for running tests against a real instance of RethinkDB")
public class ITDeleteRethinkDBTest extends ITAbstractRethinkDBTest {
@Before
public void setUp() throws Exception {
runner = TestRunners.newTestRunner(DeleteRethinkDB.class);
super.setUp();
runner.setProperty(DeleteRethinkDB.DURABILITY, "soft");
runner.setProperty(DeleteRethinkDB.RETURN_CHANGES, "true");
connection = RethinkDB.r.connection().user(user, password).db(dbName).hostname(dbHost).port(Integer.parseInt(dbPort)).connect();
RethinkDB.r.db(dbName).table(table).delete().run(connection);
long count = RethinkDB.r.db(dbName).table(table).count().run(connection);
assertEquals("Count should be same", 0L, count);
}
@After
public void tearDown() throws Exception {
super.tearDown();
}
@Test
public void testDeleteDocumentById() {
JSONObject message = new JSONObject();
message.put("id", "1");
message.put("value", "one");
RethinkDB.r.db(dbName).table(table).insert(message).run(connection);
long count = RethinkDB.r.db(dbName).table(table).count().run(connection);
assertEquals("Count should be same", 1L, count);
runner.setProperty(AbstractRethinkDBProcessor.RETHINKDB_DOCUMENT_ID, "${rethinkdb.id}");
Map<String, String> props = new HashMap<>();
props.put("rethinkdb.id","1");
runner.enqueue(new byte [] {},props);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_SUCCESS, 1);
String changeMessage = ("{\"old_val\":" + message + "}");
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_SUCCESS);
assertEquals("Flow file count should be same", 1, flowFiles.size());
assertEquals("Error should be null",null, flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
assertEquals("Content should be same size", changeMessage.length(), flowFiles.get(0).getSize());
flowFiles.get(0).assertContentEquals(changeMessage);
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_DELETED_KEY), "1");
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_ERROR_KEY),"0");
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_INSERTED_KEY),"0");
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_REPLACED_KEY),"0");
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_SKIPPED_KEY),"0");
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_UNCHANGED_KEY),"0");
Map<String,Object> document = RethinkDB.r.db(dbName).table(table).get("1").run(connection);
assertEquals("Document should be null", document, null);
}
@Test
public void testDeleteDocumentByIdNoChanges() {
JSONObject message = new JSONObject();
message.put("id", "11");
message.put("value", "one");
RethinkDB.r.db(dbName).table(table).insert(message).run(connection);
long count = RethinkDB.r.db(dbName).table(table).count().run(connection);
assertEquals("Count should be same", 1L, count);
runner.setProperty(AbstractRethinkDBProcessor.RETHINKDB_DOCUMENT_ID, "${rethinkdb.id}");
runner.setProperty(DeleteRethinkDB.RETURN_CHANGES, "false");
Map<String, String> props = new HashMap<>();
props.put("rethinkdb.id","11");
runner.enqueue(new byte [] {},props);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_SUCCESS);
assertEquals("Flow file count should be same", 1, flowFiles.size());
assertEquals("Error should be null",null, flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_DELETED_KEY), "1");
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_ERROR_KEY),"0");
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_INSERTED_KEY),"0");
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_REPLACED_KEY),"0");
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_SKIPPED_KEY),"0");
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_UNCHANGED_KEY),"0");
assertEquals(flowFiles.get(0).getSize(), 0);
Map<String,Object> document = RethinkDB.r.db(dbName).table(table).get("1").run(connection);
assertEquals("Document should be null", document, null);
}
@Test
public void testDeleteDocumentByIdNotFound() {
JSONObject message = new JSONObject();
message.put("id", "1");
message.put("value", "one");
RethinkDB.r.db(dbName).table(table).insert(message).run(connection);
long count = RethinkDB.r.db(dbName).table(table).count().run(connection);
assertEquals("Count should be same", 1L, count);
runner.setProperty(AbstractRethinkDBProcessor.RETHINKDB_DOCUMENT_ID, "${rethinkdb.id}");
Map<String, String> props = new HashMap<>();
props.put("rethinkdb.id", String.valueOf(System.currentTimeMillis()));
runner.enqueue(new byte [] {},props);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_NOT_FOUND, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_NOT_FOUND);
assertEquals("Flow file count should be same", 1, flowFiles.size());
assertNotNull("Error should not be null", flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
assertEquals("Content should be same size", 0, flowFiles.get(0).getSize());
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_DELETED_KEY), "0");
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_ERROR_KEY),"0");
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_INSERTED_KEY),"0");
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_REPLACED_KEY),"0");
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_SKIPPED_KEY),"1");
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_UNCHANGED_KEY),"0");
Map<String,Object> document = RethinkDB.r.db(dbName).table(table).get("1").run(connection);
assertNotNull("Document should not be null", document);
assertEquals("id should be same", document.get("id"), "1");
}
@Test
public void testDeleteDocumentByHardCodedId() {
JSONObject message = new JSONObject();
message.put("id", "2");
message.put("value", "two");
RethinkDB.r.db(dbName).table(table).insert(message).run(connection);
long count = RethinkDB.r.db(dbName).table(table).count().run(connection);
assertEquals("Count should be same", 1, count);
runner.setProperty(AbstractRethinkDBProcessor.RETHINKDB_DOCUMENT_ID, "2");
Map<String, String> props = new HashMap<>();
runner.enqueue(new byte [] {},props);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_SUCCESS, 1);
String changeMessage = ("{\"old_val\":" + message + "}");
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_SUCCESS);
assertEquals("Flow file count should be same", 1, flowFiles.size());
assertEquals("Error should be null",null, flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
assertEquals("Content should be same size", changeMessage.length(), flowFiles.get(0).getSize());
flowFiles.get(0).assertContentEquals(changeMessage);
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_DELETED_KEY), "1");
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_ERROR_KEY),"0");
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_INSERTED_KEY),"0");
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_REPLACED_KEY),"0");
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_SKIPPED_KEY),"0");
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_UNCHANGED_KEY),"0");
Map<String,Object> document = RethinkDB.r.db(dbName).table(table).get("2").run(connection);
assertEquals("Document should be null", document, null);
}
}

View File

@ -24,14 +24,12 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import com.rethinkdb.RethinkDB; import com.rethinkdb.RethinkDB;
import com.rethinkdb.net.Connection;
import net.minidev.json.JSONObject; import net.minidev.json.JSONObject;
@ -42,29 +40,13 @@ import net.minidev.json.JSONObject;
* test accordingly. * test accordingly.
*/ */
@Ignore("Comment this out for running tests against a real instance of RethinkDB") @Ignore("Comment this out for running tests against a real instance of RethinkDB")
public class ITGetRethinkDBTest { public class ITGetRethinkDBTest extends ITAbstractRethinkDBTest {
private TestRunner runner;
private Connection connection;
private String dbName = "test";
private String dbHost = "localhost";
private String dbPort = "28015";
private String user = "admin";
private String password = "admin";
private String table = "test";
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
runner = TestRunners.newTestRunner(GetRethinkDB.class); runner = TestRunners.newTestRunner(GetRethinkDB.class);
runner.setProperty(AbstractRethinkDBProcessor.DB_NAME, dbName); super.setUp();
runner.setProperty(AbstractRethinkDBProcessor.DB_HOST, dbHost); runner.setProperty(AbstractRethinkDBProcessor.MAX_DOCUMENTS_SIZE, "1 KB");
runner.setProperty(AbstractRethinkDBProcessor.DB_PORT, dbPort);
runner.setProperty(AbstractRethinkDBProcessor.USERNAME, user);
runner.setProperty(AbstractRethinkDBProcessor.PASSWORD, password);
runner.setProperty(AbstractRethinkDBProcessor.TABLE_NAME, table);
runner.setProperty(AbstractRethinkDBProcessor.CHARSET, "UTF-8");
connection = RethinkDB.r.connection().user(user, password).db(dbName).hostname(dbHost).port(Integer.parseInt(dbPort)).connect();
RethinkDB.r.db(dbName).table(table).delete().run(connection); RethinkDB.r.db(dbName).table(table).delete().run(connection);
long count = RethinkDB.r.db(dbName).table(table).count().run(connection); long count = RethinkDB.r.db(dbName).table(table).count().run(connection);
assertEquals("Count should be same", 0L, count); assertEquals("Count should be same", 0L, count);
@ -72,9 +54,7 @@ public class ITGetRethinkDBTest {
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
runner = null; super.tearDown();
connection.close();
connection = null;
} }
@Test @Test
@ -88,6 +68,7 @@ public class ITGetRethinkDBTest {
assertEquals("Count should be same", 1L, count); assertEquals("Count should be same", 1L, count);
runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "${rethinkdb.id}"); runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "${rethinkdb.id}");
runner.assertValid();
Map<String, String> props = new HashMap<>(); Map<String, String> props = new HashMap<>();
props.put("rethinkdb.id","1"); props.put("rethinkdb.id","1");
@ -114,6 +95,7 @@ public class ITGetRethinkDBTest {
assertEquals("Count should be same", 1L, count); assertEquals("Count should be same", 1L, count);
runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "${rethinkdb.id}"); runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "${rethinkdb.id}");
runner.assertValid();
Map<String, String> props = new HashMap<>(); Map<String, String> props = new HashMap<>();
props.put("rethinkdb.id", String.valueOf(System.currentTimeMillis())); props.put("rethinkdb.id", String.valueOf(System.currentTimeMillis()));
@ -139,6 +121,7 @@ public class ITGetRethinkDBTest {
assertEquals("Count should be same", 1, count); assertEquals("Count should be same", 1, count);
runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "2"); runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "2");
runner.assertValid();
Map<String, String> props = new HashMap<>(); Map<String, String> props = new HashMap<>();
@ -165,6 +148,7 @@ public class ITGetRethinkDBTest {
runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "2"); runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "2");
runner.setProperty(AbstractRethinkDBProcessor.MAX_DOCUMENTS_SIZE, "2B"); runner.setProperty(AbstractRethinkDBProcessor.MAX_DOCUMENTS_SIZE, "2B");
runner.assertValid();
Map<String, String> props = new HashMap<>(); Map<String, String> props = new HashMap<>();

View File

@ -20,7 +20,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import java.util.List; import java.util.List;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.json.simple.JSONArray; import org.json.simple.JSONArray;
import org.junit.After; import org.junit.After;
@ -29,7 +28,6 @@ import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import com.rethinkdb.RethinkDB; import com.rethinkdb.RethinkDB;
import com.rethinkdb.net.Connection;
import net.minidev.json.JSONObject; import net.minidev.json.JSONObject;
@ -40,43 +38,25 @@ import net.minidev.json.JSONObject;
* test accordingly. * test accordingly.
*/ */
@Ignore("Comment this out for running tests against a real instance of RethinkDB") @Ignore("Comment this out for running tests against a real instance of RethinkDB")
public class ITPutRethinkDBTest { public class ITPutRethinkDBTest extends ITAbstractRethinkDBTest {
private TestRunner runner;
private Connection connection;
private String dbName = "test";
private String dbHost = "localhost";
private String dbPort = "28015";
private String user = "admin";
private String password = "admin";
private String table = "test";
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
runner = TestRunners.newTestRunner(PutRethinkDB.class); runner = TestRunners.newTestRunner(PutRethinkDB.class);
runner.setProperty(PutRethinkDB.DB_NAME, dbName); super.setUp();
runner.setProperty(PutRethinkDB.DB_HOST, dbHost); runner.setProperty(AbstractRethinkDBProcessor.MAX_DOCUMENTS_SIZE, "1 KB");
runner.setProperty(PutRethinkDB.DB_PORT, dbPort);
runner.setProperty(PutRethinkDB.USERNAME, user);
runner.setProperty(PutRethinkDB.PASSWORD, password);
runner.setProperty(PutRethinkDB.TABLE_NAME, table);
runner.setProperty(PutRethinkDB.CHARSET, "UTF-8");
runner.setProperty(PutRethinkDB.CONFLICT_STRATEGY, PutRethinkDB.CONFLICT_STRATEGY_UPDATE); runner.setProperty(PutRethinkDB.CONFLICT_STRATEGY, PutRethinkDB.CONFLICT_STRATEGY_UPDATE);
runner.setProperty(PutRethinkDB.DURABILITY, PutRethinkDB.DURABILITY_HARD); runner.setProperty(PutRethinkDB.DURABILITY, PutRethinkDB.DURABILITY_HARD);
runner.setProperty(PutRethinkDB.MAX_DOCUMENTS_SIZE, "1 KB");
runner.assertValid();
connection = RethinkDB.r.connection().user(user, password).db(dbName).hostname(dbHost).port(Integer.parseInt(dbPort)).connect();
} }
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
runner = null; super.tearDown();
connection.close();
connection = null;
} }
@Test @Test
public void testValidSingleMessage() { public void testValidSingleMessage() {
runner.assertValid();
RethinkDB.r.db(dbName).table(table).delete().run(connection); RethinkDB.r.db(dbName).table(table).delete().run(connection);
long count = RethinkDB.r.db(dbName).table(table).count().run(connection); long count = RethinkDB.r.db(dbName).table(table).count().run(connection);
assertEquals("Count should be same", 0L, count); assertEquals("Count should be same", 0L, count);
@ -104,6 +84,7 @@ public class ITPutRethinkDBTest {
@Test @Test
public void testValidSingleMessageTwiceConflictUpdate() { public void testValidSingleMessageTwiceConflictUpdate() {
runner.assertValid();
RethinkDB.r.db(dbName).table(table).delete().run(connection); RethinkDB.r.db(dbName).table(table).delete().run(connection);
long count = RethinkDB.r.db(dbName).table(table).count().run(connection); long count = RethinkDB.r.db(dbName).table(table).count().run(connection);
assertEquals("Count should be same", 0L, count); assertEquals("Count should be same", 0L, count);
@ -144,6 +125,7 @@ public class ITPutRethinkDBTest {
@Test @Test
public void testValidSingleMessageTwiceConflictError() { public void testValidSingleMessageTwiceConflictError() {
runner.setProperty(PutRethinkDB.CONFLICT_STRATEGY, PutRethinkDB.CONFLICT_STRATEGY_ERROR); runner.setProperty(PutRethinkDB.CONFLICT_STRATEGY, PutRethinkDB.CONFLICT_STRATEGY_ERROR);
runner.assertValid();
RethinkDB.r.db(dbName).table(table).delete().run(connection); RethinkDB.r.db(dbName).table(table).delete().run(connection);
long count = RethinkDB.r.db(dbName).table(table).count().run(connection); long count = RethinkDB.r.db(dbName).table(table).count().run(connection);
assertEquals("Count should be same", 0L, count); assertEquals("Count should be same", 0L, count);
@ -183,6 +165,7 @@ public class ITPutRethinkDBTest {
@Test @Test
public void testValidArrayMessage() { public void testValidArrayMessage() {
runner.assertValid();
RethinkDB.r.db(dbName).table(table).delete().run(connection); RethinkDB.r.db(dbName).table(table).delete().run(connection);
long count = RethinkDB.r.db(dbName).table(table).count().run(connection); long count = RethinkDB.r.db(dbName).table(table).count().run(connection);
assertEquals("Count should be same", 0L, count); assertEquals("Count should be same", 0L, count);

View File

@ -0,0 +1,254 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.rethinkdb;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.rethinkdb.net.Connection;
public class TestDeleteRethinkDB {
private static final String DOCUMENT_ID = "id1";
private TestRunner runner;
private AbstractRethinkDBProcessor mockDeleteRethinkDB;
private Map<String,Object> document = new HashMap<>();
@Before
public void setUp() throws Exception {
mockDeleteRethinkDB = new DeleteRethinkDB() {
@Override
protected Connection makeConnection() {
return null;
}
@Override
protected Map<String, Object> deleteDocument(String id, String durablity, Boolean returnChanges) {
return document;
}
};
document.put(DeleteRethinkDB.RESULT_DELETED_KEY, 1L);
document.put(DeleteRethinkDB.RESULT_ERROR_KEY, 0L);
document.put(DeleteRethinkDB.RESULT_CHANGES_KEY, Lists.asList(
"[{new_val=null, old_val={id=1, value=one}}]", new String[] {}));
document.put(DeleteRethinkDB.RESULT_INSERTED_KEY, 0L);
document.put(DeleteRethinkDB.RESULT_REPLACED_KEY, 0L);
document.put(DeleteRethinkDB.RESULT_SKIPPED_KEY, 0L);
document.put(DeleteRethinkDB.RESULT_UNCHANGED_KEY, 0L);
document.put(DeleteRethinkDB.RESULT_FIRST_ERROR_KEY, "");
runner = TestRunners.newTestRunner(mockDeleteRethinkDB);
runner.setProperty(AbstractRethinkDBProcessor.DB_NAME, "test");
runner.setProperty(AbstractRethinkDBProcessor.DB_HOST, "host1");
runner.setProperty(AbstractRethinkDBProcessor.DB_PORT, "1234");
runner.setProperty(AbstractRethinkDBProcessor.USERNAME, "u1");
runner.setProperty(AbstractRethinkDBProcessor.PASSWORD, "p1");
runner.setProperty(AbstractRethinkDBProcessor.TABLE_NAME, "t1");
runner.setProperty(AbstractRethinkDBProcessor.CHARSET, "UTF-8");
runner.setProperty(AbstractRethinkDBProcessor.DURABILITY, "soft");
runner.setProperty(AbstractRethinkDBProcessor.RETHINKDB_DOCUMENT_ID, "${rethinkdb.id}");
runner.assertValid();
}
@After
public void tearDown() throws Exception {
runner = null;
}
@Test
public void testDefaultValid() {
runner.assertValid();
}
@Test
public void testBlankHost() {
runner.setProperty(AbstractRethinkDBProcessor.DB_HOST, "");
runner.assertNotValid();
}
@Test
public void testEmptyPort() {
runner.setProperty(AbstractRethinkDBProcessor.DB_PORT, "");
runner.assertNotValid();
}
@Test
public void testEmptyDBName() {
runner.setProperty(AbstractRethinkDBProcessor.DB_NAME, "");
runner.assertNotValid();
}
@Test
public void testEmptyUsername() {
runner.setProperty(AbstractRethinkDBProcessor.USERNAME, "");
runner.assertNotValid();
}
@Test
public void testEmptyPassword() {
runner.setProperty(AbstractRethinkDBProcessor.PASSWORD, "p1");
runner.assertValid();
}
@Test
public void testCharsetUTF8() {
runner.setProperty(AbstractRethinkDBProcessor.CHARSET, "UTF-8");
runner.assertValid();
}
@Test
public void testCharsetBlank() {
runner.setProperty(AbstractRethinkDBProcessor.CHARSET, "");
runner.assertNotValid();
}
@Test
public void testZeroMaxDocumentSize() {
runner.setProperty(AbstractRethinkDBProcessor.MAX_DOCUMENTS_SIZE, "0");
runner.assertNotValid();
}
@Test
public void testBlankDurability() {
runner.setProperty(DeleteRethinkDB.DURABILITY, "");
runner.assertNotValid();
}
@Test
public void testNotFound() {
runner.assertValid();
document.put(DeleteRethinkDB.RESULT_DELETED_KEY, 0L);
HashMap<String,String> props = new HashMap<>();
props.put("rethinkdb.id", DOCUMENT_ID);
runner.enqueue(new byte[]{}, props);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_NOT_FOUND, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_NOT_FOUND);
assertNotNull(flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
flowFiles.get(0).assertAttributeEquals(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE,
"Deleted count should be 1 but was 0 for document with id 'id1'");
}
@Test
public void testBlankId() {
runner.assertValid();
runner.setProperty(AbstractRethinkDBProcessor.RETHINKDB_DOCUMENT_ID, "${rethinkdb.id}");
Map<String,String> props = new HashMap<>();
runner.enqueue(new byte[]{},props);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_FAILURE);
assertNotNull(flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
flowFiles.get(0).assertAttributeEquals(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE,AbstractRethinkDBProcessor.DOCUMENT_ID_EMPTY_MESSAGE);
}
@Test
public void testNullId() {
runner.assertValid();
runner.setProperty(AbstractRethinkDBProcessor.RETHINKDB_DOCUMENT_ID, "${rethinkdb.id}");
Map<String,String> props = new HashMap<>();
props.put("rethinkdb.id", null);
runner.enqueue(new byte[]{},props);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_FAILURE);
assertNotNull(flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
flowFiles.get(0).assertAttributeEquals(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE,AbstractRethinkDBProcessor.DOCUMENT_ID_EMPTY_MESSAGE);
}
@Test
public void testValidSingleDelete() {
runner.assertValid();
HashMap<String,String> props = new HashMap<>();
props.put("rethinkdb.id", DOCUMENT_ID);
runner.enqueue(new byte[]{}, props);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_SUCCESS, 1);
Gson gson = new Gson();
String json = gson.toJson(((List)document.get(DeleteRethinkDB.RESULT_CHANGES_KEY)).get(0));
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_SUCCESS);
flowFiles.get(0).assertContentEquals(json.toString());
assertNull(flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
}
@Test
public void testGetThrowsException() {
mockDeleteRethinkDB = new DeleteRethinkDB() {
@Override
protected Connection makeConnection() {
return null;
}
@Override
protected Map<String,Object> deleteDocument(String id, String durablity, Boolean returnChanges) {
throw new RuntimeException("testException");
}
};
runner = TestRunners.newTestRunner(mockDeleteRethinkDB);
runner.setProperty(AbstractRethinkDBProcessor.DB_NAME, "test");
runner.setProperty(AbstractRethinkDBProcessor.DB_HOST, "host1");
runner.setProperty(AbstractRethinkDBProcessor.DB_PORT, "1234");
runner.setProperty(AbstractRethinkDBProcessor.USERNAME, "u1");
runner.setProperty(AbstractRethinkDBProcessor.PASSWORD, "p1");
runner.setProperty(AbstractRethinkDBProcessor.TABLE_NAME, "t1");
runner.setProperty(AbstractRethinkDBProcessor.CHARSET, "UTF-8");
runner.setProperty(AbstractRethinkDBProcessor.RETHINKDB_DOCUMENT_ID, DOCUMENT_ID);
runner.assertValid();
HashMap<String,String> props = new HashMap<>();
props.put("rethinkdb.id", DOCUMENT_ID);
runner.enqueue(new byte[]{}, props);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_FAILURE);
assertNotNull(flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
flowFiles.get(0).assertAttributeEquals(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE,"testException");
}
}