From 803ba882aa15142a9986dc0c23bbf4db11fe15a7 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 20 Jan 2021 08:57:51 -0500 Subject: [PATCH] NIFI-8146: Ensure that we close the Connection/Statement/PreparedStatement objects in finally blocks or try-with-resources Signed-off-by: Matthew Burgess This closes #4770 --- .../standard/PutDatabaseRecord.java | 210 ++++++++++-------- 1 file changed, 117 insertions(+), 93 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java index e32e2ee973..d926b3e22b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java @@ -517,6 +517,12 @@ public class PutDatabaseRecord extends AbstractProcessor { getLogger().warn("Failed to set auto-commit back to true on connection {} after finishing update", connection); } } + + try { + connection.close(); + } catch (final Exception e) { + getLogger().warn("Failed to close database connection", e); + } } } @@ -537,10 +543,10 @@ public class PutDatabaseRecord extends AbstractProcessor { throw new IllegalArgumentException(format("Record schema does not contain Field Containing SQL: %s, FlowFile %s", sqlField, flowFile)); } - try (final Statement s = connection.createStatement()) { + try (final Statement statement = connection.createStatement()) { final int timeoutMillis = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue(); try { - s.setQueryTimeout(timeoutMillis); // timeout in seconds + statement.setQueryTimeout(timeoutMillis); // timeout in seconds } catch (SQLException se) { // If the driver doesn't support query timeout, then assume it is "infinite". Allow a timeout of zero only if (timeoutMillis > 0) { @@ -557,13 +563,13 @@ public class PutDatabaseRecord extends AbstractProcessor { // Execute the statement(s) as-is if (context.getProperty(ALLOW_MULTIPLE_STATEMENTS).asBoolean()) { - String regex = "(? { try { - return TableSchema.from(con, catalog, schemaName, tableName, settings.translateFieldNames, includePrimaryKeys); + final TableSchema schema = TableSchema.from(con, catalog, schemaName, tableName, settings.translateFieldNames, includePrimaryKeys); + getLogger().debug("Fetched Table Schema {} for table name {}", schema, tableName); + return schema; } catch (SQLException e) { throw new ProcessException(e); } @@ -613,103 +621,109 @@ public class PutDatabaseRecord extends AbstractProcessor { Record outerRecord; PreparedStatement lastPreparedStatement = null; - while ((outerRecord = recordReader.nextRecord()) != null) { - final String statementType; - if (USE_RECORD_PATH.equalsIgnoreCase(explicitStatementType)) { - statementType = recordPathOperationType.apply(outerRecord); - } else { - statementType = explicitStatementType; - } + try { + while ((outerRecord = recordReader.nextRecord()) != null) { + final String statementType; + if (USE_RECORD_PATH.equalsIgnoreCase(explicitStatementType)) { + statementType = recordPathOperationType.apply(outerRecord); + } else { + statementType = explicitStatementType; + } - final List dataRecords = getDataRecords(outerRecord); - for (final Record currentRecord : dataRecords) { - PreparedSqlAndColumns preparedSqlAndColumns = preparedSql.get(statementType); - if (preparedSqlAndColumns == null) { - final RecordSchema recordSchema = currentRecord.getSchema(); + final List dataRecords = getDataRecords(outerRecord); + for (final Record currentRecord : dataRecords) { + PreparedSqlAndColumns preparedSqlAndColumns = preparedSql.get(statementType); + if (preparedSqlAndColumns == null) { + final RecordSchema recordSchema = currentRecord.getSchema(); - final SqlAndIncludedColumns sqlHolder; - if (INSERT_TYPE.equalsIgnoreCase(statementType)) { - sqlHolder = generateInsert(recordSchema, fqTableName, tableSchema, settings); - } else if (UPDATE_TYPE.equalsIgnoreCase(statementType)) { - sqlHolder = generateUpdate(recordSchema, fqTableName, updateKeys, tableSchema, settings); - } else if (DELETE_TYPE.equalsIgnoreCase(statementType)) { - sqlHolder = generateDelete(recordSchema, fqTableName, tableSchema, settings); - } else if (UPSERT_TYPE.equalsIgnoreCase(statementType)) { - sqlHolder = generateUpsert(recordSchema, fqTableName, updateKeys, tableSchema, settings); - } else { - throw new IllegalArgumentException(format("Statement Type %s is not valid, FlowFile %s", statementType, flowFile)); + final SqlAndIncludedColumns sqlHolder; + if (INSERT_TYPE.equalsIgnoreCase(statementType)) { + sqlHolder = generateInsert(recordSchema, fqTableName, tableSchema, settings); + } else if (UPDATE_TYPE.equalsIgnoreCase(statementType)) { + sqlHolder = generateUpdate(recordSchema, fqTableName, updateKeys, tableSchema, settings); + } else if (DELETE_TYPE.equalsIgnoreCase(statementType)) { + sqlHolder = generateDelete(recordSchema, fqTableName, tableSchema, settings); + } else if (UPSERT_TYPE.equalsIgnoreCase(statementType)) { + sqlHolder = generateUpsert(recordSchema, fqTableName, updateKeys, tableSchema, settings); + } else { + throw new IllegalArgumentException(format("Statement Type %s is not valid, FlowFile %s", statementType, flowFile)); + } + + // Create the Prepared Statement + final PreparedStatement preparedStatement = con.prepareStatement(sqlHolder.getSql()); + + try { + preparedStatement.setQueryTimeout(timeoutMillis); // timeout in seconds + } catch (final SQLException se) { + // If the driver doesn't support query timeout, then assume it is "infinite". Allow a timeout of zero only + if (timeoutMillis > 0) { + throw se; + } + } + + preparedSqlAndColumns = new PreparedSqlAndColumns(sqlHolder, preparedStatement); + preparedSql.put(statementType, preparedSqlAndColumns); } - // Create the Prepared Statement - final PreparedStatement preparedStatement = con.prepareStatement(sqlHolder.getSql()); + final PreparedStatement ps = preparedSqlAndColumns.getPreparedStatement(); + final List fieldIndexes = preparedSqlAndColumns.getSqlAndIncludedColumns().getFieldIndexes(); + final String sql = preparedSqlAndColumns.getSqlAndIncludedColumns().getSql(); - try { - preparedStatement.setQueryTimeout(timeoutMillis); // timeout in seconds - } catch (final SQLException se) { - // If the driver doesn't support query timeout, then assume it is "infinite". Allow a timeout of zero only - if (timeoutMillis > 0) { - throw se; + if (currentBatchSize > 0 && ps != lastPreparedStatement && lastPreparedStatement != null) { + batchIndex++; + log.debug("Executing query {} because Statement Type changed between Records for {}; fieldIndexes: {}; batch index: {}; batch size: {}", + sql, flowFile, fieldIndexes, batchIndex, currentBatchSize); + lastPreparedStatement.executeBatch(); + + session.adjustCounter("Batches Executed", 1, false); + currentBatchSize = 0; + } + lastPreparedStatement = ps; + + final Object[] values = currentRecord.getValues(); + final List dataTypes = currentRecord.getSchema().getDataTypes(); + + for (int i = 0; i < fieldIndexes.size(); i++) { + final int currentFieldIndex = fieldIndexes.get(i); + final Object currentValue = values[currentFieldIndex]; + final DataType dataType = dataTypes.get(currentFieldIndex); + final int sqlType = DataTypeUtils.getSQLTypeValue(dataType); + + // If DELETE type, insert the object twice because of the null check (see generateDelete for details) + if (DELETE_TYPE.equalsIgnoreCase(statementType)) { + ps.setObject(i * 2 + 1, currentValue, sqlType); + ps.setObject(i * 2 + 2, currentValue, sqlType); + } else if (UPSERT_TYPE.equalsIgnoreCase(statementType)) { + final int timesToAddObjects = databaseAdapter.getTimesToAddColumnObjectsForUpsert(); + for (int j = 0; j < timesToAddObjects; j++) { + ps.setObject(i + (fieldIndexes.size() * j) + 1, currentValue, sqlType); + } + } else { + ps.setObject(i + 1, currentValue, sqlType); } } - preparedSqlAndColumns = new PreparedSqlAndColumns(sqlHolder, preparedStatement); - preparedSql.put(statementType, preparedSqlAndColumns); - } - - final PreparedStatement ps = preparedSqlAndColumns.getPreparedStatement(); - final List fieldIndexes = preparedSqlAndColumns.getSqlAndIncludedColumns().getFieldIndexes(); - final String sql = preparedSqlAndColumns.getSqlAndIncludedColumns().getSql(); - - if (currentBatchSize > 0 && ps != lastPreparedStatement && lastPreparedStatement != null) { - batchIndex++; - log.debug("Executing query {} because Statement Type changed between Records for {}; fieldIndexes: {}; batch index: {}; batch size: {}", - sql, flowFile, fieldIndexes, batchIndex, currentBatchSize); - lastPreparedStatement.executeBatch(); - - session.adjustCounter("Batches Executed", 1, false); - currentBatchSize = 0; - } - lastPreparedStatement = ps; - - final Object[] values = currentRecord.getValues(); - final List dataTypes = currentRecord.getSchema().getDataTypes(); - - for (int i = 0; i < fieldIndexes.size(); i++) { - final int currentFieldIndex = fieldIndexes.get(i); - final Object currentValue = values[currentFieldIndex]; - final DataType dataType = dataTypes.get(currentFieldIndex); - final int sqlType = DataTypeUtils.getSQLTypeValue(dataType); - - // If DELETE type, insert the object twice because of the null check (see generateDelete for details) - if (DELETE_TYPE.equalsIgnoreCase(statementType)) { - ps.setObject(i * 2 + 1, currentValue, sqlType); - ps.setObject(i * 2 + 2, currentValue, sqlType); - } else if (UPSERT_TYPE.equalsIgnoreCase(statementType)) { - final int timesToAddObjects = databaseAdapter.getTimesToAddColumnObjectsForUpsert(); - for (int j = 0; j < timesToAddObjects; j++) { - ps.setObject(i + (fieldIndexes.size() * j) + 1, currentValue, sqlType); - } - } else { - ps.setObject(i + 1, currentValue, sqlType); + ps.addBatch(); + session.adjustCounter(statementType + " updates performed", 1, false); + if (++currentBatchSize == maxBatchSize) { + batchIndex++; + log.debug("Executing query {} because batch reached max size for {}; fieldIndexes: {}; batch index: {}; batch size: {}", + sql, flowFile, fieldIndexes, batchIndex, currentBatchSize); + session.adjustCounter("Batches Executed", 1, false); + ps.executeBatch(); + currentBatchSize = 0; } } - - ps.addBatch(); - session.adjustCounter(statementType + " updates performed", 1, false); - if (++currentBatchSize == maxBatchSize) { - batchIndex++; - log.debug("Executing query {} because batch reached max size for {}; fieldIndexes: {}; batch index: {}; batch size: {}", - sql, flowFile, fieldIndexes, batchIndex, currentBatchSize); - session.adjustCounter("Batches Executed", 1, false); - ps.executeBatch(); - currentBatchSize = 0; - } } - } - if (currentBatchSize > 0) { - lastPreparedStatement.executeBatch(); - session.adjustCounter("Batches Executed", 1, false); + if (currentBatchSize > 0) { + lastPreparedStatement.executeBatch(); + session.adjustCounter("Batches Executed", 1, false); + } + } finally { + for (final PreparedSqlAndColumns preparedSqlAndColumns : preparedSql.values()) { + preparedSqlAndColumns.getPreparedStatement().close(); + } } } @@ -1222,6 +1236,11 @@ public class PutDatabaseRecord extends AbstractProcessor { return new TableSchema(cols, translateColumnNames, primaryKeyColumns, dmd.getIdentifierQuoteString()); } } + + @Override + public String toString() { + return "TableSchema[columns=" + columns.values() + "]"; + } } protected static class ColumnDescription { @@ -1279,6 +1298,11 @@ public class PutDatabaseRecord extends AbstractProcessor { return new ColumnDescription(columnName, dataType, required, colSize == 0 ? null : colSize); } + + @Override + public String toString() { + return "Column[name=" + columnName + ", dataType=" + dataType + ", required=" + required + ", columnSize=" + columnSize + "]"; + } } static class SchemaKey {