NIFI-4684 - Added SQL Parameter Attribute Prefix property to ConvertJSONToSQL

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2333.
This commit is contained in:
Matthew Burgess 2017-12-08 22:01:25 -05:00 committed by Pierre Villard
parent 17718940d7
commit 612675e428
2 changed files with 73 additions and 27 deletions

View File

@ -83,9 +83,10 @@ import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttribu
+ "relationship and the SQL is routed to the 'sql' relationship.") + "relationship and the SQL is routed to the 'sql' relationship.")
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute="mime.type", description="Sets mime.type of FlowFile that is routed to 'sql' to 'text/plain'."), @WritesAttribute(attribute="mime.type", description="Sets mime.type of FlowFile that is routed to 'sql' to 'text/plain'."),
@WritesAttribute(attribute="sql.table", description="Sets the sql.table attribute of FlowFile that is routed to 'sql' to the name of the table that is updated by the SQL statement."), @WritesAttribute(attribute = "<sql>.table", description = "Sets the <sql>.table attribute of FlowFile that is routed to 'sql' to the name of the table that is updated by the SQL statement. "
@WritesAttribute(attribute="sql.catalog", description="If the Catalog name is set for this database, specifies the name of the catalog that the SQL statement will update. " + "The prefix for this attribute ('sql', e.g.) is determined by the SQL Parameter Attribute Prefix property."),
+ "If no catalog is used, this attribute will not be added."), @WritesAttribute(attribute="<sql>.catalog", description="If the Catalog name is set for this database, specifies the name of the catalog that the SQL statement will update. "
+ "If no catalog is used, this attribute will not be added. The prefix for this attribute ('sql', e.g.) is determined by the SQL Parameter Attribute Prefix property."),
@WritesAttribute(attribute="fragment.identifier", description="All FlowFiles routed to the 'sql' relationship for the same incoming FlowFile (multiple will be output for the same incoming " @WritesAttribute(attribute="fragment.identifier", description="All FlowFiles routed to the 'sql' relationship for the same incoming FlowFile (multiple will be output for the same incoming "
+ "FlowFile if the incoming FlowFile is a JSON Array) will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), + "FlowFile if the incoming FlowFile is a JSON Array) will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
@WritesAttribute(attribute="fragment.count", description="The number of SQL FlowFiles that were produced for same incoming FlowFile. This can be used in conjunction with the " @WritesAttribute(attribute="fragment.count", description="The number of SQL FlowFiles that were produced for same incoming FlowFile. This can be used in conjunction with the "
@ -93,12 +94,14 @@ import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttribu
@WritesAttribute(attribute="fragment.index", description="The position of this FlowFile in the list of outgoing FlowFiles that were all derived from the same incoming FlowFile. This can be " @WritesAttribute(attribute="fragment.index", description="The position of this FlowFile in the list of outgoing FlowFiles that were all derived from the same incoming FlowFile. This can be "
+ "used in conjunction with the fragment.identifier and fragment.count attributes to know which FlowFiles originated from the same incoming FlowFile and in what order the SQL " + "used in conjunction with the fragment.identifier and fragment.count attributes to know which FlowFiles originated from the same incoming FlowFile and in what order the SQL "
+ "FlowFiles were produced"), + "FlowFiles were produced"),
@WritesAttribute(attribute="sql.args.N.type", description="The output SQL statements are parametrized in order to avoid SQL Injection Attacks. The types of the Parameters " @WritesAttribute(attribute="<sql>.args.N.type", description="The output SQL statements are parametrized in order to avoid SQL Injection Attacks. The types of the Parameters "
+ "to use are stored in attributes named sql.args.1.type, sql.args.2.type, sql.args.3.type, and so on. The type is a number representing a JDBC Type constant. " + "to use are stored in attributes named <sql>.args.1.type, <sql>.args.2.type, <sql>.args.3.type, and so on. The type is a number representing a JDBC Type constant. "
+ "Generally, this is useful only for software to read and interpret but is added so that a processor such as PutSQL can understand how to interpret the values."), + "Generally, this is useful only for software to read and interpret but is added so that a processor such as PutSQL can understand how to interpret the values. "
@WritesAttribute(attribute="sql.args.N.value", description="The output SQL statements are parametrized in order to avoid SQL Injection Attacks. The values of the Parameters " + "The prefix for this attribute ('sql', e.g.) is determined by the SQL Parameter Attribute Prefix property."),
@WritesAttribute(attribute="<sql>.args.N.value", description="The output SQL statements are parametrized in order to avoid SQL Injection Attacks. The values of the Parameters "
+ "to use are stored in the attributes named sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. Each of these attributes has a corresponding " + "to use are stored in the attributes named sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. Each of these attributes has a corresponding "
+ "sql.args.N.type attribute that indicates how the value should be interpreted when inserting it into the database.") + "<sql>.args.N.type attribute that indicates how the value should be interpreted when inserting it into the database."
+ "The prefix for this attribute ('sql', e.g.) is determined by the SQL Parameter Attribute Prefix property.")
}) })
public class ConvertJSONToSQL extends AbstractProcessor { public class ConvertJSONToSQL extends AbstractProcessor {
private static final String UPDATE_TYPE = "UPDATE"; private static final String UPDATE_TYPE = "UPDATE";
@ -201,6 +204,16 @@ public class ConvertJSONToSQL extends AbstractProcessor {
.defaultValue("false") .defaultValue("false")
.build(); .build();
static final PropertyDescriptor SQL_PARAM_ATTR_PREFIX = new PropertyDescriptor.Builder()
.name("jts-sql-param-attr-prefix")
.displayName("SQL Parameter Attribute Prefix")
.description("The string to be prepended to the outgoing flow file attributes, such as <sql>.args.1.value, where <sql> is replaced with the specified value")
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.required(true)
.defaultValue("sql")
.build();
static final Relationship REL_ORIGINAL = new Relationship.Builder() static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original") .name("original")
.description("When a FlowFile is converted to SQL, the original JSON FlowFile is routed to this relationship") .description("When a FlowFile is converted to SQL, the original JSON FlowFile is routed to this relationship")
@ -238,6 +251,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
properties.add(UPDATE_KEY); properties.add(UPDATE_KEY);
properties.add(QUOTED_IDENTIFIERS); properties.add(QUOTED_IDENTIFIERS);
properties.add(QUOTED_TABLE_IDENTIFIER); properties.add(QUOTED_TABLE_IDENTIFIER);
properties.add(SQL_PARAM_ATTR_PREFIX);
return properties; return properties;
} }
@ -287,6 +301,9 @@ public class ConvertJSONToSQL extends AbstractProcessor {
// Quote table name? // Quote table name?
final boolean quoteTableName = context.getProperty(QUOTED_TABLE_IDENTIFIER).asBoolean(); final boolean quoteTableName = context.getProperty(QUOTED_TABLE_IDENTIFIER).asBoolean();
// Attribute prefix
final String attributePrefix = context.getProperty(SQL_PARAM_ATTR_PREFIX).evaluateAttributeExpressions(flowFile).getValue();
// get the database schema from the cache, if one exists. We do this in a synchronized block, rather than // get the database schema from the cache, if one exists. We do this in a synchronized block, rather than
// using a ConcurrentMap because the Map that we are using is a LinkedHashMap with a capacity such that if // using a ConcurrentMap because the Map that we are using is a LinkedHashMap with a capacity such that if
// the Map grows beyond this capacity, old elements are evicted. We do this in order to avoid filling the // the Map grows beyond this capacity, old elements are evicted. We do this in order to avoid filling the
@ -364,13 +381,13 @@ public class ConvertJSONToSQL extends AbstractProcessor {
if (INSERT_TYPE.equals(statementType)) { if (INSERT_TYPE.equals(statementType)) {
sql = generateInsert(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields, sql = generateInsert(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields,
failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName); failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName, attributePrefix);
} else if (UPDATE_TYPE.equals(statementType)) { } else if (UPDATE_TYPE.equals(statementType)) {
sql = generateUpdate(jsonNode, attributes, fqTableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields, sql = generateUpdate(jsonNode, attributes, fqTableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields,
failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName); failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName, attributePrefix);
} else { } else {
sql = generateDelete(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields, sql = generateDelete(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields,
failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName); failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName, attributePrefix);
} }
} catch (final ProcessException pe) { } catch (final ProcessException pe) {
getLogger().error("Failed to convert {} to a SQL {} statement due to {}; routing to failure", getLogger().error("Failed to convert {} to a SQL {} statement due to {}; routing to failure",
@ -391,13 +408,13 @@ public class ConvertJSONToSQL extends AbstractProcessor {
}); });
attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
attributes.put("sql.table", tableName); attributes.put(attributePrefix + ".table", tableName);
attributes.put(FRAGMENT_ID.key(), fragmentIdentifier); attributes.put(FRAGMENT_ID.key(), fragmentIdentifier);
attributes.put(FRAGMENT_COUNT.key(), String.valueOf(arrayNode.size())); attributes.put(FRAGMENT_COUNT.key(), String.valueOf(arrayNode.size()));
attributes.put(FRAGMENT_INDEX.key(), String.valueOf(i)); attributes.put(FRAGMENT_INDEX.key(), String.valueOf(i));
if (catalog != null) { if (catalog != null) {
attributes.put("sql.catalog", catalog); attributes.put(attributePrefix + ".catalog", catalog);
} }
sqlFlowFile = session.putAllAttributes(sqlFlowFile, attributes); sqlFlowFile = session.putAllAttributes(sqlFlowFile, attributes);
@ -420,7 +437,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
private String generateInsert(final JsonNode rootNode, final Map<String, String> attributes, final String tableName, private String generateInsert(final JsonNode rootNode, final Map<String, String> attributes, final String tableName,
final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns, final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName) { final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName, final String attributePrefix) {
final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames); final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames);
for (final String requiredColName : schema.getRequiredColumnNames()) { for (final String requiredColName : schema.getRequiredColumnNames()) {
@ -449,7 +466,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
sqlBuilder.append(" ("); sqlBuilder.append(" (");
// iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as // iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as
// adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the // adding the column value to a "<sql>.args.N.value" attribute and the type of a "<sql>.args.N.type" attribute add the
// columns that we are inserting into // columns that we are inserting into
final Iterator<String> fieldNames = rootNode.getFieldNames(); final Iterator<String> fieldNames = rootNode.getFieldNames();
while (fieldNames.hasNext()) { while (fieldNames.hasNext()) {
@ -474,13 +491,13 @@ public class ConvertJSONToSQL extends AbstractProcessor {
} }
final int sqlType = desc.getDataType(); final int sqlType = desc.getDataType();
attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType)); attributes.put(attributePrefix + ".args." + fieldCount + ".type", String.valueOf(sqlType));
final Integer colSize = desc.getColumnSize(); final Integer colSize = desc.getColumnSize();
final JsonNode fieldNode = rootNode.get(fieldName); final JsonNode fieldNode = rootNode.get(fieldName);
if (!fieldNode.isNull()) { if (!fieldNode.isNull()) {
String fieldValue = createSqlStringValue(fieldNode, colSize, sqlType); String fieldValue = createSqlStringValue(fieldNode, colSize, sqlType);
attributes.put("sql.args." + fieldCount + ".value", fieldValue); attributes.put(attributePrefix + ".args." + fieldCount + ".value", fieldValue);
} }
} }
} }
@ -562,7 +579,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
private String generateUpdate(final JsonNode rootNode, final Map<String, String> attributes, final String tableName, final String updateKeys, private String generateUpdate(final JsonNode rootNode, final Map<String, String> attributes, final String tableName, final String updateKeys,
final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns, final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName) { final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName, final String attributePrefix) {
final Set<String> updateKeyNames; final Set<String> updateKeyNames;
if (updateKeys == null) { if (updateKeys == null) {
@ -612,7 +629,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
} }
// iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as // iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as
// adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the // adding the column value to a "<sql>.args.N.value" attribute and the type of a "<sql>.args.N.type" attribute add the
// columns that we are inserting into // columns that we are inserting into
Iterator<String> fieldNames = rootNode.getFieldNames(); Iterator<String> fieldNames = rootNode.getFieldNames();
while (fieldNames.hasNext()) { while (fieldNames.hasNext()) {
@ -648,14 +665,14 @@ public class ConvertJSONToSQL extends AbstractProcessor {
sqlBuilder.append(" = ?"); sqlBuilder.append(" = ?");
final int sqlType = desc.getDataType(); final int sqlType = desc.getDataType();
attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType)); attributes.put(attributePrefix + ".args." + fieldCount + ".type", String.valueOf(sqlType));
final Integer colSize = desc.getColumnSize(); final Integer colSize = desc.getColumnSize();
final JsonNode fieldNode = rootNode.get(fieldName); final JsonNode fieldNode = rootNode.get(fieldName);
if (!fieldNode.isNull()) { if (!fieldNode.isNull()) {
String fieldValue = createSqlStringValue(fieldNode, colSize, sqlType); String fieldValue = createSqlStringValue(fieldNode, colSize, sqlType);
attributes.put("sql.args." + fieldCount + ".value", fieldValue); attributes.put(attributePrefix + ".args." + fieldCount + ".value", fieldValue);
} }
} }
@ -693,14 +710,14 @@ public class ConvertJSONToSQL extends AbstractProcessor {
} }
sqlBuilder.append(" = ?"); sqlBuilder.append(" = ?");
final int sqlType = desc.getDataType(); final int sqlType = desc.getDataType();
attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType)); attributes.put(attributePrefix + ".args." + fieldCount + ".type", String.valueOf(sqlType));
final Integer colSize = desc.getColumnSize(); final Integer colSize = desc.getColumnSize();
String fieldValue = rootNode.get(fieldName).asText(); String fieldValue = rootNode.get(fieldName).asText();
if (colSize != null && fieldValue.length() > colSize) { if (colSize != null && fieldValue.length() > colSize) {
fieldValue = fieldValue.substring(0, colSize); fieldValue = fieldValue.substring(0, colSize);
} }
attributes.put("sql.args." + fieldCount + ".value", fieldValue); attributes.put(attributePrefix + ".args." + fieldCount + ".value", fieldValue);
} }
return sqlBuilder.toString(); return sqlBuilder.toString();
@ -708,7 +725,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
private String generateDelete(final JsonNode rootNode, final Map<String, String> attributes, final String tableName, private String generateDelete(final JsonNode rootNode, final Map<String, String> attributes, final String tableName,
final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns, final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName) { final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName, final String attributePrefix) {
final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames); final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames);
for (final String requiredColName : schema.getRequiredColumnNames()) { for (final String requiredColName : schema.getRequiredColumnNames()) {
final String normalizedColName = normalizeColumnName(requiredColName, translateFieldNames); final String normalizedColName = normalizeColumnName(requiredColName, translateFieldNames);
@ -737,7 +754,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
sqlBuilder.append(" WHERE "); sqlBuilder.append(" WHERE ");
// iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as // iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as
// adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the // adding the column value to a "<sql>.args.N.value" attribute and the type of a "<sql>.args.N.type" attribute add the
// columns that we are inserting into // columns that we are inserting into
final Iterator<String> fieldNames = rootNode.getFieldNames(); final Iterator<String> fieldNames = rootNode.getFieldNames();
while (fieldNames.hasNext()) { while (fieldNames.hasNext()) {
@ -763,7 +780,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
sqlBuilder.append(" = ?"); sqlBuilder.append(" = ?");
final int sqlType = desc.getDataType(); final int sqlType = desc.getDataType();
attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType)); attributes.put(attributePrefix + ".args." + fieldCount + ".type", String.valueOf(sqlType));
final Integer colSize = desc.getColumnSize(); final Integer colSize = desc.getColumnSize();
final JsonNode fieldNode = rootNode.get(fieldName); final JsonNode fieldNode = rootNode.get(fieldName);
@ -772,7 +789,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
if (colSize != null && fieldValue.length() > colSize) { if (colSize != null && fieldValue.length() > colSize) {
fieldValue = fieldValue.substring(0, colSize); fieldValue = fieldValue.substring(0, colSize);
} }
attributes.put("sql.args." + fieldCount + ".value", fieldValue); attributes.put(attributePrefix + ".args." + fieldCount + ".value", fieldValue);
} }
} }
} }

View File

@ -874,6 +874,35 @@ public class TestConvertJSONToSQL {
out.assertContentEquals("DELETE FROM PERSONS WHERE ID = ? AND NAME = ? AND CODE = ?"); out.assertContentEquals("DELETE FROM PERSONS WHERE ID = ? AND NAME = ? AND CODE = ?");
} }
@Test
public void testAttributePrefix() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
runner.setProperty(ConvertJSONToSQL.QUOTED_IDENTIFIERS, "true");
runner.setProperty(ConvertJSONToSQL.SQL_PARAM_ATTR_PREFIX, "hiveql");
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("hiveql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("hiveql.args.1.value", "1");
out.assertAttributeEquals("hiveql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
out.assertAttributeEquals("hiveql.args.2.value", "Mark");
out.assertAttributeEquals("hiveql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("hiveql.args.3.value", "48");
out.assertContentEquals("INSERT INTO PERSONS (\"ID\", \"NAME\", \"CODE\") VALUES (?, ?, ?)");
}
/** /**
* Simple implementation only for testing purposes * Simple implementation only for testing purposes
*/ */