diff --git a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-nar/src/main/resources/META-INF/NOTICE
index 0801e8b8b4..1a2f743480 100644
--- a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-nar/src/main/resources/META-INF/NOTICE
+++ b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-nar/src/main/resources/META-INF/NOTICE
@@ -20,3 +20,12 @@ The following binary components are provided under the Apache Software License v
The following NOTICE information applies:
Apache Commons IO
Copyright 2002-2016 The Apache Software Foundation
+
+ (ASLv2) Apache Commons Lang
+ The following NOTICE information applies:
+ Apache Commons Lang
+ Copyright 2001-2015 The Apache Software Foundation
+
+ (ASLv2) Google GSON
+ The following NOTICE information applies:
+ Copyright 2008 Google Inc.
diff --git a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/pom.xml b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/pom.xml
index bba931b432..023d487945 100644
--- a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/pom.xml
@@ -42,6 +42,14 @@
commons-io
commons-io
+
+ org.apache.commons
+ commons-lang3
+
+
+ com.google.code.gson
+ gson
+
org.apache.nifi
nifi-mock
diff --git a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/AbstractRethinkDBProcessor.java b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/AbstractRethinkDBProcessor.java
index b75b6ad5f4..cfbf1bdf94 100644
--- a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/AbstractRethinkDBProcessor.java
+++ b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/AbstractRethinkDBProcessor.java
@@ -17,6 +17,7 @@
package org.apache.nifi.processors.rethinkdb;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
@@ -28,7 +29,7 @@ import com.rethinkdb.gen.ast.Table;
import com.rethinkdb.net.Connection;
/**
- * Abstract base class for RethinkDb processors
+ * Abstract base class for RethinkDB processors
*/
abstract class AbstractRethinkDBProcessor extends AbstractProcessor {
@@ -95,7 +96,7 @@ abstract class AbstractRethinkDBProcessor extends AbstractProcessor {
protected static final PropertyDescriptor MAX_DOCUMENTS_SIZE = new PropertyDescriptor.Builder()
.name("rethinkdb-max-document-size")
- .displayName("Max size of documents in MBs")
+ .displayName("Max size of documents")
.description("Maximum size of documents allowed to be posted in one batch")
.defaultValue("1 MB")
.required(true)
@@ -108,6 +109,9 @@ abstract class AbstractRethinkDBProcessor extends AbstractProcessor {
static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("Failed FlowFiles are routed to this relationship").build();
+ static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not_found")
+ .description("Document not found are routed to this relationship").build();
+
public static final String RESULT_ERROR_KEY = "errors";
public static final String RESULT_DELETED_KEY = "deleted";
public static final String RESULT_GENERATED_KEYS_KEY = "generated_keys";
@@ -118,6 +122,8 @@ abstract class AbstractRethinkDBProcessor extends AbstractProcessor {
public static final String RESULT_FIRST_ERROR_KEY = "first_error";
public static final String RESULT_WARNINGS_KEY = "warnings";
+ public static final String RETHINKDB_ERROR_MESSAGE = "rethinkdb.error.message";
+
protected Connection rethinkDbConnection;
protected String databaseName;
protected String tableName;
@@ -158,7 +164,7 @@ abstract class AbstractRethinkDBProcessor extends AbstractProcessor {
getLogger().error("Error while getting connection " + e.getLocalizedMessage(),e);
throw new RuntimeException("Error while getting connection" + e.getLocalizedMessage(),e);
}
- getLogger().info("RethinkDb connection created for host {} port {} and db {}",
+ getLogger().info("RethinkDB connection created for host {} port {} and db {}",
new Object[] {hostname, port,databaseName});
}
@@ -167,4 +173,11 @@ abstract class AbstractRethinkDBProcessor extends AbstractProcessor {
.port(port).user(username,
password).connect();
}
+
+ @OnStopped
+ public void close() {
+ getLogger().info("Closing connection");
+ if ( rethinkDbConnection != null )
+ rethinkDbConnection.close();
+ }
}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/GetRethinkDB.java b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/GetRethinkDB.java
new file mode 100644
index 0000000000..ac31396da3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/GetRethinkDB.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.rethinkdb;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.AttributeExpression.ResultType;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import com.google.gson.Gson;
+
+import java.io.ByteArrayInputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@EventDriven
+@Tags({"rethinkdb", "get", "read", "fetch"})
+@CapabilityDescription("Processor to get a JSON document from RethinkDB (https://www.rethinkdb.com/) using the document id. The FlowFile will contain the retrieved document")
+@WritesAttributes({
+ @WritesAttribute(attribute = GetRethinkDB.RETHINKDB_ERROR_MESSAGE, description = "RethinkDB error message"),
+ })
+@SeeAlso({PutRethinkDB.class})
+public class GetRethinkDB extends AbstractRethinkDBProcessor {
+
+ public static final String DOCUMENT_ID_EMPTY_MESSAGE = "Document Id cannot be empty";
+ public static AllowableValue READ_MODE_SINGLE = new AllowableValue("single", "Single", "Read values from memory from primary replica (Default)");
+ public static AllowableValue READ_MODE_MAJORITY = new AllowableValue("majority", "Majority", "Read values committed to disk on majority of replicas");
+ public static AllowableValue READ_MODE_OUTDATED = new AllowableValue("outdated", "Outdated", "Read values from memory from an arbitrary replica ");
+
+ protected static final PropertyDescriptor READ_MODE = new PropertyDescriptor.Builder()
+ .name("rethinkdb-read-mode")
+ .displayName("Read Mode")
+ .description("Read mode used for consistency")
+ .required(true)
+ .defaultValue(READ_MODE_SINGLE.getValue())
+ .allowableValues(READ_MODE_SINGLE, READ_MODE_MAJORITY, READ_MODE_OUTDATED)
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor RETHINKDB_DOCUMENT_ID = new PropertyDescriptor.Builder()
+ .displayName("Document Identifier")
+ .name("rethinkdb-document-identifier")
+ .description("A FlowFile attribute, or attribute expression used " +
+ "for determining RethinkDB key for the Flow File content")
+ .required(true)
+ .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
+ .expressionLanguageSupported(true)
+ .build();
+
+ protected String READ_MODE_KEY = "read_mode";
+
+ private static final Set relationships;
+ private static final List propertyDescriptors;
+
+ protected Gson gson = new Gson();
+
+ static {
+ final Set tempRelationships = new HashSet<>();
+ tempRelationships.add(REL_SUCCESS);
+ tempRelationships.add(REL_FAILURE);
+ tempRelationships.add(REL_NOT_FOUND);
+ relationships = Collections.unmodifiableSet(tempRelationships);
+
+ final List tempDescriptors = new ArrayList<>();
+ tempDescriptors.add(DB_NAME);
+ tempDescriptors.add(DB_HOST);
+ tempDescriptors.add(DB_PORT);
+ tempDescriptors.add(USERNAME);
+ tempDescriptors.add(PASSWORD);
+ tempDescriptors.add(TABLE_NAME);
+ tempDescriptors.add(CHARSET);
+ tempDescriptors.add(RETHINKDB_DOCUMENT_ID);
+ tempDescriptors.add(READ_MODE);
+ tempDescriptors.add(MAX_DOCUMENTS_SIZE);
+ propertyDescriptors = Collections.unmodifiableList(tempDescriptors);
+ }
+
+ @Override
+ public Set getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ public final List getSupportedPropertyDescriptors() {
+ return propertyDescriptors;
+ }
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ super.onScheduled(context);
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
+ String id = context.getProperty(RETHINKDB_DOCUMENT_ID).evaluateAttributeExpressions(flowFile).getValue();
+ String readMode = context.getProperty(READ_MODE).evaluateAttributeExpressions(flowFile).getValue();
+
+ if ( StringUtils.isEmpty(id) ) {
+ getLogger().error(DOCUMENT_ID_EMPTY_MESSAGE);
+ flowFile = session.putAttribute(flowFile, RETHINKDB_ERROR_MESSAGE, DOCUMENT_ID_EMPTY_MESSAGE);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ try {
+ long startTimeMillis = System.currentTimeMillis();
+ Map document = getDocument(id, readMode);
+
+ if ( document == null ) {
+ getLogger().debug("Document with id '" + id + "' not found");
+ flowFile = session.putAttribute(flowFile, RETHINKDB_ERROR_MESSAGE, "Document with id '" + id + "' not found");
+ session.transfer(flowFile, REL_NOT_FOUND);
+ return;
+ }
+
+ String json = gson.toJson(document);
+
+ byte [] documentBytes = json.getBytes(charset);
+
+ if ( documentBytes.length > maxDocumentsSize ) {
+ getLogger().error("Document too big with size " + documentBytes.length + " and max limit is " + maxDocumentsSize );
+ flowFile = session.putAttribute(flowFile, RETHINKDB_ERROR_MESSAGE, "Document too big size " + documentBytes.length + " bytes");
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(documentBytes);
+ session.importFrom(bais, flowFile);
+ final long endTimeMillis = System.currentTimeMillis();
+
+ getLogger().debug("Json document {} retrieved Result: {}", new Object[] {id, document});
+
+ session.transfer(flowFile, REL_SUCCESS);
+ session.getProvenanceReporter().fetch(flowFile,
+ new StringBuilder("rethinkdb://").append(databaseName).append("/").append(tableName).append("/").append(id).toString(),
+ (endTimeMillis - startTimeMillis));
+
+ } catch (Exception exception) {
+ getLogger().error("Failed to get document from RethinkDB due to error {}",
+ new Object[]{exception.getLocalizedMessage()}, exception);
+ flowFile = session.putAttribute(flowFile, RETHINKDB_ERROR_MESSAGE, exception.getMessage() + "");
+ session.transfer(flowFile, REL_FAILURE);
+ context.yield();
+ }
+ }
+
+ protected Map getDocument(String id, String readMode) {
+ return getRdbTable().optArg(READ_MODE_KEY,readMode).get(id).run(rethinkDbConnection);
+ }
+
+ @OnStopped
+ public void close() {
+ super.close();
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/PutRethinkDB.java b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/PutRethinkDB.java
index af1749beab..df2bf3c501 100644
--- a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/PutRethinkDB.java
+++ b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/PutRethinkDB.java
@@ -22,6 +22,7 @@ import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
@@ -61,6 +62,7 @@ import java.util.Set;
@WritesAttribute(attribute = PutRethinkDB.RETHINKDB_INSERT_RESULT_FIRST_ERROR_KEY, description = "First error while inserting documents"),
@WritesAttribute(attribute = PutRethinkDB.RETHINKDB_INSERT_RESULT_WARNINGS_KEY, description = "Warning message in case of large number of ids being returned on insertion")
})
+@SeeAlso({GetRethinkDB.class})
public class PutRethinkDB extends AbstractRethinkDBProcessor {
public static AllowableValue CONFLICT_STRATEGY_UPDATE = new AllowableValue("update", "Update", "Update the document having same id with new values");
@@ -96,7 +98,6 @@ public class PutRethinkDB extends AbstractRethinkDBProcessor {
private static final Set relationships;
private static final List propertyDescriptors;
- public static final String RETHINKDB_ERROR_MESSAGE = "rethinkdb.error.message";
public static final String RETHINKDB_INSERT_RESULT = "rethinkdb.insert.result";
public static final String RETHINKDB_INSERT_RESULT_ERROR_KEY = "rethinkdb.insert.errors";
public static final String RETHINKDB_INSERT_RESULT_DELETED_KEY = "rethinkdb.insert.deleted";
@@ -230,8 +231,6 @@ public class PutRethinkDB extends AbstractRethinkDBProcessor {
*/
@OnStopped
public void close() {
- getLogger().info("Closing connection");
- if ( rethinkDbConnection != null )
- rethinkDbConnection.close();
+ super.close();
}
}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index fc3b8996e3..2875277e85 100644
--- a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -13,3 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.rethinkdb.PutRethinkDB
+org.apache.nifi.processors.rethinkdb.GetRethinkDB
diff --git a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITGetRethinkDBTest.java b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITGetRethinkDBTest.java
new file mode 100644
index 0000000000..f8e14b08c4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITGetRethinkDBTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.rethinkdb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import com.rethinkdb.RethinkDB;
+import com.rethinkdb.net.Connection;
+
+import net.minidev.json.JSONObject;
+
+/**
+ * Integration test for getting documents from RethinkDB. Please ensure that the RethinkDB is running
+ * on local host with default port and has database test with table test and user
+ * admin
with password admin
before running the integration tests or set the attributes in the
+ * test accordingly.
+ */
+@Ignore("Comment this out for running tests against a real instance of RethinkDB")
+public class ITGetRethinkDBTest {
+
+ private TestRunner runner;
+ private Connection connection;
+ private String dbName = "test";
+ private String dbHost = "localhost";
+ private String dbPort = "28015";
+ private String user = "admin";
+ private String password = "admin";
+ private String table = "test";
+
+ @Before
+ public void setUp() throws Exception {
+ runner = TestRunners.newTestRunner(GetRethinkDB.class);
+ runner.setProperty(AbstractRethinkDBProcessor.DB_NAME, dbName);
+ runner.setProperty(AbstractRethinkDBProcessor.DB_HOST, dbHost);
+ runner.setProperty(AbstractRethinkDBProcessor.DB_PORT, dbPort);
+ runner.setProperty(AbstractRethinkDBProcessor.USERNAME, user);
+ runner.setProperty(AbstractRethinkDBProcessor.PASSWORD, password);
+ runner.setProperty(AbstractRethinkDBProcessor.TABLE_NAME, table);
+ runner.setProperty(AbstractRethinkDBProcessor.CHARSET, "UTF-8");
+
+ connection = RethinkDB.r.connection().user(user, password).db(dbName).hostname(dbHost).port(Integer.parseInt(dbPort)).connect();
+ RethinkDB.r.db(dbName).table(table).delete().run(connection);
+ long count = RethinkDB.r.db(dbName).table(table).count().run(connection);
+ assertEquals("Count should be same", 0L, count);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ runner = null;
+ connection.close();
+ connection = null;
+ }
+
+ @Test
+ public void testGetDocumentById() {
+ JSONObject message = new JSONObject();
+ message.put("id", "1");
+ message.put("value", "one");
+ RethinkDB.r.db(dbName).table(table).insert(message).run(connection);
+
+ long count = RethinkDB.r.db(dbName).table(table).count().run(connection);
+ assertEquals("Count should be same", 1L, count);
+
+ runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "${rethinkdb.id}");
+
+ Map props = new HashMap<>();
+ props.put("rethinkdb.id","1");
+
+ runner.enqueue(new byte [] {},props);
+ runner.run(1,true,true);
+ runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_SUCCESS, 1);
+
+ List flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_SUCCESS);
+ assertEquals("Flow file count should be same", 1, flowFiles.size());
+ assertEquals("Error should be null",null, flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
+ assertEquals("Content should be same size", message.toString().length(), flowFiles.get(0).getSize());
+ flowFiles.get(0).assertContentEquals(message.toString());
+ }
+
+ @Test
+ public void testGetDocumentByIdNotFound() {
+ JSONObject message = new JSONObject();
+ message.put("id", "1");
+ message.put("value", "one");
+ RethinkDB.r.db(dbName).table(table).insert(message).run(connection);
+
+ long count = RethinkDB.r.db(dbName).table(table).count().run(connection);
+ assertEquals("Count should be same", 1L, count);
+
+ runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "${rethinkdb.id}");
+
+ Map props = new HashMap<>();
+ props.put("rethinkdb.id", String.valueOf(System.currentTimeMillis()));
+
+ runner.enqueue(new byte [] {},props);
+ runner.run(1,true,true);
+ runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_NOT_FOUND, 1);
+
+ List flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_NOT_FOUND);
+ assertEquals("Flow file count should be same", 1, flowFiles.size());
+ assertNotNull("Error should not be null", flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
+ assertEquals("Content should be same size", 0, flowFiles.get(0).getSize());
+ }
+
+ @Test
+ public void testGetDocumentByHardCodedId() {
+ JSONObject message = new JSONObject();
+ message.put("id", "2");
+ message.put("value", "two");
+ RethinkDB.r.db(dbName).table(table).insert(message).run(connection);
+
+ long count = RethinkDB.r.db(dbName).table(table).count().run(connection);
+ assertEquals("Count should be same", 1, count);
+
+ runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "2");
+
+ Map props = new HashMap<>();
+
+ runner.enqueue(new byte [] {},props);
+ runner.run(1,true,true);
+ runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_SUCCESS, 1);
+
+ List flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_SUCCESS);
+ assertEquals("Flow file count should be same", 1, flowFiles.size());
+ assertEquals("Error should be null",null, flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
+ assertEquals("Content should be same size", message.toString().length(), flowFiles.get(0).getSize());
+ flowFiles.get(0).assertContentEquals(message.toString());
+ }
+
+ @Test
+ public void testGetDocumentTooBig() {
+ JSONObject message = new JSONObject();
+ message.put("id", "2");
+ message.put("value", "two");
+ RethinkDB.r.db(dbName).table(table).insert(message).run(connection);
+
+ long count = RethinkDB.r.db(dbName).table(table).count().run(connection);
+ assertEquals("Count should be same", 1, count);
+
+ runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "2");
+ runner.setProperty(AbstractRethinkDBProcessor.MAX_DOCUMENTS_SIZE, "2B");
+
+ Map props = new HashMap<>();
+
+ runner.enqueue(new byte [] {},props);
+ runner.run(1,true,true);
+ runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_FAILURE, 1);
+
+ List flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_FAILURE);
+ assertEquals("Flow file count should be same", 1, flowFiles.size());
+ String errorMessage = flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE);
+ assertNotNull("Error should be not be null", errorMessage);
+ assertEquals("Error message should be same", "Document too big size " + message.toJSONString().length() + " bytes",errorMessage);
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/TestGetRethinkDB.java b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/TestGetRethinkDB.java
new file mode 100644
index 0000000000..710954980d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/TestGetRethinkDB.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.rethinkdb;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.gson.Gson;
+import com.rethinkdb.net.Connection;
+
+public class TestGetRethinkDB {
+ private static final String DOCUMENT_ID = "id1";
+ private TestRunner runner;
+ private AbstractRethinkDBProcessor mockGetRethinkDB;
+ private Map document;
+
+ @Before
+ public void setUp() throws Exception {
+ mockGetRethinkDB = new GetRethinkDB() {
+ @Override
+ protected Connection makeConnection() {
+ return null;
+ }
+
+ @Override
+ protected Map getDocument(String id, String readMode) {
+ return document;
+ }
+ };
+ runner = TestRunners.newTestRunner(mockGetRethinkDB);
+ runner.setProperty(GetRethinkDB.DB_NAME, "test");
+ runner.setProperty(GetRethinkDB.DB_HOST, "host1");
+ runner.setProperty(GetRethinkDB.DB_PORT, "1234");
+ runner.setProperty(GetRethinkDB.USERNAME, "u1");
+ runner.setProperty(GetRethinkDB.PASSWORD, "p1");
+ runner.setProperty(GetRethinkDB.TABLE_NAME, "t1");
+ runner.setProperty(GetRethinkDB.CHARSET, "UTF-8");
+ runner.setProperty(GetRethinkDB.READ_MODE, "single");
+ runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "${rethinkdb.id}");
+ runner.setProperty(GetRethinkDB.MAX_DOCUMENTS_SIZE, "1 KB");
+ runner.assertValid();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ runner = null;
+ }
+
+ @Test
+ public void testDefaultValid() {
+ runner.assertValid();
+ }
+
+ @Test
+ public void testBlankHost() {
+ runner.setProperty(GetRethinkDB.DB_HOST, "");
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void testEmptyPort() {
+ runner.setProperty(GetRethinkDB.DB_PORT, "");
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void testEmptyDBName() {
+ runner.setProperty(GetRethinkDB.DB_NAME, "");
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void testEmptyUsername() {
+ runner.setProperty(GetRethinkDB.USERNAME, "");
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void testEmptyPassword() {
+ runner.setProperty(GetRethinkDB.PASSWORD, "p1");
+ runner.assertValid();
+ }
+
+ @Test
+ public void testCharsetUTF8() {
+ runner.setProperty(GetRethinkDB.CHARSET, "UTF-8");
+ runner.assertValid();
+ }
+
+ @Test
+ public void testCharsetBlank() {
+ runner.setProperty(GetRethinkDB.CHARSET, "");
+ runner.assertNotValid();
+ }
+ @Test
+ public void testZeroMaxDocumentSize() {
+ runner.setProperty(GetRethinkDB.MAX_DOCUMENTS_SIZE, "0");
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void testBlankReadMode() {
+ runner.setProperty(GetRethinkDB.READ_MODE, "");
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void testSizeGreaterThanThreshold() {
+ runner.setProperty(GetRethinkDB.MAX_DOCUMENTS_SIZE, "1 B");
+ runner.assertValid();
+ document = new HashMap<>();
+ document.put("hello", "rethinkdb");
+
+ HashMap props = new HashMap<>();
+ props.put("rethinkdb.id", DOCUMENT_ID);
+
+ runner.enqueue(new byte[]{}, props);
+ runner.run(1,true,true);
+ runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_FAILURE, 1);
+ List flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_FAILURE);
+ assertNotNull(flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
+ }
+
+ @Test
+ public void testNotFound() {
+ runner.setProperty(AbstractRethinkDBProcessor.MAX_DOCUMENTS_SIZE, "1 B");
+ runner.assertValid();
+
+ HashMap props = new HashMap<>();
+ props.put("rethinkdb.id", DOCUMENT_ID);
+
+ runner.enqueue(new byte[]{}, props);
+
+ runner.run(1,true,true);
+
+ runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_NOT_FOUND, 1);
+ List flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_NOT_FOUND);
+ assertNotNull(flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
+ flowFiles.get(0).assertAttributeEquals(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE,"Document with id '" + DOCUMENT_ID + "' not found");
+ }
+
+ @Test
+ public void testBlankId() {
+ runner.setProperty(AbstractRethinkDBProcessor.MAX_DOCUMENTS_SIZE, "1 B");
+ runner.assertValid();
+ runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "${rethinkdb.id}");
+ Map props = new HashMap<>();
+
+ runner.enqueue(new byte[]{},props);
+ runner.run(1,true,true);
+ runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_FAILURE, 1);
+ List flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_FAILURE);
+ assertNotNull(flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
+ flowFiles.get(0).assertAttributeEquals(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE,GetRethinkDB.DOCUMENT_ID_EMPTY_MESSAGE);
+ }
+
+ @Test
+ public void testNullId() {
+ runner.setProperty(AbstractRethinkDBProcessor.MAX_DOCUMENTS_SIZE, "1 B");
+ runner.assertValid();
+ runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "${rethinkdb.id}");
+ Map props = new HashMap<>();
+ props.put("rethinkdb.id", null);
+ runner.enqueue(new byte[]{},props);
+ runner.run(1,true,true);
+ runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_FAILURE, 1);
+ List flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_FAILURE);
+ assertNotNull(flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
+ flowFiles.get(0).assertAttributeEquals(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE,GetRethinkDB.DOCUMENT_ID_EMPTY_MESSAGE);
+ }
+
+ @Test
+ public void testValidSingleMessage() {
+ runner.setProperty(AbstractRethinkDBProcessor.MAX_DOCUMENTS_SIZE, "1 MB");
+ runner.assertValid();
+ document = new HashMap<>();
+ document.put("hello", "rethinkdb");
+
+ HashMap props = new HashMap<>();
+ props.put("rethinkdb.id", DOCUMENT_ID);
+
+ runner.enqueue(new byte[]{}, props);
+
+ runner.run(1,true,true);
+ runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_SUCCESS, 1);
+ Gson gson = new Gson();
+
+ String json = gson.toJson(document);
+
+ List flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_SUCCESS);
+ flowFiles.get(0).assertContentEquals(json.toString());
+ assertNull(flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
+
+ }
+
+ @Test
+ public void testGetThrowsException() {
+ mockGetRethinkDB = new GetRethinkDB() {
+ @Override
+ protected Connection makeConnection() {
+ return null;
+ }
+
+ @Override
+ protected Map getDocument(String id, String readMode) {
+ throw new RuntimeException("testException");
+ }
+ };
+
+ runner = TestRunners.newTestRunner(mockGetRethinkDB);
+ runner.setProperty(AbstractRethinkDBProcessor.DB_NAME, "test");
+ runner.setProperty(AbstractRethinkDBProcessor.DB_HOST, "host1");
+ runner.setProperty(AbstractRethinkDBProcessor.DB_PORT, "1234");
+ runner.setProperty(AbstractRethinkDBProcessor.USERNAME, "u1");
+ runner.setProperty(AbstractRethinkDBProcessor.PASSWORD, "p1");
+ runner.setProperty(AbstractRethinkDBProcessor.TABLE_NAME, "t1");
+ runner.setProperty(AbstractRethinkDBProcessor.CHARSET, "UTF-8");
+ runner.setProperty(GetRethinkDB.READ_MODE, "single");
+
+ runner.setProperty(AbstractRethinkDBProcessor.MAX_DOCUMENTS_SIZE, "1 KB");
+ runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, DOCUMENT_ID);
+
+ runner.assertValid();
+
+ HashMap props = new HashMap<>();
+ props.put("rethinkdb.id", DOCUMENT_ID);
+
+ runner.enqueue(new byte[]{}, props);
+
+ runner.run(1,true,true);
+ runner.assertAllFlowFilesTransferred(PutRethinkDB.REL_FAILURE, 1);
+
+ List flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_FAILURE);
+ assertNotNull(flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
+ flowFiles.get(0).assertAttributeEquals(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE,"testException");
+ }
+}
\ No newline at end of file