From ea1fe1a9c0a645d5ba2e81a519847e56a46aeb89 Mon Sep 17 00:00:00 2001 From: mans2singh Date: Sat, 15 Jul 2017 19:18:09 -0700 Subject: [PATCH] NIFI-4188 New RethinkDB Get processor Signed-off-by: James Wing This closes #2012. --- .../src/main/resources/META-INF/NOTICE | 9 + .../nifi-rethinkdb-processors/pom.xml | 8 + .../rethinkdb/AbstractRethinkDBProcessor.java | 19 +- .../processors/rethinkdb/GetRethinkDB.java | 196 +++++++++++++ .../processors/rethinkdb/PutRethinkDB.java | 7 +- .../org.apache.nifi.processor.Processor | 1 + .../rethinkdb/ITGetRethinkDBTest.java | 181 ++++++++++++ .../rethinkdb/TestGetRethinkDB.java | 262 ++++++++++++++++++ 8 files changed, 676 insertions(+), 7 deletions(-) create mode 100644 nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/GetRethinkDB.java create mode 100644 nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITGetRethinkDBTest.java create mode 100644 nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/TestGetRethinkDB.java diff --git a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-nar/src/main/resources/META-INF/NOTICE index 0801e8b8b4..1a2f743480 100644 --- a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-nar/src/main/resources/META-INF/NOTICE +++ b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-nar/src/main/resources/META-INF/NOTICE @@ -20,3 +20,12 @@ The following binary components are provided under the Apache Software License v The following NOTICE information applies: Apache Commons IO Copyright 2002-2016 The Apache Software Foundation + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2015 The Apache Software Foundation + + (ASLv2) Google GSON + The following NOTICE information applies: + Copyright 2008 Google Inc. diff --git a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/pom.xml b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/pom.xml index bba931b432..023d487945 100644 --- a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/pom.xml +++ b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/pom.xml @@ -42,6 +42,14 @@ commons-io commons-io + + org.apache.commons + commons-lang3 + + + com.google.code.gson + gson + org.apache.nifi nifi-mock diff --git a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/AbstractRethinkDBProcessor.java b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/AbstractRethinkDBProcessor.java index b75b6ad5f4..cfbf1bdf94 100644 --- a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/AbstractRethinkDBProcessor.java +++ b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/AbstractRethinkDBProcessor.java @@ -17,6 +17,7 @@ package org.apache.nifi.processors.rethinkdb; import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.DataUnit; @@ -28,7 +29,7 @@ import com.rethinkdb.gen.ast.Table; import com.rethinkdb.net.Connection; /** - * Abstract base class for RethinkDb processors + * Abstract base class for RethinkDB processors */ abstract class AbstractRethinkDBProcessor extends AbstractProcessor { @@ -95,7 +96,7 @@ abstract class AbstractRethinkDBProcessor extends AbstractProcessor { protected static final PropertyDescriptor MAX_DOCUMENTS_SIZE = new PropertyDescriptor.Builder() .name("rethinkdb-max-document-size") - .displayName("Max size of documents in MBs") + .displayName("Max size of documents") .description("Maximum size of documents allowed to be posted in one batch") .defaultValue("1 MB") .required(true) @@ -108,6 +109,9 @@ abstract class AbstractRethinkDBProcessor extends AbstractProcessor { static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") .description("Failed FlowFiles are routed to this relationship").build(); + static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not_found") + .description("Document not found are routed to this relationship").build(); + public static final String RESULT_ERROR_KEY = "errors"; public static final String RESULT_DELETED_KEY = "deleted"; public static final String RESULT_GENERATED_KEYS_KEY = "generated_keys"; @@ -118,6 +122,8 @@ abstract class AbstractRethinkDBProcessor extends AbstractProcessor { public static final String RESULT_FIRST_ERROR_KEY = "first_error"; public static final String RESULT_WARNINGS_KEY = "warnings"; + public static final String RETHINKDB_ERROR_MESSAGE = "rethinkdb.error.message"; + protected Connection rethinkDbConnection; protected String databaseName; protected String tableName; @@ -158,7 +164,7 @@ abstract class AbstractRethinkDBProcessor extends AbstractProcessor { getLogger().error("Error while getting connection " + e.getLocalizedMessage(),e); throw new RuntimeException("Error while getting connection" + e.getLocalizedMessage(),e); } - getLogger().info("RethinkDb connection created for host {} port {} and db {}", + getLogger().info("RethinkDB connection created for host {} port {} and db {}", new Object[] {hostname, port,databaseName}); } @@ -167,4 +173,11 @@ abstract class AbstractRethinkDBProcessor extends AbstractProcessor { .port(port).user(username, password).connect(); } + + @OnStopped + public void close() { + getLogger().info("Closing connection"); + if ( rethinkDbConnection != null ) + rethinkDbConnection.close(); + } } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/GetRethinkDB.java b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/GetRethinkDB.java new file mode 100644 index 0000000000..ac31396da3 --- /dev/null +++ b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/GetRethinkDB.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.rethinkdb; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.AttributeExpression.ResultType; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import com.google.gson.Gson; + +import java.io.ByteArrayInputStream; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@EventDriven +@Tags({"rethinkdb", "get", "read", "fetch"}) +@CapabilityDescription("Processor to get a JSON document from RethinkDB (https://www.rethinkdb.com/) using the document id. The FlowFile will contain the retrieved document") +@WritesAttributes({ + @WritesAttribute(attribute = GetRethinkDB.RETHINKDB_ERROR_MESSAGE, description = "RethinkDB error message"), + }) +@SeeAlso({PutRethinkDB.class}) +public class GetRethinkDB extends AbstractRethinkDBProcessor { + + public static final String DOCUMENT_ID_EMPTY_MESSAGE = "Document Id cannot be empty"; + public static AllowableValue READ_MODE_SINGLE = new AllowableValue("single", "Single", "Read values from memory from primary replica (Default)"); + public static AllowableValue READ_MODE_MAJORITY = new AllowableValue("majority", "Majority", "Read values committed to disk on majority of replicas"); + public static AllowableValue READ_MODE_OUTDATED = new AllowableValue("outdated", "Outdated", "Read values from memory from an arbitrary replica "); + + protected static final PropertyDescriptor READ_MODE = new PropertyDescriptor.Builder() + .name("rethinkdb-read-mode") + .displayName("Read Mode") + .description("Read mode used for consistency") + .required(true) + .defaultValue(READ_MODE_SINGLE.getValue()) + .allowableValues(READ_MODE_SINGLE, READ_MODE_MAJORITY, READ_MODE_OUTDATED) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor RETHINKDB_DOCUMENT_ID = new PropertyDescriptor.Builder() + .displayName("Document Identifier") + .name("rethinkdb-document-identifier") + .description("A FlowFile attribute, or attribute expression used " + + "for determining RethinkDB key for the Flow File content") + .required(true) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true)) + .expressionLanguageSupported(true) + .build(); + + protected String READ_MODE_KEY = "read_mode"; + + private static final Set relationships; + private static final List propertyDescriptors; + + protected Gson gson = new Gson(); + + static { + final Set tempRelationships = new HashSet<>(); + tempRelationships.add(REL_SUCCESS); + tempRelationships.add(REL_FAILURE); + tempRelationships.add(REL_NOT_FOUND); + relationships = Collections.unmodifiableSet(tempRelationships); + + final List tempDescriptors = new ArrayList<>(); + tempDescriptors.add(DB_NAME); + tempDescriptors.add(DB_HOST); + tempDescriptors.add(DB_PORT); + tempDescriptors.add(USERNAME); + tempDescriptors.add(PASSWORD); + tempDescriptors.add(TABLE_NAME); + tempDescriptors.add(CHARSET); + tempDescriptors.add(RETHINKDB_DOCUMENT_ID); + tempDescriptors.add(READ_MODE); + tempDescriptors.add(MAX_DOCUMENTS_SIZE); + propertyDescriptors = Collections.unmodifiableList(tempDescriptors); + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + super.onScheduled(context); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue()); + String id = context.getProperty(RETHINKDB_DOCUMENT_ID).evaluateAttributeExpressions(flowFile).getValue(); + String readMode = context.getProperty(READ_MODE).evaluateAttributeExpressions(flowFile).getValue(); + + if ( StringUtils.isEmpty(id) ) { + getLogger().error(DOCUMENT_ID_EMPTY_MESSAGE); + flowFile = session.putAttribute(flowFile, RETHINKDB_ERROR_MESSAGE, DOCUMENT_ID_EMPTY_MESSAGE); + session.transfer(flowFile, REL_FAILURE); + return; + } + + try { + long startTimeMillis = System.currentTimeMillis(); + Map document = getDocument(id, readMode); + + if ( document == null ) { + getLogger().debug("Document with id '" + id + "' not found"); + flowFile = session.putAttribute(flowFile, RETHINKDB_ERROR_MESSAGE, "Document with id '" + id + "' not found"); + session.transfer(flowFile, REL_NOT_FOUND); + return; + } + + String json = gson.toJson(document); + + byte [] documentBytes = json.getBytes(charset); + + if ( documentBytes.length > maxDocumentsSize ) { + getLogger().error("Document too big with size " + documentBytes.length + " and max limit is " + maxDocumentsSize ); + flowFile = session.putAttribute(flowFile, RETHINKDB_ERROR_MESSAGE, "Document too big size " + documentBytes.length + " bytes"); + session.transfer(flowFile, REL_FAILURE); + return; + } + + ByteArrayInputStream bais = new ByteArrayInputStream(documentBytes); + session.importFrom(bais, flowFile); + final long endTimeMillis = System.currentTimeMillis(); + + getLogger().debug("Json document {} retrieved Result: {}", new Object[] {id, document}); + + session.transfer(flowFile, REL_SUCCESS); + session.getProvenanceReporter().fetch(flowFile, + new StringBuilder("rethinkdb://").append(databaseName).append("/").append(tableName).append("/").append(id).toString(), + (endTimeMillis - startTimeMillis)); + + } catch (Exception exception) { + getLogger().error("Failed to get document from RethinkDB due to error {}", + new Object[]{exception.getLocalizedMessage()}, exception); + flowFile = session.putAttribute(flowFile, RETHINKDB_ERROR_MESSAGE, exception.getMessage() + ""); + session.transfer(flowFile, REL_FAILURE); + context.yield(); + } + } + + protected Map getDocument(String id, String readMode) { + return getRdbTable().optArg(READ_MODE_KEY,readMode).get(id).run(rethinkDbConnection); + } + + @OnStopped + public void close() { + super.close(); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/PutRethinkDB.java b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/PutRethinkDB.java index af1749beab..df2bf3c501 100644 --- a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/PutRethinkDB.java +++ b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/PutRethinkDB.java @@ -22,6 +22,7 @@ import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; @@ -61,6 +62,7 @@ import java.util.Set; @WritesAttribute(attribute = PutRethinkDB.RETHINKDB_INSERT_RESULT_FIRST_ERROR_KEY, description = "First error while inserting documents"), @WritesAttribute(attribute = PutRethinkDB.RETHINKDB_INSERT_RESULT_WARNINGS_KEY, description = "Warning message in case of large number of ids being returned on insertion") }) +@SeeAlso({GetRethinkDB.class}) public class PutRethinkDB extends AbstractRethinkDBProcessor { public static AllowableValue CONFLICT_STRATEGY_UPDATE = new AllowableValue("update", "Update", "Update the document having same id with new values"); @@ -96,7 +98,6 @@ public class PutRethinkDB extends AbstractRethinkDBProcessor { private static final Set relationships; private static final List propertyDescriptors; - public static final String RETHINKDB_ERROR_MESSAGE = "rethinkdb.error.message"; public static final String RETHINKDB_INSERT_RESULT = "rethinkdb.insert.result"; public static final String RETHINKDB_INSERT_RESULT_ERROR_KEY = "rethinkdb.insert.errors"; public static final String RETHINKDB_INSERT_RESULT_DELETED_KEY = "rethinkdb.insert.deleted"; @@ -230,8 +231,6 @@ public class PutRethinkDB extends AbstractRethinkDBProcessor { */ @OnStopped public void close() { - getLogger().info("Closing connection"); - if ( rethinkDbConnection != null ) - rethinkDbConnection.close(); + super.close(); } } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index fc3b8996e3..2875277e85 100644 --- a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -13,3 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. org.apache.nifi.processors.rethinkdb.PutRethinkDB +org.apache.nifi.processors.rethinkdb.GetRethinkDB diff --git a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITGetRethinkDBTest.java b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITGetRethinkDBTest.java new file mode 100644 index 0000000000..f8e14b08c4 --- /dev/null +++ b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITGetRethinkDBTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.rethinkdb; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import com.rethinkdb.RethinkDB; +import com.rethinkdb.net.Connection; + +import net.minidev.json.JSONObject; + +/** + * Integration test for getting documents from RethinkDB. Please ensure that the RethinkDB is running + * on local host with default port and has database test with table test and user + * admin with password admin before running the integration tests or set the attributes in the + * test accordingly. + */ +@Ignore("Comment this out for running tests against a real instance of RethinkDB") +public class ITGetRethinkDBTest { + + private TestRunner runner; + private Connection connection; + private String dbName = "test"; + private String dbHost = "localhost"; + private String dbPort = "28015"; + private String user = "admin"; + private String password = "admin"; + private String table = "test"; + + @Before + public void setUp() throws Exception { + runner = TestRunners.newTestRunner(GetRethinkDB.class); + runner.setProperty(AbstractRethinkDBProcessor.DB_NAME, dbName); + runner.setProperty(AbstractRethinkDBProcessor.DB_HOST, dbHost); + runner.setProperty(AbstractRethinkDBProcessor.DB_PORT, dbPort); + runner.setProperty(AbstractRethinkDBProcessor.USERNAME, user); + runner.setProperty(AbstractRethinkDBProcessor.PASSWORD, password); + runner.setProperty(AbstractRethinkDBProcessor.TABLE_NAME, table); + runner.setProperty(AbstractRethinkDBProcessor.CHARSET, "UTF-8"); + + connection = RethinkDB.r.connection().user(user, password).db(dbName).hostname(dbHost).port(Integer.parseInt(dbPort)).connect(); + RethinkDB.r.db(dbName).table(table).delete().run(connection); + long count = RethinkDB.r.db(dbName).table(table).count().run(connection); + assertEquals("Count should be same", 0L, count); + } + + @After + public void tearDown() throws Exception { + runner = null; + connection.close(); + connection = null; + } + + @Test + public void testGetDocumentById() { + JSONObject message = new JSONObject(); + message.put("id", "1"); + message.put("value", "one"); + RethinkDB.r.db(dbName).table(table).insert(message).run(connection); + + long count = RethinkDB.r.db(dbName).table(table).count().run(connection); + assertEquals("Count should be same", 1L, count); + + runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "${rethinkdb.id}"); + + Map props = new HashMap<>(); + props.put("rethinkdb.id","1"); + + runner.enqueue(new byte [] {},props); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_SUCCESS, 1); + + List flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_SUCCESS); + assertEquals("Flow file count should be same", 1, flowFiles.size()); + assertEquals("Error should be null",null, flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE)); + assertEquals("Content should be same size", message.toString().length(), flowFiles.get(0).getSize()); + flowFiles.get(0).assertContentEquals(message.toString()); + } + + @Test + public void testGetDocumentByIdNotFound() { + JSONObject message = new JSONObject(); + message.put("id", "1"); + message.put("value", "one"); + RethinkDB.r.db(dbName).table(table).insert(message).run(connection); + + long count = RethinkDB.r.db(dbName).table(table).count().run(connection); + assertEquals("Count should be same", 1L, count); + + runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "${rethinkdb.id}"); + + Map props = new HashMap<>(); + props.put("rethinkdb.id", String.valueOf(System.currentTimeMillis())); + + runner.enqueue(new byte [] {},props); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_NOT_FOUND, 1); + + List flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_NOT_FOUND); + assertEquals("Flow file count should be same", 1, flowFiles.size()); + assertNotNull("Error should not be null", flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE)); + assertEquals("Content should be same size", 0, flowFiles.get(0).getSize()); + } + + @Test + public void testGetDocumentByHardCodedId() { + JSONObject message = new JSONObject(); + message.put("id", "2"); + message.put("value", "two"); + RethinkDB.r.db(dbName).table(table).insert(message).run(connection); + + long count = RethinkDB.r.db(dbName).table(table).count().run(connection); + assertEquals("Count should be same", 1, count); + + runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "2"); + + Map props = new HashMap<>(); + + runner.enqueue(new byte [] {},props); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_SUCCESS, 1); + + List flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_SUCCESS); + assertEquals("Flow file count should be same", 1, flowFiles.size()); + assertEquals("Error should be null",null, flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE)); + assertEquals("Content should be same size", message.toString().length(), flowFiles.get(0).getSize()); + flowFiles.get(0).assertContentEquals(message.toString()); + } + + @Test + public void testGetDocumentTooBig() { + JSONObject message = new JSONObject(); + message.put("id", "2"); + message.put("value", "two"); + RethinkDB.r.db(dbName).table(table).insert(message).run(connection); + + long count = RethinkDB.r.db(dbName).table(table).count().run(connection); + assertEquals("Count should be same", 1, count); + + runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "2"); + runner.setProperty(AbstractRethinkDBProcessor.MAX_DOCUMENTS_SIZE, "2B"); + + Map props = new HashMap<>(); + + runner.enqueue(new byte [] {},props); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_FAILURE, 1); + + List flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_FAILURE); + assertEquals("Flow file count should be same", 1, flowFiles.size()); + String errorMessage = flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE); + assertNotNull("Error should be not be null", errorMessage); + assertEquals("Error message should be same", "Document too big size " + message.toJSONString().length() + " bytes",errorMessage); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/TestGetRethinkDB.java b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/TestGetRethinkDB.java new file mode 100644 index 0000000000..710954980d --- /dev/null +++ b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/TestGetRethinkDB.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.rethinkdb; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.gson.Gson; +import com.rethinkdb.net.Connection; + +public class TestGetRethinkDB { + private static final String DOCUMENT_ID = "id1"; + private TestRunner runner; + private AbstractRethinkDBProcessor mockGetRethinkDB; + private Map document; + + @Before + public void setUp() throws Exception { + mockGetRethinkDB = new GetRethinkDB() { + @Override + protected Connection makeConnection() { + return null; + } + + @Override + protected Map getDocument(String id, String readMode) { + return document; + } + }; + runner = TestRunners.newTestRunner(mockGetRethinkDB); + runner.setProperty(GetRethinkDB.DB_NAME, "test"); + runner.setProperty(GetRethinkDB.DB_HOST, "host1"); + runner.setProperty(GetRethinkDB.DB_PORT, "1234"); + runner.setProperty(GetRethinkDB.USERNAME, "u1"); + runner.setProperty(GetRethinkDB.PASSWORD, "p1"); + runner.setProperty(GetRethinkDB.TABLE_NAME, "t1"); + runner.setProperty(GetRethinkDB.CHARSET, "UTF-8"); + runner.setProperty(GetRethinkDB.READ_MODE, "single"); + runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "${rethinkdb.id}"); + runner.setProperty(GetRethinkDB.MAX_DOCUMENTS_SIZE, "1 KB"); + runner.assertValid(); + } + + @After + public void tearDown() throws Exception { + runner = null; + } + + @Test + public void testDefaultValid() { + runner.assertValid(); + } + + @Test + public void testBlankHost() { + runner.setProperty(GetRethinkDB.DB_HOST, ""); + runner.assertNotValid(); + } + + @Test + public void testEmptyPort() { + runner.setProperty(GetRethinkDB.DB_PORT, ""); + runner.assertNotValid(); + } + + @Test + public void testEmptyDBName() { + runner.setProperty(GetRethinkDB.DB_NAME, ""); + runner.assertNotValid(); + } + + @Test + public void testEmptyUsername() { + runner.setProperty(GetRethinkDB.USERNAME, ""); + runner.assertNotValid(); + } + + @Test + public void testEmptyPassword() { + runner.setProperty(GetRethinkDB.PASSWORD, "p1"); + runner.assertValid(); + } + + @Test + public void testCharsetUTF8() { + runner.setProperty(GetRethinkDB.CHARSET, "UTF-8"); + runner.assertValid(); + } + + @Test + public void testCharsetBlank() { + runner.setProperty(GetRethinkDB.CHARSET, ""); + runner.assertNotValid(); + } + @Test + public void testZeroMaxDocumentSize() { + runner.setProperty(GetRethinkDB.MAX_DOCUMENTS_SIZE, "0"); + runner.assertNotValid(); + } + + @Test + public void testBlankReadMode() { + runner.setProperty(GetRethinkDB.READ_MODE, ""); + runner.assertNotValid(); + } + + @Test + public void testSizeGreaterThanThreshold() { + runner.setProperty(GetRethinkDB.MAX_DOCUMENTS_SIZE, "1 B"); + runner.assertValid(); + document = new HashMap<>(); + document.put("hello", "rethinkdb"); + + HashMap props = new HashMap<>(); + props.put("rethinkdb.id", DOCUMENT_ID); + + runner.enqueue(new byte[]{}, props); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_FAILURE, 1); + List flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_FAILURE); + assertNotNull(flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE)); + } + + @Test + public void testNotFound() { + runner.setProperty(AbstractRethinkDBProcessor.MAX_DOCUMENTS_SIZE, "1 B"); + runner.assertValid(); + + HashMap props = new HashMap<>(); + props.put("rethinkdb.id", DOCUMENT_ID); + + runner.enqueue(new byte[]{}, props); + + runner.run(1,true,true); + + runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_NOT_FOUND, 1); + List flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_NOT_FOUND); + assertNotNull(flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE)); + flowFiles.get(0).assertAttributeEquals(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE,"Document with id '" + DOCUMENT_ID + "' not found"); + } + + @Test + public void testBlankId() { + runner.setProperty(AbstractRethinkDBProcessor.MAX_DOCUMENTS_SIZE, "1 B"); + runner.assertValid(); + runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "${rethinkdb.id}"); + Map props = new HashMap<>(); + + runner.enqueue(new byte[]{},props); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_FAILURE, 1); + List flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_FAILURE); + assertNotNull(flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE)); + flowFiles.get(0).assertAttributeEquals(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE,GetRethinkDB.DOCUMENT_ID_EMPTY_MESSAGE); + } + + @Test + public void testNullId() { + runner.setProperty(AbstractRethinkDBProcessor.MAX_DOCUMENTS_SIZE, "1 B"); + runner.assertValid(); + runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "${rethinkdb.id}"); + Map props = new HashMap<>(); + props.put("rethinkdb.id", null); + runner.enqueue(new byte[]{},props); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_FAILURE, 1); + List flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_FAILURE); + assertNotNull(flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE)); + flowFiles.get(0).assertAttributeEquals(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE,GetRethinkDB.DOCUMENT_ID_EMPTY_MESSAGE); + } + + @Test + public void testValidSingleMessage() { + runner.setProperty(AbstractRethinkDBProcessor.MAX_DOCUMENTS_SIZE, "1 MB"); + runner.assertValid(); + document = new HashMap<>(); + document.put("hello", "rethinkdb"); + + HashMap props = new HashMap<>(); + props.put("rethinkdb.id", DOCUMENT_ID); + + runner.enqueue(new byte[]{}, props); + + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_SUCCESS, 1); + Gson gson = new Gson(); + + String json = gson.toJson(document); + + List flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_SUCCESS); + flowFiles.get(0).assertContentEquals(json.toString()); + assertNull(flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE)); + + } + + @Test + public void testGetThrowsException() { + mockGetRethinkDB = new GetRethinkDB() { + @Override + protected Connection makeConnection() { + return null; + } + + @Override + protected Map getDocument(String id, String readMode) { + throw new RuntimeException("testException"); + } + }; + + runner = TestRunners.newTestRunner(mockGetRethinkDB); + runner.setProperty(AbstractRethinkDBProcessor.DB_NAME, "test"); + runner.setProperty(AbstractRethinkDBProcessor.DB_HOST, "host1"); + runner.setProperty(AbstractRethinkDBProcessor.DB_PORT, "1234"); + runner.setProperty(AbstractRethinkDBProcessor.USERNAME, "u1"); + runner.setProperty(AbstractRethinkDBProcessor.PASSWORD, "p1"); + runner.setProperty(AbstractRethinkDBProcessor.TABLE_NAME, "t1"); + runner.setProperty(AbstractRethinkDBProcessor.CHARSET, "UTF-8"); + runner.setProperty(GetRethinkDB.READ_MODE, "single"); + + runner.setProperty(AbstractRethinkDBProcessor.MAX_DOCUMENTS_SIZE, "1 KB"); + runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, DOCUMENT_ID); + + runner.assertValid(); + + HashMap props = new HashMap<>(); + props.put("rethinkdb.id", DOCUMENT_ID); + + runner.enqueue(new byte[]{}, props); + + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(PutRethinkDB.REL_FAILURE, 1); + + List flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_FAILURE); + assertNotNull(flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE)); + flowFiles.get(0).assertAttributeEquals(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE,"testException"); + } +} \ No newline at end of file