NIFI-11691 Support VARBINARY and LONGVARBINARY in PutDatabaseRecord

This closes #7380

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Matt Burgess 2023-06-14 15:48:41 -04:00 committed by exceptionfactory
parent c037c693bd
commit 760949922c
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
2 changed files with 77 additions and 3 deletions

View File

@ -750,13 +750,13 @@ public class PutDatabaseRecord extends AbstractProcessor {
targetDataType = DataTypeUtils.getDataTypeFromSQLTypeValue(fieldSqlType);
}
if (targetDataType != null) {
if (sqlType == Types.BLOB || sqlType == Types.BINARY) {
if (sqlType == Types.BLOB || sqlType == Types.BINARY || sqlType == Types.VARBINARY || sqlType == Types.LONGVARBINARY) {
if (currentValue instanceof Object[]) {
// Convert Object[Byte] arrays to byte[]
Object[] src = (Object[]) currentValue;
if (src.length > 0) {
if (!(src[0] instanceof Byte)) {
throw new IllegalTypeConversionException("Cannot convert value " + currentValue + " to BLOB/BINARY");
throw new IllegalTypeConversionException("Cannot convert value " + currentValue + " to BLOB/BINARY/VARBINARY/LONGVARBINARY");
}
}
byte[] dest = new byte[src.length];
@ -767,7 +767,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
} else if (currentValue instanceof String) {
currentValue = ((String) currentValue).getBytes(StandardCharsets.UTF_8);
} else if (currentValue != null && !(currentValue instanceof byte[])) {
throw new IllegalTypeConversionException("Cannot convert value " + currentValue + " to BLOB/BINARY");
throw new IllegalTypeConversionException("Cannot convert value " + currentValue + " to BLOB/BINARY/VARBINARY/LONGVARBINARY");
}
} else {
currentValue = DataTypeUtils.convertType(
@ -868,6 +868,35 @@ public class PutDatabaseRecord extends AbstractProcessor {
throw new IOException("Unable to parse data as CLOB/String " + value, e.getCause());
}
}
} else if (sqlType == Types.VARBINARY || sqlType == Types.LONGVARBINARY) {
if (fieldSqlType == Types.ARRAY || fieldSqlType == Types.VARCHAR) {
if (!(value instanceof byte[])) {
if (value == null) {
try {
ps.setNull(index, Types.BLOB);
return;
} catch (SQLException e) {
throw new IOException("Unable to setNull() on prepared statement" , e);
}
} else {
throw new IOException("Expected VARBINARY/LONGVARBINARY to be of type byte[] but is instead " + value.getClass().getName());
}
}
byte[] byteArray = (byte[]) value;
try {
ps.setBytes(index, byteArray);
} catch (SQLException e) {
throw new IOException("Unable to parse binary data with size" + byteArray.length, e.getCause());
}
} else {
byte[] byteArray = new byte[0];
try {
byteArray = value.toString().getBytes(StandardCharsets.UTF_8);
ps.setBytes(index, byteArray);
} catch (SQLException e) {
throw new IOException("Unable to parse binary data with size" + byteArray.length, e.getCause());
}
}
} else {
try {
// If the specified field type is OTHER and the SQL type is VARCHAR, the conversion went ok as a string literal but try the OTHER type when setting the parameter. If an error occurs,

View File

@ -65,6 +65,7 @@ import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -95,6 +96,8 @@ public class PutDatabaseRecordTest {
private static final String createUUIDSchema = "CREATE TABLE UUID_TEST (id integer primary key, name VARCHAR(100))";
private static final String createLongVarBinarySchema = "CREATE TABLE LONGVARBINARY_TEST (id integer primary key, name LONG VARCHAR FOR BIT DATA)";
private final static String DB_LOCATION = "target/db_pdr";
TestRunner runner;
@ -1844,6 +1847,48 @@ public class PutDatabaseRecordTest {
conn.close();
}
@Test
void testInsertLongVarBinaryColumn() throws InitializationException, ProcessException, SQLException {
// Manually create and drop the tables and schemas
final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement();
stmt.execute(createLongVarBinarySchema);
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
runner.enableControllerService(parser);
parser.addSchemaField("id", RecordFieldType.INT);
parser.addSchemaField("name", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()).getFieldType());
byte[] longVarBinaryValue1 = new byte[] {97,98,99};
byte[] longVarBinaryValue2 = new byte[] {100,101,102};
parser.addRecord(1, longVarBinaryValue1);
parser.addRecord(2, longVarBinaryValue2);
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE);
runner.setProperty(PutDatabaseRecord.TABLE_NAME, "LONGVARBINARY_TEST");
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1);
ResultSet rs = stmt.executeQuery("SELECT * FROM LONGVARBINARY_TEST");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertArrayEquals(longVarBinaryValue1, rs.getBytes(2));
assertTrue(rs.next());
assertEquals(2, rs.getInt(1));
assertArrayEquals(longVarBinaryValue2, rs.getBytes(2));
assertFalse(rs.next());
// Drop the schemas here so as not to interfere with other tests
stmt.execute("drop table LONGVARBINARY_TEST");
stmt.close();
conn.close();
}
private void recreateTable() throws ProcessException {
try (final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement()) {