From f812dfdfc0f008d372581c6e4c8770c88ad1c3e6 Mon Sep 17 00:00:00 2001 From: Matthew Burgess Date: Fri, 14 May 2021 15:13:00 -0400 Subject: [PATCH] NIFI-8535: Better error message in PutDatabaseRecord when table does not exist (#5070) * NIFI-8535: Better error message in PutDatabaseRecord when table does not exist --- .../standard/PutDatabaseRecord.java | 29 +++++++++++++++++-- .../standard/TestPutDatabaseRecord.groovy | 3 ++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java index 91407ea23e..1fae8c723e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java @@ -611,7 +611,7 @@ public class PutDatabaseRecord extends AbstractProcessor { final SchemaKey schemaKey = new PutDatabaseRecord.SchemaKey(catalog, schemaName, tableName); final TableSchema tableSchema = schemaCache.get(schemaKey, key -> { try { - final TableSchema schema = TableSchema.from(con, catalog, schemaName, tableName, settings.translateFieldNames, includePrimaryKeys); + final TableSchema schema = TableSchema.from(con, catalog, schemaName, tableName, settings.translateFieldNames, includePrimaryKeys, log); getLogger().debug("Fetched Table Schema {} for table name {}", schema, tableName); return schema; } catch (SQLException e) { @@ -1415,7 +1415,7 @@ public class PutDatabaseRecord extends AbstractProcessor { } public static TableSchema from(final Connection conn, final String catalog, final String schema, final String tableName, - final boolean translateColumnNames, final boolean includePrimaryKeys) throws SQLException { + final boolean translateColumnNames, final boolean includePrimaryKeys, ComponentLog log) throws SQLException { final DatabaseMetaData dmd = conn.getMetaData(); try (final ResultSet colrs = dmd.getColumns(catalog, schema, tableName, "%")) { @@ -1424,6 +1424,31 @@ public class PutDatabaseRecord extends AbstractProcessor { final ColumnDescription col = ColumnDescription.from(colrs); cols.add(col); } + // If no columns are found, check that the table exists + if (cols.isEmpty()) { + try (final ResultSet tblrs = dmd.getTables(catalog, schema, tableName, null)) { + List qualifiedNameSegments = new ArrayList<>(); + if (catalog != null) { + qualifiedNameSegments.add(catalog); + } + if (schema != null) { + qualifiedNameSegments.add(schema); + } + if (tableName != null) { + qualifiedNameSegments.add(tableName); + } + if (!tblrs.next()) { + + throw new SQLException("Table " + + String.join(".", qualifiedNameSegments) + + " not found, ensure the Catalog, Schema, and/or Table Names match those in the database exactly"); + } else { + log.warn("Table " + + String.join(".", qualifiedNameSegments) + + " found but no columns were found, if this is not expected then check the user permissions for getting table metadata from the database"); + } + } + } final Set primaryKeyColumns = new HashSet<>(); if (includePrimaryKeys) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy index d6eebec12c..d3ad032d52 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy @@ -483,6 +483,9 @@ class TestPutDatabaseRecord { runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0) runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1) + MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutDatabaseRecord.REL_FAILURE).get(0); + final String errorMessage = flowFile.getAttribute("putdatabaserecord.error") + assertTrue(errorMessage.contains("PERSONS2")) } @Test