diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java index 279720576c..fd414c4a5c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java @@ -371,7 +371,7 @@ public class PutDatabaseRecord extends AbstractProcessor { final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue(); final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); final String updateKeys = context.getProperty(UPDATE_KEYS).evaluateAttributeExpressions(flowFile).getValue(); - final SchemaKey schemaKey = new SchemaKey(catalog, tableName); + final SchemaKey schemaKey = new SchemaKey(catalog, schemaName, tableName); // Get the statement type from the attribute if necessary String statementType = statementTypeProperty; @@ -470,7 +470,9 @@ public class PutDatabaseRecord extends AbstractProcessor { return; } - final boolean includePrimaryKeys = UPDATE_TYPE.equalsIgnoreCase(statementType) && updateKeys == null; + // Always get the primary keys if Update Keys is empty. Otherwise if we have an Insert statement first, the table will be + // cached but the primary keys will not be retrieved, causing future UPDATE statements to not have primary keys available + final boolean includePrimaryKeys = (updateKeys == null); // get the database schema from the cache, if one exists. We do this in a synchronized block, rather than // using a ConcurrentMap because the Map that we are using is a LinkedHashMap with a capacity such that if @@ -1056,53 +1058,33 @@ public class PutDatabaseRecord extends AbstractProcessor { static class SchemaKey { private final String catalog; + private final String schemaName; private final String tableName; - public SchemaKey(final String catalog, final String tableName) { + public SchemaKey(final String catalog, final String schemaName, final String tableName) { this.catalog = catalog; + this.schemaName = schemaName; this.tableName = tableName; } @Override public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((catalog == null) ? 0 : catalog.hashCode()); - result = prime * result + ((tableName == null) ? 0 : tableName.hashCode()); + int result = catalog != null ? catalog.hashCode() : 0; + result = 31 * result + (schemaName != null ? schemaName.hashCode() : 0); + result = 31 * result + tableName.hashCode(); return result; } @Override - public boolean equals(final Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; - final SchemaKey other = (SchemaKey) obj; - if (catalog == null) { - if (other.catalog != null) { - return false; - } - } else if (!catalog.equals(other.catalog)) { - return false; - } + SchemaKey schemaKey = (SchemaKey) o; - - if (tableName == null) { - if (other.tableName != null) { - return false; - } - } else if (!tableName.equals(other.tableName)) { - return false; - } - - return true; + if (catalog != null ? !catalog.equals(schemaKey.catalog) : schemaKey.catalog != null) return false; + if (schemaName != null ? !schemaName.equals(schemaKey.schemaName) : schemaKey.schemaName != null) return false; + return tableName.equals(schemaKey.tableName); } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy index 6224e0e83f..bb12fb4b09 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy @@ -344,6 +344,66 @@ class TestPutDatabaseRecord { conn.close() } + @Test + void testUpdateAfterInsert() throws InitializationException, ProcessException, SQLException, IOException { + recreateTable("PERSONS", createPersons) + final MockRecordParser parser = new MockRecordParser() + runner.addControllerService("parser", parser) + runner.enableControllerService(parser) + + parser.addSchemaField("id", RecordFieldType.INT) + parser.addSchemaField("name", RecordFieldType.STRING) + parser.addSchemaField("code", RecordFieldType.INT) + + parser.addRecord(1, 'rec1', 101) + parser.addRecord(2, 'rec2', 102) + + runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser') + runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE) + runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS') + + runner.enqueue(new byte[0]) + runner.run() + + runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1) + final Connection conn = dbcp.getConnection() + Statement stmt = conn.createStatement() + stmt = conn.createStatement() + ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS') + assertTrue(rs.next()) + assertEquals(1, rs.getInt(1)) + assertEquals('rec1', rs.getString(2)) + assertEquals(101, rs.getInt(3)) + assertTrue(rs.next()) + assertEquals(2, rs.getInt(1)) + assertEquals('rec2', rs.getString(2)) + assertEquals(102, rs.getInt(3)) + assertFalse(rs.next()) + stmt.close() + runner.clearTransferState() + + parser.addRecord(1, 'rec1', 201) + parser.addRecord(2, 'rec2', 202) + runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.UPDATE_TYPE) + runner.enqueue(new byte[0]) + runner.run(1,true,false) + + runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1) + stmt = conn.createStatement() + rs = stmt.executeQuery('SELECT * FROM PERSONS') + assertTrue(rs.next()) + assertEquals(1, rs.getInt(1)) + assertEquals('rec1', rs.getString(2)) + assertEquals(201, rs.getInt(3)) + assertTrue(rs.next()) + assertEquals(2, rs.getInt(1)) + assertEquals('rec2', rs.getString(2)) + assertEquals(202, rs.getInt(3)) + assertFalse(rs.next()) + stmt.close() + conn.close() + } + @Test void testUpdateNoPrimaryKeys() throws InitializationException, ProcessException, SQLException, IOException { recreateTable("PERSONS", 'CREATE TABLE PERSONS (id integer, name varchar(100), code integer)')