PutSQLBinary

Signed-off-by: Matt Burgess <mattyb149@apache.org>

NIFI-2591

Signed-off-by: Matt Burgess <mattyb149@apache.org>

NIFI-2591

Signed-off-by: Matt Burgess <mattyb149@apache.org>

NIFI-2591 - Added Format option for binary data types. Updated unit tests.

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #883
This commit is contained in:
Peter Wicks 2016-08-17 09:06:09 -06:00 committed by Matt Burgess
parent afb9a0016f
commit e210172d93
2 changed files with 267 additions and 74 deletions

View File

@ -39,8 +39,11 @@ import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import javax.xml.bind.DatatypeConverter;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.sql.BatchUpdateException;
@ -76,78 +79,84 @@ import java.util.regex.Pattern;
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"sql", "put", "rdbms", "database", "update", "insert", "relational"})
@CapabilityDescription("Executes a SQL UPDATE or INSERT command. The content of an incoming FlowFile is expected to be the SQL command "
+ "to execute. The SQL command may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes "
+ "with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be "
+ "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format.")
+ "to execute. The SQL command may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes "
+ "with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be "
+ "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format.")
@ReadsAttributes({
@ReadsAttribute(attribute = "fragment.identifier", description = "If the <Support Fragment Transactions> property is true, this attribute is used to determine whether or "
+ "not two FlowFiles belong to the same transaction."),
@ReadsAttribute(attribute = "fragment.count", description = "If the <Support Fragment Transactions> property is true, this attribute is used to determine how many FlowFiles "
+ "are needed to complete the transaction."),
@ReadsAttribute(attribute = "fragment.index", description = "If the <Support Fragment Transactions> property is true, this attribute is used to determine the order that the FlowFiles "
+ "in a transaction should be evaluated."),
@ReadsAttribute(attribute = "sql.args.N.type", description = "Incoming FlowFiles are expected to be parameterized SQL statements. The type of each Parameter is specified as an integer "
+ "that represents the JDBC Type of the parameter."),
@ReadsAttribute(attribute = "sql.args.N.value", description = "Incoming FlowFiles are expected to be parameterized SQL statements. The value of the Parameters are specified as "
+ "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute.")
@ReadsAttribute(attribute = "fragment.identifier", description = "If the <Support Fragment Transactions> property is true, this attribute is used to determine whether or "
+ "not two FlowFiles belong to the same transaction."),
@ReadsAttribute(attribute = "fragment.count", description = "If the <Support Fragment Transactions> property is true, this attribute is used to determine how many FlowFiles "
+ "are needed to complete the transaction."),
@ReadsAttribute(attribute = "fragment.index", description = "If the <Support Fragment Transactions> property is true, this attribute is used to determine the order that the FlowFiles "
+ "in a transaction should be evaluated."),
@ReadsAttribute(attribute = "sql.args.N.type", description = "Incoming FlowFiles are expected to be parameterized SQL statements. The type of each Parameter is specified as an integer "
+ "that represents the JDBC Type of the parameter."),
@ReadsAttribute(attribute = "sql.args.N.value", description = "Incoming FlowFiles are expected to be parameterized SQL statements. The value of the Parameters are specified as "
+ "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute."),
@ReadsAttribute(attribute = "sql.args.N.format", description = "This attribute is always optional, but default options may not always work for your data. "
+ "Incoming FlowFiles are expected to be parameterized SQL statements. In some cases "
+ "a format option needs to be specified, currently this is only applicable for binary data types. For binary data types "
+ "available options are 'ascii', 'base64' and 'hex'. In 'ascii' format each string character in your attribute value represents a single byte, this is the default format "
+ "and the format provided by Avro Processors. In 'base64' format your string is a Base64 encoded string. In 'hex' format the string is hex encoded with all "
+ "letters in upper case and no '0x' at the beginning.")
})
@WritesAttributes({
@WritesAttribute(attribute = "sql.generated.key", description = "If the database generated a key for an INSERT statement and the Obtain Generated Keys property is set to true, "
+ "this attribute will be added to indicate the generated key, if possible. This feature is not supported by all database vendors.")
@WritesAttribute(attribute = "sql.generated.key", description = "If the database generated a key for an INSERT statement and the Obtain Generated Keys property is set to true, "
+ "this attribute will be added to indicate the generated key, if possible. This feature is not supported by all database vendors.")
})
public class PutSQL extends AbstractProcessor {
static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder()
.name("JDBC Connection Pool")
.description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. "
+ "The Connection Pool is necessary in order to determine the appropriate database column types.")
.identifiesControllerService(DBCPService.class)
.required(true)
.build();
.name("JDBC Connection Pool")
.description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. "
+ "The Connection Pool is necessary in order to determine the appropriate database column types.")
.identifiesControllerService(DBCPService.class)
.required(true)
.build();
static final PropertyDescriptor SUPPORT_TRANSACTIONS = new PropertyDescriptor.Builder()
.name("Support Fragmented Transactions")
.description("If true, when a FlowFile is consumed by this Processor, the Processor will first check the fragment.identifier and fragment.count attributes of that FlowFile. "
+ "If the fragment.count value is greater than 1, the Processor will not process any FlowFile will that fragment.identifier until all are available; "
+ "at that point, it will process all FlowFiles with that fragment.identifier as a single transaction, in the order specified by the FlowFiles' fragment.index attributes. "
+ "This Provides atomicity of those SQL statements. If this value is false, these attributes will be ignored and the updates will occur independent of one another.")
.allowableValues("true", "false")
.defaultValue("true")
.build();
.name("Support Fragmented Transactions")
.description("If true, when a FlowFile is consumed by this Processor, the Processor will first check the fragment.identifier and fragment.count attributes of that FlowFile. "
+ "If the fragment.count value is greater than 1, the Processor will not process any FlowFile will that fragment.identifier until all are available; "
+ "at that point, it will process all FlowFiles with that fragment.identifier as a single transaction, in the order specified by the FlowFiles' fragment.index attributes. "
+ "This Provides atomicity of those SQL statements. If this value is false, these attributes will be ignored and the updates will occur independent of one another.")
.allowableValues("true", "false")
.defaultValue("true")
.build();
static final PropertyDescriptor TRANSACTION_TIMEOUT = new PropertyDescriptor.Builder()
.name("Transaction Timeout")
.description("If the <Support Fragmented Transactions> property is set to true, specifies how long to wait for all FlowFiles for a particular fragment.identifier attribute "
+ "to arrive before just transferring all of the FlowFiles with that identifier to the 'failure' relationship")
.required(false)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
.name("Transaction Timeout")
.description("If the <Support Fragmented Transactions> property is set to true, specifies how long to wait for all FlowFiles for a particular fragment.identifier attribute "
+ "to arrive before just transferring all of the FlowFiles with that identifier to the 'failure' relationship")
.required(false)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("The preferred number of FlowFiles to put to the database in a single transaction")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("100")
.build();
.name("Batch Size")
.description("The preferred number of FlowFiles to put to the database in a single transaction")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("100")
.build();
static final PropertyDescriptor OBTAIN_GENERATED_KEYS = new PropertyDescriptor.Builder()
.name("Obtain Generated Keys")
.description("If true, any key that is automatically generated by the database will be added to the FlowFile that generated it using the sql.generate.key attribute. "
+ "This may result in slightly slower performance and is not supported by all databases.")
.allowableValues("true", "false")
.defaultValue("false")
.build();
.name("Obtain Generated Keys")
.description("If true, any key that is automatically generated by the database will be added to the FlowFile that generated it using the sql.generate.key attribute. "
+ "This may result in slightly slower performance and is not supported by all databases.")
.allowableValues("true", "false")
.defaultValue("false")
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A FlowFile is routed to this relationship after the database is successfully updated")
.build();
.name("success")
.description("A FlowFile is routed to this relationship after the database is successfully updated")
.build();
static final Relationship REL_RETRY = new Relationship.Builder()
.name("retry")
.description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed")
.build();
.name("retry")
.description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail, "
+ "such as an invalid query or an integrity constraint violation")
.build();
.name("failure")
.description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail, "
+ "such as an invalid query or an integrity constraint violation")
.build();
private static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("sql\\.args\\.(\\d+)\\.type");
private static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+");
@ -294,7 +303,7 @@ public class PutSQL extends AbstractProcessor {
conn.rollback();
final FlowFile offendingFlowFile = batchFlowFiles.get(offendingFlowFileIndex);
getLogger().error("Failed to update database due to a failed batch update. A total of {} FlowFiles are required for this transaction, so routing all to failure. "
+ "Offending FlowFile was {}, which caused the following error: {}", new Object[] {flowFiles.size(), offendingFlowFile, e});
+ "Offending FlowFile was {}, which caused the following error: {}", new Object[] {flowFiles.size(), offendingFlowFile, e});
session.transfer(flowFiles, REL_FAILURE);
return;
}
@ -337,7 +346,7 @@ public class PutSQL extends AbstractProcessor {
}
getLogger().error("Failed to update database due to a failed batch update. There were a total of {} FlowFiles that failed, {} that succeeded, "
+ "and {} that were not execute and will be routed to retry; ", new Object[] {failureCount, successCount, retryCount});
+ "and {} that were not execute and will be routed to retry; ", new Object[] {failureCount, successCount, retryCount});
} catch (final SQLNonTransientException e) {
getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {enclosure.getFlowFiles(), e});
@ -347,7 +356,7 @@ public class PutSQL extends AbstractProcessor {
continue;
} catch (final SQLException e) {
getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry",
new Object[] {enclosure.getFlowFiles(), e});
new Object[] {enclosure.getFlowFiles(), e});
for (final FlowFile flowFile : enclosure.getFlowFiles()) {
destinationRelationships.put(flowFile, REL_RETRY);
@ -522,7 +531,7 @@ public class PutSQL extends AbstractProcessor {
* @throws SQLException if unable to create the appropriate PreparedStatement
*/
private StatementFlowFileEnclosure getEnclosure(final String sql, final Connection conn, final Map<String, StatementFlowFileEnclosure> stmtMap,
final boolean obtainKeys, final boolean fragmentedTransaction) throws SQLException {
final boolean obtainKeys, final boolean fragmentedTransaction) throws SQLException {
StatementFlowFileEnclosure enclosure = stmtMap.get(sql);
if (enclosure != null) {
return enclosure;
@ -610,13 +619,17 @@ public class PutSQL extends AbstractProcessor {
final int jdbcType = Integer.parseInt(entry.getValue());
final String valueAttrName = "sql.args." + parameterIndex + ".value";
final String parameterValue = attributes.get(valueAttrName);
final String formatAttrName = "sql.args." + parameterIndex + ".format";
final String parameterFormat = attributes.containsKey(formatAttrName)? attributes.get(formatAttrName):"";
try {
setParameter(stmt, valueAttrName, parameterIndex, parameterValue, jdbcType);
setParameter(stmt, valueAttrName, parameterIndex, parameterValue, jdbcType, parameterFormat);
} catch (final NumberFormatException nfe) {
throw new ProcessException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted into the necessary data type", nfe);
} catch (ParseException pe) {
throw new ProcessException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted to a timestamp", pe);
} catch (UnsupportedEncodingException uee) {
throw new ProcessException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted to UTF-8", uee);
}
}
}
@ -644,7 +657,7 @@ public class PutSQL extends AbstractProcessor {
return null;
} else if (fragmentCount == null) {
getLogger().error("Cannot process {} because there are {} FlowFiles with the same fragment.identifier "
+ "attribute but not all FlowFiles have a fragment.count attribute; routing all to failure", new Object[] {flowFile, flowFiles.size()});
+ "attribute but not all FlowFiles have a fragment.count attribute; routing all to failure", new Object[] {flowFile, flowFiles.size()});
return REL_FAILURE;
}
@ -653,13 +666,13 @@ public class PutSQL extends AbstractProcessor {
numFragments = Integer.parseInt(fragmentCount);
} catch (final NumberFormatException nfe) {
getLogger().error("Cannot process {} because the fragment.count attribute has a value of '{}', which is not an integer; "
+ "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount});
+ "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount});
return REL_FAILURE;
}
if (numFragments < 1) {
getLogger().error("Cannot process {} because the fragment.count attribute has a value of '{}', which is not a positive integer; "
+ "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount});
+ "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount});
return REL_FAILURE;
}
@ -667,14 +680,14 @@ public class PutSQL extends AbstractProcessor {
selectedNumFragments = numFragments;
} else if (numFragments != selectedNumFragments) {
getLogger().error("Cannot process {} because the fragment.count attribute has different values for different FlowFiles with the same fragment.identifier; "
+ "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile});
+ "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile});
return REL_FAILURE;
}
final String fragmentIndex = flowFile.getAttribute(FRAGMENT_INDEX_ATTR);
if (fragmentIndex == null) {
getLogger().error("Cannot process {} because the fragment.index attribute is missing; "
+ "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile});
+ "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile});
return REL_FAILURE;
}
@ -683,19 +696,19 @@ public class PutSQL extends AbstractProcessor {
idx = Integer.parseInt(fragmentIndex);
} catch (final NumberFormatException nfe) {
getLogger().error("Cannot process {} because the fragment.index attribute has a value of '{}', which is not an integer; "
+ "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex});
+ "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex});
return REL_FAILURE;
}
if (idx < 0) {
getLogger().error("Cannot process {} because the fragment.index attribute has a value of '{}', which is not a positive integer; "
+ "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex});
+ "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex});
return REL_FAILURE;
}
if (bitSet.get(idx)) {
getLogger().error("Cannot process {} because it has the same value for the fragment.index attribute as another FlowFile with the same fragment.identifier; "
+ "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile});
+ "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile});
return REL_FAILURE;
}
@ -735,7 +748,9 @@ public class PutSQL extends AbstractProcessor {
* @param jdbcType the JDBC Type of the SQL parameter to set
* @throws SQLException if the PreparedStatement throws a SQLException when calling the appropriate setter
*/
private void setParameter(final PreparedStatement stmt, final String attrName, final int parameterIndex, final String parameterValue, final int jdbcType) throws SQLException, ParseException {
private void setParameter(final PreparedStatement stmt, final String attrName, final int parameterIndex, final String parameterValue, final int jdbcType,
final String valueFormat)
throws SQLException, ParseException, UnsupportedEncodingException {
if (parameterValue == null) {
stmt.setNull(parameterIndex, jdbcType);
} else {
@ -786,6 +801,29 @@ public class PutSQL extends AbstractProcessor {
stmt.setTimestamp(parameterIndex, new Timestamp(lTimestamp));
break;
case Types.BINARY:
case Types.VARBINARY:
case Types.LONGVARBINARY:
byte[] bValue;
switch(valueFormat){
case "":
case "ascii":
bValue = parameterValue.getBytes("ASCII");
break;
case "hex":
bValue = DatatypeConverter.parseHexBinary(parameterValue);
break;
case "base64":
bValue = DatatypeConverter.parseBase64Binary(parameterValue);
break;
default:
throw new ParseException("Unable to parse binary data using the formatter `" + valueFormat + "`.",0);
}
stmt.setBinaryStream(parameterIndex, new ByteArrayInputStream(bValue), bValue.length);
break;
case Types.CHAR:
case Types.VARCHAR:

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
@ -30,9 +31,11 @@ import java.sql.Statement;
import java.sql.Types;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.RandomUtils;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.processor.exception.ProcessException;
@ -46,6 +49,8 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
import javax.xml.bind.DatatypeConverter;
public class TestPutSQL {
private static final String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)";
private static final String createPersonsAutoId = "CREATE TABLE PERSONS_AI (id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1), name VARCHAR(100), code INTEGER check(code <= 100))";
@ -304,6 +309,136 @@ public class TestPutSQL {
}
}
@Test
public void testBinaryColumnTypes() throws InitializationException, ProcessException, SQLException, IOException, ParseException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate("CREATE TABLE BINARYTESTS (id integer primary key, bn1 CHAR(8) FOR BIT DATA, bn2 VARCHAR(100) FOR BIT DATA, " +
"bn3 LONG VARCHAR FOR BIT DATA)");
}
}
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final byte[] insertStatement = "INSERT INTO BINARYTESTS (ID, bn1, bn2, bn3) VALUES (?, ?, ?, ?)".getBytes();
final String arg2BIN = fixedSizeByteArrayAsASCIIString(8);
final String art3VARBIN = fixedSizeByteArrayAsASCIIString(50);
final String art4LongBin = fixedSizeByteArrayAsASCIIString(32700); //max size supported by Derby
//ASCII (default) binary formatn
Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "1");
attributes.put("sql.args.2.type", String.valueOf(Types.BINARY));
attributes.put("sql.args.2.value", arg2BIN);
attributes.put("sql.args.3.type", String.valueOf(Types.VARBINARY));
attributes.put("sql.args.3.value", art3VARBIN);
attributes.put("sql.args.4.type", String.valueOf(Types.LONGVARBINARY));
attributes.put("sql.args.4.value", art4LongBin);
runner.enqueue(insertStatement, attributes);
//ASCII with specified format
attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "2");
attributes.put("sql.args.2.type", String.valueOf(Types.BINARY));
attributes.put("sql.args.2.value", arg2BIN);
attributes.put("sql.args.2.format", "ascii");
attributes.put("sql.args.3.type", String.valueOf(Types.VARBINARY));
attributes.put("sql.args.3.value", art3VARBIN);
attributes.put("sql.args.3.format", "ascii");
attributes.put("sql.args.4.type", String.valueOf(Types.LONGVARBINARY));
attributes.put("sql.args.4.value", art4LongBin);
attributes.put("sql.args.4.format", "ascii");
runner.enqueue(insertStatement, attributes);
//Hex
final String arg2HexBIN = fixedSizeByteArrayAsHexString(8);
final String art3HexVARBIN = fixedSizeByteArrayAsHexString(50);
final String art4HexLongBin = fixedSizeByteArrayAsHexString(32700);
attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "3");
attributes.put("sql.args.2.type", String.valueOf(Types.BINARY));
attributes.put("sql.args.2.value", arg2HexBIN);
attributes.put("sql.args.2.format", "hex");
attributes.put("sql.args.3.type", String.valueOf(Types.VARBINARY));
attributes.put("sql.args.3.value", art3HexVARBIN);
attributes.put("sql.args.3.format", "hex");
attributes.put("sql.args.4.type", String.valueOf(Types.LONGVARBINARY));
attributes.put("sql.args.4.value", art4HexLongBin);
attributes.put("sql.args.4.format", "hex");
runner.enqueue(insertStatement, attributes);
//Base64
final String arg2Base64BIN = fixedSizeByteArrayAsBase64String(8);
final String art3Base64VARBIN = fixedSizeByteArrayAsBase64String(50);
final String art4Base64LongBin = fixedSizeByteArrayAsBase64String(32700);
attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "4");
attributes.put("sql.args.2.type", String.valueOf(Types.BINARY));
attributes.put("sql.args.2.value", arg2Base64BIN);
attributes.put("sql.args.2.format", "base64");
attributes.put("sql.args.3.type", String.valueOf(Types.VARBINARY));
attributes.put("sql.args.3.value", art3Base64VARBIN);
attributes.put("sql.args.3.format", "base64");
attributes.put("sql.args.4.type", String.valueOf(Types.LONGVARBINARY));
attributes.put("sql.args.4.value", art4Base64LongBin);
attributes.put("sql.args.4.format", "base64");
runner.enqueue(insertStatement, attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 4);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM BINARYTESTS");
//First Batch
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertTrue(Arrays.equals(arg2BIN.getBytes("ASCII"), rs.getBytes(2)));
assertTrue(Arrays.equals(art3VARBIN.getBytes("ASCII"), rs.getBytes(3)));
assertTrue(Arrays.equals(art4LongBin.getBytes("ASCII"), rs.getBytes(4)));
//Second batch
assertTrue(rs.next());
assertEquals(2, rs.getInt(1));
assertTrue(Arrays.equals(arg2BIN.getBytes("ASCII"), rs.getBytes(2)));
assertTrue(Arrays.equals(art3VARBIN.getBytes("ASCII"), rs.getBytes(3)));
assertTrue(Arrays.equals(art4LongBin.getBytes("ASCII"), rs.getBytes(4)));
//Third Batch (Hex)
assertTrue(rs.next());
assertEquals(3, rs.getInt(1));
assertTrue(Arrays.equals(DatatypeConverter.parseHexBinary(arg2HexBIN), rs.getBytes(2)));
assertTrue(Arrays.equals(DatatypeConverter.parseHexBinary(art3HexVARBIN), rs.getBytes(3)));
assertTrue(Arrays.equals(DatatypeConverter.parseHexBinary(art4HexLongBin), rs.getBytes(4)));
//Fourth Batch (Base64)
assertTrue(rs.next());
assertEquals(4, rs.getInt(1));
assertTrue(Arrays.equals(DatatypeConverter.parseBase64Binary(arg2Base64BIN), rs.getBytes(2)));
assertTrue(Arrays.equals(DatatypeConverter.parseBase64Binary(art3Base64VARBIN), rs.getBytes(3)));
assertTrue(Arrays.equals(DatatypeConverter.parseBase64Binary(art4Base64LongBin), rs.getBytes(4)));
assertFalse(rs.next());
}
}
}
@Test
public void testStatementsWithPreparedParameters() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
@ -373,7 +508,7 @@ public class TestPutSQL {
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
"UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
"UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "1");
@ -444,7 +579,7 @@ public class TestPutSQL {
recreateTable("PERSONS", createPersons);
final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
"UPDATE SOME_RANDOM_TABLE NAME='George' WHERE ID=?; ";
"UPDATE SOME_RANDOM_TABLE NAME='George' WHERE ID=?; ";
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "1");
@ -483,7 +618,7 @@ public class TestPutSQL {
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
"UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
"UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "1");
@ -671,4 +806,24 @@ public class TestPutSQL {
}
}
}
private String fixedSizeByteArrayAsASCIIString(int length){
byte[] bBinary = RandomUtils.nextBytes(length);
ByteBuffer bytes = ByteBuffer.wrap(bBinary);
StringBuffer sbBytes = new StringBuffer();
for (int i = bytes.position(); i < bytes.limit(); i++)
sbBytes.append((char)bytes.get(i));
return sbBytes.toString();
}
private String fixedSizeByteArrayAsHexString(int length){
byte[] bBinary = RandomUtils.nextBytes(length);
return DatatypeConverter.printHexBinary(bBinary);
}
private String fixedSizeByteArrayAsBase64String(int length){
byte[] bBinary = RandomUtils.nextBytes(length);
return DatatypeConverter.printBase64Binary(bBinary);
}
}