mirror of
https://github.com/apache/nifi.git
synced 2025-02-07 10:38:33 +00:00
NIFI-12530: Support CREATE TABLE in Oracle database adapters
Signed-off-by: Joe Gresock <jgresock@gmail.com> This closes #8175.
This commit is contained in:
parent
c2bd2a8454
commit
40f54f27bf
@ -22,11 +22,13 @@ import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.processors.standard.db.ColumnDescription;
|
||||
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
|
||||
import org.apache.nifi.processors.standard.db.TableSchema;
|
||||
|
||||
import static java.sql.Types.CHAR;
|
||||
import static java.sql.Types.CLOB;
|
||||
@ -188,13 +190,7 @@ public class Oracle12DatabaseAdapter implements DatabaseAdapter {
|
||||
List<String> conflictClause = new ArrayList<>();
|
||||
|
||||
for (String columnName : columnsNames) {
|
||||
|
||||
StringBuilder statementStringBuilder = new StringBuilder();
|
||||
|
||||
statementStringBuilder.append(getColumnAssignment(table, columnName, newTableAlias));
|
||||
|
||||
conflictClause.add(statementStringBuilder.toString());
|
||||
|
||||
conflictClause.add(getColumnAssignment(table, columnName, newTableAlias));
|
||||
}
|
||||
|
||||
return conflictClause;
|
||||
@ -253,4 +249,44 @@ public class Oracle12DatabaseAdapter implements DatabaseAdapter {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean supportsCreateTableIfNotExists() {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates a CREATE TABLE statement using the specified table schema
|
||||
* @param tableSchema The table schema including column information
|
||||
* @param quoteTableName Whether to quote the table name in the generated DDL
|
||||
* @param quoteColumnNames Whether to quote column names in the generated DDL
|
||||
* @return A String containing DDL to create the specified table
|
||||
*/
|
||||
@Override
|
||||
public String getCreateTableStatement(TableSchema tableSchema, boolean quoteTableName, boolean quoteColumnNames) {
|
||||
StringBuilder createTableStatement = new StringBuilder()
|
||||
.append("DECLARE\n\tsql_stmt long;\nBEGIN\n\tsql_stmt:='CREATE TABLE ")
|
||||
.append(generateTableName(quoteTableName, tableSchema.getCatalogName(), tableSchema.getSchemaName(), tableSchema.getTableName(), tableSchema))
|
||||
.append(" (");
|
||||
|
||||
List<ColumnDescription> columns = tableSchema.getColumnsAsList();
|
||||
Set<String> primaryKeyColumnNames = tableSchema.getPrimaryKeyColumnNames();
|
||||
for (int i = 0; i < columns.size(); i++) {
|
||||
ColumnDescription column = columns.get(i);
|
||||
createTableStatement
|
||||
.append((i != 0) ? ", " : "")
|
||||
.append(quoteColumnNames ? getColumnQuoteString() : "")
|
||||
.append(column.getColumnName())
|
||||
.append(quoteColumnNames ? getColumnQuoteString() : "")
|
||||
.append(" ")
|
||||
.append(getSQLForDataType(column.getDataType()))
|
||||
.append(column.isNullable() ? "" : " NOT NULL")
|
||||
.append(primaryKeyColumnNames != null && primaryKeyColumnNames.contains(column.getColumnName()) ? " PRIMARY KEY" : "");
|
||||
}
|
||||
|
||||
createTableStatement
|
||||
.append(")';\nEXECUTE IMMEDIATE sql_stmt;\nEXCEPTION\n\tWHEN OTHERS THEN\n\t\tIF SQLCODE = -955 THEN\n\t\t\t")
|
||||
.append("NULL;\n\t\tELSE\n\t\t\tRAISE;\n\t\tEND IF;\nEND;");
|
||||
|
||||
return createTableStatement.toString();
|
||||
}
|
||||
}
|
||||
|
@ -19,12 +19,14 @@ package org.apache.nifi.processors.standard.db.impl;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.processors.standard.db.ColumnDescription;
|
||||
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
|
||||
import org.apache.nifi.processors.standard.db.TableSchema;
|
||||
|
||||
import java.sql.JDBCType;
|
||||
import java.sql.Types;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import static java.sql.Types.CHAR;
|
||||
import static java.sql.Types.CLOB;
|
||||
@ -174,4 +176,45 @@ public class OracleDatabaseAdapter implements DatabaseAdapter {
|
||||
return JDBCType.valueOf(sqlType).getName();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean supportsCreateTableIfNotExists() {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates a CREATE TABLE statement using the specified table schema
|
||||
* @param tableSchema The table schema including column information
|
||||
* @param quoteTableName Whether to quote the table name in the generated DDL
|
||||
* @param quoteColumnNames Whether to quote column names in the generated DDL
|
||||
* @return A String containing DDL to create the specified table
|
||||
*/
|
||||
@Override
|
||||
public String getCreateTableStatement(TableSchema tableSchema, boolean quoteTableName, boolean quoteColumnNames) {
|
||||
StringBuilder createTableStatement = new StringBuilder()
|
||||
.append("DECLARE\n\tsql_stmt long;\nBEGIN\n\tsql_stmt:='CREATE TABLE ")
|
||||
.append(generateTableName(quoteTableName, tableSchema.getCatalogName(), tableSchema.getSchemaName(), tableSchema.getTableName(), tableSchema))
|
||||
.append(" (");
|
||||
|
||||
List<ColumnDescription> columns = tableSchema.getColumnsAsList();
|
||||
Set<String> primaryKeyColumnNames = tableSchema.getPrimaryKeyColumnNames();
|
||||
for (int i = 0; i < columns.size(); i++) {
|
||||
ColumnDescription column = columns.get(i);
|
||||
createTableStatement
|
||||
.append((i != 0) ? ", " : "")
|
||||
.append(quoteColumnNames ? getColumnQuoteString() : "")
|
||||
.append(column.getColumnName())
|
||||
.append(quoteColumnNames ? getColumnQuoteString() : "")
|
||||
.append(" ")
|
||||
.append(getSQLForDataType(column.getDataType()))
|
||||
.append(column.isNullable() ? "" : " NOT NULL")
|
||||
.append(primaryKeyColumnNames != null && primaryKeyColumnNames.contains(column.getColumnName()) ? " PRIMARY KEY" : "");
|
||||
}
|
||||
|
||||
createTableStatement
|
||||
.append(")';\nEXECUTE IMMEDIATE sql_stmt;\nEXCEPTION\n\tWHEN OTHERS THEN\n\t\tIF SQLCODE = -955 THEN\n\t\t\t")
|
||||
.append("NULL;\n\t\tELSE\n\t\t\tRAISE;\n\t\tEND IF;\nEND;");
|
||||
|
||||
return createTableStatement.toString();
|
||||
}
|
||||
}
|
||||
|
@ -16,12 +16,15 @@
|
||||
*/
|
||||
package org.apache.nifi.processors.standard.db.impl;
|
||||
|
||||
import java.sql.Types;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.nifi.processors.standard.db.ColumnDescription;
|
||||
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
|
||||
import org.apache.nifi.processors.standard.db.TableSchema;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
@ -149,6 +152,24 @@ public class TestOracle12DatabaseAdapter {
|
||||
// THEN
|
||||
testGetUpsertStatement(tableName, columnNames, uniqueKeyColumnNames, expected);
|
||||
}
|
||||
@Test
|
||||
public void testGetCreateTableStatement() {
|
||||
assertTrue(db.supportsCreateTableIfNotExists());
|
||||
final List<ColumnDescription> columns = Arrays.asList(
|
||||
new ColumnDescription("col1", Types.INTEGER, true, 4, false),
|
||||
new ColumnDescription("col2", Types.VARCHAR, false, 2000, true)
|
||||
);
|
||||
TableSchema tableSchema = new TableSchema("USERS", null, "TEST_TABLE", columns, true, Collections.singleton("COL1"), db.getColumnQuoteString());
|
||||
|
||||
String expectedStatement = "DECLARE\n\tsql_stmt long;\nBEGIN\n\tsql_stmt:='CREATE TABLE "
|
||||
// Strings are returned as VARCHAR2(2000) regardless of reported size and that VARCHAR2 is not in java.sql.Types
|
||||
+ "\"USERS\".\"TEST_TABLE\" (\"col1\" INTEGER NOT NULL, \"col2\" VARCHAR2(2000))';"
|
||||
+ "\nEXECUTE IMMEDIATE sql_stmt;\nEXCEPTION\n\tWHEN OTHERS THEN\n\t\tIF SQLCODE = -955 THEN\n\t\t\t"
|
||||
+ "NULL;\n\t\tELSE\n\t\t\tRAISE;\n\t\tEND IF;\nEND;";
|
||||
String actualStatement = db.getCreateTableStatement(tableSchema, true, true);
|
||||
assertEquals(expectedStatement, actualStatement);
|
||||
}
|
||||
|
||||
|
||||
private void testGetUpsertStatement(String tableName, List<String> columnNames, Collection<String> uniqueKeyColumnNames, IllegalArgumentException expected) {
|
||||
final IllegalArgumentException e = assertThrows(IllegalArgumentException.class, () -> {
|
||||
|
@ -16,11 +16,19 @@
|
||||
*/
|
||||
package org.apache.nifi.processors.standard.db.impl;
|
||||
|
||||
import org.apache.nifi.processors.standard.db.ColumnDescription;
|
||||
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
|
||||
import org.apache.nifi.processors.standard.db.TableSchema;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.sql.Types;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class TestOracleDatabaseAdapter {
|
||||
|
||||
@ -102,4 +110,22 @@ public class TestOracleDatabaseAdapter {
|
||||
String expected4 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename";
|
||||
assertEquals(expected4, sql4);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetCreateTableStatement() {
|
||||
assertTrue(db.supportsCreateTableIfNotExists());
|
||||
final List<ColumnDescription> columns = Arrays.asList(
|
||||
new ColumnDescription("col1", Types.INTEGER, true, 4, false),
|
||||
new ColumnDescription("col2", Types.VARCHAR, false, 2000, true)
|
||||
);
|
||||
TableSchema tableSchema = new TableSchema("USERS", null, "TEST_TABLE", columns, true, Collections.singleton("COL1"), db.getColumnQuoteString());
|
||||
|
||||
String expectedStatement = "DECLARE\n\tsql_stmt long;\nBEGIN\n\tsql_stmt:='CREATE TABLE "
|
||||
// Strings are returned as VARCHAR2(2000) regardless of reported size and that VARCHAR2 is not in java.sql.Types
|
||||
+ "\"USERS\".\"TEST_TABLE\" (\"col1\" INTEGER NOT NULL, \"col2\" VARCHAR2(2000))';"
|
||||
+ "\nEXECUTE IMMEDIATE sql_stmt;\nEXCEPTION\n\tWHEN OTHERS THEN\n\t\tIF SQLCODE = -955 THEN\n\t\t\t"
|
||||
+ "NULL;\n\t\tELSE\n\t\t\tRAISE;\n\t\tEND IF;\nEND;";
|
||||
String actualStatement = db.getCreateTableStatement(tableSchema, true, true);
|
||||
assertEquals(expectedStatement, actualStatement);
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user