NIFI-10729 Added Cassandra testcontainers.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #6604
This commit is contained in:
Mike Thomsen 2022-10-29 19:40:52 -04:00 committed by Matthew Burgess
parent b6d95faa95
commit 2a45d4ac89
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
5 changed files with 65 additions and 6 deletions

View File

@ -98,5 +98,18 @@
<artifactId>nifi-ssl-context-service-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>cassandra</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -33,6 +33,7 @@ import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.ByteArrayOutputStream;
@ -66,6 +67,7 @@ public class CassandraDistributedMapCache extends AbstractControllerService impl
.description("The name of the table where the cache will be stored.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor KEY_FIELD_NAME = new PropertyDescriptor.Builder()
@ -74,6 +76,7 @@ public class CassandraDistributedMapCache extends AbstractControllerService impl
.description("The name of the field that acts as the unique key. (The CQL type should be \"blob\")")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor VALUE_FIELD_NAME = new PropertyDescriptor.Builder()
@ -82,6 +85,7 @@ public class CassandraDistributedMapCache extends AbstractControllerService impl
.description("The name of the field that will store the value. (The CQL type should be \"blob\")")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
@ -90,6 +94,7 @@ public class CassandraDistributedMapCache extends AbstractControllerService impl
.description("If configured, this will set a TTL (Time to Live) for each row inserted into the table so that " +
"old cache items expire after a certain period of time.")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(false)
.build();

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi
import com.datastax.driver.core.Cluster
import com.datastax.driver.core.Session
import org.apache.nifi.controller.cassandra.CassandraDistributedMapCache
import org.apache.nifi.distributed.cache.client.Deserializer
@ -30,6 +31,10 @@ import org.apache.nifi.util.TestRunners
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test
import org.testcontainers.containers.CassandraContainer
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers
import org.testcontainers.utility.DockerImageName
/**
* Setup instructions:
@ -42,11 +47,16 @@ import org.junit.jupiter.api.Test
*
* Table SQL: create table dmc (id blob, value blob, primary key(id));
*/
@Testcontainers
class CassandraDistributedMapCacheIT {
@Container
static final CassandraContainer CASSANDRA_CONTAINER = new CassandraContainer(DockerImageName.parse("cassandra:4.1"))
static TestRunner runner
static CassandraDistributedMapCache distributedMapCache
static Session session
static final String KEYSPACE = "sample_keyspace"
@BeforeAll
static void setup() {
runner = TestRunners.newTestRunner(new AbstractProcessor() {
@ -57,10 +67,20 @@ class CassandraDistributedMapCacheIT {
})
distributedMapCache = new CassandraDistributedMapCache()
InetSocketAddress contactPoint = CASSANDRA_CONTAINER.getContactPoint()
String connectionString = String.format("%s:%d", contactPoint.getHostName(), contactPoint.getPort())
Cluster cluster = Cluster.builder().addContactPoint(contactPoint.getHostName())
.withPort(contactPoint.getPort()).build();
session = cluster.connect();
session.execute("create keyspace nifi_test with replication = { 'replication_factor': 1, 'class': 'SimpleStrategy' }");
session.execute("create table nifi_test.dmc (id blob, value blob, primary key(id))");
def cassandraService = new CassandraSessionProvider()
runner.addControllerService("provider", cassandraService)
runner.addControllerService("dmc", distributedMapCache)
runner.setProperty(cassandraService, CassandraSessionProvider.CONTACT_POINTS, "localhost:9042")
runner.setProperty(cassandraService, CassandraSessionProvider.CONTACT_POINTS, connectionString)
runner.setProperty(cassandraService, CassandraSessionProvider.KEYSPACE, "nifi_test")
runner.setProperty(distributedMapCache, CassandraDistributedMapCache.SESSION_PROVIDER, "provider")
runner.setProperty(distributedMapCache, CassandraDistributedMapCache.TABLE_NAME, "dmc")
@ -85,7 +105,7 @@ class CassandraDistributedMapCacheIT {
@AfterAll
static void cleanup() {
session.execute("TRUNCATE dmc")
session.execute("TRUNCATE nifi_test.dmc")
}
Serializer<String> serializer = { str, os ->

View File

@ -93,5 +93,18 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>cassandra</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -30,13 +30,21 @@ import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.CassandraContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
@Testcontainers
public class PutCassandraRecordIT {
@Container
private static final CassandraContainer CASSANDRA_CONTAINER = new CassandraContainer(DockerImageName.parse("cassandra:4.1"));
private static TestRunner testRunner;
private static MockRecordParser recordReader;
@ -46,16 +54,15 @@ public class PutCassandraRecordIT {
private static final String KEYSPACE = "sample_keyspace";
private static final String TABLE = "sample_table";
private static final String HOST = "localhost";
private static final int PORT = 9042;
@BeforeAll
public static void setup() throws InitializationException {
recordReader = new MockRecordParser();
testRunner = TestRunners.newTestRunner(PutCassandraRecord.class);
InetSocketAddress contactPoint = CASSANDRA_CONTAINER.getContactPoint();
testRunner.setProperty(PutCassandraRecord.RECORD_READER_FACTORY, "reader");
testRunner.setProperty(PutCassandraRecord.CONTACT_POINTS, HOST + ":" + PORT);
testRunner.setProperty(PutCassandraRecord.CONTACT_POINTS, contactPoint.getHostString() + ":" + contactPoint.getPort());
testRunner.setProperty(PutCassandraRecord.KEYSPACE, KEYSPACE);
testRunner.setProperty(PutCassandraRecord.TABLE, TABLE);
testRunner.setProperty(PutCassandraRecord.CONSISTENCY_LEVEL, "SERIAL");
@ -63,7 +70,8 @@ public class PutCassandraRecordIT {
testRunner.addControllerService("reader", recordReader);
testRunner.enableControllerService(recordReader);
cluster = Cluster.builder().addContactPoint(HOST).withPort(PORT).build();
cluster = Cluster.builder().addContactPoint(contactPoint.getHostName())
.withPort(contactPoint.getPort()).build();
session = cluster.connect();
String createKeyspace = "CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " WITH replication = {'class':'SimpleStrategy','replication_factor':1};";