diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java index 48da57f143..2db7bcca87 100755 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java @@ -22,6 +22,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.sql.Connection; +import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; @@ -175,6 +176,14 @@ public class ConvertJSONToSQL extends AbstractProcessor { .expressionLanguageSupported(true) .build(); + static final PropertyDescriptor QUOTED_IDENTIFIERS = new PropertyDescriptor.Builder() + .name("jts-quoted-identifiers") + .displayName("Quote Identifiers") + .description("Enabling this option will cause all column names to be quoted, allowing you to " + + "use reserved words as column names in your tables.") + .allowableValues("true", "false") + .defaultValue("false") + .build(); static final Relationship REL_ORIGINAL = new Relationship.Builder() .name("original") @@ -211,6 +220,7 @@ public class ConvertJSONToSQL extends AbstractProcessor { properties.add(UNMATCHED_FIELD_BEHAVIOR); properties.add(UNMATCHED_COLUMN_BEHAVIOR); properties.add(UPDATE_KEY); + properties.add(QUOTED_IDENTIFIERS); return properties; } @@ -254,6 +264,9 @@ public class ConvertJSONToSQL extends AbstractProcessor { final boolean failUnmappedColumns = FAIL_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue()); final boolean warningUnmappedColumns = WARNING_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue()); + //Escape column names? + final boolean escapeColumnNames = context.getProperty(QUOTED_IDENTIFIERS).asBoolean(); + // get the database schema from the cache, if one exists. We do this in a synchronized block, rather than // using a ConcurrentMap because the Map that we are using is a LinkedHashMap with a capacity such that if // the Map grows beyond this capacity, old elements are evicted. We do this in order to avoid filling the @@ -330,9 +343,11 @@ public class ConvertJSONToSQL extends AbstractProcessor { final String fqTableName = tableNameBuilder.toString(); if (INSERT_TYPE.equals(statementType)) { - sql = generateInsert(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields, failUnmappedColumns, warningUnmappedColumns); + sql = generateInsert(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields, + failUnmappedColumns, warningUnmappedColumns, escapeColumnNames); } else { - sql = generateUpdate(jsonNode, attributes, fqTableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields, failUnmappedColumns, warningUnmappedColumns); + sql = generateUpdate(jsonNode, attributes, fqTableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields, + failUnmappedColumns, warningUnmappedColumns, escapeColumnNames); } } catch (final ProcessException pe) { getLogger().error("Failed to convert {} to a SQL {} statement due to {}; routing to failure", @@ -380,8 +395,8 @@ public class ConvertJSONToSQL extends AbstractProcessor { } private String generateInsert(final JsonNode rootNode, final Map attributes, final String tableName, - final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns, - final boolean warningUnmappedColumns) { + final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns, + final boolean warningUnmappedColumns, boolean escapeColumnNames) { final Set normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames); for (final String requiredColName : schema.getRequiredColumnNames()) { @@ -418,7 +433,13 @@ public class ConvertJSONToSQL extends AbstractProcessor { sqlBuilder.append(", "); } - sqlBuilder.append(desc.getColumnName()); + if(!escapeColumnNames){ + sqlBuilder.append(desc.getColumnName()); + } else { + sqlBuilder.append(schema.getQuotedIdentifierString()); + sqlBuilder.append(desc.getColumnName()); + sqlBuilder.append(schema.getQuotedIdentifierString()); + } final int sqlType = desc.getDataType(); attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType)); @@ -454,8 +475,8 @@ public class ConvertJSONToSQL extends AbstractProcessor { } private String generateUpdate(final JsonNode rootNode, final Map attributes, final String tableName, final String updateKeys, - final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns, - final boolean warningUnmappedColumns) { + final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns, + final boolean warningUnmappedColumns, boolean escapeColumnNames) { final Set updateKeyNames; if (updateKeys == null) { @@ -522,7 +543,15 @@ public class ConvertJSONToSQL extends AbstractProcessor { sqlBuilder.append(", "); } - sqlBuilder.append(desc.getColumnName()).append(" = ?"); + if(!escapeColumnNames){ + sqlBuilder.append(desc.getColumnName()); + } else { + sqlBuilder.append(schema.getQuotedIdentifierString()) + .append(desc.getColumnName()) + .append(schema.getQuotedIdentifierString()); + } + + sqlBuilder.append(" = ?"); final int sqlType = desc.getDataType(); attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType)); @@ -563,7 +592,14 @@ public class ConvertJSONToSQL extends AbstractProcessor { } fieldCount++; - sqlBuilder.append(normalizedColName).append(" = ?"); + if(!escapeColumnNames){ + sqlBuilder.append(normalizedColName); + } else { + sqlBuilder.append(schema.getQuotedIdentifierString()) + .append(normalizedColName) + .append(schema.getQuotedIdentifierString()); + } + sqlBuilder.append(" = ?"); final int sqlType = desc.getDataType(); attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType)); @@ -586,11 +622,13 @@ public class ConvertJSONToSQL extends AbstractProcessor { private List requiredColumnNames; private Set primaryKeyColumnNames; private Map columns; + private String quotedIdentifierString; private TableSchema(final List columnDescriptions, final boolean translateColumnNames, - final Set primaryKeyColumnNames) { + final Set primaryKeyColumnNames, final String quotedIdentifierString) { this.columns = new HashMap<>(); this.primaryKeyColumnNames = primaryKeyColumnNames; + this.quotedIdentifierString = quotedIdentifierString; this.requiredColumnNames = new ArrayList<>(); for (final ColumnDescription desc : columnDescriptions) { @@ -613,10 +651,15 @@ public class ConvertJSONToSQL extends AbstractProcessor { return primaryKeyColumnNames; } + public String getQuotedIdentifierString() { + return quotedIdentifierString; + } + public static TableSchema from(final Connection conn, final String catalog, final String schema, final String tableName, final boolean translateColumnNames, final boolean includePrimaryKeys) throws SQLException { - try (final ResultSet colrs = conn.getMetaData().getColumns(catalog, schema, tableName, "%")) { + final DatabaseMetaData dmd = conn.getMetaData(); + try (final ResultSet colrs = dmd.getColumns(catalog, schema, tableName, "%")) { final List cols = new ArrayList<>(); while (colrs.next()) { final ColumnDescription col = ColumnDescription.from(colrs); @@ -634,7 +677,7 @@ public class ConvertJSONToSQL extends AbstractProcessor { } } - return new TableSchema(cols, translateColumnNames, primaryKeyColumns); + return new TableSchema(cols, translateColumnNames, primaryKeyColumns, dmd.getIdentifierQuoteString()); } } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java index 60d993ef05..7e95adab97 100755 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java @@ -82,6 +82,42 @@ public class TestConvertJSONToSQL { out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)"); } + @Test + public void testInsertQuotedIdentifiers() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); + runner.setProperty(ConvertJSONToSQL.QUOTED_IDENTIFIERS, "true"); + + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.1.value", "1"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.2.value", "Mark"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.3.value", "48"); + + out.assertContentEquals("INSERT INTO PERSONS (\"ID\", \"NAME\", \"CODE\") VALUES (?, ?, ?)"); + } + @Test public void testInsertWithNullValue() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); @@ -186,6 +222,41 @@ public class TestConvertJSONToSQL { } } + @Test + public void testMultipleInsertsQuotedIdentifiers() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); + runner.setProperty(ConvertJSONToSQL.QUOTED_IDENTIFIERS, "true"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/persons.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 5); + final List mffs = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL); + for (final MockFlowFile mff : mffs) { + mff.assertContentEquals("INSERT INTO PERSONS (\"ID\", \"NAME\", \"CODE\") VALUES (?, ?, ?)"); + + for (int i=1; i <= 3; i++) { + mff.assertAttributeExists("sql.args." + i + ".type"); + mff.assertAttributeExists("sql.args." + i + ".value"); + } + } + } + @Test public void testUpdateBasedOnPrimaryKey() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); @@ -220,6 +291,41 @@ public class TestConvertJSONToSQL { out.assertContentEquals("UPDATE PERSONS SET NAME = ?, CODE = ? WHERE ID = ?"); } + @Test + public void testUpdateBasedOnPrimaryKeyQuotedIdentifier() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); + runner.setProperty(ConvertJSONToSQL.QUOTED_IDENTIFIERS, "true"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.1.value", "Mark"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.2.value", "48"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.3.value", "1"); + + out.assertContentEquals("UPDATE PERSONS SET \"NAME\" = ?, \"CODE\" = ? WHERE \"ID\" = ?"); + } + @Test public void testUnmappedFieldBehavior() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); @@ -290,6 +396,42 @@ public class TestConvertJSONToSQL { out.assertContentEquals("UPDATE PERSONS SET ID = ?, NAME = ? WHERE CODE = ?"); } + @Test + public void testUpdateBasedOnUpdateKeyQuotedIdentifier() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); + runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "code"); + runner.setProperty(ConvertJSONToSQL.QUOTED_IDENTIFIERS, "true"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.1.value", "1"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.2.value", "Mark"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.3.value", "48"); + + out.assertContentEquals("UPDATE PERSONS SET \"ID\" = ?, \"NAME\" = ? WHERE \"CODE\" = ?"); + } + @Test public void testUpdateBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);