From 84db3725386fe44bbef8a18d84852a4f716addb6 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 14 Oct 2015 13:12:10 -0400 Subject: [PATCH] NIFI-977: Allow SQL Data Types with numerals that are negative --- .../nifi/processors/standard/PutSQL.java | 156 +++++++++--------- .../nifi/processors/standard/TestPutSQL.java | 46 +++++- 2 files changed, 119 insertions(+), 83 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java index b087737c9b..5c2bbc23d8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java @@ -70,81 +70,81 @@ import org.apache.nifi.stream.io.StreamUtils; @SeeAlso(ConvertJSONToSQL.class) @Tags({"sql", "put", "rdbms", "database", "update", "insert", "relational"}) @CapabilityDescription("Executes a SQL UPDATE or INSERT command. The content of an incoming FlowFile is expected to be the SQL command " - + "to execute. The SQL command may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes " - + "with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be " - + "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format.") + + "to execute. The SQL command may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes " + + "with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be " + + "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format.") @ReadsAttributes({ - @ReadsAttribute(attribute="fragment.identifier", description="If the property is true, this attribute is used to determine whether or " - + "not two FlowFiles belong to the same transaction."), - @ReadsAttribute(attribute="fragment.count", description="If the property is true, this attribute is used to determine how many FlowFiles " - + "are needed to complete the transaction."), - @ReadsAttribute(attribute="fragment.index", description="If the property is true, this attribute is used to determine the order that the FlowFiles " - + "in a transaction should be evaluated."), - @ReadsAttribute(attribute="sql.args.N.type", description="Incoming FlowFiles are expected to be parameterized SQL statements. The type of each Parameter is specified as an integer " - + "that represents the JDBC Type of the parameter."), - @ReadsAttribute(attribute="sql.args.N.value", description="Incoming FlowFiles are expected to be parameterized SQL statements. The value of the Parameters are specified as " - + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute.") + @ReadsAttribute(attribute = "fragment.identifier", description = "If the property is true, this attribute is used to determine whether or " + + "not two FlowFiles belong to the same transaction."), + @ReadsAttribute(attribute = "fragment.count", description = "If the property is true, this attribute is used to determine how many FlowFiles " + + "are needed to complete the transaction."), + @ReadsAttribute(attribute = "fragment.index", description = "If the property is true, this attribute is used to determine the order that the FlowFiles " + + "in a transaction should be evaluated."), + @ReadsAttribute(attribute = "sql.args.N.type", description = "Incoming FlowFiles are expected to be parameterized SQL statements. The type of each Parameter is specified as an integer " + + "that represents the JDBC Type of the parameter."), + @ReadsAttribute(attribute = "sql.args.N.value", description = "Incoming FlowFiles are expected to be parameterized SQL statements. The value of the Parameters are specified as " + + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute.") }) @WritesAttributes({ - @WritesAttribute(attribute="sql.generated.key", description="If the database generated a key for an INSERT statement and the Obtain Generated Keys property is set to true, " - + "this attribute will be added to indicate the generated key, if possible. This feature is not supported by all database vendors.") + @WritesAttribute(attribute = "sql.generated.key", description = "If the database generated a key for an INSERT statement and the Obtain Generated Keys property is set to true, " + + "this attribute will be added to indicate the generated key, if possible. This feature is not supported by all database vendors.") }) public class PutSQL extends AbstractProcessor { static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder() - .name("JDBC Connection Pool") - .description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. " - + "The Connection Pool is necessary in order to determine the appropriate database column types.") - .identifiesControllerService(DBCPService.class) - .required(true) - .build(); + .name("JDBC Connection Pool") + .description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. " + + "The Connection Pool is necessary in order to determine the appropriate database column types.") + .identifiesControllerService(DBCPService.class) + .required(true) + .build(); static final PropertyDescriptor SUPPORT_TRANSACTIONS = new PropertyDescriptor.Builder() - .name("Support Fragmented Transactions") - .description("If true, when a FlowFile is consumed by this Processor, the Processor will first check the fragment.identifier and fragment.count attributes of that FlowFile. " - + "If the fragment.count value is greater than 1, the Processor will not process any FlowFile will that fragment.identifier until all are available; " - + "at that point, it will process all FlowFiles with that fragment.identifier as a single transaction, in the order specified by the FlowFiles' fragment.index attributes. " - + "This Provides atomicity of those SQL statements. If this value is false, these attributes will be ignored and the updates will occur independent of one another.") - .allowableValues("true", "false") - .defaultValue("true") - .build(); + .name("Support Fragmented Transactions") + .description("If true, when a FlowFile is consumed by this Processor, the Processor will first check the fragment.identifier and fragment.count attributes of that FlowFile. " + + "If the fragment.count value is greater than 1, the Processor will not process any FlowFile will that fragment.identifier until all are available; " + + "at that point, it will process all FlowFiles with that fragment.identifier as a single transaction, in the order specified by the FlowFiles' fragment.index attributes. " + + "This Provides atomicity of those SQL statements. If this value is false, these attributes will be ignored and the updates will occur independent of one another.") + .allowableValues("true", "false") + .defaultValue("true") + .build(); static final PropertyDescriptor TRANSACTION_TIMEOUT = new PropertyDescriptor.Builder() - .name("Transaction Timeout") - .description("If the property is set to true, specifies how long to wait for all FlowFiles for a particular fragment.identifier attribute " - + "to arrive before just transferring all of the FlowFiles with that identifier to the 'failure' relationship") - .required(false) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .build(); + .name("Transaction Timeout") + .description("If the property is set to true, specifies how long to wait for all FlowFiles for a particular fragment.identifier attribute " + + "to arrive before just transferring all of the FlowFiles with that identifier to the 'failure' relationship") + .required(false) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() - .name("Batch Size") - .description("The preferred number of FlowFiles to put to the database in a single transaction") - .required(true) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .defaultValue("100") - .build(); + .name("Batch Size") + .description("The preferred number of FlowFiles to put to the database in a single transaction") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("100") + .build(); static final PropertyDescriptor OBTAIN_GENERATED_KEYS = new PropertyDescriptor.Builder() - .name("Obtain Generated Keys") - .description("If true, any key that is automatically generated by the database will be added to the FlowFile that generated it using the sql.generate.key attribute. " - + "This may result in slightly slower performance and is not supported by all databases.") - .allowableValues("true", "false") - .defaultValue("false") - .build(); + .name("Obtain Generated Keys") + .description("If true, any key that is automatically generated by the database will be added to the FlowFile that generated it using the sql.generate.key attribute. " + + "This may result in slightly slower performance and is not supported by all databases.") + .allowableValues("true", "false") + .defaultValue("false") + .build(); static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("A FlowFile is routed to this relationship after the database is successfully updated") - .build(); + .name("success") + .description("A FlowFile is routed to this relationship after the database is successfully updated") + .build(); static final Relationship REL_RETRY = new Relationship.Builder() - .name("retry") - .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed") - .build(); + .name("retry") + .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed") + .build(); static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail, " - + "such as an invalid query or an integrity constraint violation") - .build(); + .name("failure") + .description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail, " + + "such as an invalid query or an integrity constraint violation") + .build(); private static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("sql\\.args\\.(\\d+)\\.type"); - private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+"); + private static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+"); private static final String FRAGMENT_ID_ATTR = "fragment.identifier"; private static final String FRAGMENT_INDEX_ATTR = "fragment.index"; @@ -187,7 +187,7 @@ public class PutSQL extends AbstractProcessor { final long startNanos = System.nanoTime(); final boolean obtainKeys = context.getProperty(OBTAIN_GENERATED_KEYS).asBoolean(); final Map statementMap = new HashMap<>(); // Map SQL to a PreparedStatement and FlowFiles - final List sentFlowFiles = new ArrayList<>(); // flowfiles that have been sent + final List sentFlowFiles = new ArrayList<>(); // flowfiles that have been sent final List processedFlowFiles = new ArrayList<>(); // all flowfiles that we have processed final Set enclosuresToExecute = new LinkedHashSet<>(); // the enclosures that we've processed @@ -286,7 +286,7 @@ public class PutSQL extends AbstractProcessor { conn.rollback(); final FlowFile offendingFlowFile = batchFlowFiles.get(offendingFlowFileIndex); getLogger().error("Failed to update database due to a failed batch update. A total of {} FlowFiles are required for this transaction, so routing all to failure. " - + "Offending FlowFile was {}, which caused the following error: {}", new Object[] {flowFiles.size(), offendingFlowFile, e}); + + "Offending FlowFile was {}, which caused the following error: {}", new Object[] {flowFiles.size(), offendingFlowFile, e}); session.transfer(flowFiles, REL_FAILURE); return; } @@ -300,7 +300,7 @@ public class PutSQL extends AbstractProcessor { int failureCount = 0; int successCount = 0; int retryCount = 0; - for (int i=0; i < updateCounts.length; i++) { + for (int i = 0; i < updateCounts.length; i++) { final int updateCount = updateCounts[i]; final FlowFile flowFile = batchFlowFiles.get(i); if (updateCount == Statement.EXECUTE_FAILED) { @@ -329,7 +329,7 @@ public class PutSQL extends AbstractProcessor { } getLogger().error("Failed to update database due to a failed batch update. There were a total of {} FlowFiles that failed, {} that succeeded, " - + "and {} that were not execute and will be routed to retry; ", new Object[] {failureCount, successCount, retryCount}); + + "and {} that were not execute and will be routed to retry; ", new Object[] {failureCount, successCount, retryCount}); } catch (final SQLNonTransientException e) { getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {enclosure.getFlowFiles(), e}); @@ -339,7 +339,7 @@ public class PutSQL extends AbstractProcessor { continue; } catch (final SQLException e) { getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", - new Object[] {enclosure.getFlowFiles(), e}); + new Object[] {enclosure.getFlowFiles(), e}); for (final FlowFile flowFile : enclosure.getFlowFiles()) { destinationRelationships.put(flowFile, REL_RETRY); @@ -484,7 +484,7 @@ public class PutSQL extends AbstractProcessor { * * @param stmt the statement that generated a key * @return the key that was generated from the given statement, or null if no key - * was generated or it could not be determined. + * was generated or it could not be determined. */ private String determineGeneratedKey(final PreparedStatement stmt) { try { @@ -514,7 +514,7 @@ public class PutSQL extends AbstractProcessor { * @throws SQLException if unable to create the appropriate PreparedStatement */ private StatementFlowFileEnclosure getEnclosure(final String sql, final Connection conn, final Map stmtMap, - final boolean obtainKeys, final boolean fragmentedTransaction) throws SQLException { + final boolean obtainKeys, final boolean fragmentedTransaction) throws SQLException { StatementFlowFileEnclosure enclosure = stmtMap.get(sql); if (enclosure != null) { return enclosure; @@ -620,9 +620,9 @@ public class PutSQL extends AbstractProcessor { * * @param flowFiles the FlowFiles whose relationship is to be determined * @param transactionTimeoutMillis the maximum amount of time (in milliseconds) that we should wait - * for all FlowFiles in a transaction to be present before routing to failure + * for all FlowFiles in a transaction to be present before routing to failure * @return the appropriate relationship to route the FlowFiles to, or null if the FlowFiles - * should instead be processed + * should instead be processed */ Relationship determineRelationship(final List flowFiles, final Long transactionTimeoutMillis) { int selectedNumFragments = 0; @@ -634,7 +634,7 @@ public class PutSQL extends AbstractProcessor { return null; } else if (fragmentCount == null) { getLogger().error("Cannot process {} because there are {} FlowFiles with the same fragment.identifier " - + "attribute but not all FlowFiles have a fragment.count attribute; routing all to failure", new Object[] {flowFile, flowFiles.size()}); + + "attribute but not all FlowFiles have a fragment.count attribute; routing all to failure", new Object[] {flowFile, flowFiles.size()}); return REL_FAILURE; } @@ -643,13 +643,13 @@ public class PutSQL extends AbstractProcessor { numFragments = Integer.parseInt(fragmentCount); } catch (final NumberFormatException nfe) { getLogger().error("Cannot process {} because the fragment.count attribute has a value of '{}', which is not an integer; " - + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount}); + + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount}); return REL_FAILURE; } if (numFragments < 1) { getLogger().error("Cannot process {} because the fragment.count attribute has a value of '{}', which is not a positive integer; " - + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount}); + + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount}); return REL_FAILURE; } @@ -657,14 +657,14 @@ public class PutSQL extends AbstractProcessor { selectedNumFragments = numFragments; } else if (numFragments != selectedNumFragments) { getLogger().error("Cannot process {} because the fragment.count attribute has different values for different FlowFiles with the same fragment.identifier; " - + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile}); + + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile}); return REL_FAILURE; } final String fragmentIndex = flowFile.getAttribute(FRAGMENT_INDEX_ATTR); if (fragmentIndex == null) { getLogger().error("Cannot process {} because the fragment.index attribute is missing; " - + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile}); + + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile}); return REL_FAILURE; } @@ -673,19 +673,19 @@ public class PutSQL extends AbstractProcessor { idx = Integer.parseInt(fragmentIndex); } catch (final NumberFormatException nfe) { getLogger().error("Cannot process {} because the fragment.index attribute has a value of '{}', which is not an integer; " - + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex}); + + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex}); return REL_FAILURE; } if (idx < 0) { getLogger().error("Cannot process {} because the fragment.index attribute has a value of '{}', which is not a positive integer; " - + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex}); + + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex}); return REL_FAILURE; } if (bitSet.get(idx)) { getLogger().error("Cannot process {} because it has the same value for the fragment.index attribute as another FlowFile with the same fragment.identifier; " - + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile}); + + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile}); return REL_FAILURE; } @@ -693,7 +693,7 @@ public class PutSQL extends AbstractProcessor { } if (selectedNumFragments == flowFiles.size()) { - return null; // no relationship to route FlowFiles to yet - process the FlowFiles. + return null; // no relationship to route FlowFiles to yet - process the FlowFiles. } long latestQueueTime = 0L; @@ -711,7 +711,7 @@ public class PutSQL extends AbstractProcessor { } getLogger().debug("Not enough FlowFiles for transaction. Returning all FlowFiles to queue"); - return Relationship.SELF; // not enough FlowFiles for this transaction. Return them all to queue. + return Relationship.SELF; // not enough FlowFiles for this transaction. Return them all to queue. } /** @@ -769,7 +769,7 @@ public class PutSQL extends AbstractProcessor { break; default: throw new SQLException("The '" + attrName + "' attribute has a value of '" + parameterValue - + "' and a type of '" + jdbcType + "' but this is not a known data type"); + + "' and a type of '" + jdbcType + "' but this is not a known data type"); } } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java index a348c9e60f..17506f7e19 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java @@ -103,6 +103,7 @@ public class TestPutSQL { } } + @Test public void testInsertWithGeneratedKeys() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); @@ -158,7 +159,7 @@ public class TestPutSQL { runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false"); runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', 84)".getBytes()); - runner.enqueue("INSERT INTO PERSONS".getBytes()); // intentionally wrong syntax + runner.enqueue("INSERT INTO PERSONS".getBytes()); // intentionally wrong syntax runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Tom', 3)".getBytes()); runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Harry', 44)".getBytes()); runner.run(); @@ -256,6 +257,41 @@ public class TestPutSQL { } + @Test + public void testUsingSqlDataTypesWithNegativeValues() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate("CREATE TABLE PERSONS (id integer primary key, name varchar(100), code bigint)"); + } + } + + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + final Map attributes = new HashMap<>(); + attributes.put("sql.args.1.type", "-5"); + attributes.put("sql.args.1.value", "84"); + runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 'Mark', ?)".getBytes(), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Mark", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + } @Test public void testStatementsWithPreparedParameters() throws InitializationException, ProcessException, SQLException, IOException { @@ -343,7 +379,7 @@ public class TestPutSQL { runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + - "UPDATE PERSONS SET NAME='George' WHERE ID=?; "; + "UPDATE PERSONS SET NAME='George' WHERE ID=?; "; final Map attributes = new HashMap<>(); attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); attributes.put("sql.args.1.value", "1"); @@ -432,7 +468,7 @@ public class TestPutSQL { runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + - "UPDATE SOME_RANDOM_TABLE NAME='George' WHERE ID=?; "; + "UPDATE SOME_RANDOM_TABLE NAME='George' WHERE ID=?; "; final Map attributes = new HashMap<>(); attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); attributes.put("sql.args.1.value", "1"); @@ -471,7 +507,7 @@ public class TestPutSQL { runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + - "UPDATE PERSONS SET NAME='George' WHERE ID=?; "; + "UPDATE PERSONS SET NAME='George' WHERE ID=?; "; final Map attributes = new HashMap<>(); attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); attributes.put("sql.args.1.value", "1"); @@ -579,7 +615,7 @@ public class TestPutSQL { final MockFlowFile mff = new MockFlowFile(0L) { @Override public Long getLastQueueDate() { - return System.currentTimeMillis() - 10000L; // return 10 seconds ago + return System.currentTimeMillis() - 10000L; // return 10 seconds ago } @Override