NIFI-1244: Incorporate Schema Name property

This commit is contained in:
Mark Payne 2015-12-02 16:36:38 -05:00
parent ecc240b918
commit 6b75eda9ab
1 changed files with 25 additions and 5 deletions

View File

@ -128,6 +128,13 @@ public class ConvertJSONToSQL extends AbstractProcessor {
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
.name("Schema Name")
.description("The name of the schema that the table belongs to. This may not apply for the database that you are updating. In this case, leave the field empty")
.required(false)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor TRANSLATE_FIELD_NAMES = new PropertyDescriptor.Builder()
.name("Translate Field Names")
.description("If true, the Processor will attempt to translate JSON field names into the appropriate column names for the table specified. "
@ -183,6 +190,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
properties.add(STATEMENT_TYPE);
properties.add(TABLE_NAME);
properties.add(CATALOG_NAME);
properties.add(SCHEMA_NAME);
properties.add(TRANSLATE_FIELD_NAMES);
properties.add(UNMATCHED_FIELD_BEHAVIOR);
properties.add(UPDATE_KEY);
@ -220,6 +228,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
final String updateKeys = context.getProperty(UPDATE_KEY).evaluateAttributeExpressions(flowFile).getValue();
final String catalog = context.getProperty(CATALOG_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final SchemaKey schemaKey = new SchemaKey(catalog, tableName);
final boolean includePrimaryKeys = UPDATE_TYPE.equals(statementType) && updateKeys == null;
@ -235,7 +244,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
// No schema exists for this table yet. Query the database to determine the schema and put it into the cache.
final DBCPService dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class);
try (final Connection conn = dbcpService.getConnection()) {
schema = TableSchema.from(conn, catalog, tableName, translateFieldNames, includePrimaryKeys);
schema = TableSchema.from(conn, catalog, schemaName, tableName, translateFieldNames, includePrimaryKeys);
schemaCache.put(schemaKey, schema);
} catch (final SQLException e) {
getLogger().error("Failed to convert {} into a SQL statement due to {}; routing to failure", new Object[] {flowFile, e.toString()}, e);
@ -288,10 +297,21 @@ public class ConvertJSONToSQL extends AbstractProcessor {
final Map<String, String> attributes = new HashMap<>();
try {
// build the fully qualified table name
final StringBuilder tableNameBuilder = new StringBuilder();
if (catalog != null) {
tableNameBuilder.append(catalog).append(".");
}
if (schemaName != null) {
tableNameBuilder.append(schemaName).append(".");
}
tableNameBuilder.append(tableName);
final String fqTableName = tableNameBuilder.toString();
if (INSERT_TYPE.equals(statementType)) {
sql = generateInsert(jsonNode, attributes, tableName, schema, translateFieldNames, ignoreUnmappedFields);
sql = generateInsert(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields);
} else {
sql = generateUpdate(jsonNode, attributes, tableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields);
sql = generateUpdate(jsonNode, attributes, fqTableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields);
}
} catch (final ProcessException pe) {
getLogger().error("Failed to convert {} to a SQL {} statement due to {}; routing to failure",
@ -559,9 +579,9 @@ public class ConvertJSONToSQL extends AbstractProcessor {
return primaryKeyColumnNames;
}
public static TableSchema from(final Connection conn, final String catalog, final String tableName,
public static TableSchema from(final Connection conn, final String catalog, final String schema, final String tableName,
final boolean translateColumnNames, final boolean includePrimaryKeys) throws SQLException {
try (final ResultSet colrs = conn.getMetaData().getColumns(catalog, null, tableName, "%")) {
try (final ResultSet colrs = conn.getMetaData().getColumns(catalog, schema, tableName, "%")) {
final List<ColumnDescription> cols = new ArrayList<>();
while (colrs.next()) {