NIFI-6185: ListDatabaseTables processor doesn't close ResultSets

This closes #3405.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Lars Francke 2019-04-03 22:53:39 +02:00 committed by Koji Kawamura
parent 3a63de2ae2
commit 73571ae309

View File

@ -230,79 +230,85 @@ public class ListDatabaseTables extends AbstractProcessor {
try (final Connection con = dbcpService.getConnection(Collections.emptyMap())) {
DatabaseMetaData dbMetaData = con.getMetaData();
ResultSet rs = dbMetaData.getTables(catalog, schemaPattern, tableNamePattern, tableTypes);
while (rs.next()) {
final String tableCatalog = rs.getString(1);
final String tableSchema = rs.getString(2);
final String tableName = rs.getString(3);
final String tableType = rs.getString(4);
final String tableRemarks = rs.getString(5);
try (ResultSet rs = dbMetaData.getTables(catalog, schemaPattern, tableNamePattern, tableTypes)) {
while (rs.next()) {
final String tableCatalog = rs.getString(1);
final String tableSchema = rs.getString(2);
final String tableName = rs.getString(3);
final String tableType = rs.getString(4);
final String tableRemarks = rs.getString(5);
// Build fully-qualified name
String fqn = Stream.of(tableCatalog, tableSchema, tableName)
.filter(segment -> !StringUtils.isEmpty(segment))
.collect(Collectors.joining("."));
// Build fully-qualified name
String fqn = Stream.of(tableCatalog, tableSchema, tableName)
.filter(segment -> !StringUtils.isEmpty(segment))
.collect(Collectors.joining("."));
String lastTimestampForTable = stateMapProperties.get(fqn);
boolean refreshTable = true;
try {
// Refresh state if the interval has elapsed
long lastRefreshed = -1;
final long currentTime = System.currentTimeMillis();
if (!StringUtils.isEmpty(lastTimestampForTable)) {
lastRefreshed = Long.parseLong(lastTimestampForTable);
}
if (lastRefreshed == -1 || (refreshInterval > 0 && currentTime >= (lastRefreshed + refreshInterval))) {
stateMapProperties.remove(lastTimestampForTable);
} else {
refreshTable = false;
}
} catch (final NumberFormatException nfe) {
getLogger().error("Failed to retrieve observed last table fetches from the State Manager. Will not perform "
+ "query until this is accomplished.", nfe);
context.yield();
return;
}
if (refreshTable) {
FlowFile flowFile = session.create();
logger.info("Found {}: {}", new Object[]{tableType, fqn});
if (includeCount) {
try (Statement st = con.createStatement()) {
final String countQuery = "SELECT COUNT(1) FROM " + fqn;
logger.debug("Executing query: {}", new Object[]{countQuery});
ResultSet countResult = st.executeQuery(countQuery);
if (countResult.next()) {
flowFile = session.putAttribute(flowFile, DB_TABLE_COUNT, Long.toString(countResult.getLong(1)));
}
} catch (SQLException se) {
logger.error("Couldn't get row count for {}", new Object[]{fqn});
session.remove(flowFile);
continue;
}
}
if (tableCatalog != null) {
flowFile = session.putAttribute(flowFile, DB_TABLE_CATALOG, tableCatalog);
}
if (tableSchema != null) {
flowFile = session.putAttribute(flowFile, DB_TABLE_SCHEMA, tableSchema);
}
flowFile = session.putAttribute(flowFile, DB_TABLE_NAME, tableName);
flowFile = session.putAttribute(flowFile, DB_TABLE_FULLNAME, fqn);
flowFile = session.putAttribute(flowFile, DB_TABLE_TYPE, tableType);
if (tableRemarks != null) {
flowFile = session.putAttribute(flowFile, DB_TABLE_REMARKS, tableRemarks);
}
String transitUri;
String lastTimestampForTable = stateMapProperties.get(fqn);
boolean refreshTable = true;
try {
transitUri = dbMetaData.getURL();
} catch (SQLException sqle) {
transitUri = "<unknown>";
// Refresh state if the interval has elapsed
long lastRefreshed = -1;
final long currentTime = System.currentTimeMillis();
if (!StringUtils.isEmpty(lastTimestampForTable)) {
lastRefreshed = Long.parseLong(lastTimestampForTable);
}
if (lastRefreshed == -1 || (refreshInterval > 0 && currentTime >= (lastRefreshed
+ refreshInterval))) {
stateMapProperties.remove(lastTimestampForTable);
} else {
refreshTable = false;
}
} catch (final NumberFormatException nfe) {
getLogger().error(
"Failed to retrieve observed last table fetches from the State Manager. Will not perform "
+ "query until this is accomplished.", nfe);
context.yield();
return;
}
if (refreshTable) {
FlowFile flowFile = session.create();
logger.info("Found {}: {}", new Object[] {tableType, fqn});
if (includeCount) {
try (Statement st = con.createStatement()) {
final String countQuery = "SELECT COUNT(1) FROM " + fqn;
logger.debug("Executing query: {}", new Object[] {countQuery});
try (ResultSet countResult = st.executeQuery(countQuery)) {
if (countResult.next()) {
flowFile = session
.putAttribute(flowFile, DB_TABLE_COUNT,
Long.toString(countResult.getLong(1)));
}
}
} catch (SQLException se) {
logger.error("Couldn't get row count for {}", new Object[] {fqn});
session.remove(flowFile);
continue;
}
}
if (tableCatalog != null) {
flowFile = session.putAttribute(flowFile, DB_TABLE_CATALOG, tableCatalog);
}
if (tableSchema != null) {
flowFile = session.putAttribute(flowFile, DB_TABLE_SCHEMA, tableSchema);
}
flowFile = session.putAttribute(flowFile, DB_TABLE_NAME, tableName);
flowFile = session.putAttribute(flowFile, DB_TABLE_FULLNAME, fqn);
flowFile = session.putAttribute(flowFile, DB_TABLE_TYPE, tableType);
if (tableRemarks != null) {
flowFile = session.putAttribute(flowFile, DB_TABLE_REMARKS, tableRemarks);
}
String transitUri;
try {
transitUri = dbMetaData.getURL();
} catch (SQLException sqle) {
transitUri = "<unknown>";
}
session.getProvenanceReporter().receive(flowFile, transitUri);
session.transfer(flowFile, REL_SUCCESS);
stateMapProperties.put(fqn, Long.toString(System.currentTimeMillis()));
}
session.getProvenanceReporter().receive(flowFile, transitUri);
session.transfer(flowFile, REL_SUCCESS);
stateMapProperties.put(fqn, Long.toString(System.currentTimeMillis()));
}
}
// Update the timestamps for listed tables