NIFI-10705 - Primary keys were not being included in ConvertJSONToSQL UPDATE queries when 'User statement.type Attribute' was used.

This closes #6652

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Nathan Gough 2022-11-10 23:47:20 -05:00 committed by exceptionfactory
parent 7de74ad3f0
commit 23e8a19417
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
2 changed files with 61 additions and 36 deletions

View File

@ -292,6 +292,9 @@ public class ConvertJSONToSQL extends AbstractProcessor {
final boolean translateFieldNames = context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean();
final boolean ignoreUnmappedFields = IGNORE_UNMATCHED_FIELD.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_FIELD_BEHAVIOR).getValue());
String statementType = context.getProperty(STATEMENT_TYPE).getValue();
if (USE_ATTR_TYPE.equals(statementType)) {
statementType = flowFile.getAttribute(STATEMENT_TYPE_ATTRIBUTE);
}
final String updateKeys = context.getProperty(UPDATE_KEY).evaluateAttributeExpressions(flowFile).getValue();
final String catalog = context.getProperty(CATALOG_NAME).evaluateAttributeExpressions(flowFile).getValue();
@ -383,10 +386,6 @@ public class ConvertJSONToSQL extends AbstractProcessor {
tableNameBuilder.append(tableName);
final String fqTableName = tableNameBuilder.toString();
if (USE_ATTR_TYPE.equals(statementType)) {
statementType = flowFile.getAttribute(STATEMENT_TYPE_ATTRIBUTE);
}
if (INSERT_TYPE.equals(statementType)) {
sql = generateInsert(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields,
failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName, attributePrefix);

View File

@ -16,8 +16,6 @@
*/
package org.apache.nifi.processors.standard;
import com.fasterxml.jackson.core.JsonGenerationException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.controller.AbstractControllerService;
@ -87,7 +85,7 @@ public class TestConvertJSONToSQL {
}
@Test
public void testInsert() throws InitializationException, ProcessException, SQLException, IOException {
public void testInsert() throws InitializationException, ProcessException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
runner.addControllerService("dbcp", service);
@ -113,7 +111,7 @@ public class TestConvertJSONToSQL {
}
@Test
public void testInsertStatementType() throws InitializationException, ProcessException, SQLException, IOException {
public void testInsertStatementType() throws InitializationException, ProcessException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
runner.addControllerService("dbcp", service);
@ -141,7 +139,7 @@ public class TestConvertJSONToSQL {
}
@Test
public void testInsertQuotedIdentifiers() throws InitializationException, ProcessException, SQLException, IOException {
public void testInsertQuotedIdentifiers() throws InitializationException, ProcessException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
@ -169,7 +167,7 @@ public class TestConvertJSONToSQL {
}
@Test
public void testInsertQuotedTableIdentifier() throws InitializationException, ProcessException, SQLException, IOException {
public void testInsertQuotedTableIdentifier() throws InitializationException, ProcessException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
@ -197,7 +195,7 @@ public class TestConvertJSONToSQL {
}
@Test
public void testInsertWithNullValue() throws InitializationException, ProcessException, SQLException, IOException {
public void testInsertWithNullValue() throws InitializationException, ProcessException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
runner.addControllerService("dbcp", service);
@ -223,7 +221,7 @@ public class TestConvertJSONToSQL {
}
@Test
public void testInsertBoolToInteger() throws InitializationException, ProcessException, SQLException, IOException {
public void testInsertBoolToInteger() throws InitializationException, ProcessException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
runner.addControllerService("dbcp", service);
@ -250,7 +248,7 @@ public class TestConvertJSONToSQL {
@Test
public void testUpdateWithNullValue() throws InitializationException, ProcessException, SQLException, IOException {
public void testUpdateWithNullValue() throws InitializationException, ProcessException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
runner.addControllerService("dbcp", service);
@ -277,7 +275,7 @@ public class TestConvertJSONToSQL {
@Test
public void testUpdateQuotedTableIdentifier() throws InitializationException, ProcessException, SQLException, IOException {
public void testUpdateQuotedTableIdentifier() throws InitializationException, ProcessException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
@ -305,7 +303,7 @@ public class TestConvertJSONToSQL {
@Test
public void testMultipleInserts() throws InitializationException, ProcessException, SQLException, IOException {
public void testMultipleInserts() throws InitializationException, ProcessException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
runner.addControllerService("dbcp", service);
@ -331,7 +329,7 @@ public class TestConvertJSONToSQL {
}
@Test
public void testMultipleInsertsQuotedIdentifiers() throws InitializationException, ProcessException, SQLException, IOException {
public void testMultipleInsertsQuotedIdentifiers() throws InitializationException, ProcessException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
@ -358,7 +356,7 @@ public class TestConvertJSONToSQL {
}
@Test
public void testUpdateBasedOnPrimaryKey() throws InitializationException, ProcessException, SQLException, IOException {
public void testUpdateBasedOnPrimaryKey() throws InitializationException, ProcessException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
runner.addControllerService("dbcp", service);
@ -384,7 +382,7 @@ public class TestConvertJSONToSQL {
}
@Test
public void testUpdateBasedOnPrimaryKeyQuotedIdentifier() throws InitializationException, ProcessException, SQLException, IOException {
public void testUpdateBasedOnPrimaryKeyQuotedIdentifier() throws InitializationException, ProcessException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
@ -411,7 +409,7 @@ public class TestConvertJSONToSQL {
}
@Test
public void testUnmappedFieldBehavior() throws InitializationException, ProcessException, SQLException, IOException {
public void testUnmappedFieldBehavior() throws InitializationException, ProcessException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
runner.addControllerService("dbcp", service);
@ -438,7 +436,7 @@ public class TestConvertJSONToSQL {
}
@Test
public void testUpdateBasedOnUpdateKey() throws InitializationException, ProcessException, SQLException, IOException {
public void testUpdateBasedOnUpdateKey() throws InitializationException, ProcessException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
runner.addControllerService("dbcp", service);
@ -465,7 +463,7 @@ public class TestConvertJSONToSQL {
}
@Test
public void testUpdateBasedOnUpdateKeyQuotedIdentifier() throws InitializationException, ProcessException, SQLException, IOException {
public void testUpdateBasedOnUpdateKeyQuotedIdentifier() throws InitializationException, ProcessException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
@ -493,7 +491,7 @@ public class TestConvertJSONToSQL {
}
@Test
public void testUpdateBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, SQLException, IOException {
public void testUpdateBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
runner.addControllerService("dbcp", service);
@ -522,7 +520,7 @@ public class TestConvertJSONToSQL {
}
@Test
public void testUpdateWithMissingFieldBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, SQLException, IOException {
public void testUpdateWithMissingFieldBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
runner.addControllerService("dbcp", service);
@ -538,7 +536,7 @@ public class TestConvertJSONToSQL {
}
@Test
public void testUpdateWithMalformedJson() throws InitializationException, ProcessException, SQLException, IOException {
public void testUpdateWithMalformedJson() throws InitializationException, ProcessException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
runner.addControllerService("dbcp", service);
@ -554,7 +552,7 @@ public class TestConvertJSONToSQL {
}
@Test
public void testInsertWithMissingField() throws InitializationException, ProcessException, SQLException, IOException {
public void testInsertWithMissingField() throws InitializationException, ProcessException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
runner.addControllerService("dbcp", service);
@ -625,7 +623,7 @@ public class TestConvertJSONToSQL {
} // End testInsertWithMissingColumnWarning()
@Test
public void testInsertWithMissingColumnIgnore() throws InitializationException, ProcessException, SQLException, IOException {
public void testInsertWithMissingColumnIgnore() throws InitializationException, ProcessException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
runner.addControllerService("dbcp", service);
@ -652,7 +650,7 @@ public class TestConvertJSONToSQL {
} // End testInsertWithMissingColumnIgnore()
@Test
public void testUpdateWithMissingColumnFail() throws InitializationException, ProcessException, SQLException, IOException {
public void testUpdateWithMissingColumnFail() throws InitializationException, ProcessException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
runner.addControllerService("dbcp", service);
@ -669,7 +667,7 @@ public class TestConvertJSONToSQL {
} // End testUpdateWithMissingColumnFail()
@Test
public void testUpdateWithMissingColumnWarning() throws InitializationException, ProcessException, SQLException, IOException {
public void testUpdateWithMissingColumnWarning() throws InitializationException, ProcessException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
runner.addControllerService("dbcp", service);
@ -698,7 +696,7 @@ public class TestConvertJSONToSQL {
} // End testUpdateWithMissingColumnWarning()
@Test
public void testUpdateWithMissingColumnIgnore() throws InitializationException, ProcessException, SQLException, IOException {
public void testUpdateWithMissingColumnIgnore() throws InitializationException, ProcessException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
runner.addControllerService("dbcp", service);
@ -731,7 +729,7 @@ public class TestConvertJSONToSQL {
* Use PutSQL processor to verify converted value can be used and don't fail.
*/
@Test
public void testCreateSqlStringValue() throws ProcessException, SQLException, JsonGenerationException, JsonMappingException, IOException, InitializationException {
public void testCreateSqlStringValue() throws ProcessException, SQLException, IOException, InitializationException {
final TestRunner putSqlRunner = TestRunners.newTestRunner(PutSQL.class);
final AtomicInteger id = new AtomicInteger(20);
@ -743,7 +741,7 @@ public class TestConvertJSONToSQL {
String tableName = "DIFTYPES";
ObjectMapper mapper = new ObjectMapper();
ResultSet colrs = null;
ResultSet colrs;
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
String createDifferentTypes = "CREATE TABLE DIFTYPES (id integer primary key, b boolean, f float, dbl double, dcml decimal, d date)";
@ -804,7 +802,7 @@ public class TestConvertJSONToSQL {
}
@Test
public void testDelete() throws InitializationException, ProcessException, SQLException, IOException {
public void testDelete() throws InitializationException, ProcessException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
@ -830,7 +828,7 @@ public class TestConvertJSONToSQL {
}
@Test
public void testDeleteQuotedIdentifiers() throws InitializationException, ProcessException, SQLException, IOException {
public void testDeleteQuotedIdentifiers() throws InitializationException, ProcessException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
@ -858,7 +856,7 @@ public class TestConvertJSONToSQL {
}
@Test
public void testDeleteQuotedTableIdentifier() throws InitializationException, ProcessException, SQLException, IOException {
public void testDeleteQuotedTableIdentifier() throws InitializationException, ProcessException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
@ -886,7 +884,7 @@ public class TestConvertJSONToSQL {
}
@Test
public void testDeleteWithNullValue() throws InitializationException, ProcessException, SQLException, IOException {
public void testDeleteWithNullValue() throws InitializationException, ProcessException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
@ -912,7 +910,7 @@ public class TestConvertJSONToSQL {
}
@Test
public void testAttributePrefix() throws InitializationException, ProcessException, SQLException, IOException {
public void testAttributePrefix() throws InitializationException, ProcessException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
@ -940,6 +938,34 @@ public class TestConvertJSONToSQL {
out.assertContentEquals("INSERT INTO PERSONS (\"ID\", \"NAME\", \"CODE\") VALUES (?, ?, ?)");
}
@Test
public void testUpdateStatementTypeWithStatementTypeAttribute() throws InitializationException, ProcessException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, ConvertJSONToSQL.USE_ATTR_TYPE);
Map<String, String> attrs = new HashMap<>();
attrs.put(ConvertJSONToSQL.STATEMENT_TYPE_ATTRIBUTE, "UPDATE");
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"), attrs);
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR));
out.assertAttributeEquals("sql.args.1.value", "Mark");
out.assertAttributeEquals("sql.args.2.type", String.valueOf(Types.INTEGER));
out.assertAttributeEquals("sql.args.2.value", "48");
out.assertAttributeEquals("sql.args.3.type", String.valueOf(Types.INTEGER));
out.assertAttributeEquals("sql.args.3.value", "1");
out.assertContentEquals("UPDATE PERSONS SET NAME = ?, CODE = ? WHERE ID = ?");
}
/**
* Simple implementation only for testing purposes
*/