diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java index 00db3b56f8..638ec9dc5c 100755 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java @@ -83,9 +83,10 @@ import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttribu + "relationship and the SQL is routed to the 'sql' relationship.") @WritesAttributes({ @WritesAttribute(attribute="mime.type", description="Sets mime.type of FlowFile that is routed to 'sql' to 'text/plain'."), - @WritesAttribute(attribute="sql.table", description="Sets the sql.table attribute of FlowFile that is routed to 'sql' to the name of the table that is updated by the SQL statement."), - @WritesAttribute(attribute="sql.catalog", description="If the Catalog name is set for this database, specifies the name of the catalog that the SQL statement will update. " - + "If no catalog is used, this attribute will not be added."), + @WritesAttribute(attribute = ".table", description = "Sets the .table attribute of FlowFile that is routed to 'sql' to the name of the table that is updated by the SQL statement. " + + "The prefix for this attribute ('sql', e.g.) is determined by the SQL Parameter Attribute Prefix property."), + @WritesAttribute(attribute=".catalog", description="If the Catalog name is set for this database, specifies the name of the catalog that the SQL statement will update. " + + "If no catalog is used, this attribute will not be added. The prefix for this attribute ('sql', e.g.) is determined by the SQL Parameter Attribute Prefix property."), @WritesAttribute(attribute="fragment.identifier", description="All FlowFiles routed to the 'sql' relationship for the same incoming FlowFile (multiple will be output for the same incoming " + "FlowFile if the incoming FlowFile is a JSON Array) will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), @WritesAttribute(attribute="fragment.count", description="The number of SQL FlowFiles that were produced for same incoming FlowFile. This can be used in conjunction with the " @@ -93,12 +94,14 @@ import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttribu @WritesAttribute(attribute="fragment.index", description="The position of this FlowFile in the list of outgoing FlowFiles that were all derived from the same incoming FlowFile. This can be " + "used in conjunction with the fragment.identifier and fragment.count attributes to know which FlowFiles originated from the same incoming FlowFile and in what order the SQL " + "FlowFiles were produced"), - @WritesAttribute(attribute="sql.args.N.type", description="The output SQL statements are parametrized in order to avoid SQL Injection Attacks. The types of the Parameters " - + "to use are stored in attributes named sql.args.1.type, sql.args.2.type, sql.args.3.type, and so on. The type is a number representing a JDBC Type constant. " - + "Generally, this is useful only for software to read and interpret but is added so that a processor such as PutSQL can understand how to interpret the values."), - @WritesAttribute(attribute="sql.args.N.value", description="The output SQL statements are parametrized in order to avoid SQL Injection Attacks. The values of the Parameters " + @WritesAttribute(attribute=".args.N.type", description="The output SQL statements are parametrized in order to avoid SQL Injection Attacks. The types of the Parameters " + + "to use are stored in attributes named .args.1.type, .args.2.type, .args.3.type, and so on. The type is a number representing a JDBC Type constant. " + + "Generally, this is useful only for software to read and interpret but is added so that a processor such as PutSQL can understand how to interpret the values. " + + "The prefix for this attribute ('sql', e.g.) is determined by the SQL Parameter Attribute Prefix property."), + @WritesAttribute(attribute=".args.N.value", description="The output SQL statements are parametrized in order to avoid SQL Injection Attacks. The values of the Parameters " + "to use are stored in the attributes named sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. Each of these attributes has a corresponding " - + "sql.args.N.type attribute that indicates how the value should be interpreted when inserting it into the database.") + + ".args.N.type attribute that indicates how the value should be interpreted when inserting it into the database." + + "The prefix for this attribute ('sql', e.g.) is determined by the SQL Parameter Attribute Prefix property.") }) public class ConvertJSONToSQL extends AbstractProcessor { private static final String UPDATE_TYPE = "UPDATE"; @@ -201,6 +204,16 @@ public class ConvertJSONToSQL extends AbstractProcessor { .defaultValue("false") .build(); + static final PropertyDescriptor SQL_PARAM_ATTR_PREFIX = new PropertyDescriptor.Builder() + .name("jts-sql-param-attr-prefix") + .displayName("SQL Parameter Attribute Prefix") + .description("The string to be prepended to the outgoing flow file attributes, such as .args.1.value, where is replaced with the specified value") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .required(true) + .defaultValue("sql") + .build(); + static final Relationship REL_ORIGINAL = new Relationship.Builder() .name("original") .description("When a FlowFile is converted to SQL, the original JSON FlowFile is routed to this relationship") @@ -238,6 +251,7 @@ public class ConvertJSONToSQL extends AbstractProcessor { properties.add(UPDATE_KEY); properties.add(QUOTED_IDENTIFIERS); properties.add(QUOTED_TABLE_IDENTIFIER); + properties.add(SQL_PARAM_ATTR_PREFIX); return properties; } @@ -287,6 +301,9 @@ public class ConvertJSONToSQL extends AbstractProcessor { // Quote table name? final boolean quoteTableName = context.getProperty(QUOTED_TABLE_IDENTIFIER).asBoolean(); + // Attribute prefix + final String attributePrefix = context.getProperty(SQL_PARAM_ATTR_PREFIX).evaluateAttributeExpressions(flowFile).getValue(); + // get the database schema from the cache, if one exists. We do this in a synchronized block, rather than // using a ConcurrentMap because the Map that we are using is a LinkedHashMap with a capacity such that if // the Map grows beyond this capacity, old elements are evicted. We do this in order to avoid filling the @@ -364,13 +381,13 @@ public class ConvertJSONToSQL extends AbstractProcessor { if (INSERT_TYPE.equals(statementType)) { sql = generateInsert(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields, - failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName); + failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName, attributePrefix); } else if (UPDATE_TYPE.equals(statementType)) { sql = generateUpdate(jsonNode, attributes, fqTableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields, - failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName); + failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName, attributePrefix); } else { sql = generateDelete(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields, - failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName); + failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName, attributePrefix); } } catch (final ProcessException pe) { getLogger().error("Failed to convert {} to a SQL {} statement due to {}; routing to failure", @@ -391,13 +408,13 @@ public class ConvertJSONToSQL extends AbstractProcessor { }); attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); - attributes.put("sql.table", tableName); + attributes.put(attributePrefix + ".table", tableName); attributes.put(FRAGMENT_ID.key(), fragmentIdentifier); attributes.put(FRAGMENT_COUNT.key(), String.valueOf(arrayNode.size())); attributes.put(FRAGMENT_INDEX.key(), String.valueOf(i)); if (catalog != null) { - attributes.put("sql.catalog", catalog); + attributes.put(attributePrefix + ".catalog", catalog); } sqlFlowFile = session.putAllAttributes(sqlFlowFile, attributes); @@ -420,7 +437,7 @@ public class ConvertJSONToSQL extends AbstractProcessor { private String generateInsert(final JsonNode rootNode, final Map attributes, final String tableName, final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns, - final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName) { + final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName, final String attributePrefix) { final Set normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames); for (final String requiredColName : schema.getRequiredColumnNames()) { @@ -449,7 +466,7 @@ public class ConvertJSONToSQL extends AbstractProcessor { sqlBuilder.append(" ("); // iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as - // adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the + // adding the column value to a ".args.N.value" attribute and the type of a ".args.N.type" attribute add the // columns that we are inserting into final Iterator fieldNames = rootNode.getFieldNames(); while (fieldNames.hasNext()) { @@ -474,13 +491,13 @@ public class ConvertJSONToSQL extends AbstractProcessor { } final int sqlType = desc.getDataType(); - attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType)); + attributes.put(attributePrefix + ".args." + fieldCount + ".type", String.valueOf(sqlType)); final Integer colSize = desc.getColumnSize(); final JsonNode fieldNode = rootNode.get(fieldName); if (!fieldNode.isNull()) { String fieldValue = createSqlStringValue(fieldNode, colSize, sqlType); - attributes.put("sql.args." + fieldCount + ".value", fieldValue); + attributes.put(attributePrefix + ".args." + fieldCount + ".value", fieldValue); } } } @@ -562,7 +579,7 @@ public class ConvertJSONToSQL extends AbstractProcessor { private String generateUpdate(final JsonNode rootNode, final Map attributes, final String tableName, final String updateKeys, final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns, - final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName) { + final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName, final String attributePrefix) { final Set updateKeyNames; if (updateKeys == null) { @@ -612,7 +629,7 @@ public class ConvertJSONToSQL extends AbstractProcessor { } // iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as - // adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the + // adding the column value to a ".args.N.value" attribute and the type of a ".args.N.type" attribute add the // columns that we are inserting into Iterator fieldNames = rootNode.getFieldNames(); while (fieldNames.hasNext()) { @@ -648,14 +665,14 @@ public class ConvertJSONToSQL extends AbstractProcessor { sqlBuilder.append(" = ?"); final int sqlType = desc.getDataType(); - attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType)); + attributes.put(attributePrefix + ".args." + fieldCount + ".type", String.valueOf(sqlType)); final Integer colSize = desc.getColumnSize(); final JsonNode fieldNode = rootNode.get(fieldName); if (!fieldNode.isNull()) { String fieldValue = createSqlStringValue(fieldNode, colSize, sqlType); - attributes.put("sql.args." + fieldCount + ".value", fieldValue); + attributes.put(attributePrefix + ".args." + fieldCount + ".value", fieldValue); } } @@ -693,14 +710,14 @@ public class ConvertJSONToSQL extends AbstractProcessor { } sqlBuilder.append(" = ?"); final int sqlType = desc.getDataType(); - attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType)); + attributes.put(attributePrefix + ".args." + fieldCount + ".type", String.valueOf(sqlType)); final Integer colSize = desc.getColumnSize(); String fieldValue = rootNode.get(fieldName).asText(); if (colSize != null && fieldValue.length() > colSize) { fieldValue = fieldValue.substring(0, colSize); } - attributes.put("sql.args." + fieldCount + ".value", fieldValue); + attributes.put(attributePrefix + ".args." + fieldCount + ".value", fieldValue); } return sqlBuilder.toString(); @@ -708,7 +725,7 @@ public class ConvertJSONToSQL extends AbstractProcessor { private String generateDelete(final JsonNode rootNode, final Map attributes, final String tableName, final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns, - final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName) { + final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName, final String attributePrefix) { final Set normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames); for (final String requiredColName : schema.getRequiredColumnNames()) { final String normalizedColName = normalizeColumnName(requiredColName, translateFieldNames); @@ -737,7 +754,7 @@ public class ConvertJSONToSQL extends AbstractProcessor { sqlBuilder.append(" WHERE "); // iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as - // adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the + // adding the column value to a ".args.N.value" attribute and the type of a ".args.N.type" attribute add the // columns that we are inserting into final Iterator fieldNames = rootNode.getFieldNames(); while (fieldNames.hasNext()) { @@ -763,7 +780,7 @@ public class ConvertJSONToSQL extends AbstractProcessor { sqlBuilder.append(" = ?"); final int sqlType = desc.getDataType(); - attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType)); + attributes.put(attributePrefix + ".args." + fieldCount + ".type", String.valueOf(sqlType)); final Integer colSize = desc.getColumnSize(); final JsonNode fieldNode = rootNode.get(fieldName); @@ -772,7 +789,7 @@ public class ConvertJSONToSQL extends AbstractProcessor { if (colSize != null && fieldValue.length() > colSize) { fieldValue = fieldValue.substring(0, colSize); } - attributes.put("sql.args." + fieldCount + ".value", fieldValue); + attributes.put(attributePrefix + ".args." + fieldCount + ".value", fieldValue); } } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java index 2332e28b18..8a03a81078 100755 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java @@ -874,6 +874,35 @@ public class TestConvertJSONToSQL { out.assertContentEquals("DELETE FROM PERSONS WHERE ID = ? AND NAME = ? AND CODE = ?"); } + @Test + public void testAttributePrefix() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); + runner.setProperty(ConvertJSONToSQL.QUOTED_IDENTIFIERS, "true"); + runner.setProperty(ConvertJSONToSQL.SQL_PARAM_ATTR_PREFIX, "hiveql"); + + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1"); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("hiveql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("hiveql.args.1.value", "1"); + out.assertAttributeEquals("hiveql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("hiveql.args.2.value", "Mark"); + out.assertAttributeEquals("hiveql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("hiveql.args.3.value", "48"); + + out.assertContentEquals("INSERT INTO PERSONS (\"ID\", \"NAME\", \"CODE\") VALUES (?, ?, ?)"); + } + /** * Simple implementation only for testing purposes */