NIFI-8996: Close JDBC statements in PutHive*QL processors.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #5280
This commit is contained in:
Peter Turcsanyi 2021-08-03 20:41:43 +02:00 committed by Matthew Burgess
parent 633cdab121
commit a239eea8ff
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
3 changed files with 65 additions and 62 deletions

View File

@ -230,29 +230,30 @@ public class PutHiveQL extends AbstractHiveQLProcessor {
final String hiveQL = hiveQLStr.trim();
if (!StringUtils.isEmpty(hiveQL)) {
final PreparedStatement stmt = conn.prepareStatement(hiveQL);
try (final PreparedStatement stmt = conn.prepareStatement(hiveQL)) {
// Get ParameterMetadata
// Hive JDBC Doesn't support this yet:
// ParameterMetaData pmd = stmt.getParameterMetaData();
// int paramCount = pmd.getParameterCount();
int paramCount = StringUtils.countMatches(hiveQL, "?");
// Get ParameterMetadata
// Hive JDBC Doesn't support this yet:
// ParameterMetaData pmd = stmt.getParameterMetaData();
// int paramCount = pmd.getParameterCount();
int paramCount = StringUtils.countMatches(hiveQL, "?");
if (paramCount > 0) {
loc = setParameters(loc, stmt, paramCount, flowFile.getAttributes());
if (paramCount > 0) {
loc = setParameters(loc, stmt, paramCount, flowFile.getAttributes());
}
// Parse hiveQL and extract input/output tables
try {
tableNames.addAll(findTableNames(hiveQL));
} catch (Exception e) {
// If failed to parse the query, just log a warning message, but continue.
getLogger().warn("Failed to parse hiveQL: {} due to {}", new Object[]{hiveQL, e}, e);
}
// Execute the statement
stmt.execute();
fc.proceed();
}
// Parse hiveQL and extract input/output tables
try {
tableNames.addAll(findTableNames(hiveQL));
} catch (Exception e) {
// If failed to parse the query, just log a warning message, but continue.
getLogger().warn("Failed to parse hiveQL: {} due to {}", new Object[]{hiveQL, e}, e);
}
// Execute the statement
stmt.execute();
fc.proceed();
}
}

View File

@ -231,31 +231,32 @@ public class PutHive3QL extends AbstractHive3QLProcessor {
final String hiveQL = hiveQLStr.trim();
if (!StringUtils.isEmpty(hiveQL)) {
final PreparedStatement stmt = conn.prepareStatement(hiveQL);
try (final PreparedStatement stmt = conn.prepareStatement(hiveQL)) {
// Get ParameterMetadata
// Hive JDBC Doesn't support this yet:
// ParameterMetaData pmd = stmt.getParameterMetaData();
// int paramCount = pmd.getParameterCount();
int paramCount = StringUtils.countMatches(hiveQL, "?");
// Get ParameterMetadata
// Hive JDBC Doesn't support this yet:
// ParameterMetaData pmd = stmt.getParameterMetaData();
// int paramCount = pmd.getParameterCount();
int paramCount = StringUtils.countMatches(hiveQL, "?");
if (paramCount > 0) {
loc = setParameters(loc, stmt, paramCount, flowFile.getAttributes());
if (paramCount > 0) {
loc = setParameters(loc, stmt, paramCount, flowFile.getAttributes());
}
// Parse hiveQL and extract input/output tables
try {
tableNames.addAll(findTableNames(hiveQL));
} catch (Exception e) {
// If failed to parse the query, just log a warning message, but continue.
getLogger().warn("Failed to parse hiveQL: {} due to {}", new Object[]{hiveQL, e}, e);
}
stmt.setQueryTimeout(context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(flowFile).asInteger());
// Execute the statement
stmt.execute();
fc.proceed();
}
// Parse hiveQL and extract input/output tables
try {
tableNames.addAll(findTableNames(hiveQL));
} catch (Exception e) {
// If failed to parse the query, just log a warning message, but continue.
getLogger().warn("Failed to parse hiveQL: {} due to {}", new Object[]{hiveQL, e}, e);
}
stmt.setQueryTimeout(context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(flowFile).asInteger());
// Execute the statement
stmt.execute();
fc.proceed();
}
}

View File

@ -230,29 +230,30 @@ public class PutHive_1_1QL extends AbstractHive_1_1QLProcessor {
final String hiveQL = hiveQLStr.trim();
if (!StringUtils.isEmpty(hiveQL)) {
final PreparedStatement stmt = conn.prepareStatement(hiveQL);
try (final PreparedStatement stmt = conn.prepareStatement(hiveQL)) {
// Get ParameterMetadata
// Hive JDBC Doesn't support this yet:
// ParameterMetaData pmd = stmt.getParameterMetaData();
// int paramCount = pmd.getParameterCount();
int paramCount = StringUtils.countMatches(hiveQL, "?");
// Get ParameterMetadata
// Hive JDBC Doesn't support this yet:
// ParameterMetaData pmd = stmt.getParameterMetaData();
// int paramCount = pmd.getParameterCount();
int paramCount = StringUtils.countMatches(hiveQL, "?");
if (paramCount > 0) {
loc = setParameters(loc, stmt, paramCount, flowFile.getAttributes());
if (paramCount > 0) {
loc = setParameters(loc, stmt, paramCount, flowFile.getAttributes());
}
// Parse hiveQL and extract input/output tables
try {
tableNames.addAll(findTableNames(hiveQL));
} catch (Exception e) {
// If failed to parse the query, just log a warning message, but continue.
getLogger().warn("Failed to parse hiveQL: {} due to {}", new Object[]{hiveQL, e}, e);
}
// Execute the statement
stmt.execute();
fc.proceed();
}
// Parse hiveQL and extract input/output tables
try {
tableNames.addAll(findTableNames(hiveQL));
} catch (Exception e) {
// If failed to parse the query, just log a warning message, but continue.
getLogger().warn("Failed to parse hiveQL: {} due to {}", new Object[]{hiveQL, e}, e);
}
// Execute the statement
stmt.execute();
fc.proceed();
}
}