NIFI-8156 Fixed byte handling bug in cassandra.

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4771.
This commit is contained in:
Mike Thomsen 2021-01-20 09:48:24 -05:00 committed by Pierre Villard
parent fb2a8b5820
commit 953327cdf5
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
1 changed files with 16 additions and 1 deletions

View File

@ -52,6 +52,7 @@ import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StopWatch;
import java.io.InputStream; import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Collection; import java.util.Collection;
@ -419,7 +420,21 @@ public class PutCassandraRecord extends AbstractCassandraProcessor {
insertQuery = QueryBuilder.insertInto(cassandraTable); insertQuery = QueryBuilder.insertInto(cassandraTable);
} }
for (String fieldName : schema.getFieldNames()) { for (String fieldName : schema.getFieldNames()) {
insertQuery.value(fieldName, recordContentMap.get(fieldName)); Object value = recordContentMap.get(fieldName);
if (value != null && value.getClass().isArray()) {
Object[] array = (Object[])value;
if (array.length > 0 && array[0] instanceof Byte) {
Object[] temp = (Object[]) value;
byte[] newArray = new byte[temp.length];
for (int x = 0; x < temp.length; x++) {
newArray[x] = (Byte) temp[x];
}
value = ByteBuffer.wrap(newArray);
}
}
insertQuery.value(fieldName, value);
} }
return insertQuery; return insertQuery;
} }