NIFI-11922: Honor catalog/schema field in UpdateDatabaseTable

This closes #7585

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Matt Burgess 2023-08-08 17:14:31 -04:00 committed by exceptionfactory
parent 0643e1db96
commit 88b6b587be
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
6 changed files with 146 additions and 9 deletions

View File

@ -486,7 +486,7 @@ public class UpdateDatabaseTable extends AbstractProcessor {
getLogger().debug("Adding column " + recordFieldName + " to table " + tableName);
}
tableSchema = new TableSchema(tableName, columns, translateFieldNames, primaryKeyColumnNames, databaseAdapter.getColumnQuoteString());
tableSchema = new TableSchema(catalogName, schemaName, tableName, columns, translateFieldNames, primaryKeyColumnNames, databaseAdapter.getColumnQuoteString());
final String createTableSql = databaseAdapter.getCreateTableStatement(tableSchema, quoteTableName, quoteColumnNames);

View File

@ -178,9 +178,7 @@ public interface DatabaseAdapter {
}
createTableStatement.append("CREATE TABLE IF NOT EXISTS ")
.append(quoteTableName ? getTableQuoteString() : "")
.append(tableSchema.getTableName())
.append(quoteTableName ? getTableQuoteString() : "")
.append(generateTableName(quoteTableName, tableSchema.getCatalogName(), tableSchema.getSchemaName(), tableSchema.getTableName(),tableSchema))
.append(" (")
.append(String.join(", ", columnsAndDatatypes))
.append(") ");
@ -216,4 +214,41 @@ public interface DatabaseAdapter {
default String getSQLForDataType(int sqlType) {
return JDBCType.valueOf(sqlType).getName();
}
default String generateTableName(final boolean quoteTableName, final String catalog, final String schemaName, final String tableName, final TableSchema tableSchema) {
final StringBuilder tableNameBuilder = new StringBuilder();
if (catalog != null) {
if (quoteTableName) {
tableNameBuilder.append(tableSchema.getQuotedIdentifierString())
.append(catalog)
.append(tableSchema.getQuotedIdentifierString());
} else {
tableNameBuilder.append(catalog);
}
tableNameBuilder.append(".");
}
if (schemaName != null) {
if (quoteTableName) {
tableNameBuilder.append(tableSchema.getQuotedIdentifierString())
.append(schemaName)
.append(tableSchema.getQuotedIdentifierString());
} else {
tableNameBuilder.append(schemaName);
}
tableNameBuilder.append(".");
}
if (quoteTableName) {
tableNameBuilder.append(tableSchema.getQuotedIdentifierString())
.append(tableName)
.append(tableSchema.getQuotedIdentifierString());
} else {
tableNameBuilder.append(tableName);
}
return tableNameBuilder.toString();
}
}

View File

@ -34,10 +34,14 @@ public class TableSchema {
private final Set<String> primaryKeyColumnNames;
private final Map<String, ColumnDescription> columns;
private final String quotedIdentifierString;
private final String catalogName;
private final String schemaName;
private final String tableName;
public TableSchema(final String tableName, final List<ColumnDescription> columnDescriptions, final boolean translateColumnNames,
public TableSchema(final String catalogName, final String schemaName, final String tableName, final List<ColumnDescription> columnDescriptions, final boolean translateColumnNames,
final Set<String> primaryKeyColumnNames, final String quotedIdentifierString) {
this.catalogName = catalogName;
this.schemaName = schemaName;
this.tableName = tableName;
this.columns = new LinkedHashMap<>();
this.primaryKeyColumnNames = primaryKeyColumnNames;
@ -52,6 +56,14 @@ public class TableSchema {
}
}
public String getCatalogName() {
return catalogName;
}
public String getSchemaName() {
return schemaName;
}
public String getTableName() {
return tableName;
}
@ -128,7 +140,7 @@ public class TableSchema {
}
}
return new TableSchema(tableName, cols, translateColumnNames, primaryKeyColumns, dmd.getIdentifierQuoteString());
return new TableSchema(catalog, schema, tableName, cols, translateColumnNames, primaryKeyColumns, dmd.getIdentifierQuoteString());
}
}

View File

