NIFI-8535: Better error message in PutDatabaseRecord when table does not exist (#5070)

* NIFI-8535: Better error message in PutDatabaseRecord when table does not exist
This commit is contained in:
Matthew Burgess 2021-05-14 15:13:00 -04:00 committed by GitHub
parent ed591e0f22
commit f812dfdfc0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 30 additions and 2 deletions

View File

@ -611,7 +611,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
final SchemaKey schemaKey = new PutDatabaseRecord.SchemaKey(catalog, schemaName, tableName);
final TableSchema tableSchema = schemaCache.get(schemaKey, key -> {
try {
final TableSchema schema = TableSchema.from(con, catalog, schemaName, tableName, settings.translateFieldNames, includePrimaryKeys);
final TableSchema schema = TableSchema.from(con, catalog, schemaName, tableName, settings.translateFieldNames, includePrimaryKeys, log);
getLogger().debug("Fetched Table Schema {} for table name {}", schema, tableName);
return schema;
} catch (SQLException e) {
@ -1415,7 +1415,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
}
public static TableSchema from(final Connection conn, final String catalog, final String schema, final String tableName,
final boolean translateColumnNames, final boolean includePrimaryKeys) throws SQLException {
final boolean translateColumnNames, final boolean includePrimaryKeys, ComponentLog log) throws SQLException {
final DatabaseMetaData dmd = conn.getMetaData();
try (final ResultSet colrs = dmd.getColumns(catalog, schema, tableName, "%")) {
@ -1424,6 +1424,31 @@ public class PutDatabaseRecord extends AbstractProcessor {
final ColumnDescription col = ColumnDescription.from(colrs);
cols.add(col);
}
// If no columns are found, check that the table exists
if (cols.isEmpty()) {
try (final ResultSet tblrs = dmd.getTables(catalog, schema, tableName, null)) {
List<String> qualifiedNameSegments = new ArrayList<>();
if (catalog != null) {
qualifiedNameSegments.add(catalog);
}
if (schema != null) {
qualifiedNameSegments.add(schema);
}
if (tableName != null) {
qualifiedNameSegments.add(tableName);
}
if (!tblrs.next()) {
throw new SQLException("Table "
+ String.join(".", qualifiedNameSegments)
+ " not found, ensure the Catalog, Schema, and/or Table Names match those in the database exactly");
} else {
log.warn("Table "
+ String.join(".", qualifiedNameSegments)
+ " found but no columns were found, if this is not expected then check the user permissions for getting table metadata from the database");
}
}
}
final Set<String> primaryKeyColumns = new HashSet<>();
if (includePrimaryKeys) {

View File

@ -483,6 +483,9 @@ class TestPutDatabaseRecord {
runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0)
runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1)
MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutDatabaseRecord.REL_FAILURE).get(0);
final String errorMessage = flowFile.getAttribute("putdatabaserecord.error")
assertTrue(errorMessage.contains("PERSONS2"))
}
@Test