NIFI-4188 New RethinkDB Get processor

Signed-off-by: James Wing <jvwing@gmail.com>

This closes #2012.
This commit is contained in:
mans2singh 2017-07-15 19:18:09 -07:00 committed by James Wing
parent 451a88df43
commit ea1fe1a9c0
8 changed files with 676 additions and 7 deletions

View File

@ -20,3 +20,12 @@ The following binary components are provided under the Apache Software License v
The following NOTICE information applies: The following NOTICE information applies:
Apache Commons IO Apache Commons IO
Copyright 2002-2016 The Apache Software Foundation Copyright 2002-2016 The Apache Software Foundation
(ASLv2) Apache Commons Lang
The following NOTICE information applies:
Apache Commons Lang
Copyright 2001-2015 The Apache Software Foundation
(ASLv2) Google GSON
The following NOTICE information applies:
Copyright 2008 Google Inc.

View File

@ -42,6 +42,14 @@
<groupId>commons-io</groupId> <groupId>commons-io</groupId>
<artifactId>commons-io</artifactId> <artifactId>commons-io</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId> <artifactId>nifi-mock</artifactId>

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.rethinkdb; package org.apache.nifi.processors.rethinkdb;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.DataUnit;
@ -28,7 +29,7 @@ import com.rethinkdb.gen.ast.Table;
import com.rethinkdb.net.Connection; import com.rethinkdb.net.Connection;
/** /**
* Abstract base class for RethinkDb processors * Abstract base class for RethinkDB processors
*/ */
abstract class AbstractRethinkDBProcessor extends AbstractProcessor { abstract class AbstractRethinkDBProcessor extends AbstractProcessor {
@ -95,7 +96,7 @@ abstract class AbstractRethinkDBProcessor extends AbstractProcessor {
protected static final PropertyDescriptor MAX_DOCUMENTS_SIZE = new PropertyDescriptor.Builder() protected static final PropertyDescriptor MAX_DOCUMENTS_SIZE = new PropertyDescriptor.Builder()
.name("rethinkdb-max-document-size") .name("rethinkdb-max-document-size")
.displayName("Max size of documents in MBs") .displayName("Max size of documents")
.description("Maximum size of documents allowed to be posted in one batch") .description("Maximum size of documents allowed to be posted in one batch")
.defaultValue("1 MB") .defaultValue("1 MB")
.required(true) .required(true)
@ -108,6 +109,9 @@ abstract class AbstractRethinkDBProcessor extends AbstractProcessor {
static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("Failed FlowFiles are routed to this relationship").build(); .description("Failed FlowFiles are routed to this relationship").build();
static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not_found")
.description("Document not found are routed to this relationship").build();
public static final String RESULT_ERROR_KEY = "errors"; public static final String RESULT_ERROR_KEY = "errors";
public static final String RESULT_DELETED_KEY = "deleted"; public static final String RESULT_DELETED_KEY = "deleted";
public static final String RESULT_GENERATED_KEYS_KEY = "generated_keys"; public static final String RESULT_GENERATED_KEYS_KEY = "generated_keys";
@ -118,6 +122,8 @@ abstract class AbstractRethinkDBProcessor extends AbstractProcessor {
public static final String RESULT_FIRST_ERROR_KEY = "first_error"; public static final String RESULT_FIRST_ERROR_KEY = "first_error";
public static final String RESULT_WARNINGS_KEY = "warnings"; public static final String RESULT_WARNINGS_KEY = "warnings";
public static final String RETHINKDB_ERROR_MESSAGE = "rethinkdb.error.message";
protected Connection rethinkDbConnection; protected Connection rethinkDbConnection;
protected String databaseName; protected String databaseName;
protected String tableName; protected String tableName;
@ -158,7 +164,7 @@ abstract class AbstractRethinkDBProcessor extends AbstractProcessor {
getLogger().error("Error while getting connection " + e.getLocalizedMessage(),e); getLogger().error("Error while getting connection " + e.getLocalizedMessage(),e);
throw new RuntimeException("Error while getting connection" + e.getLocalizedMessage(),e); throw new RuntimeException("Error while getting connection" + e.getLocalizedMessage(),e);
} }
getLogger().info("RethinkDb connection created for host {} port {} and db {}", getLogger().info("RethinkDB connection created for host {} port {} and db {}",
new Object[] {hostname, port,databaseName}); new Object[] {hostname, port,databaseName});
} }
@ -167,4 +173,11 @@ abstract class AbstractRethinkDBProcessor extends AbstractProcessor {
.port(port).user(username, .port(port).user(username,
password).connect(); password).connect();
} }
@OnStopped
public void close() {
getLogger().info("Closing connection");
if ( rethinkDbConnection != null )
rethinkDbConnection.close();
}
} }

