diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/pom.xml b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/pom.xml index cbd9125d5f..848d3625fd 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/pom.xml +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/pom.xml @@ -98,5 +98,18 @@ nifi-ssl-context-service-api test + + + org.testcontainers + cassandra + ${testcontainers.version} + test + + + org.testcontainers + junit-jupiter + ${testcontainers.version} + test + diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/java/org/apache/nifi/controller/cassandra/CassandraDistributedMapCache.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/java/org/apache/nifi/controller/cassandra/CassandraDistributedMapCache.java index a921ec1ad9..64bc38b079 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/java/org/apache/nifi/controller/cassandra/CassandraDistributedMapCache.java +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/java/org/apache/nifi/controller/cassandra/CassandraDistributedMapCache.java @@ -33,6 +33,7 @@ import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.processor.util.StandardValidators; import java.io.ByteArrayOutputStream; @@ -66,6 +67,7 @@ public class CassandraDistributedMapCache extends AbstractControllerService impl .description("The name of the table where the cache will be stored.") .required(true) .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); public static final PropertyDescriptor KEY_FIELD_NAME = new PropertyDescriptor.Builder() @@ -74,6 +76,7 @@ public class CassandraDistributedMapCache extends AbstractControllerService impl .description("The name of the field that acts as the unique key. (The CQL type should be \"blob\")") .required(true) .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); public static final PropertyDescriptor VALUE_FIELD_NAME = new PropertyDescriptor.Builder() @@ -82,6 +85,7 @@ public class CassandraDistributedMapCache extends AbstractControllerService impl .description("The name of the field that will store the value. (The CQL type should be \"blob\")") .required(true) .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); public static final PropertyDescriptor TTL = new PropertyDescriptor.Builder() @@ -90,6 +94,7 @@ public class CassandraDistributedMapCache extends AbstractControllerService impl .description("If configured, this will set a TTL (Time to Live) for each row inserted into the table so that " + "old cache items expire after a certain period of time.") .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .required(false) .build(); diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/test/groovy/org/apache/nifi/CassandraDistributedMapCacheIT.groovy b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/test/groovy/org/apache/nifi/CassandraDistributedMapCacheIT.groovy index 459106dd17..2fdc3e6657 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/test/groovy/org/apache/nifi/CassandraDistributedMapCacheIT.groovy +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/test/groovy/org/apache/nifi/CassandraDistributedMapCacheIT.groovy @@ -16,6 +16,7 @@ */ package org.apache.nifi +import com.datastax.driver.core.Cluster import com.datastax.driver.core.Session import org.apache.nifi.controller.cassandra.CassandraDistributedMapCache import org.apache.nifi.distributed.cache.client.Deserializer @@ -30,6 +31,10 @@ import org.apache.nifi.util.TestRunners import org.junit.jupiter.api.AfterAll import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.Test +import org.testcontainers.containers.CassandraContainer +import org.testcontainers.junit.jupiter.Container +import org.testcontainers.junit.jupiter.Testcontainers +import org.testcontainers.utility.DockerImageName /** * Setup instructions: @@ -42,11 +47,16 @@ import org.junit.jupiter.api.Test * * Table SQL: create table dmc (id blob, value blob, primary key(id)); */ +@Testcontainers class CassandraDistributedMapCacheIT { + @Container + static final CassandraContainer CASSANDRA_CONTAINER = new CassandraContainer(DockerImageName.parse("cassandra:4.1")) static TestRunner runner static CassandraDistributedMapCache distributedMapCache static Session session + static final String KEYSPACE = "sample_keyspace" + @BeforeAll static void setup() { runner = TestRunners.newTestRunner(new AbstractProcessor() { @@ -57,10 +67,20 @@ class CassandraDistributedMapCacheIT { }) distributedMapCache = new CassandraDistributedMapCache() + InetSocketAddress contactPoint = CASSANDRA_CONTAINER.getContactPoint() + String connectionString = String.format("%s:%d", contactPoint.getHostName(), contactPoint.getPort()) + + Cluster cluster = Cluster.builder().addContactPoint(contactPoint.getHostName()) + .withPort(contactPoint.getPort()).build(); + session = cluster.connect(); + + session.execute("create keyspace nifi_test with replication = { 'replication_factor': 1, 'class': 'SimpleStrategy' }"); + session.execute("create table nifi_test.dmc (id blob, value blob, primary key(id))"); + def cassandraService = new CassandraSessionProvider() runner.addControllerService("provider", cassandraService) runner.addControllerService("dmc", distributedMapCache) - runner.setProperty(cassandraService, CassandraSessionProvider.CONTACT_POINTS, "localhost:9042") + runner.setProperty(cassandraService, CassandraSessionProvider.CONTACT_POINTS, connectionString) runner.setProperty(cassandraService, CassandraSessionProvider.KEYSPACE, "nifi_test") runner.setProperty(distributedMapCache, CassandraDistributedMapCache.SESSION_PROVIDER, "provider") runner.setProperty(distributedMapCache, CassandraDistributedMapCache.TABLE_NAME, "dmc") @@ -85,7 +105,7 @@ class CassandraDistributedMapCacheIT { @AfterAll static void cleanup() { - session.execute("TRUNCATE dmc") + session.execute("TRUNCATE nifi_test.dmc") } Serializer serializer = { str, os -> diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml index f7f9ad7873..84fee14a78 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml @@ -93,5 +93,18 @@ org.apache.commons commons-text + + + org.testcontainers + cassandra + ${testcontainers.version} + test + + + org.testcontainers + junit-jupiter + ${testcontainers.version} + test + diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordIT.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordIT.java index cee7fd1656..f0943c2afb 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordIT.java +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordIT.java @@ -30,13 +30,21 @@ import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.testcontainers.containers.CassandraContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import java.net.InetSocketAddress; import java.util.List; import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; +@Testcontainers public class PutCassandraRecordIT { + @Container + private static final CassandraContainer CASSANDRA_CONTAINER = new CassandraContainer(DockerImageName.parse("cassandra:4.1")); private static TestRunner testRunner; private static MockRecordParser recordReader; @@ -46,16 +54,15 @@ public class PutCassandraRecordIT { private static final String KEYSPACE = "sample_keyspace"; private static final String TABLE = "sample_table"; - private static final String HOST = "localhost"; - private static final int PORT = 9042; @BeforeAll public static void setup() throws InitializationException { recordReader = new MockRecordParser(); testRunner = TestRunners.newTestRunner(PutCassandraRecord.class); + InetSocketAddress contactPoint = CASSANDRA_CONTAINER.getContactPoint(); testRunner.setProperty(PutCassandraRecord.RECORD_READER_FACTORY, "reader"); - testRunner.setProperty(PutCassandraRecord.CONTACT_POINTS, HOST + ":" + PORT); + testRunner.setProperty(PutCassandraRecord.CONTACT_POINTS, contactPoint.getHostString() + ":" + contactPoint.getPort()); testRunner.setProperty(PutCassandraRecord.KEYSPACE, KEYSPACE); testRunner.setProperty(PutCassandraRecord.TABLE, TABLE); testRunner.setProperty(PutCassandraRecord.CONSISTENCY_LEVEL, "SERIAL"); @@ -63,7 +70,8 @@ public class PutCassandraRecordIT { testRunner.addControllerService("reader", recordReader); testRunner.enableControllerService(recordReader); - cluster = Cluster.builder().addContactPoint(HOST).withPort(PORT).build(); + cluster = Cluster.builder().addContactPoint(contactPoint.getHostName()) + .withPort(contactPoint.getPort()).build(); session = cluster.connect(); String createKeyspace = "CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " WITH replication = {'class':'SimpleStrategy','replication_factor':1};";