diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE
index 4a81d74d01..152e3bcd64 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE
@@ -114,6 +114,29 @@ The following binary components are provided under the Apache Software License v
Apache Tika Core
Copyright 2007-2015 The Apache Software Foundation
+ (ASLv2) Jackson JSON processor
+ The following NOTICE information applies:
+ # Jackson JSON processor
+
+ Jackson is a high-performance, Free/Open Source JSON processing library.
+ It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
+ been in development since 2007.
+ It is currently developed by a community of developers, as well as supported
+ commercially by FasterXML.com.
+
+ ## Licensing
+
+ Jackson core and extension components may licensed under different licenses.
+ To find the details that apply to this artifact see the accompanying LICENSE file.
+ For more information, including possible other licensing options, contact
+ FasterXML.com (http://fasterxml.com).
+
+ ## Credits
+
+ A list of contributors may be found from CREDITS file, which is included
+ in some artifacts (usually source distributions); but is always available
+ from the source code management (SCM) system project uses.
+
************************
Common Development and Distribution License 1.1
************************
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 994612255c..fd267d13af 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -180,6 +180,10 @@ language governing permissions and limitations under the License. -->
org.apache.nifi
nifi-dbcp-service-api
+
+ org.codehaus.jackson
+ jackson-mapper-asl
+
org.apache.derby
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index e62b57ff1f..0ce1456009 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -16,6 +16,7 @@ org.apache.nifi.processors.standard.Base64EncodeContent
org.apache.nifi.processors.standard.CompressContent
org.apache.nifi.processors.standard.ControlRate
org.apache.nifi.processors.standard.ConvertCharacterSet
+org.apache.nifi.processors.standard.ConvertJSONToSQL
org.apache.nifi.processors.standard.DetectDuplicate
org.apache.nifi.processors.standard.DistributeLoad
org.apache.nifi.processors.standard.DuplicateFlowFile
@@ -52,6 +53,7 @@ org.apache.nifi.processors.standard.PutFile
org.apache.nifi.processors.standard.PutFTP
org.apache.nifi.processors.standard.PutJMS
org.apache.nifi.processors.standard.PutSFTP
+org.apache.nifi.processors.standard.PutSQL
org.apache.nifi.processors.standard.ReplaceText
org.apache.nifi.processors.standard.ReplaceTextWithMapping
org.apache.nifi.processors.standard.RouteOnAttribute
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertFlatJSONToSQL.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertFlatJSONToSQL.java
new file mode 100644
index 0000000000..57d855f3ba
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertFlatJSONToSQL.java
@@ -0,0 +1,600 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.ObjectHolder;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+
+
+@SideEffectFree
+@SupportsBatching
+@SeeAlso(PutSQL.class)
+@Tags({"json", "sql", "database", "rdbms", "insert", "update", "relational", "flat"})
+@CapabilityDescription("Converts a JSON-formatted FlowFile into an UPDATE or INSERT SQL statement. The incoming FlowFile is expected to be "
+ + "\"flat\" JSON message, meaning that it consists of a single JSON element and each field maps to a simple type. If a field maps to "
+ + "a JSON object, that JSON object will be interpreted as Text. Upon successful conversion, the original FlowFile is routed to the 'original' "
+ + "relationship and the SQL is routed to the 'sql' relationship.")
+public class ConvertFlatJSONToSQL extends AbstractProcessor {
+ private static final String UPDATE_TYPE = "UPDATE";
+ private static final String INSERT_TYPE = "INSERT";
+
+ static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder()
+ .name("JDBC Connection Pool")
+ .description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. "
+ + "The Connection Pool is necessary in order to determine the appropriate database column types.")
+ .identifiesControllerService(DBCPService.class)
+ .required(true)
+ .build();
+ static final PropertyDescriptor STATEMENT_TYPE = new PropertyDescriptor.Builder()
+ .name("Statement Type")
+ .description("Specifies the type of SQL Statement to generate")
+ .required(true)
+ .allowableValues(UPDATE_TYPE, INSERT_TYPE)
+ .build();
+ static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+ .name("Table Name")
+ .description("The name of the table that the statement should update")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ static final PropertyDescriptor CATALOG_NAME = new PropertyDescriptor.Builder()
+ .name("Catalog Name")
+ .description("The name of the catalog that the statement should update. This may not apply for the database that you are updating. In this case, leave the field empty")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ static final PropertyDescriptor TRANSLATE_FIELD_NAMES = new PropertyDescriptor.Builder()
+ .name("Translate Field Names")
+ .description("If true, the Processor will attempt to translate JSON field names into the appropriate column names for the table specified. "
+ + "If false, the JSON field names must match the column names exactly, or the column will not be updated")
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .build();
+ static final PropertyDescriptor UPDATE_KEY = new PropertyDescriptor.Builder()
+ .name("Update Keys")
+ .description("A comma-separated list of column names that uniquely identifies a row in the database for UPDATE statements. "
+ + "If the Statement Type is UPDATE and this property is not set, the table's Primary Keys are used. "
+ + "In this case, if no Primary Key exists, the conversion to SQL will fail. "
+ + "This property is ignored if the Statement Type is INSERT")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(false)
+ .expressionLanguageSupported(true)
+ .build();
+
+
+ static final Relationship REL_ORIGINAL = new Relationship.Builder()
+ .name("original")
+ .description("When a FlowFile is converted to SQL, the original JSON FlowFile is routed to this relationship")
+ .build();
+ static final Relationship REL_SQL = new Relationship.Builder()
+ .name("sql")
+ .description("A FlowFile is routed to this relationship when its contents have successfully been converted into a SQL statement")
+ .build();
+ static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("A FlowFile is routed to this relationship if it cannot be converted into a SQL statement. Common causes include invalid JSON "
+ + "content or the JSON content missing a required field (if using an INSERT statement type).")
+ .build();
+
+ private final Map schemaCache = new LinkedHashMap(100) {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected boolean removeEldestEntry(Map.Entry eldest) {
+ return true;
+ }
+ };
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ final List properties = new ArrayList<>();
+ properties.add(CONNECTION_POOL);
+ properties.add(STATEMENT_TYPE);
+ properties.add(TABLE_NAME);
+ properties.add(CATALOG_NAME);
+ properties.add(TRANSLATE_FIELD_NAMES);
+ properties.add(UPDATE_KEY);
+ return properties;
+ }
+
+
+ @Override
+ public Set getRelationships() {
+ final Set rels = new HashSet<>();
+ rels.add(REL_ORIGINAL);
+ rels.add(REL_SQL);
+ rels.add(REL_FAILURE);
+ return rels;
+ }
+
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ synchronized (this) {
+ schemaCache.clear();
+ }
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final boolean translateFieldNames = context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean();
+ final String statementType = context.getProperty(STATEMENT_TYPE).getValue();
+ final String updateKeys = context.getProperty(UPDATE_KEY).evaluateAttributeExpressions(flowFile).getValue();
+
+ final String catalog = context.getProperty(CATALOG_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final SchemaKey schemaKey = new SchemaKey(catalog, tableName);
+ final boolean includePrimaryKeys = UPDATE_TYPE.equals(statementType) && updateKeys == null;
+
+ // get the database schema from the cache, if one exists
+ TableSchema schema;
+ synchronized (this) {
+ schema = schemaCache.get(schemaKey);
+ if (schema == null) {
+ // No schema exists for this table yet. Query the database to determine the schema and put it into the cache.
+ final DBCPService dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class);
+ try (final Connection conn = dbcpService.getConnection()) {
+ schema = TableSchema.from(conn, catalog, tableName, translateFieldNames, includePrimaryKeys);
+ schemaCache.put(schemaKey, schema);
+ } catch (final SQLException e) {
+ getLogger().error("Failed to convert {} into a SQL statement due to {}; routing to failure", new Object[] {flowFile, e.toString()}, e);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+ }
+ }
+
+ // Parse the JSON document
+ final ObjectMapper mapper = new ObjectMapper();
+ final ObjectHolder rootNodeRef = new ObjectHolder<>(null);
+ try {
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream in) throws IOException {
+ try (final InputStream bufferedIn = new BufferedInputStream(in)) {
+ rootNodeRef.set(mapper.readTree(bufferedIn));
+ }
+ }
+ });
+ } catch (final ProcessException pe) {
+ getLogger().error("Failed to parse {} as JSON due to {}; routing to failure", new Object[] {flowFile, pe.toString()}, pe);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ final JsonNode rootNode = rootNodeRef.get();
+ final String sql;
+ final Map attributes = new HashMap<>();
+ if (INSERT_TYPE.equals(statementType)) {
+ try {
+ sql = generateInsert(rootNode, attributes, tableName, schema, translateFieldNames);
+ } catch (final ProcessException pe) {
+ getLogger().error("Failed to convert {} to a SQL INSERT statement due to {}; routing to failure",
+ new Object[] { flowFile, pe.toString() }, pe);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+ } else {
+ try {
+ sql = generateUpdate(rootNode, attributes, tableName, updateKeys, schema, translateFieldNames);
+ } catch (final ProcessException pe) {
+ getLogger().error("Failed to convert {} to a SQL UPDATE statement due to {}; routing to failure",
+ new Object[] { flowFile, pe.toString() }, pe);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+ }
+
+ FlowFile sqlFlowFile = session.create(flowFile);
+ sqlFlowFile = session.write(sqlFlowFile, new OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws IOException {
+ out.write(sql.getBytes(StandardCharsets.UTF_8));
+ }
+ });
+
+ attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
+ attributes.put("sql.table", tableName);
+ if (catalog != null) {
+ attributes.put("sql.catalog", catalog);
+ }
+
+ sqlFlowFile = session.putAllAttributes(sqlFlowFile, attributes);
+
+ session.transfer(flowFile, REL_ORIGINAL);
+ session.transfer(sqlFlowFile, REL_SQL);
+ }
+
+ private Set getNormalizedColumnNames(final JsonNode node, final boolean translateFieldNames) {
+ final Set normalizedFieldNames = new HashSet<>();
+ final Iterator fieldNameItr = node.getFieldNames();
+ while (fieldNameItr.hasNext()) {
+ normalizedFieldNames.add(normalizeColumnName(fieldNameItr.next(), translateFieldNames));
+ }
+
+ return normalizedFieldNames;
+ }
+
+ private String generateInsert(final JsonNode rootNode, final Map attributes, final String tableName,
+ final TableSchema schema, final boolean translateFieldNames) {
+
+ final Set normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames);
+ for (final String requiredColName : schema.getRequiredColumnNames()) {
+ final String normalizedColName = normalizeColumnName(requiredColName, translateFieldNames);
+ if (!normalizedFieldNames.contains(normalizedColName)) {
+ throw new ProcessException("JSON does not have a value for the Required column '" + requiredColName + "'");
+ }
+ }
+
+ final StringBuilder sqlBuilder = new StringBuilder();
+ int fieldCount = 0;
+ sqlBuilder.append("INSERT INTO ").append(tableName).append(" (");
+
+ // iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as
+ // adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the
+ // columns that we are inserting into
+ final Iterator fieldNames = rootNode.getFieldNames();
+ while (fieldNames.hasNext()) {
+ final String fieldName = fieldNames.next();
+
+ final ColumnDescription desc = schema.getColumns().get(normalizeColumnName(fieldName, translateFieldNames));
+ if (desc != null) {
+ if (fieldCount++ > 0) {
+ sqlBuilder.append(", ");
+ }
+
+ sqlBuilder.append(desc.getColumnName());
+
+ final int sqlType = desc.getDataType();
+ attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType));
+
+ final Integer colSize = desc.getColumnSize();
+ String fieldValue = rootNode.get(fieldName).asText();
+ if (colSize != null && fieldValue.length() > colSize) {
+ fieldValue = fieldValue.substring(0, colSize);
+ }
+ attributes.put("sql.args." + fieldCount + ".value", fieldValue);
+ }
+ }
+
+ // complete the SQL statements by adding ?'s for all of the values to be escaped.
+ sqlBuilder.append(") VALUES (");
+ for (int i=0; i < fieldCount; i++) {
+ if (i > 0) {
+ sqlBuilder.append(", ");
+ }
+
+ sqlBuilder.append("?");
+ }
+ sqlBuilder.append(")");
+
+ if (fieldCount == 0) {
+ throw new ProcessException("None of the fields in the JSON map to the columns defined by the " + tableName + " table");
+ }
+
+ return sqlBuilder.toString();
+ }
+
+ private String generateUpdate(final JsonNode rootNode, final Map attributes, final String tableName, final String updateKeys,
+ final TableSchema schema, final boolean translateFieldNames) {
+
+ final Set updateKeyNames;
+ if (updateKeys == null) {
+ updateKeyNames = schema.getPrimaryKeyColumnNames();
+ } else {
+ updateKeyNames = new HashSet<>();
+ for (final String updateKey : updateKeys.split(",")) {
+ updateKeyNames.add(updateKey.trim());
+ }
+ }
+
+ if (updateKeyNames.isEmpty()) {
+ throw new ProcessException("Table '" + tableName + "' does not have a Primary Key and no Update Keys were specified");
+ }
+
+ final StringBuilder sqlBuilder = new StringBuilder();
+ int fieldCount = 0;
+ sqlBuilder.append("UPDATE ").append(tableName).append(" SET ");
+
+
+ // Create a Set of all normalized Update Key names, and ensure that there is a field in the JSON
+ // for each of the Update Key fields.
+ final Set normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames);
+ final Set normalizedUpdateNames = new HashSet<>();
+ for (final String uk : updateKeyNames) {
+ final String normalizedUK = normalizeColumnName(uk, translateFieldNames);
+ normalizedUpdateNames.add(normalizedUK);
+
+ if (!normalizedFieldNames.contains(normalizedUK)) {
+ throw new ProcessException("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'");
+ }
+ }
+
+ // iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as
+ // adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the
+ // columns that we are inserting into
+ Iterator fieldNames = rootNode.getFieldNames();
+ while (fieldNames.hasNext()) {
+ final String fieldName = fieldNames.next();
+
+ final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames);
+ final ColumnDescription desc = schema.getColumns().get(normalizedColName);
+ if (desc == null) {
+ continue;
+ }
+
+ // Check if this column is an Update Key. If so, skip it for now. We will come
+ // back to it after we finish the SET clause
+ if (normalizedUpdateNames.contains(normalizedColName)) {
+ continue;
+ }
+
+ if (fieldCount++ > 0) {
+ sqlBuilder.append(", ");
+ }
+
+ sqlBuilder.append(desc.getColumnName()).append(" = ?");
+ final int sqlType = desc.getDataType();
+ attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType));
+
+ final Integer colSize = desc.getColumnSize();
+ String fieldValue = rootNode.get(fieldName).asText();
+ if (colSize != null && fieldValue.length() > colSize) {
+ fieldValue = fieldValue.substring(0, colSize);
+ }
+ attributes.put("sql.args." + fieldCount + ".value", fieldValue);
+ }
+
+ // Set the WHERE clause based on the Update Key values
+ sqlBuilder.append(" WHERE ");
+
+ fieldNames = rootNode.getFieldNames();
+ int whereFieldCount = 0;
+ while (fieldNames.hasNext()) {
+ final String fieldName = fieldNames.next();
+
+ final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames);
+ final ColumnDescription desc = schema.getColumns().get(normalizedColName);
+ if (desc == null) {
+ continue;
+ }
+
+ // Check if this column is a Update Key. If so, skip it for now. We will come
+ // back to it after we finish the SET clause
+ if (!normalizedUpdateNames.contains(normalizedColName)) {
+ continue;
+ }
+
+ if (whereFieldCount++ > 0) {
+ sqlBuilder.append(" AND ");
+ }
+ fieldCount++;
+
+ sqlBuilder.append(normalizedColName).append(" = ?");
+ final int sqlType = desc.getDataType();
+ attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType));
+
+ final Integer colSize = desc.getColumnSize();
+ String fieldValue = rootNode.get(fieldName).asText();
+ if (colSize != null && fieldValue.length() > colSize) {
+ fieldValue = fieldValue.substring(0, colSize);
+ }
+ attributes.put("sql.args." + fieldCount + ".value", fieldValue);
+ }
+
+ return sqlBuilder.toString();
+ }
+
+ private static String normalizeColumnName(final String colName, final boolean translateColumnNames) {
+ return translateColumnNames ? colName.toUpperCase().replace("_", "") : colName;
+ }
+
+ private static class TableSchema {
+ private List requiredColumnNames;
+ private Set primaryKeyColumnNames;
+ private Map columns;
+
+ private TableSchema(final List columnDescriptions, final boolean translateColumnNames,
+ final Set primaryKeyColumnNames) {
+ this.columns = new HashMap<>();
+ this.primaryKeyColumnNames = primaryKeyColumnNames;
+
+ this.requiredColumnNames = new ArrayList<>();
+ for (final ColumnDescription desc : columnDescriptions) {
+ columns.put(ConvertFlatJSONToSQL.normalizeColumnName(desc.columnName, translateColumnNames), desc);
+ if (desc.isRequired()) {
+ requiredColumnNames.add(desc.columnName);
+ }
+ }
+ }
+
+ public Map getColumns() {
+ return columns;
+ }
+
+ public List getRequiredColumnNames() {
+ return requiredColumnNames;
+ }
+
+ public Set getPrimaryKeyColumnNames() {
+ return primaryKeyColumnNames;
+ }
+
+ public static TableSchema from(final Connection conn, final String catalog, final String tableName,
+ final boolean translateColumnNames, final boolean includePrimaryKeys) throws SQLException {
+ final ResultSet colrs = conn.getMetaData().getColumns(catalog, null, tableName, "%");
+
+ final List cols = new ArrayList<>();
+ while (colrs.next()) {
+ final ColumnDescription col = ColumnDescription.from(colrs);
+ cols.add(col);
+ }
+
+ final Set primaryKeyColumns = new HashSet<>();
+ if (includePrimaryKeys) {
+ final ResultSet pkrs = conn.getMetaData().getPrimaryKeys(catalog, null, tableName);
+
+ while (pkrs.next()) {
+ final String colName = pkrs.getString("COLUMN_NAME");
+ primaryKeyColumns.add(normalizeColumnName(colName, translateColumnNames));
+ }
+ }
+
+ return new TableSchema(cols, translateColumnNames, primaryKeyColumns);
+ }
+ }
+
+ private static class ColumnDescription {
+ private final String columnName;
+ private final int dataType;
+ private final boolean required;
+ private final Integer columnSize;
+
+ private ColumnDescription(final String columnName, final int dataType, final boolean required, final Integer columnSize) {
+ this.columnName = columnName;
+ this.dataType = dataType;
+ this.required = required;
+ this.columnSize = columnSize;
+ }
+
+ public int getDataType() {
+ return dataType;
+ }
+
+ public Integer getColumnSize() {
+ return columnSize;
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+
+ public boolean isRequired() {
+ return required;
+ }
+
+ public static ColumnDescription from(final ResultSet resultSet) throws SQLException {
+ final String columnName = resultSet.getString("COLUMN_NAME");
+ final int dataType = resultSet.getInt("DATA_TYPE");
+ final int colSize = resultSet.getInt("COLUMN_SIZE");
+
+ final String nullableValue = resultSet.getString("IS_NULLABLE");
+ final boolean isNullable = "YES".equalsIgnoreCase(nullableValue) || nullableValue.isEmpty();
+ final String defaultValue = resultSet.getString("COLUMN_DEF");
+ final String autoIncrementValue = resultSet.getString("IS_AUTOINCREMENT");
+ final boolean isAutoIncrement = "YES".equalsIgnoreCase(autoIncrementValue);
+ final boolean required = !isNullable && !isAutoIncrement && defaultValue == null;
+
+ return new ColumnDescription(columnName, dataType, required, colSize == 0 ? null : colSize);
+ }
+ }
+
+ private static class SchemaKey {
+ private final String catalog;
+ private final String tableName;
+
+ public SchemaKey(final String catalog, final String tableName) {
+ this.catalog = catalog;
+ this.tableName = tableName;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((catalog == null) ? 0 : catalog.hashCode());
+ result = prime * result + ((tableName == null) ? 0 : tableName.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+
+ final SchemaKey other = (SchemaKey) obj;
+ if (catalog == null) {
+ if (other.catalog != null) {
+ return false;
+ }
+ } else if (!catalog.equals(other.catalog)) {
+ return false;
+ }
+
+
+ if (tableName == null) {
+ if (other.tableName != null) {
+ return false;
+ }
+ } else if (!tableName.equals(other.tableName)) {
+ return false;
+ }
+
+ return true;
+ }
+ }
+}
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
new file mode 100644
index 0000000000..d349a58fac
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.stream.io.StreamUtils;
+
+@SupportsBatching
+@SeeAlso(ConvertFlatJSONToSQL.class)
+@Tags({"sql", "put", "rdbms", "database", "update", "insert", "relational"})
+@CapabilityDescription("Executes a SQL UPDATE or INSERT command. The content of an incoming FlowFile is expected to be the SQL command "
+ + "to execute. The SQL command may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes "
+ + "with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be "
+ + "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format.")
+public class PutSQL extends AbstractProcessor {
+
+ static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder()
+ .name("JDBC Connection Pool")
+ .description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. "
+ + "The Connection Pool is necessary in order to determine the appropriate database column types.")
+ .identifiesControllerService(DBCPService.class)
+ .required(true)
+ .build();
+
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("A FlowFile is routed to this relationship after the database is successfully updated")
+ .build();
+ static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("A FlowFile is routed to this relationship if the database cannot be updated for any reason")
+ .build();
+
+ private static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("sql\\.args\\.(\\d+)\\.type");
+ private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ final List properties = new ArrayList<>();
+ properties.add(CONNECTION_POOL);
+ return properties;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ // TODO: Batch. Pull in 50 or 100 at a time and map content of FlowFile to Set that have that
+ // same content. Then execute updates in batches.
+
+ final DBCPService dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class);
+ try (final Connection conn = dbcpService.getConnection()) {
+ // Read the SQL from the FlowFile's content
+ final byte[] buffer = new byte[(int) flowFile.getSize()];
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream in) throws IOException {
+ StreamUtils.fillBuffer(in, buffer);
+ }
+ });
+
+ final String sql = new String(buffer, StandardCharsets.UTF_8);
+
+ // Create a prepared statement and set the appropriate parameters on the statement.
+ try (final PreparedStatement stmt = conn.prepareStatement(sql)) {
+ for (final Map.Entry entry : flowFile.getAttributes().entrySet()) {
+ final String key = entry.getKey();
+ final Matcher matcher = SQL_TYPE_ATTRIBUTE_PATTERN.matcher(key);
+ if (matcher.matches()) {
+ final int parameterIndex = Integer.parseInt(matcher.group(1));
+
+ final boolean isNumeric = NUMBER_PATTERN.matcher(entry.getValue()).matches();
+ if (!isNumeric) {
+ getLogger().error("Cannot update database for {} because the value of the '{}' attribute is '{}', which is not a valid JDBC numeral type; routing to failure",
+ new Object[] {flowFile, key, entry.getValue()});
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ final int jdbcType = Integer.parseInt(entry.getValue());
+ final String valueAttrName = "sql.args." + parameterIndex + ".value";
+ final String parameterValue = flowFile.getAttribute(valueAttrName);
+ if (parameterValue == null) {
+ getLogger().error("Cannot update database for {} because the '{}' attribute exists but the '{}' attribute does not", new Object[] {flowFile, key, valueAttrName});
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ try {
+ setParameter(stmt, valueAttrName, parameterIndex, parameterValue, jdbcType);
+ } catch (final NumberFormatException nfe) {
+ getLogger().error("Cannot update database for {} because the '{}' attribute has a value of '{}', "
+ + "which cannot be converted into the necessary data type; routing to failure", new Object[] {flowFile, valueAttrName, parameterValue});
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+ }
+ }
+
+ final int updatedRowCount = stmt.executeUpdate();
+ flowFile = session.putAttribute(flowFile, "sql.update.count", String.valueOf(updatedRowCount));
+ }
+
+ // TODO: Need to expose Connection URL from DBCP Service and use it to emit a Provenance Event
+ session.transfer(flowFile, REL_SUCCESS);
+ } catch (final SQLException e) {
+ getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {flowFile, e});
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+ }
+
+ private void setParameter(final PreparedStatement stmt, final String attrName, final int parameterIndex, final String parameterValue, final int jdbcType) throws SQLException {
+ switch (jdbcType) {
+ case Types.BIT:
+ stmt.setBoolean(parameterIndex, Boolean.parseBoolean(parameterValue));
+ break;
+ case Types.TINYINT:
+ stmt.setByte(parameterIndex, Byte.parseByte(parameterValue));
+ break;
+ case Types.SMALLINT:
+ stmt.setShort(parameterIndex, Short.parseShort(parameterValue));
+ break;
+ case Types.INTEGER:
+ stmt.setInt(parameterIndex, Integer.parseInt(parameterValue));
+ break;
+ case Types.BIGINT:
+ stmt.setLong(parameterIndex, Long.parseLong(parameterValue));
+ break;
+ case Types.REAL:
+ stmt.setFloat(parameterIndex, Float.parseFloat(parameterValue));
+ break;
+ case Types.FLOAT:
+ case Types.DOUBLE:
+ stmt.setDouble(parameterIndex, Double.parseDouble(parameterValue));
+ break;
+ case Types.DATE:
+ stmt.setDate(parameterIndex, new Date(Long.parseLong(parameterValue)));
+ break;
+ case Types.TIME:
+ stmt.setTime(parameterIndex, new Time(Long.parseLong(parameterValue)));
+ break;
+ case Types.TIMESTAMP:
+ stmt.setTimestamp(parameterIndex, new Timestamp(Long.parseLong(parameterValue)));
+ break;
+ case Types.CHAR:
+ case Types.VARCHAR:
+ case Types.LONGNVARCHAR:
+ case Types.LONGVARCHAR:
+ stmt.setString(parameterIndex, parameterValue);
+ break;
+ default:
+ throw new SQLException("The '" + attrName + "' attribute has a value of '" + parameterValue
+ + "' and a type of '" + jdbcType + "' but this is not a known data type");
+ }
+ }
+}
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertFlatJSONToSQL.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertFlatJSONToSQL.java
new file mode 100644
index 0000000000..b2bdbba551
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertFlatJSONToSQL.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestConvertFlatJSONToSQL {
+ static String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)";
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ @BeforeClass
+ public static void setup() {
+ System.setProperty("derby.stream.error.file", "target/derby.log");
+ }
+
+ @Test
+ public void testInsert() throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
+ final File tempDir = folder.getRoot();
+ final File dbDir = new File(tempDir, "db");
+ final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate(createPersons);
+ }
+ }
+
+ runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp");
+ runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
+ runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "INSERT");
+ runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-1.json"));
+ runner.run();
+
+ runner.assertTransferCount(ConvertFlatJSONToSQL.REL_ORIGINAL, 1);
+ runner.assertTransferCount(ConvertFlatJSONToSQL.REL_SQL, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertFlatJSONToSQL.REL_SQL).get(0);
+ out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
+ out.assertAttributeEquals("sql.args.1.value", "1");
+ out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
+ out.assertAttributeEquals("sql.args.2.value", "Mark");
+ out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
+ out.assertAttributeEquals("sql.args.3.value", "48");
+
+ out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)");
+ }
+
+
+ @Test
+ public void testUpdateBasedOnPrimaryKey() throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
+ final File tempDir = folder.getRoot();
+ final File dbDir = new File(tempDir, "db");
+ final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate(createPersons);
+ }
+ }
+
+ runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp");
+ runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
+ runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "UPDATE");
+ runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-1.json"));
+ runner.run();
+
+ runner.assertTransferCount(ConvertFlatJSONToSQL.REL_ORIGINAL, 1);
+ runner.assertTransferCount(ConvertFlatJSONToSQL.REL_SQL, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertFlatJSONToSQL.REL_SQL).get(0);
+ out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR));
+ out.assertAttributeEquals("sql.args.1.value", "Mark");
+ out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.INTEGER));
+ out.assertAttributeEquals("sql.args.2.value", "48");
+ out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
+ out.assertAttributeEquals("sql.args.3.value", "1");
+
+ out.assertContentEquals("UPDATE PERSONS SET NAME = ?, CODE = ? WHERE ID = ?");
+ }
+
+
+ @Test
+ public void testUpdateBasedOnUpdateKey() throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
+ final File tempDir = folder.getRoot();
+ final File dbDir = new File(tempDir, "db");
+ final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate(createPersons);
+ }
+ }
+
+ runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp");
+ runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
+ runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "UPDATE");
+ runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "code");
+ runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-1.json"));
+ runner.run();
+
+ runner.assertTransferCount(ConvertFlatJSONToSQL.REL_ORIGINAL, 1);
+ runner.assertTransferCount(ConvertFlatJSONToSQL.REL_SQL, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertFlatJSONToSQL.REL_SQL).get(0);
+ out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
+ out.assertAttributeEquals("sql.args.1.value", "1");
+ out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
+ out.assertAttributeEquals("sql.args.2.value", "Mark");
+ out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
+ out.assertAttributeEquals("sql.args.3.value", "48");
+
+ out.assertContentEquals("UPDATE PERSONS SET ID = ?, NAME = ? WHERE CODE = ?");
+ }
+
+ @Test
+ public void testUpdateBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
+ final File tempDir = folder.getRoot();
+ final File dbDir = new File(tempDir, "db");
+ final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate(createPersons);
+ }
+ }
+
+ runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp");
+ runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
+ runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "UPDATE");
+ runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "name, code");
+ runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-1.json"));
+ runner.run();
+
+ runner.assertTransferCount(ConvertFlatJSONToSQL.REL_ORIGINAL, 1);
+ runner.assertTransferCount(ConvertFlatJSONToSQL.REL_SQL, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertFlatJSONToSQL.REL_SQL).get(0);
+ out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
+ out.assertAttributeEquals("sql.args.1.value", "1");
+ out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
+ out.assertAttributeEquals("sql.args.2.value", "Mark");
+ out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
+ out.assertAttributeEquals("sql.args.3.value", "48");
+
+ out.assertContentEquals("UPDATE PERSONS SET ID = ? WHERE NAME = ? AND CODE = ?");
+ }
+
+ @Test
+ public void testUpdateWithMissingFieldBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
+ final File tempDir = folder.getRoot();
+ final File dbDir = new File(tempDir, "db");
+ final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate(createPersons);
+ }
+ }
+
+ runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp");
+ runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
+ runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "UPDATE");
+ runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "name, code");
+ runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-without-code.json"));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ConvertFlatJSONToSQL.REL_FAILURE, 1);
+ }
+
+ @Test
+ public void testUpdateWithMalformedJson() throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
+ final File tempDir = folder.getRoot();
+ final File dbDir = new File(tempDir, "db");
+ final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate(createPersons);
+ }
+ }
+
+ runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp");
+ runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
+ runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "UPDATE");
+ runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "name, code");
+ runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/malformed-person-extra-comma.json"));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ConvertFlatJSONToSQL.REL_FAILURE, 1);
+ }
+
+ @Test
+ public void testInsertWithMissingField() throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
+ final File tempDir = folder.getRoot();
+ final File dbDir = new File(tempDir, "db");
+ final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate(createPersons);
+ }
+ }
+
+ runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp");
+ runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
+ runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "INSERT");
+ runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "name, code");
+ runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-without-id.json"));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ConvertFlatJSONToSQL.REL_FAILURE, 1);
+ }
+
+
+ /**
+ * Simple implementation only for testing purposes
+ */
+ private static class MockDBCPService extends AbstractControllerService implements DBCPService {
+ private final String dbLocation;
+
+ public MockDBCPService(final String dbLocation) {
+ this.dbLocation = dbLocation;
+ }
+
+ @Override
+ public String getIdentifier() {
+ return "dbcp";
+ }
+
+ @Override
+ public Connection getConnection() throws ProcessException {
+ try {
+ Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+ final Connection con = DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true");
+ return con;
+ } catch (final Exception e) {
+ e.printStackTrace();
+ throw new ProcessException("getConnection failed: " + e);
+ }
+ }
+ }
+
+}
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
new file mode 100644
index 0000000000..ccba3e176f
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestPutSQL {
+ static String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)";
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ @BeforeClass
+ public static void setup() {
+ System.setProperty("derby.stream.error.file", "target/derby.log");
+ }
+
+ @Test
+ public void testDirectStatements() throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+ final File tempDir = folder.getRoot();
+ final File dbDir = new File(tempDir, "db");
+ final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate(createPersons);
+ }
+ }
+
+ runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+ runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 'Mark', 84)".getBytes());
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertEquals("Mark", rs.getString(2));
+ assertEquals(84, rs.getInt(3));
+ assertFalse(rs.next());
+ }
+ }
+
+ runner.enqueue("UPDATE PERSONS SET NAME='George' WHERE ID=1".getBytes());
+ runner.run();
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertEquals("George", rs.getString(2));
+ assertEquals(84, rs.getInt(3));
+ assertFalse(rs.next());
+ }
+ }
+ }
+
+
+ @Test
+ public void testStatementsWithPreparedParameters() throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+ final File tempDir = folder.getRoot();
+ final File dbDir = new File(tempDir, "db");
+ final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate(createPersons);
+ }
+ }
+
+ runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+ final Map attributes = new HashMap<>();
+ attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
+ attributes.put("sql.args.1.value", "1");
+
+ attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
+ attributes.put("sql.args.2.value", "Mark");
+
+ attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
+ attributes.put("sql.args.3.value", "84");
+
+ runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertEquals("Mark", rs.getString(2));
+ assertEquals(84, rs.getInt(3));
+ assertFalse(rs.next());
+ }
+ }
+
+ runner.clearTransferState();
+
+ attributes.clear();
+ attributes.put("sql.args.1.type", String.valueOf(Types.VARCHAR));
+ attributes.put("sql.args.1.value", "George");
+
+ attributes.put("sql.args.2.type", String.valueOf(Types.INTEGER));
+ attributes.put("sql.args.2.value", "1");
+
+ runner.enqueue("UPDATE PERSONS SET NAME=? WHERE ID=?".getBytes(), attributes);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertEquals("George", rs.getString(2));
+ assertEquals(84, rs.getInt(3));
+ assertFalse(rs.next());
+ }
+ }
+ }
+
+
+ @Test
+ public void testMultipleStatementsWithinFlowFile() throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+ final File tempDir = folder.getRoot();
+ final File dbDir = new File(tempDir, "db");
+ final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate(createPersons);
+ }
+ }
+
+ runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+
+ final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
+ "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
+ final Map attributes = new HashMap<>();
+ attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
+ attributes.put("sql.args.1.value", "1");
+
+ attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
+ attributes.put("sql.args.2.value", "Mark");
+
+ attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
+ attributes.put("sql.args.3.value", "84");
+
+ attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER));
+ attributes.put("sql.args.4.value", "1");
+
+ runner.enqueue(sql.getBytes(), attributes);
+ runner.run();
+
+ // should fail because of the semicolon
+ runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1);
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
+ assertFalse(rs.next());
+ }
+ }
+ }
+
+
+ /**
+ * Simple implementation only for testing purposes
+ */
+ private static class MockDBCPService extends AbstractControllerService implements DBCPService {
+ private final String dbLocation;
+
+ public MockDBCPService(final String dbLocation) {
+ this.dbLocation = dbLocation;
+ }
+
+ @Override
+ public String getIdentifier() {
+ return "dbcp";
+ }
+
+ @Override
+ public Connection getConnection() throws ProcessException {
+ try {
+ Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+ final Connection con = DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true");
+ return con;
+ } catch (final Exception e) {
+ e.printStackTrace();
+ throw new ProcessException("getConnection failed: " + e);
+ }
+ }
+ }
+}
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/malformed-person-extra-comma.json b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/malformed-person-extra-comma.json
new file mode 100644
index 0000000000..2f82a94919
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/malformed-person-extra-comma.json
@@ -0,0 +1,4 @@
+{
+ "id": 1,
+ "name": "Mark",
+}
\ No newline at end of file
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-1.json b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-1.json
new file mode 100644
index 0000000000..8f9844770e
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-1.json
@@ -0,0 +1,5 @@
+{
+ "id": 1,
+ "name": "Mark",
+ "code": 48
+}
\ No newline at end of file
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-code.json b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-code.json
new file mode 100644
index 0000000000..3ff074ae9f
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-code.json
@@ -0,0 +1,4 @@
+{
+ "id": 1,
+ "name": "Mark"
+}
\ No newline at end of file
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-id.json b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-id.json
new file mode 100644
index 0000000000..347cc9ebad
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-id.json
@@ -0,0 +1,4 @@
+{
+ "name": "Mark",
+ "code": 48
+}
\ No newline at end of file