NIFI-9800: Unwrap SQLException in PutDatabaseRecord when table does not exist

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #5871
This commit is contained in:
Matthew Burgess 2022-03-16 11:12:07 -04:00
parent 2dc23e7309
commit c84f438782
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
1 changed files with 21 additions and 10 deletions

View File

@ -604,18 +604,29 @@ public class PutDatabaseRecord extends AbstractProcessor {
} }
final SchemaKey schemaKey = new PutDatabaseRecord.SchemaKey(catalog, schemaName, tableName); final SchemaKey schemaKey = new PutDatabaseRecord.SchemaKey(catalog, schemaName, tableName);
final TableSchema tableSchema = schemaCache.get(schemaKey, key -> { final TableSchema tableSchema;
try {
tableSchema = schemaCache.get(schemaKey, key -> {
try { try {
final TableSchema schema = TableSchema.from(con, catalog, schemaName, tableName, settings.translateFieldNames, updateKeys, log); final TableSchema schema = TableSchema.from(con, catalog, schemaName, tableName, settings.translateFieldNames, updateKeys, log);
getLogger().debug("Fetched Table Schema {} for table name {}", schema, tableName); getLogger().debug("Fetched Table Schema {} for table name {}", schema, tableName);
return schema; return schema;
} catch (SQLException e) { } catch (SQLException e) {
// Wrap this in a runtime exception, it is unwrapped in the outer try
throw new ProcessException(e); throw new ProcessException(e);
} }
}); });
if (tableSchema == null) { if (tableSchema == null) {
throw new IllegalArgumentException("No table schema specified!"); throw new IllegalArgumentException("No table schema specified!");
} }
} catch (ProcessException pe) {
// Unwrap the SQLException if one occurred
if (pe.getCause() instanceof SQLException) {
throw (SQLException) pe.getCause();
} else {
throw pe;
}
}
// build the fully qualified table name // build the fully qualified table name
final String fqTableName = generateTableName(settings, catalog, schemaName, tableName, tableSchema); final String fqTableName = generateTableName(settings, catalog, schemaName, tableName, tableSchema);