NIFI-2165: fix support for inserting timestamps into cassandra

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #602
This commit is contained in:
jeffoxenberg 2016-07-01 13:26:27 -05:00 committed by Matt Burgess
parent e7e349631f
commit 7e63b00364
2 changed files with 45 additions and 4 deletions

View File

@ -55,6 +55,7 @@ import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -318,7 +319,6 @@ public class PutCassandraQL extends AbstractCassandraProcessor {
if (mainType.equals(DataType.ascii())
|| mainType.equals(DataType.text())
|| mainType.equals(DataType.varchar())
|| mainType.equals(DataType.timestamp())
|| mainType.equals(DataType.timeuuid())
|| mainType.equals(DataType.uuid())
|| mainType.equals(DataType.inet())
@ -345,6 +345,8 @@ public class PutCassandraQL extends AbstractCassandraProcessor {
} else if (mainType.equals(DataType.blob())) {
statement.setBytes(paramIndex, (ByteBuffer) typeCodec.parse(paramValue));
} else if (mainType.equals(DataType.timestamp())) {
statement.setTimestamp(paramIndex, (Date) typeCodec.parse(paramValue));
}
return;
} else {
@ -399,4 +401,4 @@ public class PutCassandraQL extends AbstractCassandraProcessor {
super.stop();
}
}
}

View File

@ -80,7 +80,7 @@ public class PutCassandraQLTest {
public void testProcessorHappyPath() {
setUpStandardTestConfig();
testRunner.enqueue("INSERT INTO users (user_id, first_name, last_name, properties, bits, scaleset, largenum, scale, byteobject) VALUES ?, ?, ?, ?, ?, ?, ?, ?, ?",
testRunner.enqueue("INSERT INTO users (user_id, first_name, last_name, properties, bits, scaleset, largenum, scale, byteobject, ts) VALUES ?, ?, ?, ?, ?, ?, ?, ?, ?, ?",
new HashMap<String, String>() {
{
put("cql.args.1.type", "int");
@ -101,6 +101,8 @@ public class PutCassandraQLTest {
put("cql.args.8.value", "1.0");
put("cql.args.9.type", "blob");
put("cql.args.9.value", "0xDEADBEEF");
put("cql.args.10.type", "timestamp");
put("cql.args.10.value", "2016-07-01T15:21:05Z");
}
});
@ -110,6 +112,43 @@ public class PutCassandraQLTest {
testRunner.clearTransferState();
}
@Test
public void testProcessorBadTimestamp() {
setUpStandardTestConfig();
processor.setExceptionToThrow(
new InvalidQueryException(new InetSocketAddress("localhost", 9042), "invalid timestamp"));
testRunner.enqueue("INSERT INTO users (user_id, first_name, last_name, properties, bits, scaleset, largenum, scale, byteobject, ts) VALUES ?, ?, ?, ?, ?, ?, ?, ?, ?, ?",
new HashMap<String, String>() {
{
put("cql.args.1.type", "int");
put("cql.args.1.value", "1");
put("cql.args.2.type", "text");
put("cql.args.2.value", "Joe");
put("cql.args.3.type", "text");
// No value for arg 3 to test setNull
put("cql.args.4.type", "map<text,text>");
put("cql.args.4.value", "{'a':'Hello', 'b':'World'}");
put("cql.args.5.type", "list<boolean>");
put("cql.args.5.value", "[true,false,true]");
put("cql.args.6.type", "set<double>");
put("cql.args.6.value", "{1.0, 2.0}");
put("cql.args.7.type", "bigint");
put("cql.args.7.value", "20000000");
put("cql.args.8.type", "float");
put("cql.args.8.value", "1.0");
put("cql.args.9.type", "blob");
put("cql.args.9.value", "0xDEADBEEF");
put("cql.args.10.type", "timestamp");
put("cql.args.10.value", "not a timestamp");
}
});
testRunner.run(1, true, true);
testRunner.assertAllFlowFilesTransferred(PutCassandraQL.REL_FAILURE, 1);
testRunner.clearTransferState();
}
@Test
public void testProcessorInvalidQueryException() {
setUpStandardTestConfig();
@ -216,4 +255,4 @@ public class PutCassandraQLTest {
}
}
}
}