diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java index 1f60208f40..26b4281cd0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java @@ -77,10 +77,12 @@ import java.sql.SQLTransientException; import java.sql.Statement; import java.sql.Types; import java.util.ArrayList; +import java.util.Base64; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.HexFormat; import java.util.List; import java.util.Map; import java.util.Optional; @@ -238,6 +240,34 @@ public class PutDatabaseRecord extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + static final AllowableValue BINARY_STRING_FORMAT_UTF8 = new AllowableValue( + "UTF-8", + "UTF-8", + "String values for binary columns contain the original value as text via UTF-8 character encoding" + ); + + static final AllowableValue BINARY_STRING_FORMAT_HEXADECIMAL = new AllowableValue( + "Hexadecimal", + "Hexadecimal", + "String values for binary columns contain the original value in hexadecimal format" + ); + + static final AllowableValue BINARY_STRING_FORMAT_BASE64 = new AllowableValue( + "Base64", + "Base64", + "String values for binary columns contain the original value in Base64 encoded format" + ); + + static final PropertyDescriptor BINARY_STRING_FORMAT = new Builder() + .name("put-db-record-binary-format") + .displayName("Binary String Format") + .description("The format to be applied when decoding string values to binary.") + .required(true) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .allowableValues(BINARY_STRING_FORMAT_UTF8, BINARY_STRING_FORMAT_HEXADECIMAL, BINARY_STRING_FORMAT_BASE64) + .defaultValue(BINARY_STRING_FORMAT_UTF8.getValue()) + .build(); + static final PropertyDescriptor TRANSLATE_FIELD_NAMES = new Builder() .name("put-db-record-translate-field-names") .displayName("Translate Field Names") @@ -388,6 +418,7 @@ public class PutDatabaseRecord extends AbstractProcessor { pds.add(CATALOG_NAME); pds.add(SCHEMA_NAME); pds.add(TABLE_NAME); + pds.add(BINARY_STRING_FORMAT); pds.add(TRANSLATE_FIELD_NAMES); pds.add(UNMATCHED_FIELD_BEHAVIOR); pds.add(UNMATCHED_COLUMN_BEHAVIOR); @@ -606,6 +637,8 @@ public class PutDatabaseRecord extends AbstractProcessor { final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions(flowFile).asInteger(); final int timeoutMillis = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + final String binaryStringFormat = context.getProperty(BINARY_STRING_FORMAT).evaluateAttributeExpressions(flowFile).getValue(); + // Ensure the table name has been set, the generated SQL statements (and TableSchema cache) will need it if (StringUtils.isEmpty(tableName)) { throw new IllegalArgumentException(format("Cannot process %s because Table Name is null or empty", flowFile)); @@ -764,7 +797,15 @@ public class PutDatabaseRecord extends AbstractProcessor { } currentValue = dest; } else if (currentValue instanceof String) { - currentValue = ((String) currentValue).getBytes(StandardCharsets.UTF_8); + final String stringValue = (String) currentValue; + + if (BINARY_STRING_FORMAT_BASE64.getValue().equals(binaryStringFormat)) { + currentValue = Base64.getDecoder().decode(stringValue); + } else if (BINARY_STRING_FORMAT_HEXADECIMAL.getValue().equals(binaryStringFormat)) { + currentValue = HexFormat.of().parseHex(stringValue); + } else { + currentValue = stringValue.getBytes(StandardCharsets.UTF_8); + } } else if (currentValue != null && !(currentValue instanceof byte[])) { throw new IllegalTypeConversionException("Cannot convert value " + currentValue + " to BLOB/BINARY/VARBINARY/LONGVARBINARY"); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java index e6483efc7f..70a908b377 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java @@ -61,6 +61,7 @@ import java.time.LocalDate; import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Arrays; +import java.util.Base64; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -1686,6 +1687,88 @@ public class PutDatabaseRecordTest { conn.close(); } + @Test + void testInsertHexStringIntoBinary() throws Exception { + runner.setProperty(PutDatabaseRecord.BINARY_STRING_FORMAT, PutDatabaseRecord.BINARY_STRING_FORMAT_HEXADECIMAL); + + String tableName = "HEX_STRING_TEST"; + String createTable = "CREATE TABLE " + tableName + " (id integer primary key, binary_data blob)"; + String hexStringData = "abCDef"; + + recreateTable(tableName, createTable); + final MockRecordParser parser = new MockRecordParser(); + runner.addControllerService("parser", parser); + runner.enableControllerService(parser); + + parser.addSchemaField("id", RecordFieldType.INT); + parser.addSchemaField("binaryData", RecordFieldType.STRING); + + parser.addRecord(1, hexStringData); + + runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser"); + runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE); + runner.setProperty(PutDatabaseRecord.TABLE_NAME, tableName); + + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1); + final Connection conn = dbcp.getConnection(); + final Statement stmt = conn.createStatement(); + + final ResultSet resultSet = stmt.executeQuery("SELECT * FROM " + tableName); + assertTrue(resultSet.next()); + + assertEquals(1, resultSet.getInt(1)); + + Blob blob = resultSet.getBlob(2); + assertArrayEquals(new byte[]{(byte)171, (byte)205, (byte)239}, blob.getBytes(1, (int)blob.length())); + + stmt.close(); + conn.close(); + } + + @Test + void testInsertBase64StringIntoBinary() throws Exception { + runner.setProperty(PutDatabaseRecord.BINARY_STRING_FORMAT, PutDatabaseRecord.BINARY_STRING_FORMAT_BASE64); + + String tableName = "BASE64_STRING_TEST"; + String createTable = "CREATE TABLE " + tableName + " (id integer primary key, binary_data blob)"; + byte[] binaryData = {(byte) 10, (byte) 103, (byte) 234}; + + recreateTable(tableName, createTable); + final MockRecordParser parser = new MockRecordParser(); + runner.addControllerService("parser", parser); + runner.enableControllerService(parser); + + parser.addSchemaField("id", RecordFieldType.INT); + parser.addSchemaField("binaryData", RecordFieldType.STRING); + + parser.addRecord(1, Base64.getEncoder().encodeToString(binaryData)); + + runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser"); + runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE); + runner.setProperty(PutDatabaseRecord.TABLE_NAME, tableName); + + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1); + final Connection conn = dbcp.getConnection(); + final Statement stmt = conn.createStatement(); + + final ResultSet resultSet = stmt.executeQuery("SELECT * FROM " + tableName); + assertTrue(resultSet.next()); + + assertEquals(1, resultSet.getInt(1)); + + Blob blob = resultSet.getBlob(2); + assertArrayEquals(binaryData, blob.getBytes(1, (int)blob.length())); + + stmt.close(); + conn.close(); + } + @Test void testInsertWithBlobClobObjectArraySource() throws Exception { String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary key, name clob," + @@ -1959,10 +2042,14 @@ public class PutDatabaseRecordTest { } private void recreateTable(String createSQL) throws ProcessException, SQLException { + recreateTable("PERSONS", createSQL); + } + + private void recreateTable(String tableName, String createSQL) throws ProcessException, SQLException { final Connection conn = dbcp.getConnection(); final Statement stmt = conn.createStatement(); try { - stmt.execute("drop table PERSONS"); + stmt.execute("drop table " + tableName); } catch (SQLException ignore) { // Do nothing, may not have existed }