From 6b75eda9ab6fd509bd813e149f99a02bf2466943 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 2 Dec 2015 16:36:38 -0500 Subject: [PATCH] NIFI-1244: Incorporate Schema Name property --- .../processors/standard/ConvertJSONToSQL.java | 30 +++++++++++++++---- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java index c46d3e2f43..902af5123c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java @@ -128,6 +128,13 @@ public class ConvertJSONToSQL extends AbstractProcessor { .expressionLanguageSupported(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder() + .name("Schema Name") + .description("The name of the schema that the table belongs to. This may not apply for the database that you are updating. In this case, leave the field empty") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); static final PropertyDescriptor TRANSLATE_FIELD_NAMES = new PropertyDescriptor.Builder() .name("Translate Field Names") .description("If true, the Processor will attempt to translate JSON field names into the appropriate column names for the table specified. " @@ -183,6 +190,7 @@ public class ConvertJSONToSQL extends AbstractProcessor { properties.add(STATEMENT_TYPE); properties.add(TABLE_NAME); properties.add(CATALOG_NAME); + properties.add(SCHEMA_NAME); properties.add(TRANSLATE_FIELD_NAMES); properties.add(UNMATCHED_FIELD_BEHAVIOR); properties.add(UPDATE_KEY); @@ -220,6 +228,7 @@ public class ConvertJSONToSQL extends AbstractProcessor { final String updateKeys = context.getProperty(UPDATE_KEY).evaluateAttributeExpressions(flowFile).getValue(); final String catalog = context.getProperty(CATALOG_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue(); final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); final SchemaKey schemaKey = new SchemaKey(catalog, tableName); final boolean includePrimaryKeys = UPDATE_TYPE.equals(statementType) && updateKeys == null; @@ -235,7 +244,7 @@ public class ConvertJSONToSQL extends AbstractProcessor { // No schema exists for this table yet. Query the database to determine the schema and put it into the cache. final DBCPService dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class); try (final Connection conn = dbcpService.getConnection()) { - schema = TableSchema.from(conn, catalog, tableName, translateFieldNames, includePrimaryKeys); + schema = TableSchema.from(conn, catalog, schemaName, tableName, translateFieldNames, includePrimaryKeys); schemaCache.put(schemaKey, schema); } catch (final SQLException e) { getLogger().error("Failed to convert {} into a SQL statement due to {}; routing to failure", new Object[] {flowFile, e.toString()}, e); @@ -288,10 +297,21 @@ public class ConvertJSONToSQL extends AbstractProcessor { final Map attributes = new HashMap<>(); try { + // build the fully qualified table name + final StringBuilder tableNameBuilder = new StringBuilder(); + if (catalog != null) { + tableNameBuilder.append(catalog).append("."); + } + if (schemaName != null) { + tableNameBuilder.append(schemaName).append("."); + } + tableNameBuilder.append(tableName); + final String fqTableName = tableNameBuilder.toString(); + if (INSERT_TYPE.equals(statementType)) { - sql = generateInsert(jsonNode, attributes, tableName, schema, translateFieldNames, ignoreUnmappedFields); + sql = generateInsert(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields); } else { - sql = generateUpdate(jsonNode, attributes, tableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields); + sql = generateUpdate(jsonNode, attributes, fqTableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields); } } catch (final ProcessException pe) { getLogger().error("Failed to convert {} to a SQL {} statement due to {}; routing to failure", @@ -559,9 +579,9 @@ public class ConvertJSONToSQL extends AbstractProcessor { return primaryKeyColumnNames; } - public static TableSchema from(final Connection conn, final String catalog, final String tableName, + public static TableSchema from(final Connection conn, final String catalog, final String schema, final String tableName, final boolean translateColumnNames, final boolean includePrimaryKeys) throws SQLException { - try (final ResultSet colrs = conn.getMetaData().getColumns(catalog, null, tableName, "%")) { + try (final ResultSet colrs = conn.getMetaData().getColumns(catalog, schema, tableName, "%")) { final List cols = new ArrayList<>(); while (colrs.next()) {