diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java index e7226d115b..3eb44cb420 100755 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java @@ -99,13 +99,16 @@ public class ConvertJSONToSQL extends AbstractProcessor { static final AllowableValue IGNORE_UNMATCHED_FIELD = new AllowableValue("Ignore Unmatched Fields", "Ignore Unmatched Fields", "Any field in the JSON document that cannot be mapped to a column in the database is ignored"); static final AllowableValue FAIL_UNMATCHED_FIELD = new AllowableValue("Fail", "Fail", - "If the JSON document has any field that cannot be mapped to a column in the database, the FlowFile will be routed to the failure relationship"); - static final AllowableValue IGNORE_UNMATCHED_COLUMN = new AllowableValue("Ignore Unmatched Columns", "Ignore Unmatched Columns", + "If the JSON document has any field that cannot be mapped to a column in the database, the FlowFile will be routed to the failure relationship"); + static final AllowableValue IGNORE_UNMATCHED_COLUMN = new AllowableValue("Ignore Unmatched Columns", + "Ignore Unmatched Columns", "Any column in the database that does not have a field in the JSON document will be assumed to not be required. No notification will be logged"); - static final AllowableValue WARNING_UNMATCHED_COLUMN = new AllowableValue("Warning Unmatched Columns", "Warning Unmatched Columns", + static final AllowableValue WARNING_UNMATCHED_COLUMN = new AllowableValue("Warn on Unmatched Columns", + "Warn on Unmatched Columns", "Any column in the database that does not have a field in the JSON document will be assumed to not be required. A warning will be logged"); - static final AllowableValue FAIL_UNMATCHED_COLUMN = new AllowableValue("Fail Unmatched Columns", "Fail Unmatched Columns", - "Any column in the database that does not have a field in the JSON document will fail the flow. An error will be logged"); + static final AllowableValue FAIL_UNMATCHED_COLUMN = new AllowableValue("Fail on Unmatched Columns", + "Fail on Unmatched Columns", + "A flow will fail if any column in the database that does not have a field in the JSON document. An error will be logged"); static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder() .name("JDBC Connection Pool") @@ -376,17 +379,19 @@ public class ConvertJSONToSQL extends AbstractProcessor { } private String generateInsert(final JsonNode rootNode, final Map attributes, final String tableName, - final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, - final boolean failUnmappedColumns, final boolean warningUnmappedColumns) { + final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns, + final boolean warningUnmappedColumns) { + final Set normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames); for (final String requiredColName : schema.getRequiredColumnNames()) { final String normalizedColName = normalizeColumnName(requiredColName, translateFieldNames); if (!normalizedFieldNames.contains(normalizedColName)) { - if(failUnmappedColumns) { - getLogger().error("JSON does not have a value for the Required column '" + requiredColName + "'"); - throw new ProcessException("JSON does not have a value for the Required column '" + requiredColName + "'"); - } else if(warningUnmappedColumns) { - getLogger().warn("JSON does not have a value for the Required column '" + requiredColName + "'"); + String missingColMessage = "JSON does not have a value for the Required column '" + requiredColName + "'"; + if (failUnmappedColumns) { + getLogger().error(missingColMessage); + throw new ProcessException(missingColMessage); + } else if (warningUnmappedColumns) { + getLogger().warn(missingColMessage); } } } @@ -448,8 +453,8 @@ public class ConvertJSONToSQL extends AbstractProcessor { } private String generateUpdate(final JsonNode rootNode, final Map attributes, final String tableName, final String updateKeys, - final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, - final boolean failUnmappedColumns, final boolean warningUnmappedColumns) { + final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns, + final boolean warningUnmappedColumns) { final Set updateKeyNames; if (updateKeys == null) { @@ -477,12 +482,14 @@ public class ConvertJSONToSQL extends AbstractProcessor { for (final String uk : updateKeyNames) { final String normalizedUK = normalizeColumnName(uk, translateFieldNames); normalizedUpdateNames.add(normalizedUK); - if(!normalizedFieldNames.contains(normalizedUK)) { - if(failUnmappedColumns) { - getLogger().error("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'"); - throw new ProcessException("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'"); - } else if(warningUnmappedColumns) { - getLogger().warn("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'"); + + if (!normalizedFieldNames.contains(normalizedUK)) { + String missingColMessage = "JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'"; + if (failUnmappedColumns) { + getLogger().error(missingColMessage); + throw new ProcessException(missingColMessage); + } else if (warningUnmappedColumns) { + getLogger().warn(missingColMessage); } } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java index dd89e96920..60d993ef05 100755 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java @@ -418,7 +418,7 @@ public class TestConvertJSONToSQL { runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); - runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Fail Unmatched Columns"); + runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, ConvertJSONToSQL.FAIL_UNMATCHED_COLUMN); runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); runner.run(); @@ -443,7 +443,7 @@ public class TestConvertJSONToSQL { runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); - runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Warning Unmatched Columns"); + runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, ConvertJSONToSQL.WARNING_UNMATCHED_COLUMN); runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); runner.run(); @@ -514,7 +514,7 @@ public class TestConvertJSONToSQL { runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code, extra"); - runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Fail Unmatched Columns"); + runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, ConvertJSONToSQL.FAIL_UNMATCHED_COLUMN); runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); runner.run(); @@ -540,7 +540,7 @@ public class TestConvertJSONToSQL { runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code, extra"); - runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Warning Unmatched Columns"); + runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, ConvertJSONToSQL.WARNING_UNMATCHED_COLUMN); runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); runner.run();