From 0bb141153282888427656d1471c35a5d83958780 Mon Sep 17 00:00:00 2001 From: mans2singh Date: Fri, 21 Jul 2017 19:07:15 -0700 Subject: [PATCH] NIFI-4212 - RethinkDB Delete Processor Signed-off-by: James Wing This closes #2030. --- .../rethinkdb/AbstractRethinkDBProcessor.java | 31 ++- .../processors/rethinkdb/DeleteRethinkDB.java | 215 +++++++++++++++ .../processors/rethinkdb/GetRethinkDB.java | 17 +- .../processors/rethinkdb/PutRethinkDB.java | 22 +- .../org.apache.nifi.processor.Processor | 1 + .../rethinkdb/ITAbstractRethinkDBTest.java | 55 ++++ .../rethinkdb/ITDeleteRethinkDBTest.java | 207 ++++++++++++++ .../rethinkdb/ITGetRethinkDBTest.java | 32 +-- .../rethinkdb/ITPutRethinkDBTest.java | 33 +-- .../rethinkdb/TestDeleteRethinkDB.java | 254 ++++++++++++++++++ 10 files changed, 785 insertions(+), 82 deletions(-) create mode 100644 nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/DeleteRethinkDB.java create mode 100644 nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITAbstractRethinkDBTest.java create mode 100644 nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITDeleteRethinkDBTest.java create mode 100644 nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/TestDeleteRethinkDB.java diff --git a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/AbstractRethinkDBProcessor.java b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/AbstractRethinkDBProcessor.java index cfbf1bdf94..61ad45760c 100644 --- a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/AbstractRethinkDBProcessor.java +++ b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/AbstractRethinkDBProcessor.java @@ -18,9 +18,10 @@ package org.apache.nifi.processors.rethinkdb; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.AttributeExpression.ResultType; import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.util.StandardValidators; @@ -103,6 +104,30 @@ abstract class AbstractRethinkDBProcessor extends AbstractProcessor { .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) .build(); + public static final PropertyDescriptor RETHINKDB_DOCUMENT_ID = new PropertyDescriptor.Builder() + .displayName("Document Identifier") + .name("rethinkdb-document-identifier") + .description("A FlowFile attribute, or attribute expression used " + + "for determining RethinkDB key for the Flow File content") + .required(true) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true)) + .expressionLanguageSupported(true) + .build(); + + public static AllowableValue DURABILITY_SOFT = new AllowableValue("soft", "Soft", "Don't save changes to disk before ack"); + + public static AllowableValue DURABILITY_HARD = new AllowableValue("hard", "Hard", "Save change to disk before ack"); + + protected static final PropertyDescriptor DURABILITY = new PropertyDescriptor.Builder() + .name("rethinkdb-durability") + .displayName("Durablity of documents") + .description("Durability of documents being inserted") + .required(true) + .defaultValue("hard") + .allowableValues(DURABILITY_HARD, DURABILITY_SOFT) + .expressionLanguageSupported(true) + .build(); + static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") .description("Sucessful FlowFiles are routed to this relationship").build(); @@ -122,7 +147,10 @@ abstract class AbstractRethinkDBProcessor extends AbstractProcessor { public static final String RESULT_FIRST_ERROR_KEY = "first_error"; public static final String RESULT_WARNINGS_KEY = "warnings"; + public static final String DURABILITY_OPTION_KEY = "durability"; + public static final String RETHINKDB_ERROR_MESSAGE = "rethinkdb.error.message"; + public static final String DOCUMENT_ID_EMPTY_MESSAGE = "Document Id cannot be empty"; protected Connection rethinkDbConnection; protected String databaseName; @@ -156,7 +184,6 @@ abstract class AbstractRethinkDBProcessor extends AbstractProcessor { password = context.getProperty(PASSWORD).getValue(); databaseName = context.getProperty(DB_NAME).getValue(); tableName = context.getProperty(TABLE_NAME).getValue(); - maxDocumentsSize = context.getProperty(MAX_DOCUMENTS_SIZE).asDataSize(DataUnit.B).longValue(); try { rethinkDbConnection = makeConnection(); diff --git a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/DeleteRethinkDB.java b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/DeleteRethinkDB.java new file mode 100644 index 0000000000..6fed9e2698 --- /dev/null +++ b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/DeleteRethinkDB.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.rethinkdb; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; + +import com.google.gson.Gson; + +import java.io.ByteArrayInputStream; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@EventDriven +@Tags({"rethinkdb", "delete", "remove"}) +@CapabilityDescription("Processor to remove a JSON document from RethinkDB (https://www.rethinkdb.com/) using the document id.") +@WritesAttributes({ + @WritesAttribute(attribute = DeleteRethinkDB.RETHINKDB_ERROR_MESSAGE, description = "RethinkDB error message"), + @WritesAttribute(attribute = DeleteRethinkDB.RETHINKDB_DELETE_RESULT_ERROR_KEY, description = "Error count while delete documents"), + @WritesAttribute(attribute = DeleteRethinkDB.RETHINKDB_DELETE_RESULT_DELETED_KEY, description = "Number of documents deleted"), + @WritesAttribute(attribute = DeleteRethinkDB.RETHINKDB_DELETE_RESULT_INSERTED_KEY, description = "Number of documents inserted"), + @WritesAttribute(attribute = DeleteRethinkDB.RETHINKDB_DELETE_RESULT_REPLACED_KEY, description = "Number of documents replaced"), + @WritesAttribute(attribute = DeleteRethinkDB.RETHINKDB_DELETE_RESULT_SKIPPED_KEY, description = "Number of documents skipped"), + @WritesAttribute(attribute = DeleteRethinkDB.RETHINKDB_DELETE_RESULT_UNCHANGED_KEY, description = "Number of documents unchanged since they already existed"), + }) +@SeeAlso({PutRethinkDB.class,GetRethinkDB.class}) +public class DeleteRethinkDB extends AbstractRethinkDBProcessor { + + public static AllowableValue RETURN_CHANGES_TRUE = new AllowableValue("true", "True", "Return changed document"); + public static AllowableValue RETURN_CHANGES_FALSE = new AllowableValue("false", "False", "Do not return changed document"); + + protected static final PropertyDescriptor RETURN_CHANGES = new PropertyDescriptor.Builder() + .name("rethinkdb-return-result") + .displayName("Return deleted value") + .description("Return old value which were deleted") + .required(true) + .defaultValue(RETURN_CHANGES_TRUE.getValue()) + .allowableValues(RETURN_CHANGES_TRUE, RETURN_CHANGES_FALSE) + .expressionLanguageSupported(true) + .build(); + + private static final Set relationships; + private static final List propertyDescriptors; + + public static final String RETHINKDB_DELETE_RESULT_ERROR_KEY = "rethinkdb.delete.errors"; + public static final String RETHINKDB_DELETE_RESULT_DELETED_KEY = "rethinkdb.delete.deleted"; + public static final String RETHINKDB_DELETE_RESULT_INSERTED_KEY = "rethinkdb.delete.inserted"; + public static final String RETHINKDB_DELETE_RESULT_REPLACED_KEY = "rethinkdb.delete.replaced"; + public static final String RETHINKDB_DELETE_RESULT_SKIPPED_KEY = "rethinkdb.delete.skipped"; + public static final String RETHINKDB_DELETE_RESULT_UNCHANGED_KEY = "rethinkdb.delete.unchanged"; + + public static final String RESULT_CHANGES_KEY = "changes"; + public static final String RETURN_CHANGES_OPTION_KEY = "return_changes"; + + protected Gson gson = new Gson(); + + static { + final Set tempRelationships = new HashSet<>(); + tempRelationships.add(REL_SUCCESS); + tempRelationships.add(REL_FAILURE); + tempRelationships.add(REL_NOT_FOUND); + relationships = Collections.unmodifiableSet(tempRelationships); + + final List tempDescriptors = new ArrayList<>(); + tempDescriptors.add(DB_NAME); + tempDescriptors.add(DB_HOST); + tempDescriptors.add(DB_PORT); + tempDescriptors.add(USERNAME); + tempDescriptors.add(PASSWORD); + tempDescriptors.add(TABLE_NAME); + tempDescriptors.add(CHARSET); + tempDescriptors.add(RETHINKDB_DOCUMENT_ID); + tempDescriptors.add(RETURN_CHANGES); + tempDescriptors.add(DURABILITY); + propertyDescriptors = Collections.unmodifiableList(tempDescriptors); + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + super.onScheduled(context); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue()); + String id = context.getProperty(RETHINKDB_DOCUMENT_ID).evaluateAttributeExpressions(flowFile).getValue(); + String durablity = context.getProperty(DURABILITY).evaluateAttributeExpressions(flowFile).getValue(); + Boolean returnChanges = context.getProperty(RETURN_CHANGES).evaluateAttributeExpressions(flowFile).asBoolean(); + + if ( StringUtils.isEmpty(id) ) { + getLogger().error(DOCUMENT_ID_EMPTY_MESSAGE); + flowFile = session.putAttribute(flowFile, RETHINKDB_ERROR_MESSAGE, DOCUMENT_ID_EMPTY_MESSAGE); + session.transfer(flowFile, REL_FAILURE); + return; + } + + try { + long startTimeMillis = System.currentTimeMillis(); + Map result = deleteDocument(id, durablity, returnChanges); + final long endTimeMillis = System.currentTimeMillis(); + + getLogger().debug("Json document {} deleted Result: {}", new Object[] {id, result}); + + flowFile = populateAttributes(session, flowFile, result); + + Long deletedCount = ((Long)result.get(RESULT_DELETED_KEY)).longValue(); + + if ( deletedCount == 0L ) { + getLogger().debug("Deleted count should be 1 but was " + deletedCount + " for document with id '" + id + "'"); + + flowFile = populateAttributes(session, flowFile, result); + + flowFile = session.putAttribute(flowFile, RETHINKDB_ERROR_MESSAGE, "Deleted count should be 1 but was " + deletedCount + " for document with id '" + id + "'"); + session.transfer(flowFile, REL_NOT_FOUND); + return; + } + + if ( returnChanges ) { + String json = gson.toJson(((List)result.get(RESULT_CHANGES_KEY)).get(0)); + + byte [] documentBytes = json.getBytes(charset); + + ByteArrayInputStream bais = new ByteArrayInputStream(documentBytes); + session.importFrom(bais, flowFile); + + session.getProvenanceReporter().modifyContent(flowFile, + new StringBuilder("rethinkdb://").append(databaseName).append("/").append(tableName).append("/").append(id).toString(), + (endTimeMillis - startTimeMillis)); + } + + session.transfer(flowFile, REL_SUCCESS); + + + } catch (Exception exception) { + getLogger().error("Failed to delete document from RethinkDB due to error {}", + new Object[]{exception.getLocalizedMessage()}, exception); + flowFile = session.putAttribute(flowFile, RETHINKDB_ERROR_MESSAGE, exception.getMessage()); + session.transfer(flowFile, REL_FAILURE); + context.yield(); + } + } + + private FlowFile populateAttributes(final ProcessSession session, FlowFile flowFile, + Map result) { + Map resultAttributes = new HashMap<>(); + resultAttributes.put(RETHINKDB_DELETE_RESULT_ERROR_KEY, String.valueOf(result.get(RESULT_ERROR_KEY))); + resultAttributes.put(RETHINKDB_DELETE_RESULT_DELETED_KEY, String.valueOf(result.get(RESULT_DELETED_KEY))); + resultAttributes.put(RETHINKDB_DELETE_RESULT_INSERTED_KEY, String.valueOf(result.get(RESULT_INSERTED_KEY))); + resultAttributes.put(RETHINKDB_DELETE_RESULT_REPLACED_KEY, String.valueOf(result.get(RESULT_REPLACED_KEY))); + resultAttributes.put(RETHINKDB_DELETE_RESULT_SKIPPED_KEY, String.valueOf(result.get(RESULT_SKIPPED_KEY))); + resultAttributes.put(RETHINKDB_DELETE_RESULT_UNCHANGED_KEY, String.valueOf(result.get(RESULT_UNCHANGED_KEY))); + flowFile = session.putAllAttributes(flowFile, resultAttributes); + return flowFile; + } + + protected Map deleteDocument(String id, String durablity, Boolean returnChanges) { + return getRdbTable().get(id).delete().optArg(DURABILITY_OPTION_KEY,durablity).optArg(RETURN_CHANGES_OPTION_KEY, returnChanges).run(rethinkDbConnection); + } + + @OnStopped + public void close() { + super.close(); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/GetRethinkDB.java b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/GetRethinkDB.java index ac31396da3..aaf10462b6 100644 --- a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/GetRethinkDB.java +++ b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/GetRethinkDB.java @@ -28,13 +28,12 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.expression.AttributeExpression.ResultType; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; import com.google.gson.Gson; @@ -54,10 +53,9 @@ import java.util.Set; @WritesAttributes({ @WritesAttribute(attribute = GetRethinkDB.RETHINKDB_ERROR_MESSAGE, description = "RethinkDB error message"), }) -@SeeAlso({PutRethinkDB.class}) +@SeeAlso({PutRethinkDB.class,DeleteRethinkDB.class}) public class GetRethinkDB extends AbstractRethinkDBProcessor { - public static final String DOCUMENT_ID_EMPTY_MESSAGE = "Document Id cannot be empty"; public static AllowableValue READ_MODE_SINGLE = new AllowableValue("single", "Single", "Read values from memory from primary replica (Default)"); public static AllowableValue READ_MODE_MAJORITY = new AllowableValue("majority", "Majority", "Read values committed to disk on majority of replicas"); public static AllowableValue READ_MODE_OUTDATED = new AllowableValue("outdated", "Outdated", "Read values from memory from an arbitrary replica "); @@ -72,16 +70,6 @@ public class GetRethinkDB extends AbstractRethinkDBProcessor { .expressionLanguageSupported(true) .build(); - public static final PropertyDescriptor RETHINKDB_DOCUMENT_ID = new PropertyDescriptor.Builder() - .displayName("Document Identifier") - .name("rethinkdb-document-identifier") - .description("A FlowFile attribute, or attribute expression used " + - "for determining RethinkDB key for the Flow File content") - .required(true) - .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true)) - .expressionLanguageSupported(true) - .build(); - protected String READ_MODE_KEY = "read_mode"; private static final Set relationships; @@ -122,6 +110,7 @@ public class GetRethinkDB extends AbstractRethinkDBProcessor { @OnScheduled public void onScheduled(final ProcessContext context) { + maxDocumentsSize = context.getProperty(MAX_DOCUMENTS_SIZE).asDataSize(DataUnit.B).longValue(); super.onScheduled(context); } diff --git a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/PutRethinkDB.java b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/PutRethinkDB.java index df2bf3c501..ac57957e95 100644 --- a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/PutRethinkDB.java +++ b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/PutRethinkDB.java @@ -29,6 +29,7 @@ import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; @@ -62,16 +63,13 @@ import java.util.Set; @WritesAttribute(attribute = PutRethinkDB.RETHINKDB_INSERT_RESULT_FIRST_ERROR_KEY, description = "First error while inserting documents"), @WritesAttribute(attribute = PutRethinkDB.RETHINKDB_INSERT_RESULT_WARNINGS_KEY, description = "Warning message in case of large number of ids being returned on insertion") }) -@SeeAlso({GetRethinkDB.class}) +@SeeAlso({GetRethinkDB.class,DeleteRethinkDB.class}) public class PutRethinkDB extends AbstractRethinkDBProcessor { public static AllowableValue CONFLICT_STRATEGY_UPDATE = new AllowableValue("update", "Update", "Update the document having same id with new values"); public static AllowableValue CONFLICT_STRATEGY_REPLACE = new AllowableValue("replace", "Replace", "Replace the document with having same id new document"); public static AllowableValue CONFLICT_STRATEGY_ERROR = new AllowableValue("error", "Error", "Return error if the document with same id exists"); - public static AllowableValue DURABILITY_SOFT = new AllowableValue("soft", "Soft", "Don't save document on disk before ack"); - public static AllowableValue DURABILITY_HARD = new AllowableValue("hard", "Hard", "Save document on disk before ack"); - protected static final PropertyDescriptor CONFLICT_STRATEGY = new PropertyDescriptor.Builder() .name("rethinkdb-conflict-strategy") .displayName("Conflict strategy") @@ -82,19 +80,6 @@ public class PutRethinkDB extends AbstractRethinkDBProcessor { .expressionLanguageSupported(true) .build(); - protected static final PropertyDescriptor DURABILITY = new PropertyDescriptor.Builder() - .name("rethinkdb-durability") - .displayName("Durablity of documents") - .description("Durability of documents being inserted") - .required(true) - .defaultValue("hard") - .allowableValues(DURABILITY_HARD, DURABILITY_SOFT) - .expressionLanguageSupported(true) - .build(); - - protected String CONFLICT_OPTION_KEY = "conflict"; - protected String DURABILITY_OPTION_KEY = "durability"; - private static final Set relationships; private static final List propertyDescriptors; @@ -109,6 +94,8 @@ public class PutRethinkDB extends AbstractRethinkDBProcessor { public static final String RETHINKDB_INSERT_RESULT_FIRST_ERROR_KEY = "rethinkdb.insert.first_error"; public static final String RETHINKDB_INSERT_RESULT_WARNINGS_KEY = "rethinkdb.insert.warnings"; + public final String CONFLICT_OPTION_KEY = "conflict"; + static { final Set tempRelationships = new HashSet<>(); tempRelationships.add(REL_SUCCESS); @@ -141,6 +128,7 @@ public class PutRethinkDB extends AbstractRethinkDBProcessor { @OnScheduled public void onScheduled(final ProcessContext context) { + maxDocumentsSize = context.getProperty(MAX_DOCUMENTS_SIZE).asDataSize(DataUnit.B).longValue(); super.onScheduled(context); } diff --git a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 2875277e85..0db3a569ac 100644 --- a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -14,3 +14,4 @@ # limitations under the License. org.apache.nifi.processors.rethinkdb.PutRethinkDB org.apache.nifi.processors.rethinkdb.GetRethinkDB +org.apache.nifi.processors.rethinkdb.DeleteRethinkDB diff --git a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITAbstractRethinkDBTest.java b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITAbstractRethinkDBTest.java new file mode 100644 index 0000000000..7131834651 --- /dev/null +++ b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITAbstractRethinkDBTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.rethinkdb; + +import org.apache.nifi.util.TestRunner; + +import com.rethinkdb.RethinkDB; +import com.rethinkdb.net.Connection; + +/** + * Abstract base class for RethinkDB integration tests + */ +public class ITAbstractRethinkDBTest { + protected TestRunner runner; + protected Connection connection; + protected String dbName = "test"; + protected String dbHost = "localhost"; + protected String dbPort = "28015"; + protected String user = "admin"; + protected String password = "admin"; + protected String table = "test"; + + public void setUp() throws Exception { + runner.setProperty(AbstractRethinkDBProcessor.DB_NAME, dbName); + runner.setProperty(AbstractRethinkDBProcessor.DB_HOST, dbHost); + runner.setProperty(AbstractRethinkDBProcessor.DB_PORT, dbPort); + runner.setProperty(AbstractRethinkDBProcessor.USERNAME, user); + runner.setProperty(AbstractRethinkDBProcessor.PASSWORD, password); + runner.setProperty(AbstractRethinkDBProcessor.TABLE_NAME, table); + runner.setProperty(AbstractRethinkDBProcessor.CHARSET, "UTF-8"); + + connection = RethinkDB.r.connection().user(user, password).db(dbName).hostname(dbHost).port(Integer.parseInt(dbPort)).connect(); + } + + public void tearDown() throws Exception { + runner = null; + connection.close(); + connection = null; + } + +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITDeleteRethinkDBTest.java b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITDeleteRethinkDBTest.java new file mode 100644 index 0000000000..47869d39ba --- /dev/null +++ b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITDeleteRethinkDBTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.rethinkdb; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import com.rethinkdb.RethinkDB; + +import net.minidev.json.JSONObject; + +/** + * Integration test for deleting documents from RethinkDB. Please ensure that the RethinkDB is running + * on local host with default port and has database test with table test and user + * admin with password admin before running the integration tests or set the attributes in the + * test accordingly. + */ +@Ignore("Comment this out for running tests against a real instance of RethinkDB") +public class ITDeleteRethinkDBTest extends ITAbstractRethinkDBTest { + + @Before + public void setUp() throws Exception { + runner = TestRunners.newTestRunner(DeleteRethinkDB.class); + super.setUp(); + runner.setProperty(DeleteRethinkDB.DURABILITY, "soft"); + runner.setProperty(DeleteRethinkDB.RETURN_CHANGES, "true"); + + connection = RethinkDB.r.connection().user(user, password).db(dbName).hostname(dbHost).port(Integer.parseInt(dbPort)).connect(); + RethinkDB.r.db(dbName).table(table).delete().run(connection); + long count = RethinkDB.r.db(dbName).table(table).count().run(connection); + assertEquals("Count should be same", 0L, count); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + } + + @Test + public void testDeleteDocumentById() { + JSONObject message = new JSONObject(); + message.put("id", "1"); + message.put("value", "one"); + RethinkDB.r.db(dbName).table(table).insert(message).run(connection); + + long count = RethinkDB.r.db(dbName).table(table).count().run(connection); + assertEquals("Count should be same", 1L, count); + + runner.setProperty(AbstractRethinkDBProcessor.RETHINKDB_DOCUMENT_ID, "${rethinkdb.id}"); + + Map props = new HashMap<>(); + props.put("rethinkdb.id","1"); + + runner.enqueue(new byte [] {},props); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_SUCCESS, 1); + + String changeMessage = ("{\"old_val\":" + message + "}"); + + List flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_SUCCESS); + assertEquals("Flow file count should be same", 1, flowFiles.size()); + assertEquals("Error should be null",null, flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE)); + assertEquals("Content should be same size", changeMessage.length(), flowFiles.get(0).getSize()); + flowFiles.get(0).assertContentEquals(changeMessage); + assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_DELETED_KEY), "1"); + assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_ERROR_KEY),"0"); + assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_INSERTED_KEY),"0"); + assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_REPLACED_KEY),"0"); + assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_SKIPPED_KEY),"0"); + assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_UNCHANGED_KEY),"0"); + + Map document = RethinkDB.r.db(dbName).table(table).get("1").run(connection); + assertEquals("Document should be null", document, null); + } + + @Test + public void testDeleteDocumentByIdNoChanges() { + JSONObject message = new JSONObject(); + message.put("id", "11"); + message.put("value", "one"); + RethinkDB.r.db(dbName).table(table).insert(message).run(connection); + + long count = RethinkDB.r.db(dbName).table(table).count().run(connection); + assertEquals("Count should be same", 1L, count); + + runner.setProperty(AbstractRethinkDBProcessor.RETHINKDB_DOCUMENT_ID, "${rethinkdb.id}"); + runner.setProperty(DeleteRethinkDB.RETURN_CHANGES, "false"); + + Map props = new HashMap<>(); + props.put("rethinkdb.id","11"); + + runner.enqueue(new byte [] {},props); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_SUCCESS, 1); + + List flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_SUCCESS); + assertEquals("Flow file count should be same", 1, flowFiles.size()); + assertEquals("Error should be null",null, flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE)); + assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_DELETED_KEY), "1"); + assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_ERROR_KEY),"0"); + assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_INSERTED_KEY),"0"); + assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_REPLACED_KEY),"0"); + assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_SKIPPED_KEY),"0"); + assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_UNCHANGED_KEY),"0"); + assertEquals(flowFiles.get(0).getSize(), 0); + + Map document = RethinkDB.r.db(dbName).table(table).get("1").run(connection); + assertEquals("Document should be null", document, null); + } + + @Test + public void testDeleteDocumentByIdNotFound() { + JSONObject message = new JSONObject(); + message.put("id", "1"); + message.put("value", "one"); + RethinkDB.r.db(dbName).table(table).insert(message).run(connection); + + long count = RethinkDB.r.db(dbName).table(table).count().run(connection); + assertEquals("Count should be same", 1L, count); + + runner.setProperty(AbstractRethinkDBProcessor.RETHINKDB_DOCUMENT_ID, "${rethinkdb.id}"); + + Map props = new HashMap<>(); + props.put("rethinkdb.id", String.valueOf(System.currentTimeMillis())); + + runner.enqueue(new byte [] {},props); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_NOT_FOUND, 1); + + List flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_NOT_FOUND); + assertEquals("Flow file count should be same", 1, flowFiles.size()); + assertNotNull("Error should not be null", flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE)); + assertEquals("Content should be same size", 0, flowFiles.get(0).getSize()); + assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_DELETED_KEY), "0"); + assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_ERROR_KEY),"0"); + assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_INSERTED_KEY),"0"); + assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_REPLACED_KEY),"0"); + assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_SKIPPED_KEY),"1"); + assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_UNCHANGED_KEY),"0"); + + Map document = RethinkDB.r.db(dbName).table(table).get("1").run(connection); + assertNotNull("Document should not be null", document); + assertEquals("id should be same", document.get("id"), "1"); + } + + @Test + public void testDeleteDocumentByHardCodedId() { + JSONObject message = new JSONObject(); + message.put("id", "2"); + message.put("value", "two"); + RethinkDB.r.db(dbName).table(table).insert(message).run(connection); + + long count = RethinkDB.r.db(dbName).table(table).count().run(connection); + assertEquals("Count should be same", 1, count); + + runner.setProperty(AbstractRethinkDBProcessor.RETHINKDB_DOCUMENT_ID, "2"); + + Map props = new HashMap<>(); + + runner.enqueue(new byte [] {},props); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_SUCCESS, 1); + + String changeMessage = ("{\"old_val\":" + message + "}"); + + List flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_SUCCESS); + assertEquals("Flow file count should be same", 1, flowFiles.size()); + assertEquals("Error should be null",null, flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE)); + assertEquals("Content should be same size", changeMessage.length(), flowFiles.get(0).getSize()); + flowFiles.get(0).assertContentEquals(changeMessage); + + assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_DELETED_KEY), "1"); + assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_ERROR_KEY),"0"); + assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_INSERTED_KEY),"0"); + assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_REPLACED_KEY),"0"); + assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_SKIPPED_KEY),"0"); + assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_UNCHANGED_KEY),"0"); + + Map document = RethinkDB.r.db(dbName).table(table).get("2").run(connection); + assertEquals("Document should be null", document, null); + } + +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITGetRethinkDBTest.java b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITGetRethinkDBTest.java index f8e14b08c4..38887001d0 100644 --- a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITGetRethinkDBTest.java +++ b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITGetRethinkDBTest.java @@ -24,14 +24,12 @@ import java.util.List; import java.util.Map; import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.After; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; import com.rethinkdb.RethinkDB; -import com.rethinkdb.net.Connection; import net.minidev.json.JSONObject; @@ -42,29 +40,13 @@ import net.minidev.json.JSONObject; * test accordingly. */ @Ignore("Comment this out for running tests against a real instance of RethinkDB") -public class ITGetRethinkDBTest { - - private TestRunner runner; - private Connection connection; - private String dbName = "test"; - private String dbHost = "localhost"; - private String dbPort = "28015"; - private String user = "admin"; - private String password = "admin"; - private String table = "test"; +public class ITGetRethinkDBTest extends ITAbstractRethinkDBTest { @Before public void setUp() throws Exception { runner = TestRunners.newTestRunner(GetRethinkDB.class); - runner.setProperty(AbstractRethinkDBProcessor.DB_NAME, dbName); - runner.setProperty(AbstractRethinkDBProcessor.DB_HOST, dbHost); - runner.setProperty(AbstractRethinkDBProcessor.DB_PORT, dbPort); - runner.setProperty(AbstractRethinkDBProcessor.USERNAME, user); - runner.setProperty(AbstractRethinkDBProcessor.PASSWORD, password); - runner.setProperty(AbstractRethinkDBProcessor.TABLE_NAME, table); - runner.setProperty(AbstractRethinkDBProcessor.CHARSET, "UTF-8"); - - connection = RethinkDB.r.connection().user(user, password).db(dbName).hostname(dbHost).port(Integer.parseInt(dbPort)).connect(); + super.setUp(); + runner.setProperty(AbstractRethinkDBProcessor.MAX_DOCUMENTS_SIZE, "1 KB"); RethinkDB.r.db(dbName).table(table).delete().run(connection); long count = RethinkDB.r.db(dbName).table(table).count().run(connection); assertEquals("Count should be same", 0L, count); @@ -72,9 +54,7 @@ public class ITGetRethinkDBTest { @After public void tearDown() throws Exception { - runner = null; - connection.close(); - connection = null; + super.tearDown(); } @Test @@ -88,6 +68,7 @@ public class ITGetRethinkDBTest { assertEquals("Count should be same", 1L, count); runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "${rethinkdb.id}"); + runner.assertValid(); Map props = new HashMap<>(); props.put("rethinkdb.id","1"); @@ -114,6 +95,7 @@ public class ITGetRethinkDBTest { assertEquals("Count should be same", 1L, count); runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "${rethinkdb.id}"); + runner.assertValid(); Map props = new HashMap<>(); props.put("rethinkdb.id", String.valueOf(System.currentTimeMillis())); @@ -139,6 +121,7 @@ public class ITGetRethinkDBTest { assertEquals("Count should be same", 1, count); runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "2"); + runner.assertValid(); Map props = new HashMap<>(); @@ -165,6 +148,7 @@ public class ITGetRethinkDBTest { runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "2"); runner.setProperty(AbstractRethinkDBProcessor.MAX_DOCUMENTS_SIZE, "2B"); + runner.assertValid(); Map props = new HashMap<>(); diff --git a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITPutRethinkDBTest.java b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITPutRethinkDBTest.java index 5260696a1d..052d3845be 100644 --- a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITPutRethinkDBTest.java +++ b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITPutRethinkDBTest.java @@ -20,7 +20,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import java.util.List; import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.json.simple.JSONArray; import org.junit.After; @@ -29,7 +28,6 @@ import org.junit.Ignore; import org.junit.Test; import com.rethinkdb.RethinkDB; -import com.rethinkdb.net.Connection; import net.minidev.json.JSONObject; @@ -40,43 +38,25 @@ import net.minidev.json.JSONObject; * test accordingly. */ @Ignore("Comment this out for running tests against a real instance of RethinkDB") -public class ITPutRethinkDBTest { - private TestRunner runner; - private Connection connection; - private String dbName = "test"; - private String dbHost = "localhost"; - private String dbPort = "28015"; - private String user = "admin"; - private String password = "admin"; - private String table = "test"; +public class ITPutRethinkDBTest extends ITAbstractRethinkDBTest { @Before public void setUp() throws Exception { runner = TestRunners.newTestRunner(PutRethinkDB.class); - runner.setProperty(PutRethinkDB.DB_NAME, dbName); - runner.setProperty(PutRethinkDB.DB_HOST, dbHost); - runner.setProperty(PutRethinkDB.DB_PORT, dbPort); - runner.setProperty(PutRethinkDB.USERNAME, user); - runner.setProperty(PutRethinkDB.PASSWORD, password); - runner.setProperty(PutRethinkDB.TABLE_NAME, table); - runner.setProperty(PutRethinkDB.CHARSET, "UTF-8"); + super.setUp(); + runner.setProperty(AbstractRethinkDBProcessor.MAX_DOCUMENTS_SIZE, "1 KB"); runner.setProperty(PutRethinkDB.CONFLICT_STRATEGY, PutRethinkDB.CONFLICT_STRATEGY_UPDATE); runner.setProperty(PutRethinkDB.DURABILITY, PutRethinkDB.DURABILITY_HARD); - runner.setProperty(PutRethinkDB.MAX_DOCUMENTS_SIZE, "1 KB"); - runner.assertValid(); - - connection = RethinkDB.r.connection().user(user, password).db(dbName).hostname(dbHost).port(Integer.parseInt(dbPort)).connect(); } @After public void tearDown() throws Exception { - runner = null; - connection.close(); - connection = null; + super.tearDown(); } @Test public void testValidSingleMessage() { + runner.assertValid(); RethinkDB.r.db(dbName).table(table).delete().run(connection); long count = RethinkDB.r.db(dbName).table(table).count().run(connection); assertEquals("Count should be same", 0L, count); @@ -104,6 +84,7 @@ public class ITPutRethinkDBTest { @Test public void testValidSingleMessageTwiceConflictUpdate() { + runner.assertValid(); RethinkDB.r.db(dbName).table(table).delete().run(connection); long count = RethinkDB.r.db(dbName).table(table).count().run(connection); assertEquals("Count should be same", 0L, count); @@ -144,6 +125,7 @@ public class ITPutRethinkDBTest { @Test public void testValidSingleMessageTwiceConflictError() { runner.setProperty(PutRethinkDB.CONFLICT_STRATEGY, PutRethinkDB.CONFLICT_STRATEGY_ERROR); + runner.assertValid(); RethinkDB.r.db(dbName).table(table).delete().run(connection); long count = RethinkDB.r.db(dbName).table(table).count().run(connection); assertEquals("Count should be same", 0L, count); @@ -183,6 +165,7 @@ public class ITPutRethinkDBTest { @Test public void testValidArrayMessage() { + runner.assertValid(); RethinkDB.r.db(dbName).table(table).delete().run(connection); long count = RethinkDB.r.db(dbName).table(table).count().run(connection); assertEquals("Count should be same", 0L, count); diff --git a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/TestDeleteRethinkDB.java b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/TestDeleteRethinkDB.java new file mode 100644 index 0000000000..4c06f489f1 --- /dev/null +++ b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/TestDeleteRethinkDB.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.rethinkdb; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Lists; +import com.google.gson.Gson; +import com.rethinkdb.net.Connection; + +public class TestDeleteRethinkDB { + private static final String DOCUMENT_ID = "id1"; + private TestRunner runner; + private AbstractRethinkDBProcessor mockDeleteRethinkDB; + private Map document = new HashMap<>(); + + @Before + public void setUp() throws Exception { + mockDeleteRethinkDB = new DeleteRethinkDB() { + @Override + protected Connection makeConnection() { + return null; + } + + @Override + protected Map deleteDocument(String id, String durablity, Boolean returnChanges) { + return document; + } + + }; + + document.put(DeleteRethinkDB.RESULT_DELETED_KEY, 1L); + document.put(DeleteRethinkDB.RESULT_ERROR_KEY, 0L); + document.put(DeleteRethinkDB.RESULT_CHANGES_KEY, Lists.asList( + "[{new_val=null, old_val={id=1, value=one}}]", new String[] {})); + document.put(DeleteRethinkDB.RESULT_INSERTED_KEY, 0L); + document.put(DeleteRethinkDB.RESULT_REPLACED_KEY, 0L); + document.put(DeleteRethinkDB.RESULT_SKIPPED_KEY, 0L); + document.put(DeleteRethinkDB.RESULT_UNCHANGED_KEY, 0L); + document.put(DeleteRethinkDB.RESULT_FIRST_ERROR_KEY, ""); + + runner = TestRunners.newTestRunner(mockDeleteRethinkDB); + runner.setProperty(AbstractRethinkDBProcessor.DB_NAME, "test"); + runner.setProperty(AbstractRethinkDBProcessor.DB_HOST, "host1"); + runner.setProperty(AbstractRethinkDBProcessor.DB_PORT, "1234"); + runner.setProperty(AbstractRethinkDBProcessor.USERNAME, "u1"); + runner.setProperty(AbstractRethinkDBProcessor.PASSWORD, "p1"); + runner.setProperty(AbstractRethinkDBProcessor.TABLE_NAME, "t1"); + runner.setProperty(AbstractRethinkDBProcessor.CHARSET, "UTF-8"); + runner.setProperty(AbstractRethinkDBProcessor.DURABILITY, "soft"); + runner.setProperty(AbstractRethinkDBProcessor.RETHINKDB_DOCUMENT_ID, "${rethinkdb.id}"); + runner.assertValid(); + } + + @After + public void tearDown() throws Exception { + runner = null; + } + + @Test + public void testDefaultValid() { + runner.assertValid(); + } + + @Test + public void testBlankHost() { + runner.setProperty(AbstractRethinkDBProcessor.DB_HOST, ""); + runner.assertNotValid(); + } + + @Test + public void testEmptyPort() { + runner.setProperty(AbstractRethinkDBProcessor.DB_PORT, ""); + runner.assertNotValid(); + } + + @Test + public void testEmptyDBName() { + runner.setProperty(AbstractRethinkDBProcessor.DB_NAME, ""); + runner.assertNotValid(); + } + + @Test + public void testEmptyUsername() { + runner.setProperty(AbstractRethinkDBProcessor.USERNAME, ""); + runner.assertNotValid(); + } + + @Test + public void testEmptyPassword() { + runner.setProperty(AbstractRethinkDBProcessor.PASSWORD, "p1"); + runner.assertValid(); + } + + @Test + public void testCharsetUTF8() { + runner.setProperty(AbstractRethinkDBProcessor.CHARSET, "UTF-8"); + runner.assertValid(); + } + + @Test + public void testCharsetBlank() { + runner.setProperty(AbstractRethinkDBProcessor.CHARSET, ""); + runner.assertNotValid(); + } + @Test + public void testZeroMaxDocumentSize() { + runner.setProperty(AbstractRethinkDBProcessor.MAX_DOCUMENTS_SIZE, "0"); + runner.assertNotValid(); + } + + @Test + public void testBlankDurability() { + runner.setProperty(DeleteRethinkDB.DURABILITY, ""); + runner.assertNotValid(); + } + + @Test + public void testNotFound() { + runner.assertValid(); + document.put(DeleteRethinkDB.RESULT_DELETED_KEY, 0L); + + HashMap props = new HashMap<>(); + props.put("rethinkdb.id", DOCUMENT_ID); + + runner.enqueue(new byte[]{}, props); + + runner.run(1,true,true); + + runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_NOT_FOUND, 1); + List flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_NOT_FOUND); + assertNotNull(flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE)); + flowFiles.get(0).assertAttributeEquals(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE, + "Deleted count should be 1 but was 0 for document with id 'id1'"); + } + + @Test + public void testBlankId() { + runner.assertValid(); + runner.setProperty(AbstractRethinkDBProcessor.RETHINKDB_DOCUMENT_ID, "${rethinkdb.id}"); + + Map props = new HashMap<>(); + + runner.enqueue(new byte[]{},props); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_FAILURE, 1); + List flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_FAILURE); + assertNotNull(flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE)); + flowFiles.get(0).assertAttributeEquals(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE,AbstractRethinkDBProcessor.DOCUMENT_ID_EMPTY_MESSAGE); + } + + @Test + public void testNullId() { + runner.assertValid(); + runner.setProperty(AbstractRethinkDBProcessor.RETHINKDB_DOCUMENT_ID, "${rethinkdb.id}"); + + Map props = new HashMap<>(); + props.put("rethinkdb.id", null); + + runner.enqueue(new byte[]{},props); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_FAILURE, 1); + List flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_FAILURE); + assertNotNull(flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE)); + flowFiles.get(0).assertAttributeEquals(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE,AbstractRethinkDBProcessor.DOCUMENT_ID_EMPTY_MESSAGE); + } + + @Test + public void testValidSingleDelete() { + runner.assertValid(); + + HashMap props = new HashMap<>(); + props.put("rethinkdb.id", DOCUMENT_ID); + + runner.enqueue(new byte[]{}, props); + + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_SUCCESS, 1); + Gson gson = new Gson(); + + String json = gson.toJson(((List)document.get(DeleteRethinkDB.RESULT_CHANGES_KEY)).get(0)); + + List flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_SUCCESS); + flowFiles.get(0).assertContentEquals(json.toString()); + assertNull(flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE)); + + } + + @Test + public void testGetThrowsException() { + mockDeleteRethinkDB = new DeleteRethinkDB() { + @Override + protected Connection makeConnection() { + return null; + } + + @Override + protected Map deleteDocument(String id, String durablity, Boolean returnChanges) { + throw new RuntimeException("testException"); + } + }; + + runner = TestRunners.newTestRunner(mockDeleteRethinkDB); + runner.setProperty(AbstractRethinkDBProcessor.DB_NAME, "test"); + runner.setProperty(AbstractRethinkDBProcessor.DB_HOST, "host1"); + runner.setProperty(AbstractRethinkDBProcessor.DB_PORT, "1234"); + runner.setProperty(AbstractRethinkDBProcessor.USERNAME, "u1"); + runner.setProperty(AbstractRethinkDBProcessor.PASSWORD, "p1"); + runner.setProperty(AbstractRethinkDBProcessor.TABLE_NAME, "t1"); + runner.setProperty(AbstractRethinkDBProcessor.CHARSET, "UTF-8"); + + runner.setProperty(AbstractRethinkDBProcessor.RETHINKDB_DOCUMENT_ID, DOCUMENT_ID); + + runner.assertValid(); + + HashMap props = new HashMap<>(); + props.put("rethinkdb.id", DOCUMENT_ID); + + runner.enqueue(new byte[]{}, props); + + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_FAILURE, 1); + + List flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_FAILURE); + assertNotNull(flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE)); + flowFiles.get(0).assertAttributeEquals(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE,"testException"); + } +} \ No newline at end of file