Merge branch 'NIFI-977'

This commit is contained in:
Mark Payne 2015-10-27 16:53:38 -04:00
commit dc4004de64
2 changed files with 119 additions and 83 deletions

View File

@ -73,81 +73,81 @@ import org.apache.nifi.stream.io.StreamUtils;
@InputRequirement(Requirement.INPUT_REQUIRED) @InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"sql", "put", "rdbms", "database", "update", "insert", "relational"}) @Tags({"sql", "put", "rdbms", "database", "update", "insert", "relational"})
@CapabilityDescription("Executes a SQL UPDATE or INSERT command. The content of an incoming FlowFile is expected to be the SQL command " @CapabilityDescription("Executes a SQL UPDATE or INSERT command. The content of an incoming FlowFile is expected to be the SQL command "
+ "to execute. The SQL command may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes " + "to execute. The SQL command may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes "
+ "with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be " + "with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be "
+ "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format.") + "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format.")
@ReadsAttributes({ @ReadsAttributes({
@ReadsAttribute(attribute="fragment.identifier", description="If the <Support Fragment Transactions> property is true, this attribute is used to determine whether or " @ReadsAttribute(attribute = "fragment.identifier", description = "If the <Support Fragment Transactions> property is true, this attribute is used to determine whether or "
+ "not two FlowFiles belong to the same transaction."), + "not two FlowFiles belong to the same transaction."),
@ReadsAttribute(attribute="fragment.count", description="If the <Support Fragment Transactions> property is true, this attribute is used to determine how many FlowFiles " @ReadsAttribute(attribute = "fragment.count", description = "If the <Support Fragment Transactions> property is true, this attribute is used to determine how many FlowFiles "
+ "are needed to complete the transaction."), + "are needed to complete the transaction."),
@ReadsAttribute(attribute="fragment.index", description="If the <Support Fragment Transactions> property is true, this attribute is used to determine the order that the FlowFiles " @ReadsAttribute(attribute = "fragment.index", description = "If the <Support Fragment Transactions> property is true, this attribute is used to determine the order that the FlowFiles "
+ "in a transaction should be evaluated."), + "in a transaction should be evaluated."),
@ReadsAttribute(attribute="sql.args.N.type", description="Incoming FlowFiles are expected to be parameterized SQL statements. The type of each Parameter is specified as an integer " @ReadsAttribute(attribute = "sql.args.N.type", description = "Incoming FlowFiles are expected to be parameterized SQL statements. The type of each Parameter is specified as an integer "
+ "that represents the JDBC Type of the parameter."), + "that represents the JDBC Type of the parameter."),
@ReadsAttribute(attribute="sql.args.N.value", description="Incoming FlowFiles are expected to be parameterized SQL statements. The value of the Parameters are specified as " @ReadsAttribute(attribute = "sql.args.N.value", description = "Incoming FlowFiles are expected to be parameterized SQL statements. The value of the Parameters are specified as "
+ "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute.") + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute.")
}) })
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute="sql.generated.key", description="If the database generated a key for an INSERT statement and the Obtain Generated Keys property is set to true, " @WritesAttribute(attribute = "sql.generated.key", description = "If the database generated a key for an INSERT statement and the Obtain Generated Keys property is set to true, "
+ "this attribute will be added to indicate the generated key, if possible. This feature is not supported by all database vendors.") + "this attribute will be added to indicate the generated key, if possible. This feature is not supported by all database vendors.")
}) })
public class PutSQL extends AbstractProcessor { public class PutSQL extends AbstractProcessor {
static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder() static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder()
.name("JDBC Connection Pool") .name("JDBC Connection Pool")
.description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. " .description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. "
+ "The Connection Pool is necessary in order to determine the appropriate database column types.") + "The Connection Pool is necessary in order to determine the appropriate database column types.")
.identifiesControllerService(DBCPService.class) .identifiesControllerService(DBCPService.class)
.required(true) .required(true)
.build(); .build();
static final PropertyDescriptor SUPPORT_TRANSACTIONS = new PropertyDescriptor.Builder() static final PropertyDescriptor SUPPORT_TRANSACTIONS = new PropertyDescriptor.Builder()
.name("Support Fragmented Transactions") .name("Support Fragmented Transactions")
.description("If true, when a FlowFile is consumed by this Processor, the Processor will first check the fragment.identifier and fragment.count attributes of that FlowFile. " .description("If true, when a FlowFile is consumed by this Processor, the Processor will first check the fragment.identifier and fragment.count attributes of that FlowFile. "
+ "If the fragment.count value is greater than 1, the Processor will not process any FlowFile will that fragment.identifier until all are available; " + "If the fragment.count value is greater than 1, the Processor will not process any FlowFile will that fragment.identifier until all are available; "
+ "at that point, it will process all FlowFiles with that fragment.identifier as a single transaction, in the order specified by the FlowFiles' fragment.index attributes. " + "at that point, it will process all FlowFiles with that fragment.identifier as a single transaction, in the order specified by the FlowFiles' fragment.index attributes. "
+ "This Provides atomicity of those SQL statements. If this value is false, these attributes will be ignored and the updates will occur independent of one another.") + "This Provides atomicity of those SQL statements. If this value is false, these attributes will be ignored and the updates will occur independent of one another.")
.allowableValues("true", "false") .allowableValues("true", "false")
.defaultValue("true") .defaultValue("true")
.build(); .build();
static final PropertyDescriptor TRANSACTION_TIMEOUT = new PropertyDescriptor.Builder() static final PropertyDescriptor TRANSACTION_TIMEOUT = new PropertyDescriptor.Builder()
.name("Transaction Timeout") .name("Transaction Timeout")
.description("If the <Support Fragmented Transactions> property is set to true, specifies how long to wait for all FlowFiles for a particular fragment.identifier attribute " .description("If the <Support Fragmented Transactions> property is set to true, specifies how long to wait for all FlowFiles for a particular fragment.identifier attribute "
+ "to arrive before just transferring all of the FlowFiles with that identifier to the 'failure' relationship") + "to arrive before just transferring all of the FlowFiles with that identifier to the 'failure' relationship")
.required(false) .required(false)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build(); .build();
static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size") .name("Batch Size")
.description("The preferred number of FlowFiles to put to the database in a single transaction") .description("The preferred number of FlowFiles to put to the database in a single transaction")
.required(true) .required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("100") .defaultValue("100")
.build(); .build();
static final PropertyDescriptor OBTAIN_GENERATED_KEYS = new PropertyDescriptor.Builder() static final PropertyDescriptor OBTAIN_GENERATED_KEYS = new PropertyDescriptor.Builder()
.name("Obtain Generated Keys") .name("Obtain Generated Keys")
.description("If true, any key that is automatically generated by the database will be added to the FlowFile that generated it using the sql.generate.key attribute. " .description("If true, any key that is automatically generated by the database will be added to the FlowFile that generated it using the sql.generate.key attribute. "
+ "This may result in slightly slower performance and is not supported by all databases.") + "This may result in slightly slower performance and is not supported by all databases.")
.allowableValues("true", "false") .allowableValues("true", "false")
.defaultValue("false") .defaultValue("false")
.build(); .build();
static final Relationship REL_SUCCESS = new Relationship.Builder() static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
.description("A FlowFile is routed to this relationship after the database is successfully updated") .description("A FlowFile is routed to this relationship after the database is successfully updated")
.build(); .build();
static final Relationship REL_RETRY = new Relationship.Builder() static final Relationship REL_RETRY = new Relationship.Builder()
.name("retry") .name("retry")
.description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed") .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed")
.build(); .build();
static final Relationship REL_FAILURE = new Relationship.Builder() static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure") .name("failure")
.description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail, " .description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail, "
+ "such as an invalid query or an integrity constraint violation") + "such as an invalid query or an integrity constraint violation")
.build(); .build();
private static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("sql\\.args\\.(\\d+)\\.type"); private static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("sql\\.args\\.(\\d+)\\.type");
private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+"); private static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+");
private static final String FRAGMENT_ID_ATTR = "fragment.identifier"; private static final String FRAGMENT_ID_ATTR = "fragment.identifier";
private static final String FRAGMENT_INDEX_ATTR = "fragment.index"; private static final String FRAGMENT_INDEX_ATTR = "fragment.index";
@ -190,7 +190,7 @@ public class PutSQL extends AbstractProcessor {
final long startNanos = System.nanoTime(); final long startNanos = System.nanoTime();
final boolean obtainKeys = context.getProperty(OBTAIN_GENERATED_KEYS).asBoolean(); final boolean obtainKeys = context.getProperty(OBTAIN_GENERATED_KEYS).asBoolean();
final Map<String, StatementFlowFileEnclosure> statementMap = new HashMap<>(); // Map SQL to a PreparedStatement and FlowFiles final Map<String, StatementFlowFileEnclosure> statementMap = new HashMap<>(); // Map SQL to a PreparedStatement and FlowFiles
final List<FlowFile> sentFlowFiles = new ArrayList<>(); // flowfiles that have been sent final List<FlowFile> sentFlowFiles = new ArrayList<>(); // flowfiles that have been sent
final List<FlowFile> processedFlowFiles = new ArrayList<>(); // all flowfiles that we have processed final List<FlowFile> processedFlowFiles = new ArrayList<>(); // all flowfiles that we have processed
final Set<StatementFlowFileEnclosure> enclosuresToExecute = new LinkedHashSet<>(); // the enclosures that we've processed final Set<StatementFlowFileEnclosure> enclosuresToExecute = new LinkedHashSet<>(); // the enclosures that we've processed
@ -289,7 +289,7 @@ public class PutSQL extends AbstractProcessor {
conn.rollback(); conn.rollback();
final FlowFile offendingFlowFile = batchFlowFiles.get(offendingFlowFileIndex); final FlowFile offendingFlowFile = batchFlowFiles.get(offendingFlowFileIndex);
getLogger().error("Failed to update database due to a failed batch update. A total of {} FlowFiles are required for this transaction, so routing all to failure. " getLogger().error("Failed to update database due to a failed batch update. A total of {} FlowFiles are required for this transaction, so routing all to failure. "
+ "Offending FlowFile was {}, which caused the following error: {}", new Object[] {flowFiles.size(), offendingFlowFile, e}); + "Offending FlowFile was {}, which caused the following error: {}", new Object[] {flowFiles.size(), offendingFlowFile, e});
session.transfer(flowFiles, REL_FAILURE); session.transfer(flowFiles, REL_FAILURE);
return; return;
} }
@ -303,7 +303,7 @@ public class PutSQL extends AbstractProcessor {
int failureCount = 0; int failureCount = 0;
int successCount = 0; int successCount = 0;
int retryCount = 0; int retryCount = 0;
for (int i=0; i < updateCounts.length; i++) { for (int i = 0; i < updateCounts.length; i++) {
final int updateCount = updateCounts[i]; final int updateCount = updateCounts[i];
final FlowFile flowFile = batchFlowFiles.get(i); final FlowFile flowFile = batchFlowFiles.get(i);
if (updateCount == Statement.EXECUTE_FAILED) { if (updateCount == Statement.EXECUTE_FAILED) {
@ -332,7 +332,7 @@ public class PutSQL extends AbstractProcessor {
} }
getLogger().error("Failed to update database due to a failed batch update. There were a total of {} FlowFiles that failed, {} that succeeded, " getLogger().error("Failed to update database due to a failed batch update. There were a total of {} FlowFiles that failed, {} that succeeded, "
+ "and {} that were not execute and will be routed to retry; ", new Object[] {failureCount, successCount, retryCount}); + "and {} that were not execute and will be routed to retry; ", new Object[] {failureCount, successCount, retryCount});
} catch (final SQLNonTransientException e) { } catch (final SQLNonTransientException e) {
getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {enclosure.getFlowFiles(), e}); getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {enclosure.getFlowFiles(), e});
@ -342,7 +342,7 @@ public class PutSQL extends AbstractProcessor {
continue; continue;
} catch (final SQLException e) { } catch (final SQLException e) {
getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry",
new Object[] {enclosure.getFlowFiles(), e}); new Object[] {enclosure.getFlowFiles(), e});
for (final FlowFile flowFile : enclosure.getFlowFiles()) { for (final FlowFile flowFile : enclosure.getFlowFiles()) {
destinationRelationships.put(flowFile, REL_RETRY); destinationRelationships.put(flowFile, REL_RETRY);
@ -487,7 +487,7 @@ public class PutSQL extends AbstractProcessor {
* *
* @param stmt the statement that generated a key * @param stmt the statement that generated a key
* @return the key that was generated from the given statement, or <code>null</code> if no key * @return the key that was generated from the given statement, or <code>null</code> if no key
* was generated or it could not be determined. * was generated or it could not be determined.
*/ */
private String determineGeneratedKey(final PreparedStatement stmt) { private String determineGeneratedKey(final PreparedStatement stmt) {
try { try {
@ -517,7 +517,7 @@ public class PutSQL extends AbstractProcessor {
* @throws SQLException if unable to create the appropriate PreparedStatement * @throws SQLException if unable to create the appropriate PreparedStatement
*/ */
private StatementFlowFileEnclosure getEnclosure(final String sql, final Connection conn, final Map<String, StatementFlowFileEnclosure> stmtMap, private StatementFlowFileEnclosure getEnclosure(final String sql, final Connection conn, final Map<String, StatementFlowFileEnclosure> stmtMap,
final boolean obtainKeys, final boolean fragmentedTransaction) throws SQLException { final boolean obtainKeys, final boolean fragmentedTransaction) throws SQLException {
StatementFlowFileEnclosure enclosure = stmtMap.get(sql); StatementFlowFileEnclosure enclosure = stmtMap.get(sql);
if (enclosure != null) { if (enclosure != null) {
return enclosure; return enclosure;
@ -623,9 +623,9 @@ public class PutSQL extends AbstractProcessor {
* *
* @param flowFiles the FlowFiles whose relationship is to be determined * @param flowFiles the FlowFiles whose relationship is to be determined
* @param transactionTimeoutMillis the maximum amount of time (in milliseconds) that we should wait * @param transactionTimeoutMillis the maximum amount of time (in milliseconds) that we should wait
* for all FlowFiles in a transaction to be present before routing to failure * for all FlowFiles in a transaction to be present before routing to failure
* @return the appropriate relationship to route the FlowFiles to, or <code>null</code> if the FlowFiles * @return the appropriate relationship to route the FlowFiles to, or <code>null</code> if the FlowFiles
* should instead be processed * should instead be processed
*/ */
Relationship determineRelationship(final List<FlowFile> flowFiles, final Long transactionTimeoutMillis) { Relationship determineRelationship(final List<FlowFile> flowFiles, final Long transactionTimeoutMillis) {
int selectedNumFragments = 0; int selectedNumFragments = 0;
@ -637,7 +637,7 @@ public class PutSQL extends AbstractProcessor {
return null; return null;
} else if (fragmentCount == null) { } else if (fragmentCount == null) {
getLogger().error("Cannot process {} because there are {} FlowFiles with the same fragment.identifier " getLogger().error("Cannot process {} because there are {} FlowFiles with the same fragment.identifier "
+ "attribute but not all FlowFiles have a fragment.count attribute; routing all to failure", new Object[] {flowFile, flowFiles.size()}); + "attribute but not all FlowFiles have a fragment.count attribute; routing all to failure", new Object[] {flowFile, flowFiles.size()});
return REL_FAILURE; return REL_FAILURE;
} }
@ -646,13 +646,13 @@ public class PutSQL extends AbstractProcessor {
numFragments = Integer.parseInt(fragmentCount); numFragments = Integer.parseInt(fragmentCount);
} catch (final NumberFormatException nfe) { } catch (final NumberFormatException nfe) {
getLogger().error("Cannot process {} because the fragment.count attribute has a value of '{}', which is not an integer; " getLogger().error("Cannot process {} because the fragment.count attribute has a value of '{}', which is not an integer; "
+ "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount}); + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount});
return REL_FAILURE; return REL_FAILURE;
} }
if (numFragments < 1) { if (numFragments < 1) {
getLogger().error("Cannot process {} because the fragment.count attribute has a value of '{}', which is not a positive integer; " getLogger().error("Cannot process {} because the fragment.count attribute has a value of '{}', which is not a positive integer; "
+ "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount}); + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount});
return REL_FAILURE; return REL_FAILURE;
} }
@ -660,14 +660,14 @@ public class PutSQL extends AbstractProcessor {
selectedNumFragments = numFragments; selectedNumFragments = numFragments;
} else if (numFragments != selectedNumFragments) { } else if (numFragments != selectedNumFragments) {
getLogger().error("Cannot process {} because the fragment.count attribute has different values for different FlowFiles with the same fragment.identifier; " getLogger().error("Cannot process {} because the fragment.count attribute has different values for different FlowFiles with the same fragment.identifier; "
+ "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile}); + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile});
return REL_FAILURE; return REL_FAILURE;
} }
final String fragmentIndex = flowFile.getAttribute(FRAGMENT_INDEX_ATTR); final String fragmentIndex = flowFile.getAttribute(FRAGMENT_INDEX_ATTR);
if (fragmentIndex == null) { if (fragmentIndex == null) {
getLogger().error("Cannot process {} because the fragment.index attribute is missing; " getLogger().error("Cannot process {} because the fragment.index attribute is missing; "
+ "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile}); + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile});
return REL_FAILURE; return REL_FAILURE;
} }
@ -676,19 +676,19 @@ public class PutSQL extends AbstractProcessor {
idx = Integer.parseInt(fragmentIndex); idx = Integer.parseInt(fragmentIndex);
} catch (final NumberFormatException nfe) { } catch (final NumberFormatException nfe) {
getLogger().error("Cannot process {} because the fragment.index attribute has a value of '{}', which is not an integer; " getLogger().error("Cannot process {} because the fragment.index attribute has a value of '{}', which is not an integer; "
+ "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex}); + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex});
return REL_FAILURE; return REL_FAILURE;
} }
if (idx < 0) { if (idx < 0) {
getLogger().error("Cannot process {} because the fragment.index attribute has a value of '{}', which is not a positive integer; " getLogger().error("Cannot process {} because the fragment.index attribute has a value of '{}', which is not a positive integer; "
+ "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex}); + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex});
return REL_FAILURE; return REL_FAILURE;
} }
if (bitSet.get(idx)) { if (bitSet.get(idx)) {
getLogger().error("Cannot process {} because it has the same value for the fragment.index attribute as another FlowFile with the same fragment.identifier; " getLogger().error("Cannot process {} because it has the same value for the fragment.index attribute as another FlowFile with the same fragment.identifier; "
+ "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile}); + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile});
return REL_FAILURE; return REL_FAILURE;
} }
@ -696,7 +696,7 @@ public class PutSQL extends AbstractProcessor {
} }
if (selectedNumFragments == flowFiles.size()) { if (selectedNumFragments == flowFiles.size()) {
return null; // no relationship to route FlowFiles to yet - process the FlowFiles. return null; // no relationship to route FlowFiles to yet - process the FlowFiles.
} }
long latestQueueTime = 0L; long latestQueueTime = 0L;
@ -714,7 +714,7 @@ public class PutSQL extends AbstractProcessor {
} }
getLogger().debug("Not enough FlowFiles for transaction. Returning all FlowFiles to queue"); getLogger().debug("Not enough FlowFiles for transaction. Returning all FlowFiles to queue");
return Relationship.SELF; // not enough FlowFiles for this transaction. Return them all to queue. return Relationship.SELF; // not enough FlowFiles for this transaction. Return them all to queue.
} }
/** /**
@ -772,7 +772,7 @@ public class PutSQL extends AbstractProcessor {
break; break;
default: default:
throw new SQLException("The '" + attrName + "' attribute has a value of '" + parameterValue throw new SQLException("The '" + attrName + "' attribute has a value of '" + parameterValue
+ "' and a type of '" + jdbcType + "' but this is not a known data type"); + "' and a type of '" + jdbcType + "' but this is not a known data type");
} }
} }
} }

View File

@ -103,6 +103,7 @@ public class TestPutSQL {
} }
} }
@Test @Test
public void testInsertWithGeneratedKeys() throws InitializationException, ProcessException, SQLException, IOException { public void testInsertWithGeneratedKeys() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
@ -158,7 +159,7 @@ public class TestPutSQL {
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false"); runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', 84)".getBytes()); runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', 84)".getBytes());
runner.enqueue("INSERT INTO PERSONS".getBytes()); // intentionally wrong syntax runner.enqueue("INSERT INTO PERSONS".getBytes()); // intentionally wrong syntax
runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Tom', 3)".getBytes()); runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Tom', 3)".getBytes());
runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Harry', 44)".getBytes()); runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Harry', 44)".getBytes());
runner.run(); runner.run();
@ -256,6 +257,41 @@ public class TestPutSQL {
} }
@Test
public void testUsingSqlDataTypesWithNegativeValues() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate("CREATE TABLE PERSONS (id integer primary key, name varchar(100), code bigint)");
}
}
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", "-5");
attributes.put("sql.args.1.value", "84");
runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 'Mark', ?)".getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("Mark", rs.getString(2));
assertEquals(84, rs.getInt(3));
assertFalse(rs.next());
}
}
}
@Test @Test
public void testStatementsWithPreparedParameters() throws InitializationException, ProcessException, SQLException, IOException { public void testStatementsWithPreparedParameters() throws InitializationException, ProcessException, SQLException, IOException {
@ -343,7 +379,7 @@ public class TestPutSQL {
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
"UPDATE PERSONS SET NAME='George' WHERE ID=?; "; "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "1"); attributes.put("sql.args.1.value", "1");
@ -432,7 +468,7 @@ public class TestPutSQL {
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
"UPDATE SOME_RANDOM_TABLE NAME='George' WHERE ID=?; "; "UPDATE SOME_RANDOM_TABLE NAME='George' WHERE ID=?; ";
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "1"); attributes.put("sql.args.1.value", "1");
@ -471,7 +507,7 @@ public class TestPutSQL {
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
"UPDATE PERSONS SET NAME='George' WHERE ID=?; "; "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "1"); attributes.put("sql.args.1.value", "1");
@ -579,7 +615,7 @@ public class TestPutSQL {
final MockFlowFile mff = new MockFlowFile(0L) { final MockFlowFile mff = new MockFlowFile(0L) {
@Override @Override
public Long getLastQueueDate() { public Long getLastQueueDate() {
return System.currentTimeMillis() - 10000L; // return 10 seconds ago return System.currentTimeMillis() - 10000L; // return 10 seconds ago
} }
@Override @Override