Merge branch 'NIFI-1093' of https://github.com/olegz/nifi into NIFI-1093

This commit is contained in:
Mark Payne 2016-02-01 11:51:27 -05:00
commit ef80549d63
2 changed files with 31 additions and 24 deletions

View File

@ -99,13 +99,16 @@ public class ConvertJSONToSQL extends AbstractProcessor {
static final AllowableValue IGNORE_UNMATCHED_FIELD = new AllowableValue("Ignore Unmatched Fields", "Ignore Unmatched Fields",
"Any field in the JSON document that cannot be mapped to a column in the database is ignored");
static final AllowableValue FAIL_UNMATCHED_FIELD = new AllowableValue("Fail", "Fail",
"If the JSON document has any field that cannot be mapped to a column in the database, the FlowFile will be routed to the failure relationship");
static final AllowableValue IGNORE_UNMATCHED_COLUMN = new AllowableValue("Ignore Unmatched Columns", "Ignore Unmatched Columns",
"If the JSON document has any field that cannot be mapped to a column in the database, the FlowFile will be routed to the failure relationship");
static final AllowableValue IGNORE_UNMATCHED_COLUMN = new AllowableValue("Ignore Unmatched Columns",
"Ignore Unmatched Columns",
"Any column in the database that does not have a field in the JSON document will be assumed to not be required. No notification will be logged");
static final AllowableValue WARNING_UNMATCHED_COLUMN = new AllowableValue("Warning Unmatched Columns", "Warning Unmatched Columns",
static final AllowableValue WARNING_UNMATCHED_COLUMN = new AllowableValue("Warn on Unmatched Columns",
"Warn on Unmatched Columns",
"Any column in the database that does not have a field in the JSON document will be assumed to not be required. A warning will be logged");
static final AllowableValue FAIL_UNMATCHED_COLUMN = new AllowableValue("Fail Unmatched Columns", "Fail Unmatched Columns",
"Any column in the database that does not have a field in the JSON document will fail the flow. An error will be logged");
static final AllowableValue FAIL_UNMATCHED_COLUMN = new AllowableValue("Fail on Unmatched Columns",
"Fail on Unmatched Columns",
"A flow will fail if any column in the database that does not have a field in the JSON document. An error will be logged");
static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder()
.name("JDBC Connection Pool")
@ -376,17 +379,19 @@ public class ConvertJSONToSQL extends AbstractProcessor {
}
private String generateInsert(final JsonNode rootNode, final Map<String, String> attributes, final String tableName,
final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields,
final boolean failUnmappedColumns, final boolean warningUnmappedColumns) {
final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
final boolean warningUnmappedColumns) {
final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames);
for (final String requiredColName : schema.getRequiredColumnNames()) {
final String normalizedColName = normalizeColumnName(requiredColName, translateFieldNames);
if (!normalizedFieldNames.contains(normalizedColName)) {
if(failUnmappedColumns) {
getLogger().error("JSON does not have a value for the Required column '" + requiredColName + "'");
throw new ProcessException("JSON does not have a value for the Required column '" + requiredColName + "'");
} else if(warningUnmappedColumns) {
getLogger().warn("JSON does not have a value for the Required column '" + requiredColName + "'");
String missingColMessage = "JSON does not have a value for the Required column '" + requiredColName + "'";
if (failUnmappedColumns) {
getLogger().error(missingColMessage);
throw new ProcessException(missingColMessage);
} else if (warningUnmappedColumns) {
getLogger().warn(missingColMessage);
}
}
}
@ -448,8 +453,8 @@ public class ConvertJSONToSQL extends AbstractProcessor {
}
private String generateUpdate(final JsonNode rootNode, final Map<String, String> attributes, final String tableName, final String updateKeys,
final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields,
final boolean failUnmappedColumns, final boolean warningUnmappedColumns) {
final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
final boolean warningUnmappedColumns) {
final Set<String> updateKeyNames;
if (updateKeys == null) {
@ -477,12 +482,14 @@ public class ConvertJSONToSQL extends AbstractProcessor {
for (final String uk : updateKeyNames) {
final String normalizedUK = normalizeColumnName(uk, translateFieldNames);
normalizedUpdateNames.add(normalizedUK);
if(!normalizedFieldNames.contains(normalizedUK)) {
if(failUnmappedColumns) {
getLogger().error("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'");
throw new ProcessException("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'");
} else if(warningUnmappedColumns) {
getLogger().warn("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'");
if (!normalizedFieldNames.contains(normalizedUK)) {
String missingColMessage = "JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'";
if (failUnmappedColumns) {
getLogger().error(missingColMessage);
throw new ProcessException(missingColMessage);
} else if (warningUnmappedColumns) {
getLogger().warn(missingColMessage);
}
}
}

View File

@ -418,7 +418,7 @@ public class TestConvertJSONToSQL {
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Fail Unmatched Columns");
runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, ConvertJSONToSQL.FAIL_UNMATCHED_COLUMN);
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
runner.run();
@ -443,7 +443,7 @@ public class TestConvertJSONToSQL {
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Warning Unmatched Columns");
runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, ConvertJSONToSQL.WARNING_UNMATCHED_COLUMN);
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
runner.run();
@ -514,7 +514,7 @@ public class TestConvertJSONToSQL {
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code, extra");
runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Fail Unmatched Columns");
runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, ConvertJSONToSQL.FAIL_UNMATCHED_COLUMN);
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
runner.run();
@ -540,7 +540,7 @@ public class TestConvertJSONToSQL {
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code, extra");
runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Warning Unmatched Columns");
runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, ConvertJSONToSQL.WARNING_UNMATCHED_COLUMN);
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
runner.run();