NIFI-8146: Ensure that we close the Connection/Statement/PreparedStatement objects in finally blocks or try-with-resources

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #4770
This commit is contained in:
Mark Payne 2021-01-20 08:57:51 -05:00 committed by Matthew Burgess
parent 25ab050ed7
commit 803ba882aa
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24

View File

@ -517,6 +517,12 @@ public class PutDatabaseRecord extends AbstractProcessor {
getLogger().warn("Failed to set auto-commit back to true on connection {} after finishing update", connection);
}
}
try {
connection.close();
} catch (final Exception e) {
getLogger().warn("Failed to close database connection", e);
}
}
}
@ -537,10 +543,10 @@ public class PutDatabaseRecord extends AbstractProcessor {
throw new IllegalArgumentException(format("Record schema does not contain Field Containing SQL: %s, FlowFile %s", sqlField, flowFile));
}
try (final Statement s = connection.createStatement()) {
try (final Statement statement = connection.createStatement()) {
final int timeoutMillis = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
try {
s.setQueryTimeout(timeoutMillis); // timeout in seconds
statement.setQueryTimeout(timeoutMillis); // timeout in seconds
} catch (SQLException se) {
// If the driver doesn't support query timeout, then assume it is "infinite". Allow a timeout of zero only
if (timeoutMillis > 0) {
@ -557,13 +563,13 @@ public class PutDatabaseRecord extends AbstractProcessor {
// Execute the statement(s) as-is
if (context.getProperty(ALLOW_MULTIPLE_STATEMENTS).asBoolean()) {
String regex = "(?<!\\\\);";
String[] sqlStatements = (sql).split(regex);
for (String statement : sqlStatements) {
s.execute(statement);
final String regex = "(?<!\\\\);";
final String[] sqlStatements = (sql).split(regex);
for (String sqlStatement : sqlStatements) {
statement.execute(sqlStatement);
}
} else {
s.execute(sql);
statement.execute(sql);
}
}
}
@ -595,7 +601,9 @@ public class PutDatabaseRecord extends AbstractProcessor {
final SchemaKey schemaKey = new PutDatabaseRecord.SchemaKey(catalog, schemaName, tableName);
final TableSchema tableSchema = schemaCache.get(schemaKey, key -> {
try {
return TableSchema.from(con, catalog, schemaName, tableName, settings.translateFieldNames, includePrimaryKeys);
final TableSchema schema = TableSchema.from(con, catalog, schemaName, tableName, settings.translateFieldNames, includePrimaryKeys);
getLogger().debug("Fetched Table Schema {} for table name {}", schema, tableName);
return schema;
} catch (SQLException e) {
throw new ProcessException(e);
}
@ -613,103 +621,109 @@ public class PutDatabaseRecord extends AbstractProcessor {
Record outerRecord;
PreparedStatement lastPreparedStatement = null;
while ((outerRecord = recordReader.nextRecord()) != null) {
final String statementType;
if (USE_RECORD_PATH.equalsIgnoreCase(explicitStatementType)) {
statementType = recordPathOperationType.apply(outerRecord);
} else {
statementType = explicitStatementType;
}
try {
while ((outerRecord = recordReader.nextRecord()) != null) {
final String statementType;
if (USE_RECORD_PATH.equalsIgnoreCase(explicitStatementType)) {
statementType = recordPathOperationType.apply(outerRecord);
} else {
statementType = explicitStatementType;
}
final List<Record> dataRecords = getDataRecords(outerRecord);
for (final Record currentRecord : dataRecords) {
PreparedSqlAndColumns preparedSqlAndColumns = preparedSql.get(statementType);
if (preparedSqlAndColumns == null) {
final RecordSchema recordSchema = currentRecord.getSchema();
final List<Record> dataRecords = getDataRecords(outerRecord);
for (final Record currentRecord : dataRecords) {
PreparedSqlAndColumns preparedSqlAndColumns = preparedSql.get(statementType);
if (preparedSqlAndColumns == null) {
final RecordSchema recordSchema = currentRecord.getSchema();
final SqlAndIncludedColumns sqlHolder;
if (INSERT_TYPE.equalsIgnoreCase(statementType)) {
sqlHolder = generateInsert(recordSchema, fqTableName, tableSchema, settings);
} else if (UPDATE_TYPE.equalsIgnoreCase(statementType)) {
sqlHolder = generateUpdate(recordSchema, fqTableName, updateKeys, tableSchema, settings);
} else if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
sqlHolder = generateDelete(recordSchema, fqTableName, tableSchema, settings);
} else if (UPSERT_TYPE.equalsIgnoreCase(statementType)) {
sqlHolder = generateUpsert(recordSchema, fqTableName, updateKeys, tableSchema, settings);
} else {
throw new IllegalArgumentException(format("Statement Type %s is not valid, FlowFile %s", statementType, flowFile));
final SqlAndIncludedColumns sqlHolder;
if (INSERT_TYPE.equalsIgnoreCase(statementType)) {
sqlHolder = generateInsert(recordSchema, fqTableName, tableSchema, settings);
} else if (UPDATE_TYPE.equalsIgnoreCase(statementType)) {
sqlHolder = generateUpdate(recordSchema, fqTableName, updateKeys, tableSchema, settings);
} else if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
sqlHolder = generateDelete(recordSchema, fqTableName, tableSchema, settings);
} else if (UPSERT_TYPE.equalsIgnoreCase(statementType)) {
sqlHolder = generateUpsert(recordSchema, fqTableName, updateKeys, tableSchema, settings);
} else {
throw new IllegalArgumentException(format("Statement Type %s is not valid, FlowFile %s", statementType, flowFile));
}
// Create the Prepared Statement
final PreparedStatement preparedStatement = con.prepareStatement(sqlHolder.getSql());
try {
preparedStatement.setQueryTimeout(timeoutMillis); // timeout in seconds
} catch (final SQLException se) {
// If the driver doesn't support query timeout, then assume it is "infinite". Allow a timeout of zero only
if (timeoutMillis > 0) {
throw se;
}
}
preparedSqlAndColumns = new PreparedSqlAndColumns(sqlHolder, preparedStatement);
preparedSql.put(statementType, preparedSqlAndColumns);
}
// Create the Prepared Statement
final PreparedStatement preparedStatement = con.prepareStatement(sqlHolder.getSql());
final PreparedStatement ps = preparedSqlAndColumns.getPreparedStatement();
final List<Integer> fieldIndexes = preparedSqlAndColumns.getSqlAndIncludedColumns().getFieldIndexes();
final String sql = preparedSqlAndColumns.getSqlAndIncludedColumns().getSql();
try {
preparedStatement.setQueryTimeout(timeoutMillis); // timeout in seconds
} catch (final SQLException se) {
// If the driver doesn't support query timeout, then assume it is "infinite". Allow a timeout of zero only
if (timeoutMillis > 0) {
throw se;
if (currentBatchSize > 0 && ps != lastPreparedStatement && lastPreparedStatement != null) {
batchIndex++;
log.debug("Executing query {} because Statement Type changed between Records for {}; fieldIndexes: {}; batch index: {}; batch size: {}",
sql, flowFile, fieldIndexes, batchIndex, currentBatchSize);
lastPreparedStatement.executeBatch();
session.adjustCounter("Batches Executed", 1, false);
currentBatchSize = 0;
}
lastPreparedStatement = ps;
final Object[] values = currentRecord.getValues();
final List<DataType> dataTypes = currentRecord.getSchema().getDataTypes();
for (int i = 0; i < fieldIndexes.size(); i++) {
final int currentFieldIndex = fieldIndexes.get(i);
final Object currentValue = values[currentFieldIndex];
final DataType dataType = dataTypes.get(currentFieldIndex);
final int sqlType = DataTypeUtils.getSQLTypeValue(dataType);
// If DELETE type, insert the object twice because of the null check (see generateDelete for details)
if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
ps.setObject(i * 2 + 1, currentValue, sqlType);
ps.setObject(i * 2 + 2, currentValue, sqlType);
} else if (UPSERT_TYPE.equalsIgnoreCase(statementType)) {
final int timesToAddObjects = databaseAdapter.getTimesToAddColumnObjectsForUpsert();
for (int j = 0; j < timesToAddObjects; j++) {
ps.setObject(i + (fieldIndexes.size() * j) + 1, currentValue, sqlType);
}
} else {
ps.setObject(i + 1, currentValue, sqlType);
}
}
preparedSqlAndColumns = new PreparedSqlAndColumns(sqlHolder, preparedStatement);
preparedSql.put(statementType, preparedSqlAndColumns);
}
final PreparedStatement ps = preparedSqlAndColumns.getPreparedStatement();
final List<Integer> fieldIndexes = preparedSqlAndColumns.getSqlAndIncludedColumns().getFieldIndexes();
final String sql = preparedSqlAndColumns.getSqlAndIncludedColumns().getSql();
if (currentBatchSize > 0 && ps != lastPreparedStatement && lastPreparedStatement != null) {
batchIndex++;
log.debug("Executing query {} because Statement Type changed between Records for {}; fieldIndexes: {}; batch index: {}; batch size: {}",
sql, flowFile, fieldIndexes, batchIndex, currentBatchSize);
lastPreparedStatement.executeBatch();
session.adjustCounter("Batches Executed", 1, false);
currentBatchSize = 0;
}
lastPreparedStatement = ps;
final Object[] values = currentRecord.getValues();
final List<DataType> dataTypes = currentRecord.getSchema().getDataTypes();
for (int i = 0; i < fieldIndexes.size(); i++) {
final int currentFieldIndex = fieldIndexes.get(i);
final Object currentValue = values[currentFieldIndex];
final DataType dataType = dataTypes.get(currentFieldIndex);
final int sqlType = DataTypeUtils.getSQLTypeValue(dataType);
// If DELETE type, insert the object twice because of the null check (see generateDelete for details)
if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
ps.setObject(i * 2 + 1, currentValue, sqlType);
ps.setObject(i * 2 + 2, currentValue, sqlType);
} else if (UPSERT_TYPE.equalsIgnoreCase(statementType)) {
final int timesToAddObjects = databaseAdapter.getTimesToAddColumnObjectsForUpsert();
for (int j = 0; j < timesToAddObjects; j++) {
ps.setObject(i + (fieldIndexes.size() * j) + 1, currentValue, sqlType);
}
} else {
ps.setObject(i + 1, currentValue, sqlType);
ps.addBatch();
session.adjustCounter(statementType + " updates performed", 1, false);
if (++currentBatchSize == maxBatchSize) {
batchIndex++;
log.debug("Executing query {} because batch reached max size for {}; fieldIndexes: {}; batch index: {}; batch size: {}",
sql, flowFile, fieldIndexes, batchIndex, currentBatchSize);
session.adjustCounter("Batches Executed", 1, false);
ps.executeBatch();
currentBatchSize = 0;
}
}
ps.addBatch();
session.adjustCounter(statementType + " updates performed", 1, false);
if (++currentBatchSize == maxBatchSize) {
batchIndex++;
log.debug("Executing query {} because batch reached max size for {}; fieldIndexes: {}; batch index: {}; batch size: {}",
sql, flowFile, fieldIndexes, batchIndex, currentBatchSize);
session.adjustCounter("Batches Executed", 1, false);
ps.executeBatch();
currentBatchSize = 0;
}
}
}
if (currentBatchSize > 0) {
lastPreparedStatement.executeBatch();
session.adjustCounter("Batches Executed", 1, false);
if (currentBatchSize > 0) {
lastPreparedStatement.executeBatch();
session.adjustCounter("Batches Executed", 1, false);
}
} finally {
for (final PreparedSqlAndColumns preparedSqlAndColumns : preparedSql.values()) {
preparedSqlAndColumns.getPreparedStatement().close();
}
}
}
@ -1222,6 +1236,11 @@ public class PutDatabaseRecord extends AbstractProcessor {
return new TableSchema(cols, translateColumnNames, primaryKeyColumns, dmd.getIdentifierQuoteString());
}
}
@Override
public String toString() {
return "TableSchema[columns=" + columns.values() + "]";
}
}
protected static class ColumnDescription {
@ -1279,6 +1298,11 @@ public class PutDatabaseRecord extends AbstractProcessor {
return new ColumnDescription(columnName, dataType, required, colSize == 0 ? null : colSize);
}
@Override
public String toString() {
return "Column[name=" + columnName + ", dataType=" + dataType + ", required=" + required + ", columnSize=" + columnSize + "]";
}
}
static class SchemaKey {