From e210172d93d6d5be45c061039bb7e0a2896c6e14 Mon Sep 17 00:00:00 2001 From: Peter Wicks Date: Wed, 17 Aug 2016 09:06:09 -0600 Subject: [PATCH] PutSQLBinary Signed-off-by: Matt Burgess NIFI-2591 Signed-off-by: Matt Burgess NIFI-2591 Signed-off-by: Matt Burgess NIFI-2591 - Added Format option for binary data types. Updated unit tests. Signed-off-by: Matt Burgess This closes #883 --- .../nifi/processors/standard/PutSQL.java | 180 +++++++++++------- .../nifi/processors/standard/TestPutSQL.java | 161 +++++++++++++++- 2 files changed, 267 insertions(+), 74 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java index 910112e5d0..8e87c56727 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java @@ -39,8 +39,11 @@ import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.stream.io.StreamUtils; +import javax.xml.bind.DatatypeConverter; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.UnsupportedEncodingException; import java.math.BigDecimal; import java.nio.charset.StandardCharsets; import java.sql.BatchUpdateException; @@ -76,78 +79,84 @@ import java.util.regex.Pattern; @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"sql", "put", "rdbms", "database", "update", "insert", "relational"}) @CapabilityDescription("Executes a SQL UPDATE or INSERT command. The content of an incoming FlowFile is expected to be the SQL command " - + "to execute. The SQL command may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes " - + "with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be " - + "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format.") + + "to execute. The SQL command may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes " + + "with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be " + + "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format.") @ReadsAttributes({ - @ReadsAttribute(attribute = "fragment.identifier", description = "If the property is true, this attribute is used to determine whether or " - + "not two FlowFiles belong to the same transaction."), - @ReadsAttribute(attribute = "fragment.count", description = "If the property is true, this attribute is used to determine how many FlowFiles " - + "are needed to complete the transaction."), - @ReadsAttribute(attribute = "fragment.index", description = "If the property is true, this attribute is used to determine the order that the FlowFiles " - + "in a transaction should be evaluated."), - @ReadsAttribute(attribute = "sql.args.N.type", description = "Incoming FlowFiles are expected to be parameterized SQL statements. The type of each Parameter is specified as an integer " - + "that represents the JDBC Type of the parameter."), - @ReadsAttribute(attribute = "sql.args.N.value", description = "Incoming FlowFiles are expected to be parameterized SQL statements. The value of the Parameters are specified as " - + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute.") + @ReadsAttribute(attribute = "fragment.identifier", description = "If the property is true, this attribute is used to determine whether or " + + "not two FlowFiles belong to the same transaction."), + @ReadsAttribute(attribute = "fragment.count", description = "If the property is true, this attribute is used to determine how many FlowFiles " + + "are needed to complete the transaction."), + @ReadsAttribute(attribute = "fragment.index", description = "If the property is true, this attribute is used to determine the order that the FlowFiles " + + "in a transaction should be evaluated."), + @ReadsAttribute(attribute = "sql.args.N.type", description = "Incoming FlowFiles are expected to be parameterized SQL statements. The type of each Parameter is specified as an integer " + + "that represents the JDBC Type of the parameter."), + @ReadsAttribute(attribute = "sql.args.N.value", description = "Incoming FlowFiles are expected to be parameterized SQL statements. The value of the Parameters are specified as " + + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute."), + @ReadsAttribute(attribute = "sql.args.N.format", description = "This attribute is always optional, but default options may not always work for your data. " + + "Incoming FlowFiles are expected to be parameterized SQL statements. In some cases " + + "a format option needs to be specified, currently this is only applicable for binary data types. For binary data types " + + "available options are 'ascii', 'base64' and 'hex'. In 'ascii' format each string character in your attribute value represents a single byte, this is the default format " + + "and the format provided by Avro Processors. In 'base64' format your string is a Base64 encoded string. In 'hex' format the string is hex encoded with all " + + "letters in upper case and no '0x' at the beginning.") }) @WritesAttributes({ - @WritesAttribute(attribute = "sql.generated.key", description = "If the database generated a key for an INSERT statement and the Obtain Generated Keys property is set to true, " - + "this attribute will be added to indicate the generated key, if possible. This feature is not supported by all database vendors.") + @WritesAttribute(attribute = "sql.generated.key", description = "If the database generated a key for an INSERT statement and the Obtain Generated Keys property is set to true, " + + "this attribute will be added to indicate the generated key, if possible. This feature is not supported by all database vendors.") }) public class PutSQL extends AbstractProcessor { static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder() - .name("JDBC Connection Pool") - .description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. " - + "The Connection Pool is necessary in order to determine the appropriate database column types.") - .identifiesControllerService(DBCPService.class) - .required(true) - .build(); + .name("JDBC Connection Pool") + .description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. " + + "The Connection Pool is necessary in order to determine the appropriate database column types.") + .identifiesControllerService(DBCPService.class) + .required(true) + .build(); static final PropertyDescriptor SUPPORT_TRANSACTIONS = new PropertyDescriptor.Builder() - .name("Support Fragmented Transactions") - .description("If true, when a FlowFile is consumed by this Processor, the Processor will first check the fragment.identifier and fragment.count attributes of that FlowFile. " - + "If the fragment.count value is greater than 1, the Processor will not process any FlowFile will that fragment.identifier until all are available; " - + "at that point, it will process all FlowFiles with that fragment.identifier as a single transaction, in the order specified by the FlowFiles' fragment.index attributes. " - + "This Provides atomicity of those SQL statements. If this value is false, these attributes will be ignored and the updates will occur independent of one another.") - .allowableValues("true", "false") - .defaultValue("true") - .build(); + .name("Support Fragmented Transactions") + .description("If true, when a FlowFile is consumed by this Processor, the Processor will first check the fragment.identifier and fragment.count attributes of that FlowFile. " + + "If the fragment.count value is greater than 1, the Processor will not process any FlowFile will that fragment.identifier until all are available; " + + "at that point, it will process all FlowFiles with that fragment.identifier as a single transaction, in the order specified by the FlowFiles' fragment.index attributes. " + + "This Provides atomicity of those SQL statements. If this value is false, these attributes will be ignored and the updates will occur independent of one another.") + .allowableValues("true", "false") + .defaultValue("true") + .build(); static final PropertyDescriptor TRANSACTION_TIMEOUT = new PropertyDescriptor.Builder() - .name("Transaction Timeout") - .description("If the property is set to true, specifies how long to wait for all FlowFiles for a particular fragment.identifier attribute " - + "to arrive before just transferring all of the FlowFiles with that identifier to the 'failure' relationship") - .required(false) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .build(); + .name("Transaction Timeout") + .description("If the property is set to true, specifies how long to wait for all FlowFiles for a particular fragment.identifier attribute " + + "to arrive before just transferring all of the FlowFiles with that identifier to the 'failure' relationship") + .required(false) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() - .name("Batch Size") - .description("The preferred number of FlowFiles to put to the database in a single transaction") - .required(true) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .defaultValue("100") - .build(); + .name("Batch Size") + .description("The preferred number of FlowFiles to put to the database in a single transaction") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("100") + .build(); static final PropertyDescriptor OBTAIN_GENERATED_KEYS = new PropertyDescriptor.Builder() - .name("Obtain Generated Keys") - .description("If true, any key that is automatically generated by the database will be added to the FlowFile that generated it using the sql.generate.key attribute. " - + "This may result in slightly slower performance and is not supported by all databases.") - .allowableValues("true", "false") - .defaultValue("false") - .build(); + .name("Obtain Generated Keys") + .description("If true, any key that is automatically generated by the database will be added to the FlowFile that generated it using the sql.generate.key attribute. " + + "This may result in slightly slower performance and is not supported by all databases.") + .allowableValues("true", "false") + .defaultValue("false") + .build(); static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("A FlowFile is routed to this relationship after the database is successfully updated") - .build(); + .name("success") + .description("A FlowFile is routed to this relationship after the database is successfully updated") + .build(); static final Relationship REL_RETRY = new Relationship.Builder() - .name("retry") - .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed") - .build(); + .name("retry") + .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed") + .build(); static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail, " - + "such as an invalid query or an integrity constraint violation") - .build(); + .name("failure") + .description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail, " + + "such as an invalid query or an integrity constraint violation") + .build(); private static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("sql\\.args\\.(\\d+)\\.type"); private static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+"); @@ -294,7 +303,7 @@ public class PutSQL extends AbstractProcessor { conn.rollback(); final FlowFile offendingFlowFile = batchFlowFiles.get(offendingFlowFileIndex); getLogger().error("Failed to update database due to a failed batch update. A total of {} FlowFiles are required for this transaction, so routing all to failure. " - + "Offending FlowFile was {}, which caused the following error: {}", new Object[] {flowFiles.size(), offendingFlowFile, e}); + + "Offending FlowFile was {}, which caused the following error: {}", new Object[] {flowFiles.size(), offendingFlowFile, e}); session.transfer(flowFiles, REL_FAILURE); return; } @@ -337,7 +346,7 @@ public class PutSQL extends AbstractProcessor { } getLogger().error("Failed to update database due to a failed batch update. There were a total of {} FlowFiles that failed, {} that succeeded, " - + "and {} that were not execute and will be routed to retry; ", new Object[] {failureCount, successCount, retryCount}); + + "and {} that were not execute and will be routed to retry; ", new Object[] {failureCount, successCount, retryCount}); } catch (final SQLNonTransientException e) { getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {enclosure.getFlowFiles(), e}); @@ -347,7 +356,7 @@ public class PutSQL extends AbstractProcessor { continue; } catch (final SQLException e) { getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", - new Object[] {enclosure.getFlowFiles(), e}); + new Object[] {enclosure.getFlowFiles(), e}); for (final FlowFile flowFile : enclosure.getFlowFiles()) { destinationRelationships.put(flowFile, REL_RETRY); @@ -522,7 +531,7 @@ public class PutSQL extends AbstractProcessor { * @throws SQLException if unable to create the appropriate PreparedStatement */ private StatementFlowFileEnclosure getEnclosure(final String sql, final Connection conn, final Map stmtMap, - final boolean obtainKeys, final boolean fragmentedTransaction) throws SQLException { + final boolean obtainKeys, final boolean fragmentedTransaction) throws SQLException { StatementFlowFileEnclosure enclosure = stmtMap.get(sql); if (enclosure != null) { return enclosure; @@ -610,13 +619,17 @@ public class PutSQL extends AbstractProcessor { final int jdbcType = Integer.parseInt(entry.getValue()); final String valueAttrName = "sql.args." + parameterIndex + ".value"; final String parameterValue = attributes.get(valueAttrName); + final String formatAttrName = "sql.args." + parameterIndex + ".format"; + final String parameterFormat = attributes.containsKey(formatAttrName)? attributes.get(formatAttrName):""; try { - setParameter(stmt, valueAttrName, parameterIndex, parameterValue, jdbcType); + setParameter(stmt, valueAttrName, parameterIndex, parameterValue, jdbcType, parameterFormat); } catch (final NumberFormatException nfe) { throw new ProcessException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted into the necessary data type", nfe); } catch (ParseException pe) { throw new ProcessException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted to a timestamp", pe); + } catch (UnsupportedEncodingException uee) { + throw new ProcessException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted to UTF-8", uee); } } } @@ -644,7 +657,7 @@ public class PutSQL extends AbstractProcessor { return null; } else if (fragmentCount == null) { getLogger().error("Cannot process {} because there are {} FlowFiles with the same fragment.identifier " - + "attribute but not all FlowFiles have a fragment.count attribute; routing all to failure", new Object[] {flowFile, flowFiles.size()}); + + "attribute but not all FlowFiles have a fragment.count attribute; routing all to failure", new Object[] {flowFile, flowFiles.size()}); return REL_FAILURE; } @@ -653,13 +666,13 @@ public class PutSQL extends AbstractProcessor { numFragments = Integer.parseInt(fragmentCount); } catch (final NumberFormatException nfe) { getLogger().error("Cannot process {} because the fragment.count attribute has a value of '{}', which is not an integer; " - + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount}); + + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount}); return REL_FAILURE; } if (numFragments < 1) { getLogger().error("Cannot process {} because the fragment.count attribute has a value of '{}', which is not a positive integer; " - + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount}); + + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount}); return REL_FAILURE; } @@ -667,14 +680,14 @@ public class PutSQL extends AbstractProcessor { selectedNumFragments = numFragments; } else if (numFragments != selectedNumFragments) { getLogger().error("Cannot process {} because the fragment.count attribute has different values for different FlowFiles with the same fragment.identifier; " - + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile}); + + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile}); return REL_FAILURE; } final String fragmentIndex = flowFile.getAttribute(FRAGMENT_INDEX_ATTR); if (fragmentIndex == null) { getLogger().error("Cannot process {} because the fragment.index attribute is missing; " - + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile}); + + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile}); return REL_FAILURE; } @@ -683,19 +696,19 @@ public class PutSQL extends AbstractProcessor { idx = Integer.parseInt(fragmentIndex); } catch (final NumberFormatException nfe) { getLogger().error("Cannot process {} because the fragment.index attribute has a value of '{}', which is not an integer; " - + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex}); + + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex}); return REL_FAILURE; } if (idx < 0) { getLogger().error("Cannot process {} because the fragment.index attribute has a value of '{}', which is not a positive integer; " - + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex}); + + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex}); return REL_FAILURE; } if (bitSet.get(idx)) { getLogger().error("Cannot process {} because it has the same value for the fragment.index attribute as another FlowFile with the same fragment.identifier; " - + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile}); + + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile}); return REL_FAILURE; } @@ -735,7 +748,9 @@ public class PutSQL extends AbstractProcessor { * @param jdbcType the JDBC Type of the SQL parameter to set * @throws SQLException if the PreparedStatement throws a SQLException when calling the appropriate setter */ - private void setParameter(final PreparedStatement stmt, final String attrName, final int parameterIndex, final String parameterValue, final int jdbcType) throws SQLException, ParseException { + private void setParameter(final PreparedStatement stmt, final String attrName, final int parameterIndex, final String parameterValue, final int jdbcType, + final String valueFormat) + throws SQLException, ParseException, UnsupportedEncodingException { if (parameterValue == null) { stmt.setNull(parameterIndex, jdbcType); } else { @@ -786,6 +801,29 @@ public class PutSQL extends AbstractProcessor { stmt.setTimestamp(parameterIndex, new Timestamp(lTimestamp)); + break; + case Types.BINARY: + case Types.VARBINARY: + case Types.LONGVARBINARY: + byte[] bValue; + + switch(valueFormat){ + case "": + case "ascii": + bValue = parameterValue.getBytes("ASCII"); + break; + case "hex": + bValue = DatatypeConverter.parseHexBinary(parameterValue); + break; + case "base64": + bValue = DatatypeConverter.parseBase64Binary(parameterValue); + break; + default: + throw new ParseException("Unable to parse binary data using the formatter `" + valueFormat + "`.",0); + } + + stmt.setBinaryStream(parameterIndex, new ByteArrayInputStream(bValue), bValue.length); + break; case Types.CHAR: case Types.VARCHAR: diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java index 533961c71d..321bac783f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; @@ -30,9 +31,11 @@ import java.sql.Statement; import java.sql.Types; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import org.apache.commons.lang3.RandomUtils; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.processor.exception.ProcessException; @@ -46,6 +49,8 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.mockito.Mockito; +import javax.xml.bind.DatatypeConverter; + public class TestPutSQL { private static final String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)"; private static final String createPersonsAutoId = "CREATE TABLE PERSONS_AI (id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1), name VARCHAR(100), code INTEGER check(code <= 100))"; @@ -304,6 +309,136 @@ public class TestPutSQL { } } + @Test + public void testBinaryColumnTypes() throws InitializationException, ProcessException, SQLException, IOException, ParseException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate("CREATE TABLE BINARYTESTS (id integer primary key, bn1 CHAR(8) FOR BIT DATA, bn2 VARCHAR(100) FOR BIT DATA, " + + "bn3 LONG VARCHAR FOR BIT DATA)"); + } + } + + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + + final byte[] insertStatement = "INSERT INTO BINARYTESTS (ID, bn1, bn2, bn3) VALUES (?, ?, ?, ?)".getBytes(); + + final String arg2BIN = fixedSizeByteArrayAsASCIIString(8); + final String art3VARBIN = fixedSizeByteArrayAsASCIIString(50); + final String art4LongBin = fixedSizeByteArrayAsASCIIString(32700); //max size supported by Derby + + //ASCII (default) binary formatn + Map attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.1.value", "1"); + attributes.put("sql.args.2.type", String.valueOf(Types.BINARY)); + attributes.put("sql.args.2.value", arg2BIN); + attributes.put("sql.args.3.type", String.valueOf(Types.VARBINARY)); + attributes.put("sql.args.3.value", art3VARBIN); + attributes.put("sql.args.4.type", String.valueOf(Types.LONGVARBINARY)); + attributes.put("sql.args.4.value", art4LongBin); + + runner.enqueue(insertStatement, attributes); + + //ASCII with specified format + attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.1.value", "2"); + attributes.put("sql.args.2.type", String.valueOf(Types.BINARY)); + attributes.put("sql.args.2.value", arg2BIN); + attributes.put("sql.args.2.format", "ascii"); + attributes.put("sql.args.3.type", String.valueOf(Types.VARBINARY)); + attributes.put("sql.args.3.value", art3VARBIN); + attributes.put("sql.args.3.format", "ascii"); + attributes.put("sql.args.4.type", String.valueOf(Types.LONGVARBINARY)); + attributes.put("sql.args.4.value", art4LongBin); + attributes.put("sql.args.4.format", "ascii"); + + runner.enqueue(insertStatement, attributes); + + //Hex + final String arg2HexBIN = fixedSizeByteArrayAsHexString(8); + final String art3HexVARBIN = fixedSizeByteArrayAsHexString(50); + final String art4HexLongBin = fixedSizeByteArrayAsHexString(32700); + + attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.1.value", "3"); + attributes.put("sql.args.2.type", String.valueOf(Types.BINARY)); + attributes.put("sql.args.2.value", arg2HexBIN); + attributes.put("sql.args.2.format", "hex"); + attributes.put("sql.args.3.type", String.valueOf(Types.VARBINARY)); + attributes.put("sql.args.3.value", art3HexVARBIN); + attributes.put("sql.args.3.format", "hex"); + attributes.put("sql.args.4.type", String.valueOf(Types.LONGVARBINARY)); + attributes.put("sql.args.4.value", art4HexLongBin); + attributes.put("sql.args.4.format", "hex"); + + runner.enqueue(insertStatement, attributes); + + //Base64 + final String arg2Base64BIN = fixedSizeByteArrayAsBase64String(8); + final String art3Base64VARBIN = fixedSizeByteArrayAsBase64String(50); + final String art4Base64LongBin = fixedSizeByteArrayAsBase64String(32700); + + attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.1.value", "4"); + attributes.put("sql.args.2.type", String.valueOf(Types.BINARY)); + attributes.put("sql.args.2.value", arg2Base64BIN); + attributes.put("sql.args.2.format", "base64"); + attributes.put("sql.args.3.type", String.valueOf(Types.VARBINARY)); + attributes.put("sql.args.3.value", art3Base64VARBIN); + attributes.put("sql.args.3.format", "base64"); + attributes.put("sql.args.4.type", String.valueOf(Types.LONGVARBINARY)); + attributes.put("sql.args.4.value", art4Base64LongBin); + attributes.put("sql.args.4.format", "base64"); + + runner.enqueue(insertStatement, attributes); + + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 4); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM BINARYTESTS"); + + //First Batch + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertTrue(Arrays.equals(arg2BIN.getBytes("ASCII"), rs.getBytes(2))); + assertTrue(Arrays.equals(art3VARBIN.getBytes("ASCII"), rs.getBytes(3))); + assertTrue(Arrays.equals(art4LongBin.getBytes("ASCII"), rs.getBytes(4))); + + //Second batch + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + assertTrue(Arrays.equals(arg2BIN.getBytes("ASCII"), rs.getBytes(2))); + assertTrue(Arrays.equals(art3VARBIN.getBytes("ASCII"), rs.getBytes(3))); + assertTrue(Arrays.equals(art4LongBin.getBytes("ASCII"), rs.getBytes(4))); + + //Third Batch (Hex) + assertTrue(rs.next()); + assertEquals(3, rs.getInt(1)); + assertTrue(Arrays.equals(DatatypeConverter.parseHexBinary(arg2HexBIN), rs.getBytes(2))); + assertTrue(Arrays.equals(DatatypeConverter.parseHexBinary(art3HexVARBIN), rs.getBytes(3))); + assertTrue(Arrays.equals(DatatypeConverter.parseHexBinary(art4HexLongBin), rs.getBytes(4))); + + //Fourth Batch (Base64) + assertTrue(rs.next()); + assertEquals(4, rs.getInt(1)); + assertTrue(Arrays.equals(DatatypeConverter.parseBase64Binary(arg2Base64BIN), rs.getBytes(2))); + assertTrue(Arrays.equals(DatatypeConverter.parseBase64Binary(art3Base64VARBIN), rs.getBytes(3))); + assertTrue(Arrays.equals(DatatypeConverter.parseBase64Binary(art4Base64LongBin), rs.getBytes(4))); + + assertFalse(rs.next()); + } + } + } + @Test public void testStatementsWithPreparedParameters() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); @@ -373,7 +508,7 @@ public class TestPutSQL { runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + - "UPDATE PERSONS SET NAME='George' WHERE ID=?; "; + "UPDATE PERSONS SET NAME='George' WHERE ID=?; "; final Map attributes = new HashMap<>(); attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); attributes.put("sql.args.1.value", "1"); @@ -444,7 +579,7 @@ public class TestPutSQL { recreateTable("PERSONS", createPersons); final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + - "UPDATE SOME_RANDOM_TABLE NAME='George' WHERE ID=?; "; + "UPDATE SOME_RANDOM_TABLE NAME='George' WHERE ID=?; "; final Map attributes = new HashMap<>(); attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); attributes.put("sql.args.1.value", "1"); @@ -483,7 +618,7 @@ public class TestPutSQL { runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + - "UPDATE PERSONS SET NAME='George' WHERE ID=?; "; + "UPDATE PERSONS SET NAME='George' WHERE ID=?; "; final Map attributes = new HashMap<>(); attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); attributes.put("sql.args.1.value", "1"); @@ -671,4 +806,24 @@ public class TestPutSQL { } } } + + private String fixedSizeByteArrayAsASCIIString(int length){ + byte[] bBinary = RandomUtils.nextBytes(length); + ByteBuffer bytes = ByteBuffer.wrap(bBinary); + StringBuffer sbBytes = new StringBuffer(); + for (int i = bytes.position(); i < bytes.limit(); i++) + sbBytes.append((char)bytes.get(i)); + + return sbBytes.toString(); + } + + private String fixedSizeByteArrayAsHexString(int length){ + byte[] bBinary = RandomUtils.nextBytes(length); + return DatatypeConverter.printHexBinary(bBinary); + } + + private String fixedSizeByteArrayAsBase64String(int length){ + byte[] bBinary = RandomUtils.nextBytes(length); + return DatatypeConverter.printBase64Binary(bBinary); + } }