diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java index 1ce6fb5586..4d8f462548 100755 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java @@ -183,13 +183,22 @@ public class ConvertJSONToSQL extends AbstractProcessor { static final PropertyDescriptor QUOTED_IDENTIFIERS = new PropertyDescriptor.Builder() .name("jts-quoted-identifiers") - .displayName("Quote Identifiers") + .displayName("Quote Column Identifiers") .description("Enabling this option will cause all column names to be quoted, allowing you to " + "use reserved words as column names in your tables.") .allowableValues("true", "false") .defaultValue("false") .build(); + static final PropertyDescriptor QUOTED_TABLE_IDENTIFIER = new PropertyDescriptor.Builder() + .name("jts-quoted-table-identifiers") + .displayName("Quote Table Identifiers") + .description("Enabling this option will cause the table name to be quoted to support the " + + "use of special characters in the table name") + .allowableValues("true", "false") + .defaultValue("false") + .build(); + static final Relationship REL_ORIGINAL = new Relationship.Builder() .name("original") .description("When a FlowFile is converted to SQL, the original JSON FlowFile is routed to this relationship") @@ -226,6 +235,7 @@ public class ConvertJSONToSQL extends AbstractProcessor { properties.add(UNMATCHED_COLUMN_BEHAVIOR); properties.add(UPDATE_KEY); properties.add(QUOTED_IDENTIFIERS); + properties.add(QUOTED_TABLE_IDENTIFIER); return properties; } @@ -272,6 +282,9 @@ public class ConvertJSONToSQL extends AbstractProcessor { //Escape column names? final boolean escapeColumnNames = context.getProperty(QUOTED_IDENTIFIERS).asBoolean(); + // Quote table name? + final boolean quoteTableName = context.getProperty(QUOTED_TABLE_IDENTIFIER).asBoolean(); + // get the database schema from the cache, if one exists. We do this in a synchronized block, rather than // using a ConcurrentMap because the Map that we are using is a LinkedHashMap with a capacity such that if // the Map grows beyond this capacity, old elements are evicted. We do this in order to avoid filling the @@ -349,10 +362,10 @@ public class ConvertJSONToSQL extends AbstractProcessor { if (INSERT_TYPE.equals(statementType)) { sql = generateInsert(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields, - failUnmappedColumns, warningUnmappedColumns, escapeColumnNames); + failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName); } else { sql = generateUpdate(jsonNode, attributes, fqTableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields, - failUnmappedColumns, warningUnmappedColumns, escapeColumnNames); + failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName); } } catch (final ProcessException pe) { getLogger().error("Failed to convert {} to a SQL {} statement due to {}; routing to failure", @@ -402,7 +415,7 @@ public class ConvertJSONToSQL extends AbstractProcessor { private String generateInsert(final JsonNode rootNode, final Map attributes, final String tableName, final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns, - final boolean warningUnmappedColumns, boolean escapeColumnNames) { + final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName) { final Set normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames); for (final String requiredColName : schema.getRequiredColumnNames()) { @@ -420,7 +433,15 @@ public class ConvertJSONToSQL extends AbstractProcessor { final StringBuilder sqlBuilder = new StringBuilder(); int fieldCount = 0; - sqlBuilder.append("INSERT INTO ").append(tableName).append(" ("); + sqlBuilder.append("INSERT INTO "); + if (quoteTableName) { + sqlBuilder.append(schema.getQuotedIdentifierString()) + .append(tableName) + .append(schema.getQuotedIdentifierString()); + } else { + sqlBuilder.append(tableName); + } + sqlBuilder.append(" ("); // iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as // adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the @@ -439,12 +460,12 @@ public class ConvertJSONToSQL extends AbstractProcessor { sqlBuilder.append(", "); } - if(!escapeColumnNames){ - sqlBuilder.append(desc.getColumnName()); + if(escapeColumnNames){ + sqlBuilder.append(schema.getQuotedIdentifierString()) + .append(desc.getColumnName()) + .append(schema.getQuotedIdentifierString()); } else { - sqlBuilder.append(schema.getQuotedIdentifierString()); sqlBuilder.append(desc.getColumnName()); - sqlBuilder.append(schema.getQuotedIdentifierString()); } final int sqlType = desc.getDataType(); @@ -482,7 +503,7 @@ public class ConvertJSONToSQL extends AbstractProcessor { private String generateUpdate(final JsonNode rootNode, final Map attributes, final String tableName, final String updateKeys, final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns, - final boolean warningUnmappedColumns, boolean escapeColumnNames) { + final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName) { final Set updateKeyNames; if (updateKeys == null) { @@ -500,7 +521,16 @@ public class ConvertJSONToSQL extends AbstractProcessor { final StringBuilder sqlBuilder = new StringBuilder(); int fieldCount = 0; - sqlBuilder.append("UPDATE ").append(tableName).append(" SET "); + sqlBuilder.append("UPDATE "); + if (quoteTableName) { + sqlBuilder.append(schema.getQuotedIdentifierString()) + .append(tableName) + .append(schema.getQuotedIdentifierString()); + } else { + sqlBuilder.append(tableName); + } + + sqlBuilder.append(" SET "); // Create a Set of all normalized Update Key names, and ensure that there is a field in the JSON @@ -549,12 +579,12 @@ public class ConvertJSONToSQL extends AbstractProcessor { sqlBuilder.append(", "); } - if(!escapeColumnNames){ - sqlBuilder.append(desc.getColumnName()); - } else { + if(escapeColumnNames){ sqlBuilder.append(schema.getQuotedIdentifierString()) .append(desc.getColumnName()) .append(schema.getQuotedIdentifierString()); + } else { + sqlBuilder.append(desc.getColumnName()); } sqlBuilder.append(" = ?"); @@ -598,12 +628,12 @@ public class ConvertJSONToSQL extends AbstractProcessor { } fieldCount++; - if(!escapeColumnNames){ - sqlBuilder.append(normalizedColName); - } else { + if(escapeColumnNames){ sqlBuilder.append(schema.getQuotedIdentifierString()) .append(normalizedColName) .append(schema.getQuotedIdentifierString()); + } else { + sqlBuilder.append(normalizedColName); } sqlBuilder.append(" = ?"); final int sqlType = desc.getDataType(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java index f808072a04..cf1459cac6 100755 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java @@ -123,6 +123,43 @@ public class TestConvertJSONToSQL { out.assertContentEquals("INSERT INTO PERSONS (\"ID\", \"NAME\", \"CODE\") VALUES (?, ?, ?)"); } + @Test + public void testInsertQuotedTableIdentifier() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); + runner.setProperty(ConvertJSONToSQL.QUOTED_TABLE_IDENTIFIER, "true"); + + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1"); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.1.value", "1"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.2.value", "Mark"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.3.value", "48"); + + out.assertContentEquals("INSERT INTO \"PERSONS\" (ID, NAME, CODE) VALUES (?, ?, ?)"); + } + @Test public void testInsertWithNullValue() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); @@ -195,6 +232,43 @@ public class TestConvertJSONToSQL { } + @Test + public void testUpdateQuotedTableIdentifier() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); + runner.setProperty(ConvertJSONToSQL.QUOTED_TABLE_IDENTIFIER, "true"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-null-code.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1"); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.1.value", "Mark"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeNotExists("sql.args.2.value"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.3.value", "1"); + + out.assertContentEquals("UPDATE \"PERSONS\" SET NAME = ?, CODE = ? WHERE ID = ?"); + } + + @Test public void testMultipleInserts() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);