mirror of https://github.com/apache/nifi.git
NIFI-7132: fix handling of UUIDs in PutCassandraQL
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #4048.
This commit is contained in:
parent
65b2a9bc2c
commit
a80b2475f7
|
@ -63,6 +63,7 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
@ -327,8 +328,6 @@ public class PutCassandraQL extends AbstractCassandraProcessor {
|
|||
if (mainType.equals(DataType.ascii())
|
||||
|| mainType.equals(DataType.text())
|
||||
|| mainType.equals(DataType.varchar())
|
||||
|| mainType.equals(DataType.timeuuid())
|
||||
|| mainType.equals(DataType.uuid())
|
||||
|| mainType.equals(DataType.inet())
|
||||
|| mainType.equals(DataType.varint())) {
|
||||
// These are strings, so just use the paramValue
|
||||
|
@ -355,6 +354,9 @@ public class PutCassandraQL extends AbstractCassandraProcessor {
|
|||
|
||||
} else if (mainType.equals(DataType.timestamp())) {
|
||||
statement.setTimestamp(paramIndex, (Date) typeCodec.parse(paramValue));
|
||||
} else if (mainType.equals(DataType.timeuuid())
|
||||
|| mainType.equals(DataType.uuid())) {
|
||||
statement.setUUID(paramIndex, (UUID) typeCodec.parse(paramValue));
|
||||
}
|
||||
return;
|
||||
} else {
|
||||
|
|
|
@ -252,6 +252,78 @@ public class PutCassandraQLTest {
|
|||
testRunner.clearTransferState();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProcessorUuid() {
|
||||
setUpStandardTestConfig();
|
||||
|
||||
testRunner.enqueue("INSERT INTO users (user_id, first_name, last_name, properties, bits, scaleset, largenum, scale, byteobject, ts) VALUES ?, ?, ?, ?, ?, ?, ?, ?, ?, ?",
|
||||
new HashMap<String, String>() {
|
||||
{
|
||||
put("cql.args.1.type", "int");
|
||||
put("cql.args.1.value", "1");
|
||||
put("cql.args.2.type", "text");
|
||||
put("cql.args.2.value", "Joe");
|
||||
put("cql.args.3.type", "text");
|
||||
// No value for arg 3 to test setNull
|
||||
put("cql.args.4.type", "map<text,text>");
|
||||
put("cql.args.4.value", "{'a':'Hello', 'b':'World'}");
|
||||
put("cql.args.5.type", "list<boolean>");
|
||||
put("cql.args.5.value", "[true,false,true]");
|
||||
put("cql.args.6.type", "set<double>");
|
||||
put("cql.args.6.value", "{1.0, 2.0}");
|
||||
put("cql.args.7.type", "bigint");
|
||||
put("cql.args.7.value", "20000000");
|
||||
put("cql.args.8.type", "float");
|
||||
put("cql.args.8.value", "1.0");
|
||||
put("cql.args.9.type", "blob");
|
||||
put("cql.args.9.value", "0xDEADBEEF");
|
||||
put("cql.args.10.type", "uuid");
|
||||
put("cql.args.10.value", "5442b1f6-4c16-11ea-87f5-45a32dbc5199");
|
||||
|
||||
}
|
||||
});
|
||||
|
||||
testRunner.run(1, true, true);
|
||||
testRunner.assertAllFlowFilesTransferred(PutCassandraQL.REL_SUCCESS, 1);
|
||||
testRunner.clearTransferState();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProcessorBadUuid() {
|
||||
setUpStandardTestConfig();
|
||||
|
||||
testRunner.enqueue("INSERT INTO users (user_id, first_name, last_name, properties, bits, scaleset, largenum, scale, byteobject, ts) VALUES ?, ?, ?, ?, ?, ?, ?, ?, ?, ?",
|
||||
new HashMap<String, String>() {
|
||||
{
|
||||
put("cql.args.1.type", "int");
|
||||
put("cql.args.1.value", "1");
|
||||
put("cql.args.2.type", "text");
|
||||
put("cql.args.2.value", "Joe");
|
||||
put("cql.args.3.type", "text");
|
||||
// No value for arg 3 to test setNull
|
||||
put("cql.args.4.type", "map<text,text>");
|
||||
put("cql.args.4.value", "{'a':'Hello', 'b':'World'}");
|
||||
put("cql.args.5.type", "list<boolean>");
|
||||
put("cql.args.5.value", "[true,false,true]");
|
||||
put("cql.args.6.type", "set<double>");
|
||||
put("cql.args.6.value", "{1.0, 2.0}");
|
||||
put("cql.args.7.type", "bigint");
|
||||
put("cql.args.7.value", "20000000");
|
||||
put("cql.args.8.type", "float");
|
||||
put("cql.args.8.value", "1.0");
|
||||
put("cql.args.9.type", "blob");
|
||||
put("cql.args.9.value", "0xDEADBEEF");
|
||||
put("cql.args.10.type", "uuid");
|
||||
put("cql.args.10.value", "bad-uuid");
|
||||
|
||||
}
|
||||
});
|
||||
|
||||
testRunner.run(1, true, true);
|
||||
testRunner.assertAllFlowFilesTransferred(PutCassandraQL.REL_FAILURE, 1);
|
||||
testRunner.clearTransferState();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProcessorInvalidQueryException() {
|
||||
setUpStandardTestConfig();
|
||||
|
|
Loading…
Reference in New Issue