From 953327cdf587c6b68765c0d32508873d8a0031e7 Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Wed, 20 Jan 2021 09:48:24 -0500 Subject: [PATCH] NIFI-8156 Fixed byte handling bug in cassandra. Signed-off-by: Pierre Villard This closes #4771. --- .../cassandra/PutCassandraRecord.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java index 37fefa851c..cedeeeb7bc 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java @@ -52,6 +52,7 @@ import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.apache.nifi.util.StopWatch; import java.io.InputStream; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; import java.util.Collection; @@ -419,7 +420,21 @@ public class PutCassandraRecord extends AbstractCassandraProcessor { insertQuery = QueryBuilder.insertInto(cassandraTable); } for (String fieldName : schema.getFieldNames()) { - insertQuery.value(fieldName, recordContentMap.get(fieldName)); + Object value = recordContentMap.get(fieldName); + + if (value != null && value.getClass().isArray()) { + Object[] array = (Object[])value; + + if (array.length > 0 && array[0] instanceof Byte) { + Object[] temp = (Object[]) value; + byte[] newArray = new byte[temp.length]; + for (int x = 0; x < temp.length; x++) { + newArray[x] = (Byte) temp[x]; + } + value = ByteBuffer.wrap(newArray); + } + } + insertQuery.value(fieldName, value); } return insertQuery; }