NIFI-1093 - Added settings for processing JSON documents when validating against database columns to fail, log a warning, or ignore unmatched columns. Added tests for new settings.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Daniel Cave 2016-01-28 18:44:56 +00:00 committed by Mark Payne
parent dbe8ff3f44
commit 171b9c4e94
2 changed files with 258 additions and 37 deletions

View File

@ -74,32 +74,38 @@ import org.codehaus.jackson.node.JsonNodeFactory;
+ "output as a separate FlowFile to the 'sql' relationship. Upon successful conversion, the original FlowFile is routed to the 'original' "
+ "relationship and the SQL is routed to the 'sql' relationship.")
@WritesAttributes({
@WritesAttribute(attribute="mime.type", description="Sets mime.type of FlowFile that is routed to 'sql' to 'text/plain'."),
@WritesAttribute(attribute="sql.table", description="Sets the sql.table attribute of FlowFile that is routed to 'sql' to the name of the table that is updated by the SQL statement."),
@WritesAttribute(attribute="sql.catalog", description="If the Catalog name is set for this database, specifies the name of the catalog that the SQL statement will update. "
+ "If no catalog is used, this attribute will not be added."),
@WritesAttribute(attribute="fragment.identifier", description="All FlowFiles routed to the 'sql' relationship for the same incoming FlowFile (multiple will be output for the same incoming "
+ "FlowFile if the incoming FlowFile is a JSON Array) will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
@WritesAttribute(attribute="fragment.count", description="The number of SQL FlowFiles that were produced for same incoming FlowFile. This can be used in conjunction with the "
+ "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming FlowFile."),
@WritesAttribute(attribute="fragment.index", description="The position of this FlowFile in the list of outgoing FlowFiles that were all derived from the same incoming FlowFile. This can be "
+ "used in conjunction with the fragment.identifier and fragment.count attributes to know which FlowFiles originated from the same incoming FlowFile and in what order the SQL "
+ "FlowFiles were produced"),
@WritesAttribute(attribute="sql.args.N.type", description="The output SQL statements are parameterized in order to avoid SQL Injection Attacks. The types of the Parameters "
+ "to use are stored in attributes named sql.args.1.type, sql.args.2.type, sql.args.3.type, and so on. The type is a number representing a JDBC Type constant. "
+ "Generally, this is useful only for software to read and interpret but is added so that a processor such as PutSQL can understand how to interpret the values."),
@WritesAttribute(attribute="sql.args.N.value", description="The output SQL statements are parameterized in order to avoid SQL Injection Attacks. The values of the Parameters "
+ "to use are stored in the attributes named sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. Each of these attributes has a corresponding "
+ "sql.args.N.type attribute that indicates how the value should be interpreted when inserting it into the database.")
@WritesAttribute(attribute="mime.type", description="Sets mime.type of FlowFile that is routed to 'sql' to 'text/plain'."),
@WritesAttribute(attribute="sql.table", description="Sets the sql.table attribute of FlowFile that is routed to 'sql' to the name of the table that is updated by the SQL statement."),
@WritesAttribute(attribute="sql.catalog", description="If the Catalog name is set for this database, specifies the name of the catalog that the SQL statement will update. "
+ "If no catalog is used, this attribute will not be added."),
@WritesAttribute(attribute="fragment.identifier", description="All FlowFiles routed to the 'sql' relationship for the same incoming FlowFile (multiple will be output for the same incoming "
+ "FlowFile if the incoming FlowFile is a JSON Array) will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
@WritesAttribute(attribute="fragment.count", description="The number of SQL FlowFiles that were produced for same incoming FlowFile. This can be used in conjunction with the "
+ "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming FlowFile."),
@WritesAttribute(attribute="fragment.index", description="The position of this FlowFile in the list of outgoing FlowFiles that were all derived from the same incoming FlowFile. This can be "
+ "used in conjunction with the fragment.identifier and fragment.count attributes to know which FlowFiles originated from the same incoming FlowFile and in what order the SQL "
+ "FlowFiles were produced"),
@WritesAttribute(attribute="sql.args.N.type", description="The output SQL statements are parameterized in order to avoid SQL Injection Attacks. The types of the Parameters "
+ "to use are stored in attributes named sql.args.1.type, sql.args.2.type, sql.args.3.type, and so on. The type is a number representing a JDBC Type constant. "
+ "Generally, this is useful only for software to read and interpret but is added so that a processor such as PutSQL can understand how to interpret the values."),
@WritesAttribute(attribute="sql.args.N.value", description="The output SQL statements are parameterized in order to avoid SQL Injection Attacks. The values of the Parameters "
+ "to use are stored in the attributes named sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. Each of these attributes has a corresponding "
+ "sql.args.N.type attribute that indicates how the value should be interpreted when inserting it into the database.")
})
public class ConvertJSONToSQL extends AbstractProcessor {
private static final String UPDATE_TYPE = "UPDATE";
private static final String INSERT_TYPE = "INSERT";
static final AllowableValue IGNORE_UNMATCHED_FIELD = new AllowableValue("Ignore Unmatched Fields", "Ignore Unmatched Fields",
"Any field in the JSON document that cannot be mapped to a column in the database is ignored");
"Any field in the JSON document that cannot be mapped to a column in the database is ignored");
static final AllowableValue FAIL_UNMATCHED_FIELD = new AllowableValue("Fail", "Fail",
"If the JSON document has any field that cannot be mapped to a column in the database, the FlowFile will be routed to the failure relationship");
"If the JSON document has any field that cannot be mapped to a column in the database, the FlowFile will be routed to the failure relationship");
static final AllowableValue IGNORE_UNMATCHED_COLUMN = new AllowableValue("Ignore Unmatched Columns", "Ignore Unmatched Columns",
"Any column in the database that does not have a field in the JSON document will be assumed to not be required. No notification will be logged");
static final AllowableValue WARNING_UNMATCHED_COLUMN = new AllowableValue("Warning Unmatched Columns", "Warning Unmatched Columns",
"Any column in the database that does not have a field in the JSON document will be assumed to not be required. A warning will be logged");
static final AllowableValue FAIL_UNMATCHED_COLUMN = new AllowableValue("Fail Unmatched Columns", "Fail Unmatched Columns",
"Any column in the database that does not have a field in the JSON document will fail the flow. An error will be logged");
static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder()
.name("JDBC Connection Pool")
@ -143,16 +149,22 @@ public class ConvertJSONToSQL extends AbstractProcessor {
.defaultValue("true")
.build();
static final PropertyDescriptor UNMATCHED_FIELD_BEHAVIOR = new PropertyDescriptor.Builder()
.name("Unmatched Field Behavior")
.description("If an incoming JSON element has a field that does not map to any of the database table's columns, this property specifies how to handle the situation")
.allowableValues(IGNORE_UNMATCHED_FIELD, FAIL_UNMATCHED_FIELD)
.defaultValue(IGNORE_UNMATCHED_FIELD.getValue())
.build();
.name("Unmatched Field Behavior")
.description("If an incoming JSON element has a field that does not map to any of the database table's columns, this property specifies how to handle the situation")
.allowableValues(IGNORE_UNMATCHED_FIELD, FAIL_UNMATCHED_FIELD)
.defaultValue(IGNORE_UNMATCHED_FIELD.getValue())
.build();
static final PropertyDescriptor UNMATCHED_COLUMN_BEHAVIOR = new PropertyDescriptor.Builder()
.name("Unmatched Column Behavior")
.description("If an incoming JSON element does not have a field mapping for all of the database table's columns, this property specifies how to handle the situation")
.allowableValues(IGNORE_UNMATCHED_COLUMN, WARNING_UNMATCHED_COLUMN ,FAIL_UNMATCHED_COLUMN)
.defaultValue(FAIL_UNMATCHED_COLUMN.getValue())
.build();
static final PropertyDescriptor UPDATE_KEY = new PropertyDescriptor.Builder()
.name("Update Keys")
.description("A comma-separated list of column names that uniquely identifies a row in the database for UPDATE statements. "
+ "If the Statement Type is UPDATE and this property is not set, the table's Primary Keys are used. "
+ "In this case, if no Primary Key exists, the conversion to SQL will fail. "
+ "In this case, if no Primary Key exists, the conversion to SQL will fail if Unmatched Column Behaviour is set to FAIL. "
+ "This property is ignored if the Statement Type is INSERT")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
@ -193,6 +205,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
properties.add(SCHEMA_NAME);
properties.add(TRANSLATE_FIELD_NAMES);
properties.add(UNMATCHED_FIELD_BEHAVIOR);
properties.add(UNMATCHED_COLUMN_BEHAVIOR);
properties.add(UPDATE_KEY);
return properties;
}
@ -233,6 +246,10 @@ public class ConvertJSONToSQL extends AbstractProcessor {
final SchemaKey schemaKey = new SchemaKey(catalog, tableName);
final boolean includePrimaryKeys = UPDATE_TYPE.equals(statementType) && updateKeys == null;
// Is the unmatched column behaviour fail or warning?
final boolean failUnmappedColumns = FAIL_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
final boolean warningUnmappedColumns = WARNING_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
// get the database schema from the cache, if one exists. We do this in a synchronized block, rather than
// using a ConcurrentMap because the Map that we are using is a LinkedHashMap with a capacity such that if
// the Map grows beyond this capacity, old elements are evicted. We do this in order to avoid filling the
@ -309,9 +326,9 @@ public class ConvertJSONToSQL extends AbstractProcessor {
final String fqTableName = tableNameBuilder.toString();
if (INSERT_TYPE.equals(statementType)) {
sql = generateInsert(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields);
sql = generateInsert(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields, failUnmappedColumns, warningUnmappedColumns);
} else {
sql = generateUpdate(jsonNode, attributes, fqTableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields);
sql = generateUpdate(jsonNode, attributes, fqTableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields, failUnmappedColumns, warningUnmappedColumns);
}
} catch (final ProcessException pe) {
getLogger().error("Failed to convert {} to a SQL {} statement due to {}; routing to failure",
@ -359,13 +376,18 @@ public class ConvertJSONToSQL extends AbstractProcessor {
}
private String generateInsert(final JsonNode rootNode, final Map<String, String> attributes, final String tableName,
final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields) {
final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields,
final boolean failUnmappedColumns, final boolean warningUnmappedColumns) {
final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames);
for (final String requiredColName : schema.getRequiredColumnNames()) {
final String normalizedColName = normalizeColumnName(requiredColName, translateFieldNames);
if (!normalizedFieldNames.contains(normalizedColName)) {
throw new ProcessException("JSON does not have a value for the Required column '" + requiredColName + "'");
if(failUnmappedColumns) {
getLogger().error("JSON does not have a value for the Required column '" + requiredColName + "'");
throw new ProcessException("JSON does not have a value for the Required column '" + requiredColName + "'");
} else if(warningUnmappedColumns) {
getLogger().warn("JSON does not have a value for the Required column '" + requiredColName + "'");
}
}
}
@ -426,7 +448,8 @@ public class ConvertJSONToSQL extends AbstractProcessor {
}
private String generateUpdate(final JsonNode rootNode, final Map<String, String> attributes, final String tableName, final String updateKeys,
final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields) {
final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields,
final boolean failUnmappedColumns, final boolean warningUnmappedColumns) {
final Set<String> updateKeyNames;
if (updateKeys == null) {
@ -454,9 +477,13 @@ public class ConvertJSONToSQL extends AbstractProcessor {
for (final String uk : updateKeyNames) {
final String normalizedUK = normalizeColumnName(uk, translateFieldNames);
normalizedUpdateNames.add(normalizedUK);
if (!normalizedFieldNames.contains(normalizedUK)) {
throw new ProcessException("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'");
if(!normalizedFieldNames.contains(normalizedUK)) {
if(failUnmappedColumns) {
getLogger().error("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'");
throw new ProcessException("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'");
} else if(warningUnmappedColumns) {
getLogger().warn("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'");
}
}
}
@ -469,7 +496,6 @@ public class ConvertJSONToSQL extends AbstractProcessor {
final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames);
final ColumnDescription desc = schema.getColumns().get(normalizedColName);
if (desc == null) {
if (!ignoreUnmappedFields) {
throw new ProcessException("Cannot map JSON field '" + fieldName + "' to any column in the database");
@ -554,7 +580,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
private Map<String, ColumnDescription> columns;
private TableSchema(final List<ColumnDescription> columnDescriptions, final boolean translateColumnNames,
final Set<String> primaryKeyColumnNames) {
final Set<String> primaryKeyColumnNames) {
this.columns = new HashMap<>();
this.primaryKeyColumnNames = primaryKeyColumnNames;
@ -580,7 +606,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
}
public static TableSchema from(final Connection conn, final String catalog, final String schema, final String tableName,
final boolean translateColumnNames, final boolean includePrimaryKeys) throws SQLException {
final boolean translateColumnNames, final boolean includePrimaryKeys) throws SQLException {
try (final ResultSet colrs = conn.getMetaData().getColumns(catalog, schema, tableName, "%")) {
final List<ColumnDescription> cols = new ArrayList<>();

View File

@ -400,6 +400,201 @@ public class TestConvertJSONToSQL {
runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
}
@Test
public void testInsertWithMissingColumnFail() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate("CREATE TABLE PERSONS (id integer, name varchar(100), code integer, generated_key integer primary key)");
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Fail Unmatched Columns");
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
runner.run();
runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
} // End testInsertWithMissingColumnFail()
@Test
public void testInsertWithMissingColumnWarning() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate("CREATE TABLE PERSONS (id integer, name varchar(100), code integer, generated_key integer primary key)");
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Warning Unmatched Columns");
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.1.value", "1");
out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
out.assertAttributeEquals("sql.args.2.value", "Mark");
out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.3.value", "48");
out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)");
} // End testInsertWithMissingColumnWarning()
@Test
public void testInsertWithMissingColumnIgnore() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate("CREATE TABLE PERSONS (id integer, name varchar(100), code integer, generated_key integer primary key)");
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Ignore Unmatched Columns");
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.1.value", "1");
out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
out.assertAttributeEquals("sql.args.2.value", "Mark");
out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.3.value", "48");
out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)");
} // End testInsertWithMissingColumnIgnore()
@Test
public void testUpdateWithMissingColumnFail() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code, extra");
runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Fail Unmatched Columns");
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
runner.run();
runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
} // End testUpdateWithMissingColumnFail()
@Test
public void testUpdateWithMissingColumnWarning() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code, extra");
runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Warning Unmatched Columns");
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.1.value", "1");
out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
out.assertAttributeEquals("sql.args.2.value", "Mark");
out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.3.value", "48");
out.assertContentEquals("UPDATE PERSONS SET ID = ? WHERE NAME = ? AND CODE = ?");
} // End testUpdateWithMissingColumnWarning()
@Test
public void testUpdateWithMissingColumnIgnore() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code, extra");
runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Ignore Unmatched Columns");
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.1.value", "1");
out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
out.assertAttributeEquals("sql.args.2.value", "Mark");
out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.3.value", "48");
out.assertContentEquals("UPDATE PERSONS SET ID = ? WHERE NAME = ? AND CODE = ?");
} // End testUpdateWithMissingColumnIgnore()
/**
* Simple implementation only for testing purposes