@ -265,6 +265,8 @@ public class PutDatabaseRecordTest {
final RecordSchema schema = new SimpleRecordSchema(fields);
final TableSchema tableSchema = new TableSchema(
null,
null,
"PERSONS",
Arrays.asList(
new ColumnDescription("id", 4, true, 2, false),
@ -301,6 +303,8 @@ public class PutDatabaseRecordTest {
final RecordSchema schema = new SimpleRecordSchema(fields);
final TableSchema tableSchema = new TableSchema(
null,
null,
"PERSONS",
Arrays.asList(
new ColumnDescription("id", 4, true, 2, false),
@ -1416,6 +1420,8 @@ public class PutDatabaseRecordTest {
final RecordSchema schema = new SimpleRecordSchema(fields);
final TableSchema tableSchema = new TableSchema(
null,
null,
"PERSONS",
Arrays.asList(
new ColumnDescription("id", 4, true, 2, false),

View File

@ -50,6 +50,9 @@ public class TestUpdateDatabaseTable {
private static final String createPersons = "CREATE TABLE \"persons\" (\"id\" integer primary key, \"name\" varchar(100), \"code\" integer)";
private static final String createSchema = "CREATE SCHEMA \"testSchema\"";
@TempDir
public static File tempDir;
@ -88,6 +91,18 @@ public class TestUpdateDatabaseTable {
} catch (SQLException se) {
// Ignore, table probably doesn't exist
}
try (Statement s = service.getConnection().createStatement()) {
s.execute("DROP TABLE \"newTable\"");
} catch (SQLException se) {
// Ignore, table probably doesn't exist
}
try (Statement s = service.getConnection().createStatement()) {
s.execute("DROP SCHEMA \"testSchema\"");
} catch (SQLException se) {
// Ignore, schema probably doesn't exist
}
}
@Test
@ -408,6 +423,77 @@ public class TestUpdateDatabaseTable {
}
}
@Test
public void testCreateTableNonDefaultSchema() throws Exception {
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createSchema);
}
}
runner = TestRunners.newTestRunner(processor);
MockRecordParser readerFactory = new MockRecordParser();
readerFactory.addSchemaField(new RecordField("id", RecordFieldType.INT.getDataType(), false));
readerFactory.addSchemaField(new RecordField("name", RecordFieldType.STRING.getDataType(), true));
readerFactory.addSchemaField(new RecordField("code", RecordFieldType.INT.getDataType(), 0, true));
readerFactory.addSchemaField(new RecordField("newField", RecordFieldType.STRING.getDataType(), 0, true));
readerFactory.addRecord(1, "name1", 10);
runner.addControllerService("mock-reader-factory", readerFactory);
runner.enableControllerService(readerFactory);
runner.setProperty(UpdateDatabaseTable.RECORD_READER, "mock-reader-factory");
runner.setProperty(UpdateDatabaseTable.SCHEMA_NAME, "testSchema");
runner.setProperty(UpdateDatabaseTable.TABLE_NAME, "${table.name}");
runner.setProperty(UpdateDatabaseTable.CREATE_TABLE, UpdateDatabaseTable.CREATE_IF_NOT_EXISTS);
runner.setProperty(UpdateDatabaseTable.QUOTE_TABLE_IDENTIFIER, "false");
runner.setProperty(UpdateDatabaseTable.QUOTE_COLUMN_IDENTIFIERS, "true");
runner.setProperty(UpdateDatabaseTable.DB_TYPE, new DerbyDatabaseAdapter().getName());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(UpdateDatabaseTable.DBCP_SERVICE, "dbcp");
Map<String, String> attrs = new HashMap<>();
attrs.put("db.name", "default");
attrs.put("table.name", "newTable");
runner.enqueue(new byte[0], attrs);
runner.run();
runner.assertTransferCount(UpdateDatabaseTable.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(UpdateDatabaseTable.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(UpdateDatabaseTable.ATTR_OUTPUT_TABLE, "newTable");
// Verify the table has been created with the expected fields
try (Statement s = service.getConnection().createStatement()) {
// The Derby equivalent of DESCRIBE TABLE (using a query rather than the ij tool)
ResultSet rs = s.executeQuery("select * from sys.syscolumns where referenceid = (select tableid from sys.systables "
+ "join sys.sysschemas on sys.systables.schemaid = sys.sysschemas.schemaid where tablename = 'NEWTABLE' and sys.sysschemas.schemaname = 'TESTSCHEMA') order by columnnumber");
assertTrue(rs.next());
// Columns 2,3,4 are Column Name, Column Index, and Column Type
assertEquals("id", rs.getString(2));
assertEquals(1, rs.getInt(3));
assertEquals("INTEGER NOT NULL", rs.getString(4));
assertTrue(rs.next());
assertEquals("name", rs.getString(2));
assertEquals(2, rs.getInt(3));
assertEquals("VARCHAR(100)", rs.getString(4));
assertTrue(rs.next());
assertEquals("code", rs.getString(2));
assertEquals(3, rs.getInt(3));
assertEquals("INTEGER", rs.getString(4));
assertTrue(rs.next());
assertEquals("newField", rs.getString(2));
assertEquals(4, rs.getInt(3));
assertEquals("VARCHAR(100)", rs.getString(4));
// No more rows
assertFalse(rs.next());
}
}
/**
* Simple implementation only for testing purposes
*/

View File

@ -137,9 +137,7 @@ public class DerbyDatabaseAdapter implements DatabaseAdapter {
// This will throw an exception if the table already exists, but it should only be used for tests
createTableStatement.append("CREATE TABLE ")
.append(quoteTableName ? getTableQuoteString() : "")
.append(tableSchema.getTableName())
.append(quoteTableName ? getTableQuoteString() : "")
.append(generateTableName(quoteTableName, tableSchema.getCatalogName(), tableSchema.getSchemaName(), tableSchema.getTableName(),tableSchema))
.append(" (")
.append(String.join(", ", columnsAndDatatypes))
.append(") ");