NIFI-3852 Add expression language support to Cassandra processors

Remove unused import

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #1770
This commit is contained in:
Tim Reardon 2017-05-09 12:39:18 -04:00 committed by Matt Burgess
parent b1901d5fe0
commit afd2b04afd
6 changed files with 137 additions and 15 deletions

View File

@ -47,7 +47,6 @@ import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -66,7 +65,7 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
+ "comma-separated and in hostname:port format. Example node1:port,node2:port,...." + "comma-separated and in hostname:port format. Example node1:port,node2:port,...."
+ " The default client port for Cassandra is 9042, but the port(s) must be explicitly specified.") + " The default client port for Cassandra is 9042, but the port(s) must be explicitly specified.")
.required(true) .required(true)
.expressionLanguageSupported(false) .expressionLanguageSupported(true)
.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
.build(); .build();
@ -75,6 +74,7 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
.description("The Cassandra Keyspace to connect to. If no keyspace is specified, the query will need to " .description("The Cassandra Keyspace to connect to. If no keyspace is specified, the query will need to "
+ "include the keyspace name before any table reference.") + "include the keyspace name before any table reference.")
.required(false) .required(false)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();
@ -100,6 +100,7 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
.name("Username") .name("Username")
.description("Username to access the Cassandra cluster") .description("Username to access the Cassandra cluster")
.required(false) .required(false)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();
@ -108,6 +109,7 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
.description("Password to access the Cassandra cluster") .description("Password to access the Cassandra cluster")
.required(false) .required(false)
.sensitive(true) .sensitive(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();
@ -123,6 +125,7 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
.name("Character Set") .name("Character Set")
.description("Specifies the character set of the record data.") .description("Specifies the character set of the record data.")
.required(true) .required(true)
.expressionLanguageSupported(true)
.defaultValue("UTF-8") .defaultValue("UTF-8")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.build(); .build();
@ -150,8 +153,9 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
Set<ValidationResult> results = new HashSet<>(); Set<ValidationResult> results = new HashSet<>();
// Ensure that if username or password is set, then the other is too // Ensure that if username or password is set, then the other is too
Map<PropertyDescriptor, String> propertyMap = validationContext.getProperties(); String userName = validationContext.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
if (StringUtils.isEmpty(propertyMap.get(USERNAME)) != StringUtils.isEmpty(propertyMap.get(PASSWORD))) { String password = validationContext.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
if (StringUtils.isEmpty(userName) != StringUtils.isEmpty(password)) {
results.add(new ValidationResult.Builder().valid(false).explanation( results.add(new ValidationResult.Builder().valid(false).explanation(
"If username or password is specified, then the other must be specified as well").build()); "If username or password is specified, then the other must be specified as well").build());
} }
@ -162,7 +166,7 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
protected void connectToCassandra(ProcessContext context) { protected void connectToCassandra(ProcessContext context) {
if (cluster.get() == null) { if (cluster.get() == null) {
ComponentLog log = getLogger(); ComponentLog log = getLogger();
final String contactPointList = context.getProperty(CONTACT_POINTS).getValue(); final String contactPointList = context.getProperty(CONTACT_POINTS).evaluateAttributeExpressions().getValue();
final String consistencyLevel = context.getProperty(CONSISTENCY_LEVEL).getValue(); final String consistencyLevel = context.getProperty(CONSISTENCY_LEVEL).getValue();
List<InetSocketAddress> contactPoints = getContactPoints(contactPointList); List<InetSocketAddress> contactPoints = getContactPoints(contactPointList);
@ -190,8 +194,8 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
} }
final String username, password; final String username, password;
PropertyValue usernameProperty = context.getProperty(USERNAME); PropertyValue usernameProperty = context.getProperty(USERNAME).evaluateAttributeExpressions();
PropertyValue passwordProperty = context.getProperty(PASSWORD); PropertyValue passwordProperty = context.getProperty(PASSWORD).evaluateAttributeExpressions();
if (usernameProperty != null && passwordProperty != null) { if (usernameProperty != null && passwordProperty != null) {
username = usernameProperty.getValue(); username = usernameProperty.getValue();
@ -203,7 +207,7 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
// Create the cluster and connect to it // Create the cluster and connect to it
Cluster newCluster = createCluster(contactPoints, sslContext, username, password); Cluster newCluster = createCluster(contactPoints, sslContext, username, password);
PropertyValue keyspaceProperty = context.getProperty(KEYSPACE); PropertyValue keyspaceProperty = context.getProperty(KEYSPACE).evaluateAttributeExpressions();
final Session newSession; final Session newSession;
if (keyspaceProperty != null) { if (keyspaceProperty != null) {
newSession = newCluster.connect(keyspaceProperty.getValue()); newSession = newCluster.connect(keyspaceProperty.getValue());

View File

@ -95,6 +95,7 @@ public class PutCassandraQL extends AbstractCassandraProcessor {
+ "Time Unit, such as: nanos, millis, secs, mins, hrs, days. A value of zero means there is no limit. ") + "Time Unit, such as: nanos, millis, secs, mins, hrs, days. A value of zero means there is no limit. ")
.defaultValue("0 seconds") .defaultValue("0 seconds")
.required(true) .required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build(); .build();
@ -177,8 +178,8 @@ public class PutCassandraQL extends AbstractCassandraProcessor {
} }
final long startNanos = System.nanoTime(); final long startNanos = System.nanoTime();
final long statementTimeout = context.getProperty(STATEMENT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); final long statementTimeout = context.getProperty(STATEMENT_TIMEOUT).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.MILLISECONDS);
final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue()); final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
// The documentation for the driver recommends the session remain open the entire time the processor is running // The documentation for the driver recommends the session remain open the entire time the processor is running
// and states that it is thread-safe. This is why connectionSession is not in a try-with-resources. // and states that it is thread-safe. This is why connectionSession is not in a try-with-resources.

View File

@ -103,6 +103,7 @@ public class QueryCassandra extends AbstractCassandraProcessor {
+ "Time Unit, such as: nanos, millis, secs, mins, hrs, days. A value of zero means there is no limit. ") + "Time Unit, such as: nanos, millis, secs, mins, hrs, days. A value of zero means there is no limit. ")
.defaultValue("0 seconds") .defaultValue("0 seconds")
.required(true) .required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build(); .build();
@ -112,6 +113,7 @@ public class QueryCassandra extends AbstractCassandraProcessor {
+ "and means there is no limit.") + "and means there is no limit.")
.defaultValue("0") .defaultValue("0")
.required(true) .required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.INTEGER_VALIDATOR) .addValidator(StandardValidators.INTEGER_VALIDATOR)
.build(); .build();
@ -178,7 +180,7 @@ public class QueryCassandra extends AbstractCassandraProcessor {
ComponentLog log = getLogger(); ComponentLog log = getLogger();
try { try {
connectToCassandra(context); connectToCassandra(context);
final int fetchSize = context.getProperty(FETCH_SIZE).asInteger(); final int fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
if (fetchSize > 0) { if (fetchSize > 0) {
synchronized (cluster.get()) { synchronized (cluster.get()) {
cluster.get().getConfiguration().getQueryOptions().setFetchSize(fetchSize); cluster.get().getConfiguration().getQueryOptions().setFetchSize(fetchSize);
@ -214,9 +216,9 @@ public class QueryCassandra extends AbstractCassandraProcessor {
final ComponentLog logger = getLogger(); final ComponentLog logger = getLogger();
final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
final long queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS);
final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue(); final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue();
final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue()); final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
final StopWatch stopWatch = new StopWatch(true); final StopWatch stopWatch = new StopWatch(true);
if (fileToProcess == null) { if (fileToProcess == null) {

View File

@ -88,6 +88,16 @@ public class AbstractCassandraProcessorTest {
testRunner.assertValid(); testRunner.assertValid();
} }
@Test
public void testCustomValidateEL() throws Exception {
testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS, "${host}");
testRunner.setProperty(AbstractCassandraProcessor.KEYSPACE, "${keyspace}");
testRunner.setProperty(AbstractCassandraProcessor.USERNAME, "${user}");
testRunner.setProperty(AbstractCassandraProcessor.PASSWORD, "${password}");
testRunner.setProperty(AbstractCassandraProcessor.CHARSET, "${charset}");
testRunner.assertValid();
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Test @Test
public void testGetCassandraObject() throws Exception { public void testGetCassandraObject() throws Exception {
@ -247,7 +257,6 @@ public class AbstractCassandraProcessorTest {
assertNotNull(processor.getCluster()); assertNotNull(processor.getCluster());
} }
/** /**
* Provides a stubbed processor instance for testing * Provides a stubbed processor instance for testing
*/ */
@ -255,7 +264,7 @@ public class AbstractCassandraProcessorTest {
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Arrays.asList(CONTACT_POINTS, USERNAME, PASSWORD, CONSISTENCY_LEVEL); return Arrays.asList(CONTACT_POINTS, KEYSPACE, USERNAME, PASSWORD, CONSISTENCY_LEVEL, CHARSET);
} }
@Override @Override

View File

@ -76,6 +76,17 @@ public class PutCassandraQLTest {
testRunner.assertValid(); testRunner.assertValid();
} }
@Test
public void testProcessorELConfigValidity() {
testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS, "${hosts}");
testRunner.setProperty(AbstractCassandraProcessor.PASSWORD, "${pass}");
testRunner.setProperty(AbstractCassandraProcessor.USERNAME, "${user}");
testRunner.setProperty(AbstractCassandraProcessor.CHARSET, "${charset}");
testRunner.setProperty(PutCassandraQL.STATEMENT_TIMEOUT, "${timeout}");
testRunner.assertValid();
}
@Test @Test
public void testProcessorHappyPath() { public void testProcessorHappyPath() {
setUpStandardTestConfig(); setUpStandardTestConfig();
@ -112,6 +123,54 @@ public class PutCassandraQLTest {
testRunner.clearTransferState(); testRunner.clearTransferState();
} }
@Test
public void testProcessorHappyPathELConfig() {
testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS, "${hosts}");
testRunner.setProperty(AbstractCassandraProcessor.PASSWORD, "${pass}");
testRunner.setProperty(AbstractCassandraProcessor.USERNAME, "${user}");
testRunner.setProperty(AbstractCassandraProcessor.CONSISTENCY_LEVEL, "ONE");
testRunner.setProperty(AbstractCassandraProcessor.CHARSET, "${charset}");
testRunner.setProperty(PutCassandraQL.STATEMENT_TIMEOUT, "${timeout}");
testRunner.assertValid();
testRunner.setVariable("hosts", "localhost:9042");
testRunner.setVariable("user", "username");
testRunner.setVariable("pass", "password");
testRunner.setVariable("charset", "UTF-8");
testRunner.setVariable("timeout", "30 sec");
testRunner.enqueue("INSERT INTO users (user_id, first_name, last_name, properties, bits, scaleset, largenum, scale, byteobject, ts) VALUES ?, ?, ?, ?, ?, ?, ?, ?, ?, ?",
new HashMap<String, String>() {
{
put("cql.args.1.type", "int");
put("cql.args.1.value", "1");
put("cql.args.2.type", "text");
put("cql.args.2.value", "Joe");
put("cql.args.3.type", "text");
// No value for arg 3 to test setNull
put("cql.args.4.type", "map<text,text>");
put("cql.args.4.value", "{'a':'Hello', 'b':'World'}");
put("cql.args.5.type", "list<boolean>");
put("cql.args.5.value", "[true,false,true]");
put("cql.args.6.type", "set<double>");
put("cql.args.6.value", "{1.0, 2.0}");
put("cql.args.7.type", "bigint");
put("cql.args.7.value", "20000000");
put("cql.args.8.type", "float");
put("cql.args.8.value", "1.0");
put("cql.args.9.type", "blob");
put("cql.args.9.value", "0xDEADBEEF");
put("cql.args.10.type", "timestamp");
put("cql.args.10.value", "2016-07-01T15:21:05Z");
}
});
testRunner.run(1, true, true);
testRunner.assertAllFlowFilesTransferred(PutCassandraQL.REL_SUCCESS, 1);
testRunner.clearTransferState();
}
@Test @Test
public void testProcessorBadTimestamp() { public void testProcessorBadTimestamp() {
setUpStandardTestConfig(); setUpStandardTestConfig();

View File

@ -78,6 +78,16 @@ public class QueryCassandraTest {
testRunner.assertValid(); testRunner.assertValid();
} }
@Test
public void testProcessorELConfigValid() {
testRunner.setProperty(AbstractCassandraProcessor.CONSISTENCY_LEVEL, "ONE");
testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS, "${hosts}");
testRunner.setProperty(QueryCassandra.CQL_SELECT_QUERY, "${query}");
testRunner.setProperty(AbstractCassandraProcessor.PASSWORD, "${pass}");
testRunner.setProperty(AbstractCassandraProcessor.USERNAME, "${user}");
testRunner.assertValid();
}
@Test @Test
public void testProcessorNoInputFlowFileAndExceptions() { public void testProcessorNoInputFlowFileAndExceptions() {
setUpStandardProcessorConfig(); setUpStandardProcessorConfig();
@ -139,6 +149,43 @@ public class QueryCassandraTest {
new String(files.get(0).toByteArray())); new String(files.get(0).toByteArray()));
} }
@Test
public void testProcessorELConfigJsonOutput() {
testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS, "${hosts}");
testRunner.setProperty(QueryCassandra.CQL_SELECT_QUERY, "${query}");
testRunner.setProperty(AbstractCassandraProcessor.PASSWORD, "${pass}");
testRunner.setProperty(AbstractCassandraProcessor.USERNAME, "${user}");
testRunner.setProperty(AbstractCassandraProcessor.CHARSET, "${charset}");
testRunner.setProperty(QueryCassandra.QUERY_TIMEOUT, "${timeout}");
testRunner.setProperty(QueryCassandra.FETCH_SIZE, "${fetch}");
testRunner.setIncomingConnection(false);
testRunner.assertValid();
testRunner.setVariable("hosts", "localhost:9042");
testRunner.setVariable("user", "username");
testRunner.setVariable("pass", "password");
testRunner.setVariable("charset", "UTF-8");
testRunner.setVariable("timeout", "30 sec");
testRunner.setVariable("fetch", "0");
// Test JSON output
testRunner.setProperty(QueryCassandra.OUTPUT_FORMAT, QueryCassandra.JSON_FORMAT);
testRunner.run(1, true, true);
testRunner.assertAllFlowFilesTransferred(QueryCassandra.REL_SUCCESS, 1);
List<MockFlowFile> files = testRunner.getFlowFilesForRelationship(QueryCassandra.REL_SUCCESS);
assertNotNull(files);
assertEquals("One file should be transferred to success", 1, files.size());
assertEquals("{\"results\":[{\"user_id\":\"user1\",\"first_name\":\"Joe\",\"last_name\":\"Smith\","
+ "\"emails\":[\"jsmith@notareal.com\"],\"top_places\":[\"New York, NY\",\"Santa Clara, CA\"],"
+ "\"todo\":{\"2016-01-03 05:00:00+0000\":\"Set my alarm for a month from now\"},"
+ "\"registered\":\"false\",\"scale\":1.0,\"metric\":2.0},"
+ "{\"user_id\":\"user2\",\"first_name\":\"Mary\",\"last_name\":\"Jones\","
+ "\"emails\":[\"mjones@notareal.com\"],\"top_places\":[\"Orlando, FL\"],"
+ "\"todo\":{\"2016-02-03 05:00:00+0000\":\"Get milk and bread\"},"
+ "\"registered\":\"true\",\"scale\":3.0,\"metric\":4.0}]}",
new String(files.get(0).toByteArray()));
}
@Test @Test
public void testProcessorJsonOutputWithQueryTimeout() { public void testProcessorJsonOutputWithQueryTimeout() {
setUpStandardProcessorConfig(); setUpStandardProcessorConfig();