View File

@ -0,0 +1,196 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.rethinkdb;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import com.google.gson.Gson;
import java.io.ByteArrayInputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@EventDriven
@Tags({"rethinkdb", "get", "read", "fetch"})
@CapabilityDescription("Processor to get a JSON document from RethinkDB (https://www.rethinkdb.com/) using the document id. The FlowFile will contain the retrieved document")
@WritesAttributes({
@WritesAttribute(attribute = GetRethinkDB.RETHINKDB_ERROR_MESSAGE, description = "RethinkDB error message"),
})
@SeeAlso({PutRethinkDB.class})
public class GetRethinkDB extends AbstractRethinkDBProcessor {
public static final String DOCUMENT_ID_EMPTY_MESSAGE = "Document Id cannot be empty";
public static AllowableValue READ_MODE_SINGLE = new AllowableValue("single", "Single", "Read values from memory from primary replica (Default)");
public static AllowableValue READ_MODE_MAJORITY = new AllowableValue("majority", "Majority", "Read values committed to disk on majority of replicas");
public static AllowableValue READ_MODE_OUTDATED = new AllowableValue("outdated", "Outdated", "Read values from memory from an arbitrary replica ");
protected static final PropertyDescriptor READ_MODE = new PropertyDescriptor.Builder()
.name("rethinkdb-read-mode")
.displayName("Read Mode")
.description("Read mode used for consistency")
.required(true)
.defaultValue(READ_MODE_SINGLE.getValue())
.allowableValues(READ_MODE_SINGLE, READ_MODE_MAJORITY, READ_MODE_OUTDATED)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor RETHINKDB_DOCUMENT_ID = new PropertyDescriptor.Builder()
.displayName("Document Identifier")
.name("rethinkdb-document-identifier")
.description("A FlowFile attribute, or attribute expression used " +
"for determining RethinkDB key for the Flow File content")
.required(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
.expressionLanguageSupported(true)
.build();
protected String READ_MODE_KEY = "read_mode";
private static final Set<Relationship> relationships;
private static final List<PropertyDescriptor> propertyDescriptors;
protected Gson gson = new Gson();
static {
final Set<Relationship> tempRelationships = new HashSet<>();
tempRelationships.add(REL_SUCCESS);
tempRelationships.add(REL_FAILURE);
tempRelationships.add(REL_NOT_FOUND);
relationships = Collections.unmodifiableSet(tempRelationships);
final List<PropertyDescriptor> tempDescriptors = new ArrayList<>();
tempDescriptors.add(DB_NAME);
tempDescriptors.add(DB_HOST);
tempDescriptors.add(DB_PORT);
tempDescriptors.add(USERNAME);
tempDescriptors.add(PASSWORD);
tempDescriptors.add(TABLE_NAME);
tempDescriptors.add(CHARSET);
tempDescriptors.add(RETHINKDB_DOCUMENT_ID);
tempDescriptors.add(READ_MODE);
tempDescriptors.add(MAX_DOCUMENTS_SIZE);
propertyDescriptors = Collections.unmodifiableList(tempDescriptors);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
super.onScheduled(context);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
String id = context.getProperty(RETHINKDB_DOCUMENT_ID).evaluateAttributeExpressions(flowFile).getValue();
String readMode = context.getProperty(READ_MODE).evaluateAttributeExpressions(flowFile).getValue();
if ( StringUtils.isEmpty(id) ) {
getLogger().error(DOCUMENT_ID_EMPTY_MESSAGE);
flowFile = session.putAttribute(flowFile, RETHINKDB_ERROR_MESSAGE, DOCUMENT_ID_EMPTY_MESSAGE);
session.transfer(flowFile, REL_FAILURE);
return;
}
try {
long startTimeMillis = System.currentTimeMillis();
Map<String,Object> document = getDocument(id, readMode);
if ( document == null ) {
getLogger().debug("Document with id '" + id + "' not found");
flowFile = session.putAttribute(flowFile, RETHINKDB_ERROR_MESSAGE, "Document with id '" + id + "' not found");
session.transfer(flowFile, REL_NOT_FOUND);
return;
}
String json = gson.toJson(document);
byte [] documentBytes = json.getBytes(charset);
if ( documentBytes.length > maxDocumentsSize ) {
getLogger().error("Document too big with size " + documentBytes.length + " and max limit is " + maxDocumentsSize );
flowFile = session.putAttribute(flowFile, RETHINKDB_ERROR_MESSAGE, "Document too big size " + documentBytes.length + " bytes");
session.transfer(flowFile, REL_FAILURE);
return;
}
ByteArrayInputStream bais = new ByteArrayInputStream(documentBytes);
session.importFrom(bais, flowFile);
final long endTimeMillis = System.currentTimeMillis();
getLogger().debug("Json document {} retrieved Result: {}", new Object[] {id, document});
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().fetch(flowFile,
new StringBuilder("rethinkdb://").append(databaseName).append("/").append(tableName).append("/").append(id).toString(),
(endTimeMillis - startTimeMillis));
} catch (Exception exception) {
getLogger().error("Failed to get document from RethinkDB due to error {}",
new Object[]{exception.getLocalizedMessage()}, exception);
flowFile = session.putAttribute(flowFile, RETHINKDB_ERROR_MESSAGE, exception.getMessage() + "");
session.transfer(flowFile, REL_FAILURE);
context.yield();
}
}
protected Map<String,Object> getDocument(String id, String readMode) {
return getRdbTable().optArg(READ_MODE_KEY,readMode).get(id).run(rethinkDbConnection);
}
@OnStopped
public void close() {
super.close();
}
}

View File

@ -22,6 +22,7 @@ import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnStopped;
@ -61,6 +62,7 @@ import java.util.Set;
@WritesAttribute(attribute = PutRethinkDB.RETHINKDB_INSERT_RESULT_FIRST_ERROR_KEY, description = "First error while inserting documents"), @WritesAttribute(attribute = PutRethinkDB.RETHINKDB_INSERT_RESULT_FIRST_ERROR_KEY, description = "First error while inserting documents"),
@WritesAttribute(attribute = PutRethinkDB.RETHINKDB_INSERT_RESULT_WARNINGS_KEY, description = "Warning message in case of large number of ids being returned on insertion") @WritesAttribute(attribute = PutRethinkDB.RETHINKDB_INSERT_RESULT_WARNINGS_KEY, description = "Warning message in case of large number of ids being returned on insertion")
}) })
@SeeAlso({GetRethinkDB.class})
public class PutRethinkDB extends AbstractRethinkDBProcessor { public class PutRethinkDB extends AbstractRethinkDBProcessor {
public static AllowableValue CONFLICT_STRATEGY_UPDATE = new AllowableValue("update", "Update", "Update the document having same id with new values"); public static AllowableValue CONFLICT_STRATEGY_UPDATE = new AllowableValue("update", "Update", "Update the document having same id with new values");
@ -96,7 +98,6 @@ public class PutRethinkDB extends AbstractRethinkDBProcessor {
private static final Set<Relationship> relationships; private static final Set<Relationship> relationships;
private static final List<PropertyDescriptor> propertyDescriptors; private static final List<PropertyDescriptor> propertyDescriptors;
public static final String RETHINKDB_ERROR_MESSAGE = "rethinkdb.error.message";
public static final String RETHINKDB_INSERT_RESULT = "rethinkdb.insert.result"; public static final String RETHINKDB_INSERT_RESULT = "rethinkdb.insert.result";
public static final String RETHINKDB_INSERT_RESULT_ERROR_KEY = "rethinkdb.insert.errors"; public static final String RETHINKDB_INSERT_RESULT_ERROR_KEY = "rethinkdb.insert.errors";
public static final String RETHINKDB_INSERT_RESULT_DELETED_KEY = "rethinkdb.insert.deleted"; public static final String RETHINKDB_INSERT_RESULT_DELETED_KEY = "rethinkdb.insert.deleted";
@ -230,8 +231,6 @@ public class PutRethinkDB extends AbstractRethinkDBProcessor {
*/ */
@OnStopped @OnStopped
public void close() { public void close() {
getLogger().info("Closing connection"); super.close();
if ( rethinkDbConnection != null )
rethinkDbConnection.close();
} }
} }

View File

@ -13,3 +13,4 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
org.apache.nifi.processors.rethinkdb.PutRethinkDB org.apache.nifi.processors.rethinkdb.PutRethinkDB
org.apache.nifi.processors.rethinkdb.GetRethinkDB

View File

@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.rethinkdb;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import com.rethinkdb.RethinkDB;
import com.rethinkdb.net.Connection;
import net.minidev.json.JSONObject;
/**
* Integration test for getting documents from RethinkDB. Please ensure that the RethinkDB is running
* on local host with default port and has database test with table test and user
* <code>admin</code> with password <code>admin</code> before running the integration tests or set the attributes in the
* test accordingly.
*/
@Ignore("Comment this out for running tests against a real instance of RethinkDB")
public class ITGetRethinkDBTest {
private TestRunner runner;
private Connection connection;
private String dbName = "test";
private String dbHost = "localhost";
private String dbPort = "28015";
private String user = "admin";
private String password = "admin";
private String table = "test";
@Before
public void setUp() throws Exception {
runner = TestRunners.newTestRunner(GetRethinkDB.class);
runner.setProperty(AbstractRethinkDBProcessor.DB_NAME, dbName);
runner.setProperty(AbstractRethinkDBProcessor.DB_HOST, dbHost);
runner.setProperty(AbstractRethinkDBProcessor.DB_PORT, dbPort);
runner.setProperty(AbstractRethinkDBProcessor.USERNAME, user);
runner.setProperty(AbstractRethinkDBProcessor.PASSWORD, password);
runner.setProperty(AbstractRethinkDBProcessor.TABLE_NAME, table);
runner.setProperty(AbstractRethinkDBProcessor.CHARSET, "UTF-8");
connection = RethinkDB.r.connection().user(user, password).db(dbName).hostname(dbHost).port(Integer.parseInt(dbPort)).connect();
RethinkDB.r.db(dbName).table(table).delete().run(connection);
long count = RethinkDB.r.db(dbName).table(table).count().run(connection);
assertEquals("Count should be same", 0L, count);
}
@After
public void tearDown() throws Exception {
runner = null;
connection.close();
connection = null;
}
@Test
public void testGetDocumentById() {
JSONObject message = new JSONObject();
message.put("id", "1");
message.put("value", "one");
RethinkDB.r.db(dbName).table(table).insert(message).run(connection);
long count = RethinkDB.r.db(dbName).table(table).count().run(connection);
assertEquals("Count should be same", 1L, count);
runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "${rethinkdb.id}");
Map<String, String> props = new HashMap<>();
props.put("rethinkdb.id","1");
runner.enqueue(new byte [] {},props);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_SUCCESS);
assertEquals("Flow file count should be same", 1, flowFiles.size());
assertEquals("Error should be null",null, flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
assertEquals("Content should be same size", message.toString().length(), flowFiles.get(0).getSize());
flowFiles.get(0).assertContentEquals(message.toString());
}
@Test
public void testGetDocumentByIdNotFound() {
JSONObject message = new JSONObject();
message.put("id", "1");
message.put("value", "one");
RethinkDB.r.db(dbName).table(table).insert(message).run(connection);
long count = RethinkDB.r.db(dbName).table(table).count().run(connection);
assertEquals("Count should be same", 1L, count);
runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "${rethinkdb.id}");
Map<String, String> props = new HashMap<>();
props.put("rethinkdb.id", String.valueOf(System.currentTimeMillis()));
runner.enqueue(new byte [] {},props);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_NOT_FOUND, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_NOT_FOUND);
assertEquals("Flow file count should be same", 1, flowFiles.size());
assertNotNull("Error should not be null", flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
assertEquals("Content should be same size", 0, flowFiles.get(0).getSize());
}
@Test
public void testGetDocumentByHardCodedId() {
JSONObject message = new JSONObject();
message.put("id", "2");
message.put("value", "two");
RethinkDB.r.db(dbName).table(table).insert(message).run(connection);
long count = RethinkDB.r.db(dbName).table(table).count().run(connection);
assertEquals("Count should be same", 1, count);
runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "2");
Map<String, String> props = new HashMap<>();
runner.enqueue(new byte [] {},props);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_SUCCESS);
assertEquals("Flow file count should be same", 1, flowFiles.size());
assertEquals("Error should be null",null, flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
assertEquals("Content should be same size", message.toString().length(), flowFiles.get(0).getSize());
flowFiles.get(0).assertContentEquals(message.toString());
}
@Test
public void testGetDocumentTooBig() {
JSONObject message = new JSONObject();
message.put("id", "2");
message.put("value", "two");
RethinkDB.r.db(dbName).table(table).insert(message).run(connection);
long count = RethinkDB.r.db(dbName).table(table).count().run(connection);
assertEquals("Count should be same", 1, count);
runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "2");
runner.setProperty(AbstractRethinkDBProcessor.MAX_DOCUMENTS_SIZE, "2B");
Map<String, String> props = new HashMap<>();
runner.enqueue(new byte [] {},props);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_FAILURE);
assertEquals("Flow file count should be same", 1, flowFiles.size());
String errorMessage = flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE);
assertNotNull("Error should be not be null", errorMessage);
assertEquals("Error message should be same", "Document too big size " + message.toJSONString().length() + " bytes",errorMessage);
}
}

