NIFI-12887 Added Binary String Format property to PutDatabaseRecord

- Supports handling Strings as hexadecimal character sequences or base64-encoded binary data when inserting into a binary type column

This closes #8493

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
tpalfy 2024-03-12 13:33:01 +01:00 committed by exceptionfactory
parent 27fa595b25
commit 42bd5243bb
No known key found for this signature in database
2 changed files with 130 additions and 2 deletions

View File

@ -77,10 +77,12 @@ import java.sql.SQLTransientException;
import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.HexFormat;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -238,6 +240,34 @@ public class PutDatabaseRecord extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final AllowableValue BINARY_STRING_FORMAT_UTF8 = new AllowableValue(
"UTF-8",
"UTF-8",
"String values for binary columns contain the original value as text via UTF-8 character encoding"
);
static final AllowableValue BINARY_STRING_FORMAT_HEXADECIMAL = new AllowableValue(
"Hexadecimal",
"Hexadecimal",
"String values for binary columns contain the original value in hexadecimal format"
);
static final AllowableValue BINARY_STRING_FORMAT_BASE64 = new AllowableValue(
"Base64",
"Base64",
"String values for binary columns contain the original value in Base64 encoded format"
);
static final PropertyDescriptor BINARY_STRING_FORMAT = new Builder()
.name("put-db-record-binary-format")
.displayName("Binary String Format")
.description("The format to be applied when decoding string values to binary.")
.required(true)
.expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.allowableValues(BINARY_STRING_FORMAT_UTF8, BINARY_STRING_FORMAT_HEXADECIMAL, BINARY_STRING_FORMAT_BASE64)
.defaultValue(BINARY_STRING_FORMAT_UTF8.getValue())
.build();
static final PropertyDescriptor TRANSLATE_FIELD_NAMES = new Builder()
.name("put-db-record-translate-field-names")
.displayName("Translate Field Names")
@ -388,6 +418,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
pds.add(CATALOG_NAME);
pds.add(SCHEMA_NAME);
pds.add(TABLE_NAME);
pds.add(BINARY_STRING_FORMAT);
pds.add(TRANSLATE_FIELD_NAMES);
pds.add(UNMATCHED_FIELD_BEHAVIOR);
pds.add(UNMATCHED_COLUMN_BEHAVIOR);
@ -606,6 +637,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions(flowFile).asInteger();
final int timeoutMillis = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final String binaryStringFormat = context.getProperty(BINARY_STRING_FORMAT).evaluateAttributeExpressions(flowFile).getValue();
// Ensure the table name has been set, the generated SQL statements (and TableSchema cache) will need it
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException(format("Cannot process %s because Table Name is null or empty", flowFile));
@ -764,7 +797,15 @@ public class PutDatabaseRecord extends AbstractProcessor {
}
currentValue = dest;
} else if (currentValue instanceof String) {
currentValue = ((String) currentValue).getBytes(StandardCharsets.UTF_8);
final String stringValue = (String) currentValue;
if (BINARY_STRING_FORMAT_BASE64.getValue().equals(binaryStringFormat)) {
currentValue = Base64.getDecoder().decode(stringValue);
} else if (BINARY_STRING_FORMAT_HEXADECIMAL.getValue().equals(binaryStringFormat)) {
currentValue = HexFormat.of().parseHex(stringValue);
} else {
currentValue = stringValue.getBytes(StandardCharsets.UTF_8);
}
} else if (currentValue != null && !(currentValue instanceof byte[])) {
throw new IllegalTypeConversionException("Cannot convert value " + currentValue + " to BLOB/BINARY/VARBINARY/LONGVARBINARY");
}

View File

@ -61,6 +61,7 @@ import java.time.LocalDate;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -1686,6 +1687,88 @@ public class PutDatabaseRecordTest {
conn.close();
}
@Test
void testInsertHexStringIntoBinary() throws Exception {
runner.setProperty(PutDatabaseRecord.BINARY_STRING_FORMAT, PutDatabaseRecord.BINARY_STRING_FORMAT_HEXADECIMAL);
String tableName = "HEX_STRING_TEST";
String createTable = "CREATE TABLE " + tableName + " (id integer primary key, binary_data blob)";
String hexStringData = "abCDef";
recreateTable(tableName, createTable);
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
runner.enableControllerService(parser);
parser.addSchemaField("id", RecordFieldType.INT);
parser.addSchemaField("binaryData", RecordFieldType.STRING);
parser.addRecord(1, hexStringData);
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE);
runner.setProperty(PutDatabaseRecord.TABLE_NAME, tableName);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1);
final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement();
final ResultSet resultSet = stmt.executeQuery("SELECT * FROM " + tableName);
assertTrue(resultSet.next());
assertEquals(1, resultSet.getInt(1));
Blob blob = resultSet.getBlob(2);
assertArrayEquals(new byte[]{(byte)171, (byte)205, (byte)239}, blob.getBytes(1, (int)blob.length()));
stmt.close();
conn.close();
}
@Test
void testInsertBase64StringIntoBinary() throws Exception {
runner.setProperty(PutDatabaseRecord.BINARY_STRING_FORMAT, PutDatabaseRecord.BINARY_STRING_FORMAT_BASE64);
String tableName = "BASE64_STRING_TEST";
String createTable = "CREATE TABLE " + tableName + " (id integer primary key, binary_data blob)";
byte[] binaryData = {(byte) 10, (byte) 103, (byte) 234};
recreateTable(tableName, createTable);
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
runner.enableControllerService(parser);
parser.addSchemaField("id", RecordFieldType.INT);
parser.addSchemaField("binaryData", RecordFieldType.STRING);
parser.addRecord(1, Base64.getEncoder().encodeToString(binaryData));
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE);
runner.setProperty(PutDatabaseRecord.TABLE_NAME, tableName);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1);
final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement();
final ResultSet resultSet = stmt.executeQuery("SELECT * FROM " + tableName);
assertTrue(resultSet.next());
assertEquals(1, resultSet.getInt(1));
Blob blob = resultSet.getBlob(2);
assertArrayEquals(binaryData, blob.getBytes(1, (int)blob.length()));
stmt.close();
conn.close();
}
@Test
void testInsertWithBlobClobObjectArraySource() throws Exception {
String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary key, name clob," +
@ -1959,10 +2042,14 @@ public class PutDatabaseRecordTest {
}
private void recreateTable(String createSQL) throws ProcessException, SQLException {
recreateTable("PERSONS", createSQL);
}
private void recreateTable(String tableName, String createSQL) throws ProcessException, SQLException {
final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement();
try {
stmt.execute("drop table PERSONS");
stmt.execute("drop table " + tableName);
} catch (SQLException ignore) {
// Do nothing, may not have existed
}