diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java index 4af3aa72e1..1e0c973eb1 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java @@ -55,6 +55,7 @@ import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collections; +import java.util.Date; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -318,7 +319,6 @@ public class PutCassandraQL extends AbstractCassandraProcessor { if (mainType.equals(DataType.ascii()) || mainType.equals(DataType.text()) || mainType.equals(DataType.varchar()) - || mainType.equals(DataType.timestamp()) || mainType.equals(DataType.timeuuid()) || mainType.equals(DataType.uuid()) || mainType.equals(DataType.inet()) @@ -345,6 +345,8 @@ public class PutCassandraQL extends AbstractCassandraProcessor { } else if (mainType.equals(DataType.blob())) { statement.setBytes(paramIndex, (ByteBuffer) typeCodec.parse(paramValue)); + } else if (mainType.equals(DataType.timestamp())) { + statement.setTimestamp(paramIndex, (Date) typeCodec.parse(paramValue)); } return; } else { @@ -399,4 +401,4 @@ public class PutCassandraQL extends AbstractCassandraProcessor { super.stop(); } -} \ No newline at end of file +} diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java index 1a405567cc..b3e4fe2bf1 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java @@ -80,7 +80,7 @@ public class PutCassandraQLTest { public void testProcessorHappyPath() { setUpStandardTestConfig(); - testRunner.enqueue("INSERT INTO users (user_id, first_name, last_name, properties, bits, scaleset, largenum, scale, byteobject) VALUES ?, ?, ?, ?, ?, ?, ?, ?, ?", + testRunner.enqueue("INSERT INTO users (user_id, first_name, last_name, properties, bits, scaleset, largenum, scale, byteobject, ts) VALUES ?, ?, ?, ?, ?, ?, ?, ?, ?, ?", new HashMap() { { put("cql.args.1.type", "int"); @@ -101,6 +101,8 @@ public class PutCassandraQLTest { put("cql.args.8.value", "1.0"); put("cql.args.9.type", "blob"); put("cql.args.9.value", "0xDEADBEEF"); + put("cql.args.10.type", "timestamp"); + put("cql.args.10.value", "2016-07-01T15:21:05Z"); } }); @@ -110,6 +112,43 @@ public class PutCassandraQLTest { testRunner.clearTransferState(); } + @Test + public void testProcessorBadTimestamp() { + setUpStandardTestConfig(); + processor.setExceptionToThrow( + new InvalidQueryException(new InetSocketAddress("localhost", 9042), "invalid timestamp")); + testRunner.enqueue("INSERT INTO users (user_id, first_name, last_name, properties, bits, scaleset, largenum, scale, byteobject, ts) VALUES ?, ?, ?, ?, ?, ?, ?, ?, ?, ?", + new HashMap() { + { + put("cql.args.1.type", "int"); + put("cql.args.1.value", "1"); + put("cql.args.2.type", "text"); + put("cql.args.2.value", "Joe"); + put("cql.args.3.type", "text"); + // No value for arg 3 to test setNull + put("cql.args.4.type", "map"); + put("cql.args.4.value", "{'a':'Hello', 'b':'World'}"); + put("cql.args.5.type", "list"); + put("cql.args.5.value", "[true,false,true]"); + put("cql.args.6.type", "set"); + put("cql.args.6.value", "{1.0, 2.0}"); + put("cql.args.7.type", "bigint"); + put("cql.args.7.value", "20000000"); + put("cql.args.8.type", "float"); + put("cql.args.8.value", "1.0"); + put("cql.args.9.type", "blob"); + put("cql.args.9.value", "0xDEADBEEF"); + put("cql.args.10.type", "timestamp"); + put("cql.args.10.value", "not a timestamp"); + + } + }); + + testRunner.run(1, true, true); + testRunner.assertAllFlowFilesTransferred(PutCassandraQL.REL_FAILURE, 1); + testRunner.clearTransferState(); + } + @Test public void testProcessorInvalidQueryException() { setUpStandardTestConfig(); @@ -216,4 +255,4 @@ public class PutCassandraQLTest { } } -} \ No newline at end of file +}