NIFI-3745: Fixed Table caching / primary key logic in PutDatabaseRecord

This closes #1700.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Matt Burgess 2017-04-26 14:44:49 -04:00 committed by Koji Kawamura
parent d66eac2ea1
commit 097548da9d
2 changed files with 77 additions and 35 deletions

View File

@ -371,7 +371,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String updateKeys = context.getProperty(UPDATE_KEYS).evaluateAttributeExpressions(flowFile).getValue();
final SchemaKey schemaKey = new SchemaKey(catalog, tableName);
final SchemaKey schemaKey = new SchemaKey(catalog, schemaName, tableName);
// Get the statement type from the attribute if necessary
String statementType = statementTypeProperty;
@ -470,7 +470,9 @@ public class PutDatabaseRecord extends AbstractProcessor {
return;
}
final boolean includePrimaryKeys = UPDATE_TYPE.equalsIgnoreCase(statementType) && updateKeys == null;
// Always get the primary keys if Update Keys is empty. Otherwise if we have an Insert statement first, the table will be
// cached but the primary keys will not be retrieved, causing future UPDATE statements to not have primary keys available
final boolean includePrimaryKeys = (updateKeys == null);
// get the database schema from the cache, if one exists. We do this in a synchronized block, rather than
// using a ConcurrentMap because the Map that we are using is a LinkedHashMap with a capacity such that if
@ -1056,53 +1058,33 @@ public class PutDatabaseRecord extends AbstractProcessor {
static class SchemaKey {
private final String catalog;
private final String schemaName;
private final String tableName;
public SchemaKey(final String catalog, final String tableName) {
public SchemaKey(final String catalog, final String schemaName, final String tableName) {
this.catalog = catalog;
this.schemaName = schemaName;
this.tableName = tableName;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((catalog == null) ? 0 : catalog.hashCode());
result = prime * result + ((tableName == null) ? 0 : tableName.hashCode());
int result = catalog != null ? catalog.hashCode() : 0;
result = 31 * result + (schemaName != null ? schemaName.hashCode() : 0);
result = 31 * result + tableName.hashCode();
return result;
}
@Override
public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final SchemaKey other = (SchemaKey) obj;
if (catalog == null) {
if (other.catalog != null) {
return false;
}
} else if (!catalog.equals(other.catalog)) {
return false;
}
SchemaKey schemaKey = (SchemaKey) o;
if (tableName == null) {
if (other.tableName != null) {
return false;
}
} else if (!tableName.equals(other.tableName)) {
return false;
}
return true;
if (catalog != null ? !catalog.equals(schemaKey.catalog) : schemaKey.catalog != null) return false;
if (schemaName != null ? !schemaName.equals(schemaKey.schemaName) : schemaKey.schemaName != null) return false;
return tableName.equals(schemaKey.tableName);
}
}

View File

@ -344,6 +344,66 @@ class TestPutDatabaseRecord {
conn.close()
}
@Test
void testUpdateAfterInsert() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable("PERSONS", createPersons)
final MockRecordParser parser = new MockRecordParser()
runner.addControllerService("parser", parser)
runner.enableControllerService(parser)
parser.addSchemaField("id", RecordFieldType.INT)
parser.addSchemaField("name", RecordFieldType.STRING)
parser.addSchemaField("code", RecordFieldType.INT)
parser.addRecord(1, 'rec1', 101)
parser.addRecord(2, 'rec2', 102)
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
runner.enqueue(new byte[0])
runner.run()
runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
final Connection conn = dbcp.getConnection()
Statement stmt = conn.createStatement()
stmt = conn.createStatement()
ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
assertTrue(rs.next())
assertEquals(1, rs.getInt(1))
assertEquals('rec1', rs.getString(2))
assertEquals(101, rs.getInt(3))
assertTrue(rs.next())
assertEquals(2, rs.getInt(1))
assertEquals('rec2', rs.getString(2))
assertEquals(102, rs.getInt(3))
assertFalse(rs.next())
stmt.close()
runner.clearTransferState()
parser.addRecord(1, 'rec1', 201)
parser.addRecord(2, 'rec2', 202)
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.UPDATE_TYPE)
runner.enqueue(new byte[0])
runner.run(1,true,false)
runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
stmt = conn.createStatement()
rs = stmt.executeQuery('SELECT * FROM PERSONS')
assertTrue(rs.next())
assertEquals(1, rs.getInt(1))
assertEquals('rec1', rs.getString(2))
assertEquals(201, rs.getInt(3))
assertTrue(rs.next())
assertEquals(2, rs.getInt(1))
assertEquals('rec2', rs.getString(2))
assertEquals(202, rs.getInt(3))
assertFalse(rs.next())
stmt.close()
conn.close()
}
@Test
void testUpdateNoPrimaryKeys() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable("PERSONS", 'CREATE TABLE PERSONS (id integer, name varchar(100), code integer)')