View File

@ -0,0 +1,262 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.rethinkdb;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.google.gson.Gson;
import com.rethinkdb.net.Connection;
public class TestGetRethinkDB {
private static final String DOCUMENT_ID = "id1";
private TestRunner runner;
private AbstractRethinkDBProcessor mockGetRethinkDB;
private Map<String,Object> document;
@Before
public void setUp() throws Exception {
mockGetRethinkDB = new GetRethinkDB() {
@Override
protected Connection makeConnection() {
return null;
}
@Override
protected Map<String,Object> getDocument(String id, String readMode) {
return document;
}
};
runner = TestRunners.newTestRunner(mockGetRethinkDB);
runner.setProperty(GetRethinkDB.DB_NAME, "test");
runner.setProperty(GetRethinkDB.DB_HOST, "host1");
runner.setProperty(GetRethinkDB.DB_PORT, "1234");
runner.setProperty(GetRethinkDB.USERNAME, "u1");
runner.setProperty(GetRethinkDB.PASSWORD, "p1");
runner.setProperty(GetRethinkDB.TABLE_NAME, "t1");
runner.setProperty(GetRethinkDB.CHARSET, "UTF-8");
runner.setProperty(GetRethinkDB.READ_MODE, "single");
runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "${rethinkdb.id}");
runner.setProperty(GetRethinkDB.MAX_DOCUMENTS_SIZE, "1 KB");
runner.assertValid();
}
@After
public void tearDown() throws Exception {
runner = null;
}
@Test
public void testDefaultValid() {
runner.assertValid();
}
@Test
public void testBlankHost() {
runner.setProperty(GetRethinkDB.DB_HOST, "");
runner.assertNotValid();
}
@Test
public void testEmptyPort() {
runner.setProperty(GetRethinkDB.DB_PORT, "");
runner.assertNotValid();
}
@Test
public void testEmptyDBName() {
runner.setProperty(GetRethinkDB.DB_NAME, "");
runner.assertNotValid();
}
@Test
public void testEmptyUsername() {
runner.setProperty(GetRethinkDB.USERNAME, "");
runner.assertNotValid();
}
@Test
public void testEmptyPassword() {
runner.setProperty(GetRethinkDB.PASSWORD, "p1");
runner.assertValid();
}
@Test
public void testCharsetUTF8() {
runner.setProperty(GetRethinkDB.CHARSET, "UTF-8");
runner.assertValid();
}
@Test
public void testCharsetBlank() {
runner.setProperty(GetRethinkDB.CHARSET, "");
runner.assertNotValid();
}
@Test
public void testZeroMaxDocumentSize() {
runner.setProperty(GetRethinkDB.MAX_DOCUMENTS_SIZE, "0");
runner.assertNotValid();
}
@Test
public void testBlankReadMode() {
runner.setProperty(GetRethinkDB.READ_MODE, "");
runner.assertNotValid();
}
@Test
public void testSizeGreaterThanThreshold() {
runner.setProperty(GetRethinkDB.MAX_DOCUMENTS_SIZE, "1 B");
runner.assertValid();
document = new HashMap<>();
document.put("hello", "rethinkdb");
HashMap<String,String> props = new HashMap<>();
props.put("rethinkdb.id", DOCUMENT_ID);
runner.enqueue(new byte[]{}, props);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_FAILURE);
assertNotNull(flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
}
@Test
public void testNotFound() {
runner.setProperty(AbstractRethinkDBProcessor.MAX_DOCUMENTS_SIZE, "1 B");
runner.assertValid();
HashMap<String,String> props = new HashMap<>();
props.put("rethinkdb.id", DOCUMENT_ID);
runner.enqueue(new byte[]{}, props);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_NOT_FOUND, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_NOT_FOUND);
assertNotNull(flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
flowFiles.get(0).assertAttributeEquals(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE,"Document with id '" + DOCUMENT_ID + "' not found");
}
@Test
public void testBlankId() {
runner.setProperty(AbstractRethinkDBProcessor.MAX_DOCUMENTS_SIZE, "1 B");
runner.assertValid();
runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "${rethinkdb.id}");
Map<String,String> props = new HashMap<>();
runner.enqueue(new byte[]{},props);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_FAILURE);
assertNotNull(flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
flowFiles.get(0).assertAttributeEquals(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE,GetRethinkDB.DOCUMENT_ID_EMPTY_MESSAGE);
}
@Test
public void testNullId() {
runner.setProperty(AbstractRethinkDBProcessor.MAX_DOCUMENTS_SIZE, "1 B");
runner.assertValid();
runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "${rethinkdb.id}");
Map<String,String> props = new HashMap<>();
props.put("rethinkdb.id", null);
runner.enqueue(new byte[]{},props);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_FAILURE);
assertNotNull(flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
flowFiles.get(0).assertAttributeEquals(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE,GetRethinkDB.DOCUMENT_ID_EMPTY_MESSAGE);
}
@Test
public void testValidSingleMessage() {
runner.setProperty(AbstractRethinkDBProcessor.MAX_DOCUMENTS_SIZE, "1 MB");
runner.assertValid();
document = new HashMap<>();
document.put("hello", "rethinkdb");
HashMap<String,String> props = new HashMap<>();
props.put("rethinkdb.id", DOCUMENT_ID);
runner.enqueue(new byte[]{}, props);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_SUCCESS, 1);
Gson gson = new Gson();
String json = gson.toJson(document);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_SUCCESS);
flowFiles.get(0).assertContentEquals(json.toString());
assertNull(flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
}
@Test
public void testGetThrowsException() {
mockGetRethinkDB = new GetRethinkDB() {
@Override
protected Connection makeConnection() {
return null;
}
@Override
protected Map<String,Object> getDocument(String id, String readMode) {
throw new RuntimeException("testException");
}
};
runner = TestRunners.newTestRunner(mockGetRethinkDB);
runner.setProperty(AbstractRethinkDBProcessor.DB_NAME, "test");
runner.setProperty(AbstractRethinkDBProcessor.DB_HOST, "host1");
runner.setProperty(AbstractRethinkDBProcessor.DB_PORT, "1234");
runner.setProperty(AbstractRethinkDBProcessor.USERNAME, "u1");
runner.setProperty(AbstractRethinkDBProcessor.PASSWORD, "p1");
runner.setProperty(AbstractRethinkDBProcessor.TABLE_NAME, "t1");
runner.setProperty(AbstractRethinkDBProcessor.CHARSET, "UTF-8");
runner.setProperty(GetRethinkDB.READ_MODE, "single");
runner.setProperty(AbstractRethinkDBProcessor.MAX_DOCUMENTS_SIZE, "1 KB");
runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, DOCUMENT_ID);
runner.assertValid();
HashMap<String,String> props = new HashMap<>();
props.put("rethinkdb.id", DOCUMENT_ID);
runner.enqueue(new byte[]{}, props);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(PutRethinkDB.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_FAILURE);
assertNotNull(flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
flowFiles.get(0).assertAttributeEquals(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE,"testException");
}
}