From 88b6b587be6b3dd63d61b6a2e9ae321d8bacd1e1 Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Tue, 8 Aug 2023 17:14:31 -0400 Subject: [PATCH] NIFI-11922: Honor catalog/schema field in UpdateDatabaseTable This closes #7585 Signed-off-by: David Handermann --- .../standard/UpdateDatabaseTable.java | 2 +- .../standard/db/DatabaseAdapter.java | 41 ++++++++- .../processors/standard/db/TableSchema.java | 16 +++- .../standard/PutDatabaseRecordTest.java | 6 ++ .../standard/TestUpdateDatabaseTable.java | 86 +++++++++++++++++++ .../db/impl/DerbyDatabaseAdapter.java | 4 +- 6 files changed, 146 insertions(+), 9 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateDatabaseTable.java index 04d07f7bae..c161c21443 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateDatabaseTable.java @@ -486,7 +486,7 @@ public class UpdateDatabaseTable extends AbstractProcessor { getLogger().debug("Adding column " + recordFieldName + " to table " + tableName); } - tableSchema = new TableSchema(tableName, columns, translateFieldNames, primaryKeyColumnNames, databaseAdapter.getColumnQuoteString()); + tableSchema = new TableSchema(catalogName, schemaName, tableName, columns, translateFieldNames, primaryKeyColumnNames, databaseAdapter.getColumnQuoteString()); final String createTableSql = databaseAdapter.getCreateTableStatement(tableSchema, quoteTableName, quoteColumnNames); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java index 98803d66c7..ab661998ed 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java @@ -178,9 +178,7 @@ public interface DatabaseAdapter { } createTableStatement.append("CREATE TABLE IF NOT EXISTS ") - .append(quoteTableName ? getTableQuoteString() : "") - .append(tableSchema.getTableName()) - .append(quoteTableName ? getTableQuoteString() : "") + .append(generateTableName(quoteTableName, tableSchema.getCatalogName(), tableSchema.getSchemaName(), tableSchema.getTableName(),tableSchema)) .append(" (") .append(String.join(", ", columnsAndDatatypes)) .append(") "); @@ -216,4 +214,41 @@ public interface DatabaseAdapter { default String getSQLForDataType(int sqlType) { return JDBCType.valueOf(sqlType).getName(); } + + default String generateTableName(final boolean quoteTableName, final String catalog, final String schemaName, final String tableName, final TableSchema tableSchema) { + final StringBuilder tableNameBuilder = new StringBuilder(); + if (catalog != null) { + if (quoteTableName) { + tableNameBuilder.append(tableSchema.getQuotedIdentifierString()) + .append(catalog) + .append(tableSchema.getQuotedIdentifierString()); + } else { + tableNameBuilder.append(catalog); + } + + tableNameBuilder.append("."); + } + + if (schemaName != null) { + if (quoteTableName) { + tableNameBuilder.append(tableSchema.getQuotedIdentifierString()) + .append(schemaName) + .append(tableSchema.getQuotedIdentifierString()); + } else { + tableNameBuilder.append(schemaName); + } + + tableNameBuilder.append("."); + } + + if (quoteTableName) { + tableNameBuilder.append(tableSchema.getQuotedIdentifierString()) + .append(tableName) + .append(tableSchema.getQuotedIdentifierString()); + } else { + tableNameBuilder.append(tableName); + } + + return tableNameBuilder.toString(); + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TableSchema.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TableSchema.java index b7429ca776..88e81e16be 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TableSchema.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TableSchema.java @@ -34,10 +34,14 @@ public class TableSchema { private final Set primaryKeyColumnNames; private final Map columns; private final String quotedIdentifierString; + private final String catalogName; + private final String schemaName; private final String tableName; - public TableSchema(final String tableName, final List columnDescriptions, final boolean translateColumnNames, + public TableSchema(final String catalogName, final String schemaName, final String tableName, final List columnDescriptions, final boolean translateColumnNames, final Set primaryKeyColumnNames, final String quotedIdentifierString) { + this.catalogName = catalogName; + this.schemaName = schemaName; this.tableName = tableName; this.columns = new LinkedHashMap<>(); this.primaryKeyColumnNames = primaryKeyColumnNames; @@ -52,6 +56,14 @@ public class TableSchema { } } + public String getCatalogName() { + return catalogName; + } + + public String getSchemaName() { + return schemaName; + } + public String getTableName() { return tableName; } @@ -128,7 +140,7 @@ public class TableSchema { } } - return new TableSchema(tableName, cols, translateColumnNames, primaryKeyColumns, dmd.getIdentifierQuoteString()); + return new TableSchema(catalog, schema, tableName, cols, translateColumnNames, primaryKeyColumns, dmd.getIdentifierQuoteString()); } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java index e9fd13b005..107db3f688 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java @@ -265,6 +265,8 @@ public class PutDatabaseRecordTest { final RecordSchema schema = new SimpleRecordSchema(fields); final TableSchema tableSchema = new TableSchema( + null, + null, "PERSONS", Arrays.asList( new ColumnDescription("id", 4, true, 2, false), @@ -301,6 +303,8 @@ public class PutDatabaseRecordTest { final RecordSchema schema = new SimpleRecordSchema(fields); final TableSchema tableSchema = new TableSchema( + null, + null, "PERSONS", Arrays.asList( new ColumnDescription("id", 4, true, 2, false), @@ -1416,6 +1420,8 @@ public class PutDatabaseRecordTest { final RecordSchema schema = new SimpleRecordSchema(fields); final TableSchema tableSchema = new TableSchema( + null, + null, "PERSONS", Arrays.asList( new ColumnDescription("id", 4, true, 2, false), diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateDatabaseTable.java index 4a8d6c129e..e47c8954b8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateDatabaseTable.java @@ -50,6 +50,9 @@ public class TestUpdateDatabaseTable { private static final String createPersons = "CREATE TABLE \"persons\" (\"id\" integer primary key, \"name\" varchar(100), \"code\" integer)"; + private static final String createSchema = "CREATE SCHEMA \"testSchema\""; + + @TempDir public static File tempDir; @@ -88,6 +91,18 @@ public class TestUpdateDatabaseTable { } catch (SQLException se) { // Ignore, table probably doesn't exist } + + try (Statement s = service.getConnection().createStatement()) { + s.execute("DROP TABLE \"newTable\""); + } catch (SQLException se) { + // Ignore, table probably doesn't exist + } + + try (Statement s = service.getConnection().createStatement()) { + s.execute("DROP SCHEMA \"testSchema\""); + } catch (SQLException se) { + // Ignore, schema probably doesn't exist + } } @Test @@ -408,6 +423,77 @@ public class TestUpdateDatabaseTable { } } + @Test + public void testCreateTableNonDefaultSchema() throws Exception { + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createSchema); + } + } + runner = TestRunners.newTestRunner(processor); + MockRecordParser readerFactory = new MockRecordParser(); + + readerFactory.addSchemaField(new RecordField("id", RecordFieldType.INT.getDataType(), false)); + readerFactory.addSchemaField(new RecordField("name", RecordFieldType.STRING.getDataType(), true)); + readerFactory.addSchemaField(new RecordField("code", RecordFieldType.INT.getDataType(), 0, true)); + readerFactory.addSchemaField(new RecordField("newField", RecordFieldType.STRING.getDataType(), 0, true)); + readerFactory.addRecord(1, "name1", 10); + + runner.addControllerService("mock-reader-factory", readerFactory); + runner.enableControllerService(readerFactory); + + runner.setProperty(UpdateDatabaseTable.RECORD_READER, "mock-reader-factory"); + runner.setProperty(UpdateDatabaseTable.SCHEMA_NAME, "testSchema"); + runner.setProperty(UpdateDatabaseTable.TABLE_NAME, "${table.name}"); + runner.setProperty(UpdateDatabaseTable.CREATE_TABLE, UpdateDatabaseTable.CREATE_IF_NOT_EXISTS); + runner.setProperty(UpdateDatabaseTable.QUOTE_TABLE_IDENTIFIER, "false"); + runner.setProperty(UpdateDatabaseTable.QUOTE_COLUMN_IDENTIFIERS, "true"); + runner.setProperty(UpdateDatabaseTable.DB_TYPE, new DerbyDatabaseAdapter().getName()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + runner.setProperty(UpdateDatabaseTable.DBCP_SERVICE, "dbcp"); + Map attrs = new HashMap<>(); + attrs.put("db.name", "default"); + attrs.put("table.name", "newTable"); + runner.enqueue(new byte[0], attrs); + runner.run(); + + runner.assertTransferCount(UpdateDatabaseTable.REL_SUCCESS, 1); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(UpdateDatabaseTable.REL_SUCCESS).get(0); + flowFile.assertAttributeEquals(UpdateDatabaseTable.ATTR_OUTPUT_TABLE, "newTable"); + // Verify the table has been created with the expected fields + try (Statement s = service.getConnection().createStatement()) { + // The Derby equivalent of DESCRIBE TABLE (using a query rather than the ij tool) + ResultSet rs = s.executeQuery("select * from sys.syscolumns where referenceid = (select tableid from sys.systables " + + "join sys.sysschemas on sys.systables.schemaid = sys.sysschemas.schemaid where tablename = 'NEWTABLE' and sys.sysschemas.schemaname = 'TESTSCHEMA') order by columnnumber"); + assertTrue(rs.next()); + // Columns 2,3,4 are Column Name, Column Index, and Column Type + assertEquals("id", rs.getString(2)); + assertEquals(1, rs.getInt(3)); + assertEquals("INTEGER NOT NULL", rs.getString(4)); + + assertTrue(rs.next()); + assertEquals("name", rs.getString(2)); + assertEquals(2, rs.getInt(3)); + assertEquals("VARCHAR(100)", rs.getString(4)); + + assertTrue(rs.next()); + assertEquals("code", rs.getString(2)); + assertEquals(3, rs.getInt(3)); + assertEquals("INTEGER", rs.getString(4)); + + assertTrue(rs.next()); + assertEquals("newField", rs.getString(2)); + assertEquals(4, rs.getInt(3)); + assertEquals("VARCHAR(100)", rs.getString(4)); + + // No more rows + assertFalse(rs.next()); + } + } + + /** * Simple implementation only for testing purposes */ diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/DerbyDatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/DerbyDatabaseAdapter.java index cea3ee9fe9..e7c726b869 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/DerbyDatabaseAdapter.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/DerbyDatabaseAdapter.java @@ -137,9 +137,7 @@ public class DerbyDatabaseAdapter implements DatabaseAdapter { // This will throw an exception if the table already exists, but it should only be used for tests createTableStatement.append("CREATE TABLE ") - .append(quoteTableName ? getTableQuoteString() : "") - .append(tableSchema.getTableName()) - .append(quoteTableName ? getTableQuoteString() : "") + .append(generateTableName(quoteTableName, tableSchema.getCatalogName(), tableSchema.getSchemaName(), tableSchema.getTableName(),tableSchema)) .append(" (") .append(String.join(", ", columnsAndDatatypes)) .append(") ");