From 73571ae30933063b121f9a757a64cf5f7f9c500e Mon Sep 17 00:00:00 2001 From: Lars Francke Date: Wed, 3 Apr 2019 22:53:39 +0200 Subject: [PATCH] NIFI-6185: ListDatabaseTables processor doesn't close ResultSets This closes #3405. Signed-off-by: Koji Kawamura --- .../standard/ListDatabaseTables.java | 144 +++++++++--------- 1 file changed, 75 insertions(+), 69 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java index ec2d3c1ab2..fa7dc5c2bb 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java @@ -230,79 +230,85 @@ public class ListDatabaseTables extends AbstractProcessor { try (final Connection con = dbcpService.getConnection(Collections.emptyMap())) { DatabaseMetaData dbMetaData = con.getMetaData(); - ResultSet rs = dbMetaData.getTables(catalog, schemaPattern, tableNamePattern, tableTypes); - while (rs.next()) { - final String tableCatalog = rs.getString(1); - final String tableSchema = rs.getString(2); - final String tableName = rs.getString(3); - final String tableType = rs.getString(4); - final String tableRemarks = rs.getString(5); + try (ResultSet rs = dbMetaData.getTables(catalog, schemaPattern, tableNamePattern, tableTypes)) { + while (rs.next()) { + final String tableCatalog = rs.getString(1); + final String tableSchema = rs.getString(2); + final String tableName = rs.getString(3); + final String tableType = rs.getString(4); + final String tableRemarks = rs.getString(5); - // Build fully-qualified name - String fqn = Stream.of(tableCatalog, tableSchema, tableName) - .filter(segment -> !StringUtils.isEmpty(segment)) - .collect(Collectors.joining(".")); + // Build fully-qualified name + String fqn = Stream.of(tableCatalog, tableSchema, tableName) + .filter(segment -> !StringUtils.isEmpty(segment)) + .collect(Collectors.joining(".")); - String lastTimestampForTable = stateMapProperties.get(fqn); - boolean refreshTable = true; - try { - // Refresh state if the interval has elapsed - long lastRefreshed = -1; - final long currentTime = System.currentTimeMillis(); - if (!StringUtils.isEmpty(lastTimestampForTable)) { - lastRefreshed = Long.parseLong(lastTimestampForTable); - } - if (lastRefreshed == -1 || (refreshInterval > 0 && currentTime >= (lastRefreshed + refreshInterval))) { - stateMapProperties.remove(lastTimestampForTable); - } else { - refreshTable = false; - } - } catch (final NumberFormatException nfe) { - getLogger().error("Failed to retrieve observed last table fetches from the State Manager. Will not perform " - + "query until this is accomplished.", nfe); - context.yield(); - return; - } - if (refreshTable) { - FlowFile flowFile = session.create(); - logger.info("Found {}: {}", new Object[]{tableType, fqn}); - if (includeCount) { - try (Statement st = con.createStatement()) { - final String countQuery = "SELECT COUNT(1) FROM " + fqn; - - logger.debug("Executing query: {}", new Object[]{countQuery}); - ResultSet countResult = st.executeQuery(countQuery); - if (countResult.next()) { - flowFile = session.putAttribute(flowFile, DB_TABLE_COUNT, Long.toString(countResult.getLong(1))); - } - } catch (SQLException se) { - logger.error("Couldn't get row count for {}", new Object[]{fqn}); - session.remove(flowFile); - continue; - } - } - if (tableCatalog != null) { - flowFile = session.putAttribute(flowFile, DB_TABLE_CATALOG, tableCatalog); - } - if (tableSchema != null) { - flowFile = session.putAttribute(flowFile, DB_TABLE_SCHEMA, tableSchema); - } - flowFile = session.putAttribute(flowFile, DB_TABLE_NAME, tableName); - flowFile = session.putAttribute(flowFile, DB_TABLE_FULLNAME, fqn); - flowFile = session.putAttribute(flowFile, DB_TABLE_TYPE, tableType); - if (tableRemarks != null) { - flowFile = session.putAttribute(flowFile, DB_TABLE_REMARKS, tableRemarks); - } - - String transitUri; + String lastTimestampForTable = stateMapProperties.get(fqn); + boolean refreshTable = true; try { - transitUri = dbMetaData.getURL(); - } catch (SQLException sqle) { - transitUri = ""; + // Refresh state if the interval has elapsed + long lastRefreshed = -1; + final long currentTime = System.currentTimeMillis(); + if (!StringUtils.isEmpty(lastTimestampForTable)) { + lastRefreshed = Long.parseLong(lastTimestampForTable); + } + if (lastRefreshed == -1 || (refreshInterval > 0 && currentTime >= (lastRefreshed + + refreshInterval))) { + stateMapProperties.remove(lastTimestampForTable); + } else { + refreshTable = false; + } + } catch (final NumberFormatException nfe) { + getLogger().error( + "Failed to retrieve observed last table fetches from the State Manager. Will not perform " + + "query until this is accomplished.", nfe); + context.yield(); + return; + } + if (refreshTable) { + FlowFile flowFile = session.create(); + logger.info("Found {}: {}", new Object[] {tableType, fqn}); + if (includeCount) { + try (Statement st = con.createStatement()) { + final String countQuery = "SELECT COUNT(1) FROM " + fqn; + + logger.debug("Executing query: {}", new Object[] {countQuery}); + try (ResultSet countResult = st.executeQuery(countQuery)) { + if (countResult.next()) { + flowFile = session + .putAttribute(flowFile, DB_TABLE_COUNT, + Long.toString(countResult.getLong(1))); + } + } + } catch (SQLException se) { + logger.error("Couldn't get row count for {}", new Object[] {fqn}); + session.remove(flowFile); + continue; + } + } + if (tableCatalog != null) { + flowFile = session.putAttribute(flowFile, DB_TABLE_CATALOG, tableCatalog); + } + if (tableSchema != null) { + flowFile = session.putAttribute(flowFile, DB_TABLE_SCHEMA, tableSchema); + } + flowFile = session.putAttribute(flowFile, DB_TABLE_NAME, tableName); + flowFile = session.putAttribute(flowFile, DB_TABLE_FULLNAME, fqn); + flowFile = session.putAttribute(flowFile, DB_TABLE_TYPE, tableType); + if (tableRemarks != null) { + flowFile = session.putAttribute(flowFile, DB_TABLE_REMARKS, tableRemarks); + } + + String transitUri; + try { + transitUri = dbMetaData.getURL(); + } catch (SQLException sqle) { + transitUri = ""; + } + session.getProvenanceReporter().receive(flowFile, transitUri); + session.transfer(flowFile, REL_SUCCESS); + stateMapProperties.put(fqn, Long.toString(System.currentTimeMillis())); } - session.getProvenanceReporter().receive(flowFile, transitUri); - session.transfer(flowFile, REL_SUCCESS); - stateMapProperties.put(fqn, Long.toString(System.currentTimeMillis())); } } // Update the timestamps for listed tables