mirror of https://github.com/apache/nifi.git
NIFI-3626: Add support for DELETE in ConvertJSONToSQL
This closes #1605. Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
parent
40acd4a6e1
commit
c4d0c0bbd1
|
@ -74,8 +74,8 @@ import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttribu
|
|||
@SupportsBatching
|
||||
@SeeAlso(PutSQL.class)
|
||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||
@Tags({"json", "sql", "database", "rdbms", "insert", "update", "relational", "flat"})
|
||||
@CapabilityDescription("Converts a JSON-formatted FlowFile into an UPDATE or INSERT SQL statement. The incoming FlowFile is expected to be "
|
||||
@Tags({"json", "sql", "database", "rdbms", "insert", "update", "delete", "relational", "flat"})
|
||||
@CapabilityDescription("Converts a JSON-formatted FlowFile into an UPDATE, INSERT, or DELETE SQL statement. The incoming FlowFile is expected to be "
|
||||
+ "\"flat\" JSON message, meaning that it consists of a single JSON element and each field maps to a simple type. If a field maps to "
|
||||
+ "a JSON object, that JSON object will be interpreted as Text. If the input is an array of JSON elements, each element in the array is "
|
||||
+ "output as a separate FlowFile to the 'sql' relationship. Upon successful conversion, the original FlowFile is routed to the 'original' "
|
||||
|
@ -102,6 +102,7 @@ import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttribu
|
|||
public class ConvertJSONToSQL extends AbstractProcessor {
|
||||
private static final String UPDATE_TYPE = "UPDATE";
|
||||
private static final String INSERT_TYPE = "INSERT";
|
||||
private static final String DELETE_TYPE = "DELETE";
|
||||
|
||||
static final AllowableValue IGNORE_UNMATCHED_FIELD = new AllowableValue("Ignore Unmatched Fields", "Ignore Unmatched Fields",
|
||||
"Any field in the JSON document that cannot be mapped to a column in the database is ignored");
|
||||
|
@ -128,7 +129,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
|
|||
.name("Statement Type")
|
||||
.description("Specifies the type of SQL Statement to generate")
|
||||
.required(true)
|
||||
.allowableValues(UPDATE_TYPE, INSERT_TYPE)
|
||||
.allowableValues(UPDATE_TYPE, INSERT_TYPE, DELETE_TYPE)
|
||||
.build();
|
||||
static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
|
||||
.name("Table Name")
|
||||
|
@ -363,9 +364,12 @@ public class ConvertJSONToSQL extends AbstractProcessor {
|
|||
if (INSERT_TYPE.equals(statementType)) {
|
||||
sql = generateInsert(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields,
|
||||
failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName);
|
||||
} else {
|
||||
} else if (UPDATE_TYPE.equals(statementType)) {
|
||||
sql = generateUpdate(jsonNode, attributes, fqTableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields,
|
||||
failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName);
|
||||
} else {
|
||||
sql = generateDelete(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields,
|
||||
failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName);
|
||||
}
|
||||
} catch (final ProcessException pe) {
|
||||
getLogger().error("Failed to convert {} to a SQL {} statement due to {}; routing to failure",
|
||||
|
@ -650,6 +654,84 @@ public class ConvertJSONToSQL extends AbstractProcessor {
|
|||
return sqlBuilder.toString();
|
||||
}
|
||||
|
||||
private String generateDelete(final JsonNode rootNode, final Map<String, String> attributes, final String tableName,
|
||||
final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
|
||||
final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName) {
|
||||
final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames);
|
||||
for (final String requiredColName : schema.getRequiredColumnNames()) {
|
||||
final String normalizedColName = normalizeColumnName(requiredColName, translateFieldNames);
|
||||
if (!normalizedFieldNames.contains(normalizedColName)) {
|
||||
String missingColMessage = "JSON does not have a value for the Required column '" + requiredColName + "'";
|
||||
if (failUnmappedColumns) {
|
||||
getLogger().error(missingColMessage);
|
||||
throw new ProcessException(missingColMessage);
|
||||
} else if (warningUnmappedColumns) {
|
||||
getLogger().warn(missingColMessage);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final StringBuilder sqlBuilder = new StringBuilder();
|
||||
int fieldCount = 0;
|
||||
sqlBuilder.append("DELETE FROM ");
|
||||
if (quoteTableName) {
|
||||
sqlBuilder.append(schema.getQuotedIdentifierString())
|
||||
.append(tableName)
|
||||
.append(schema.getQuotedIdentifierString());
|
||||
} else {
|
||||
sqlBuilder.append(tableName);
|
||||
}
|
||||
|
||||
sqlBuilder.append(" WHERE ");
|
||||
|
||||
// iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as
|
||||
// adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the
|
||||
// columns that we are inserting into
|
||||
final Iterator<String> fieldNames = rootNode.getFieldNames();
|
||||
while (fieldNames.hasNext()) {
|
||||
final String fieldName = fieldNames.next();
|
||||
|
||||
final ColumnDescription desc = schema.getColumns().get(normalizeColumnName(fieldName, translateFieldNames));
|
||||
if (desc == null && !ignoreUnmappedFields) {
|
||||
throw new ProcessException("Cannot map JSON field '" + fieldName + "' to any column in the database");
|
||||
}
|
||||
|
||||
if (desc != null) {
|
||||
if (fieldCount++ > 0) {
|
||||
sqlBuilder.append(" AND ");
|
||||
}
|
||||
|
||||
if (escapeColumnNames) {
|
||||
sqlBuilder.append(schema.getQuotedIdentifierString())
|
||||
.append(desc.getColumnName())
|
||||
.append(schema.getQuotedIdentifierString());
|
||||
} else {
|
||||
sqlBuilder.append(desc.getColumnName());
|
||||
}
|
||||
sqlBuilder.append(" = ?");
|
||||
|
||||
final int sqlType = desc.getDataType();
|
||||
attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType));
|
||||
|
||||
final Integer colSize = desc.getColumnSize();
|
||||
final JsonNode fieldNode = rootNode.get(fieldName);
|
||||
if (!fieldNode.isNull()) {
|
||||
String fieldValue = fieldNode.asText();
|
||||
if (colSize != null && fieldValue.length() > colSize) {
|
||||
fieldValue = fieldValue.substring(0, colSize);
|
||||
}
|
||||
attributes.put("sql.args." + fieldCount + ".value", fieldValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (fieldCount == 0) {
|
||||
throw new ProcessException("None of the fields in the JSON map to the columns defined by the " + tableName + " table");
|
||||
}
|
||||
|
||||
return sqlBuilder.toString();
|
||||
}
|
||||
|
||||
private static String normalizeColumnName(final String colName, final boolean translateColumnNames) {
|
||||
return translateColumnNames ? colName.toUpperCase().replace("_", "") : colName;
|
||||
}
|
||||
|
|
|
@ -833,6 +833,150 @@ public class TestConvertJSONToSQL {
|
|||
} // End testUpdateWithMissingColumnIgnore()
|
||||
|
||||
|
||||
@Test
|
||||
public void testDelete() throws InitializationException, ProcessException, SQLException, IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
|
||||
final File tempDir = folder.getRoot();
|
||||
final File dbDir = new File(tempDir, "db");
|
||||
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
|
||||
runner.addControllerService("dbcp", service);
|
||||
runner.enableControllerService(service);
|
||||
|
||||
try (final Connection conn = service.getConnection()) {
|
||||
try (final Statement stmt = conn.createStatement()) {
|
||||
stmt.executeUpdate(createPersons);
|
||||
}
|
||||
}
|
||||
|
||||
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
|
||||
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
|
||||
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "DELETE");
|
||||
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
|
||||
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
|
||||
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
|
||||
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
|
||||
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
|
||||
out.assertAttributeEquals("sql.args.1.value", "1");
|
||||
out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
|
||||
out.assertAttributeEquals("sql.args.2.value", "Mark");
|
||||
out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
|
||||
out.assertAttributeEquals("sql.args.3.value", "48");
|
||||
|
||||
out.assertContentEquals("DELETE FROM PERSONS WHERE ID = ? AND NAME = ? AND CODE = ?");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteQuotedIdentifiers() throws InitializationException, ProcessException, SQLException, IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
|
||||
final File tempDir = folder.getRoot();
|
||||
final File dbDir = new File(tempDir, "db");
|
||||
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
|
||||
runner.addControllerService("dbcp", service);
|
||||
runner.enableControllerService(service);
|
||||
|
||||
try (final Connection conn = service.getConnection()) {
|
||||
try (final Statement stmt = conn.createStatement()) {
|
||||
stmt.executeUpdate(createPersons);
|
||||
}
|
||||
}
|
||||
|
||||
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
|
||||
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
|
||||
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "DELETE");
|
||||
runner.setProperty(ConvertJSONToSQL.QUOTED_IDENTIFIERS, "true");
|
||||
|
||||
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
|
||||
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
|
||||
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
|
||||
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
|
||||
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
|
||||
out.assertAttributeEquals("sql.args.1.value", "1");
|
||||
out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
|
||||
out.assertAttributeEquals("sql.args.2.value", "Mark");
|
||||
out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
|
||||
out.assertAttributeEquals("sql.args.3.value", "48");
|
||||
|
||||
out.assertContentEquals("DELETE FROM PERSONS WHERE \"ID\" = ? AND \"NAME\" = ? AND \"CODE\" = ?");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteQuotedTableIdentifier() throws InitializationException, ProcessException, SQLException, IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
|
||||
final File tempDir = folder.getRoot();
|
||||
final File dbDir = new File(tempDir, "db");
|
||||
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
|
||||
runner.addControllerService("dbcp", service);
|
||||
runner.enableControllerService(service);
|
||||
|
||||
try (final Connection conn = service.getConnection()) {
|
||||
try (final Statement stmt = conn.createStatement()) {
|
||||
stmt.executeUpdate(createPersons);
|
||||
}
|
||||
}
|
||||
|
||||
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
|
||||
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
|
||||
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "DELETE");
|
||||
runner.setProperty(ConvertJSONToSQL.QUOTED_TABLE_IDENTIFIER, "true");
|
||||
|
||||
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
|
||||
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
|
||||
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
|
||||
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
|
||||
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
|
||||
out.assertAttributeEquals("sql.args.1.value", "1");
|
||||
out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
|
||||
out.assertAttributeEquals("sql.args.2.value", "Mark");
|
||||
out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
|
||||
out.assertAttributeEquals("sql.args.3.value", "48");
|
||||
|
||||
out.assertContentEquals("DELETE FROM \"PERSONS\" WHERE ID = ? AND NAME = ? AND CODE = ?");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteWithNullValue() throws InitializationException, ProcessException, SQLException, IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
|
||||
final File tempDir = folder.getRoot();
|
||||
final File dbDir = new File(tempDir, "db");
|
||||
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
|
||||
runner.addControllerService("dbcp", service);
|
||||
runner.enableControllerService(service);
|
||||
|
||||
try (final Connection conn = service.getConnection()) {
|
||||
try (final Statement stmt = conn.createStatement()) {
|
||||
stmt.executeUpdate(createPersons);
|
||||
}
|
||||
}
|
||||
|
||||
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
|
||||
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
|
||||
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "DELETE");
|
||||
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-null-code.json"));
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
|
||||
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
|
||||
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
|
||||
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
|
||||
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
|
||||
out.assertAttributeEquals("sql.args.1.value", "1");
|
||||
out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
|
||||
out.assertAttributeEquals("sql.args.2.value", "Mark");
|
||||
out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
|
||||
out.assertAttributeNotExists("sql.args.3.value");
|
||||
|
||||
out.assertContentEquals("DELETE FROM PERSONS WHERE ID = ? AND NAME = ? AND CODE = ?");
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple implementation only for testing purposes
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue