From 1a37c95f5883cdca858c175a637c67d5f012908c Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 12 Aug 2015 17:17:12 -0400 Subject: [PATCH] NIFI-293: Added processors ConvertFlatJSONToSQL, PutSQL --- .../src/main/resources/META-INF/NOTICE | 23 + .../nifi-standard-processors/pom.xml | 4 + .../org.apache.nifi.processor.Processor | 2 + .../standard/ConvertFlatJSONToSQL.java | 600 ++++++++++++++++++ .../nifi/processors/standard/PutSQL.java | 203 ++++++ .../standard/TestConvertFlatJSONToSQL.java | 294 +++++++++ .../nifi/processors/standard/TestPutSQL.java | 246 +++++++ .../malformed-person-extra-comma.json | 4 + .../TestConvertFlatJSONToSQL/person-1.json | 5 + .../person-without-code.json | 4 + .../person-without-id.json | 4 + 11 files changed, 1389 insertions(+) create mode 100644 nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertFlatJSONToSQL.java create mode 100644 nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java create mode 100644 nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertFlatJSONToSQL.java create mode 100644 nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java create mode 100644 nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/malformed-person-extra-comma.json create mode 100644 nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-1.json create mode 100644 nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-code.json create mode 100644 nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-id.json diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE index 4a81d74d01..152e3bcd64 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE @@ -114,6 +114,29 @@ The following binary components are provided under the Apache Software License v Apache Tika Core Copyright 2007-2015 The Apache Software Foundation + (ASLv2) Jackson JSON processor + The following NOTICE information applies: + # Jackson JSON processor + + Jackson is a high-performance, Free/Open Source JSON processing library. + It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has + been in development since 2007. + It is currently developed by a community of developers, as well as supported + commercially by FasterXML.com. + + ## Licensing + + Jackson core and extension components may licensed under different licenses. + To find the details that apply to this artifact see the accompanying LICENSE file. + For more information, including possible other licensing options, contact + FasterXML.com (http://fasterxml.com). + + ## Credits + + A list of contributors may be found from CREDITS file, which is included + in some artifacts (usually source distributions); but is always available + from the source code management (SCM) system project uses. + ************************ Common Development and Distribution License 1.1 ************************ diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 994612255c..fd267d13af 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -180,6 +180,10 @@ language governing permissions and limitations under the License. --> org.apache.nifi nifi-dbcp-service-api + + org.codehaus.jackson + jackson-mapper-asl + org.apache.derby diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index e62b57ff1f..0ce1456009 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -16,6 +16,7 @@ org.apache.nifi.processors.standard.Base64EncodeContent org.apache.nifi.processors.standard.CompressContent org.apache.nifi.processors.standard.ControlRate org.apache.nifi.processors.standard.ConvertCharacterSet +org.apache.nifi.processors.standard.ConvertJSONToSQL org.apache.nifi.processors.standard.DetectDuplicate org.apache.nifi.processors.standard.DistributeLoad org.apache.nifi.processors.standard.DuplicateFlowFile @@ -52,6 +53,7 @@ org.apache.nifi.processors.standard.PutFile org.apache.nifi.processors.standard.PutFTP org.apache.nifi.processors.standard.PutJMS org.apache.nifi.processors.standard.PutSFTP +org.apache.nifi.processors.standard.PutSQL org.apache.nifi.processors.standard.ReplaceText org.apache.nifi.processors.standard.ReplaceTextWithMapping org.apache.nifi.processors.standard.RouteOnAttribute diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertFlatJSONToSQL.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertFlatJSONToSQL.java new file mode 100644 index 0000000000..57d855f3ba --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertFlatJSONToSQL.java @@ -0,0 +1,600 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.ObjectHolder; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.map.ObjectMapper; + + +@SideEffectFree +@SupportsBatching +@SeeAlso(PutSQL.class) +@Tags({"json", "sql", "database", "rdbms", "insert", "update", "relational", "flat"}) +@CapabilityDescription("Converts a JSON-formatted FlowFile into an UPDATE or INSERT SQL statement. The incoming FlowFile is expected to be " + + "\"flat\" JSON message, meaning that it consists of a single JSON element and each field maps to a simple type. If a field maps to " + + "a JSON object, that JSON object will be interpreted as Text. Upon successful conversion, the original FlowFile is routed to the 'original' " + + "relationship and the SQL is routed to the 'sql' relationship.") +public class ConvertFlatJSONToSQL extends AbstractProcessor { + private static final String UPDATE_TYPE = "UPDATE"; + private static final String INSERT_TYPE = "INSERT"; + + static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder() + .name("JDBC Connection Pool") + .description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. " + + "The Connection Pool is necessary in order to determine the appropriate database column types.") + .identifiesControllerService(DBCPService.class) + .required(true) + .build(); + static final PropertyDescriptor STATEMENT_TYPE = new PropertyDescriptor.Builder() + .name("Statement Type") + .description("Specifies the type of SQL Statement to generate") + .required(true) + .allowableValues(UPDATE_TYPE, INSERT_TYPE) + .build(); + static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() + .name("Table Name") + .description("The name of the table that the statement should update") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + static final PropertyDescriptor CATALOG_NAME = new PropertyDescriptor.Builder() + .name("Catalog Name") + .description("The name of the catalog that the statement should update. This may not apply for the database that you are updating. In this case, leave the field empty") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + static final PropertyDescriptor TRANSLATE_FIELD_NAMES = new PropertyDescriptor.Builder() + .name("Translate Field Names") + .description("If true, the Processor will attempt to translate JSON field names into the appropriate column names for the table specified. " + + "If false, the JSON field names must match the column names exactly, or the column will not be updated") + .allowableValues("true", "false") + .defaultValue("true") + .build(); + static final PropertyDescriptor UPDATE_KEY = new PropertyDescriptor.Builder() + .name("Update Keys") + .description("A comma-separated list of column names that uniquely identifies a row in the database for UPDATE statements. " + + "If the Statement Type is UPDATE and this property is not set, the table's Primary Keys are used. " + + "In this case, if no Primary Key exists, the conversion to SQL will fail. " + + "This property is ignored if the Statement Type is INSERT") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .expressionLanguageSupported(true) + .build(); + + + static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("When a FlowFile is converted to SQL, the original JSON FlowFile is routed to this relationship") + .build(); + static final Relationship REL_SQL = new Relationship.Builder() + .name("sql") + .description("A FlowFile is routed to this relationship when its contents have successfully been converted into a SQL statement") + .build(); + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("A FlowFile is routed to this relationship if it cannot be converted into a SQL statement. Common causes include invalid JSON " + + "content or the JSON content missing a required field (if using an INSERT statement type).") + .build(); + + private final Map schemaCache = new LinkedHashMap(100) { + private static final long serialVersionUID = 1L; + + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return true; + } + }; + + @Override + protected List getSupportedPropertyDescriptors() { + final List properties = new ArrayList<>(); + properties.add(CONNECTION_POOL); + properties.add(STATEMENT_TYPE); + properties.add(TABLE_NAME); + properties.add(CATALOG_NAME); + properties.add(TRANSLATE_FIELD_NAMES); + properties.add(UPDATE_KEY); + return properties; + } + + + @Override + public Set getRelationships() { + final Set rels = new HashSet<>(); + rels.add(REL_ORIGINAL); + rels.add(REL_SQL); + rels.add(REL_FAILURE); + return rels; + } + + + @OnScheduled + public void onScheduled(final ProcessContext context) { + synchronized (this) { + schemaCache.clear(); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final boolean translateFieldNames = context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean(); + final String statementType = context.getProperty(STATEMENT_TYPE).getValue(); + final String updateKeys = context.getProperty(UPDATE_KEY).evaluateAttributeExpressions(flowFile).getValue(); + + final String catalog = context.getProperty(CATALOG_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final SchemaKey schemaKey = new SchemaKey(catalog, tableName); + final boolean includePrimaryKeys = UPDATE_TYPE.equals(statementType) && updateKeys == null; + + // get the database schema from the cache, if one exists + TableSchema schema; + synchronized (this) { + schema = schemaCache.get(schemaKey); + if (schema == null) { + // No schema exists for this table yet. Query the database to determine the schema and put it into the cache. + final DBCPService dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class); + try (final Connection conn = dbcpService.getConnection()) { + schema = TableSchema.from(conn, catalog, tableName, translateFieldNames, includePrimaryKeys); + schemaCache.put(schemaKey, schema); + } catch (final SQLException e) { + getLogger().error("Failed to convert {} into a SQL statement due to {}; routing to failure", new Object[] {flowFile, e.toString()}, e); + session.transfer(flowFile, REL_FAILURE); + return; + } + } + } + + // Parse the JSON document + final ObjectMapper mapper = new ObjectMapper(); + final ObjectHolder rootNodeRef = new ObjectHolder<>(null); + try { + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + try (final InputStream bufferedIn = new BufferedInputStream(in)) { + rootNodeRef.set(mapper.readTree(bufferedIn)); + } + } + }); + } catch (final ProcessException pe) { + getLogger().error("Failed to parse {} as JSON due to {}; routing to failure", new Object[] {flowFile, pe.toString()}, pe); + session.transfer(flowFile, REL_FAILURE); + return; + } + + final JsonNode rootNode = rootNodeRef.get(); + final String sql; + final Map attributes = new HashMap<>(); + if (INSERT_TYPE.equals(statementType)) { + try { + sql = generateInsert(rootNode, attributes, tableName, schema, translateFieldNames); + } catch (final ProcessException pe) { + getLogger().error("Failed to convert {} to a SQL INSERT statement due to {}; routing to failure", + new Object[] { flowFile, pe.toString() }, pe); + session.transfer(flowFile, REL_FAILURE); + return; + } + } else { + try { + sql = generateUpdate(rootNode, attributes, tableName, updateKeys, schema, translateFieldNames); + } catch (final ProcessException pe) { + getLogger().error("Failed to convert {} to a SQL UPDATE statement due to {}; routing to failure", + new Object[] { flowFile, pe.toString() }, pe); + session.transfer(flowFile, REL_FAILURE); + return; + } + } + + FlowFile sqlFlowFile = session.create(flowFile); + sqlFlowFile = session.write(sqlFlowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + out.write(sql.getBytes(StandardCharsets.UTF_8)); + } + }); + + attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); + attributes.put("sql.table", tableName); + if (catalog != null) { + attributes.put("sql.catalog", catalog); + } + + sqlFlowFile = session.putAllAttributes(sqlFlowFile, attributes); + + session.transfer(flowFile, REL_ORIGINAL); + session.transfer(sqlFlowFile, REL_SQL); + } + + private Set getNormalizedColumnNames(final JsonNode node, final boolean translateFieldNames) { + final Set normalizedFieldNames = new HashSet<>(); + final Iterator fieldNameItr = node.getFieldNames(); + while (fieldNameItr.hasNext()) { + normalizedFieldNames.add(normalizeColumnName(fieldNameItr.next(), translateFieldNames)); + } + + return normalizedFieldNames; + } + + private String generateInsert(final JsonNode rootNode, final Map attributes, final String tableName, + final TableSchema schema, final boolean translateFieldNames) { + + final Set normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames); + for (final String requiredColName : schema.getRequiredColumnNames()) { + final String normalizedColName = normalizeColumnName(requiredColName, translateFieldNames); + if (!normalizedFieldNames.contains(normalizedColName)) { + throw new ProcessException("JSON does not have a value for the Required column '" + requiredColName + "'"); + } + } + + final StringBuilder sqlBuilder = new StringBuilder(); + int fieldCount = 0; + sqlBuilder.append("INSERT INTO ").append(tableName).append(" ("); + + // iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as + // adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the + // columns that we are inserting into + final Iterator fieldNames = rootNode.getFieldNames(); + while (fieldNames.hasNext()) { + final String fieldName = fieldNames.next(); + + final ColumnDescription desc = schema.getColumns().get(normalizeColumnName(fieldName, translateFieldNames)); + if (desc != null) { + if (fieldCount++ > 0) { + sqlBuilder.append(", "); + } + + sqlBuilder.append(desc.getColumnName()); + + final int sqlType = desc.getDataType(); + attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType)); + + final Integer colSize = desc.getColumnSize(); + String fieldValue = rootNode.get(fieldName).asText(); + if (colSize != null && fieldValue.length() > colSize) { + fieldValue = fieldValue.substring(0, colSize); + } + attributes.put("sql.args." + fieldCount + ".value", fieldValue); + } + } + + // complete the SQL statements by adding ?'s for all of the values to be escaped. + sqlBuilder.append(") VALUES ("); + for (int i=0; i < fieldCount; i++) { + if (i > 0) { + sqlBuilder.append(", "); + } + + sqlBuilder.append("?"); + } + sqlBuilder.append(")"); + + if (fieldCount == 0) { + throw new ProcessException("None of the fields in the JSON map to the columns defined by the " + tableName + " table"); + } + + return sqlBuilder.toString(); + } + + private String generateUpdate(final JsonNode rootNode, final Map attributes, final String tableName, final String updateKeys, + final TableSchema schema, final boolean translateFieldNames) { + + final Set updateKeyNames; + if (updateKeys == null) { + updateKeyNames = schema.getPrimaryKeyColumnNames(); + } else { + updateKeyNames = new HashSet<>(); + for (final String updateKey : updateKeys.split(",")) { + updateKeyNames.add(updateKey.trim()); + } + } + + if (updateKeyNames.isEmpty()) { + throw new ProcessException("Table '" + tableName + "' does not have a Primary Key and no Update Keys were specified"); + } + + final StringBuilder sqlBuilder = new StringBuilder(); + int fieldCount = 0; + sqlBuilder.append("UPDATE ").append(tableName).append(" SET "); + + + // Create a Set of all normalized Update Key names, and ensure that there is a field in the JSON + // for each of the Update Key fields. + final Set normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames); + final Set normalizedUpdateNames = new HashSet<>(); + for (final String uk : updateKeyNames) { + final String normalizedUK = normalizeColumnName(uk, translateFieldNames); + normalizedUpdateNames.add(normalizedUK); + + if (!normalizedFieldNames.contains(normalizedUK)) { + throw new ProcessException("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'"); + } + } + + // iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as + // adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the + // columns that we are inserting into + Iterator fieldNames = rootNode.getFieldNames(); + while (fieldNames.hasNext()) { + final String fieldName = fieldNames.next(); + + final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames); + final ColumnDescription desc = schema.getColumns().get(normalizedColName); + if (desc == null) { + continue; + } + + // Check if this column is an Update Key. If so, skip it for now. We will come + // back to it after we finish the SET clause + if (normalizedUpdateNames.contains(normalizedColName)) { + continue; + } + + if (fieldCount++ > 0) { + sqlBuilder.append(", "); + } + + sqlBuilder.append(desc.getColumnName()).append(" = ?"); + final int sqlType = desc.getDataType(); + attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType)); + + final Integer colSize = desc.getColumnSize(); + String fieldValue = rootNode.get(fieldName).asText(); + if (colSize != null && fieldValue.length() > colSize) { + fieldValue = fieldValue.substring(0, colSize); + } + attributes.put("sql.args." + fieldCount + ".value", fieldValue); + } + + // Set the WHERE clause based on the Update Key values + sqlBuilder.append(" WHERE "); + + fieldNames = rootNode.getFieldNames(); + int whereFieldCount = 0; + while (fieldNames.hasNext()) { + final String fieldName = fieldNames.next(); + + final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames); + final ColumnDescription desc = schema.getColumns().get(normalizedColName); + if (desc == null) { + continue; + } + + // Check if this column is a Update Key. If so, skip it for now. We will come + // back to it after we finish the SET clause + if (!normalizedUpdateNames.contains(normalizedColName)) { + continue; + } + + if (whereFieldCount++ > 0) { + sqlBuilder.append(" AND "); + } + fieldCount++; + + sqlBuilder.append(normalizedColName).append(" = ?"); + final int sqlType = desc.getDataType(); + attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType)); + + final Integer colSize = desc.getColumnSize(); + String fieldValue = rootNode.get(fieldName).asText(); + if (colSize != null && fieldValue.length() > colSize) { + fieldValue = fieldValue.substring(0, colSize); + } + attributes.put("sql.args." + fieldCount + ".value", fieldValue); + } + + return sqlBuilder.toString(); + } + + private static String normalizeColumnName(final String colName, final boolean translateColumnNames) { + return translateColumnNames ? colName.toUpperCase().replace("_", "") : colName; + } + + private static class TableSchema { + private List requiredColumnNames; + private Set primaryKeyColumnNames; + private Map columns; + + private TableSchema(final List columnDescriptions, final boolean translateColumnNames, + final Set primaryKeyColumnNames) { + this.columns = new HashMap<>(); + this.primaryKeyColumnNames = primaryKeyColumnNames; + + this.requiredColumnNames = new ArrayList<>(); + for (final ColumnDescription desc : columnDescriptions) { + columns.put(ConvertFlatJSONToSQL.normalizeColumnName(desc.columnName, translateColumnNames), desc); + if (desc.isRequired()) { + requiredColumnNames.add(desc.columnName); + } + } + } + + public Map getColumns() { + return columns; + } + + public List getRequiredColumnNames() { + return requiredColumnNames; + } + + public Set getPrimaryKeyColumnNames() { + return primaryKeyColumnNames; + } + + public static TableSchema from(final Connection conn, final String catalog, final String tableName, + final boolean translateColumnNames, final boolean includePrimaryKeys) throws SQLException { + final ResultSet colrs = conn.getMetaData().getColumns(catalog, null, tableName, "%"); + + final List cols = new ArrayList<>(); + while (colrs.next()) { + final ColumnDescription col = ColumnDescription.from(colrs); + cols.add(col); + } + + final Set primaryKeyColumns = new HashSet<>(); + if (includePrimaryKeys) { + final ResultSet pkrs = conn.getMetaData().getPrimaryKeys(catalog, null, tableName); + + while (pkrs.next()) { + final String colName = pkrs.getString("COLUMN_NAME"); + primaryKeyColumns.add(normalizeColumnName(colName, translateColumnNames)); + } + } + + return new TableSchema(cols, translateColumnNames, primaryKeyColumns); + } + } + + private static class ColumnDescription { + private final String columnName; + private final int dataType; + private final boolean required; + private final Integer columnSize; + + private ColumnDescription(final String columnName, final int dataType, final boolean required, final Integer columnSize) { + this.columnName = columnName; + this.dataType = dataType; + this.required = required; + this.columnSize = columnSize; + } + + public int getDataType() { + return dataType; + } + + public Integer getColumnSize() { + return columnSize; + } + + public String getColumnName() { + return columnName; + } + + public boolean isRequired() { + return required; + } + + public static ColumnDescription from(final ResultSet resultSet) throws SQLException { + final String columnName = resultSet.getString("COLUMN_NAME"); + final int dataType = resultSet.getInt("DATA_TYPE"); + final int colSize = resultSet.getInt("COLUMN_SIZE"); + + final String nullableValue = resultSet.getString("IS_NULLABLE"); + final boolean isNullable = "YES".equalsIgnoreCase(nullableValue) || nullableValue.isEmpty(); + final String defaultValue = resultSet.getString("COLUMN_DEF"); + final String autoIncrementValue = resultSet.getString("IS_AUTOINCREMENT"); + final boolean isAutoIncrement = "YES".equalsIgnoreCase(autoIncrementValue); + final boolean required = !isNullable && !isAutoIncrement && defaultValue == null; + + return new ColumnDescription(columnName, dataType, required, colSize == 0 ? null : colSize); + } + } + + private static class SchemaKey { + private final String catalog; + private final String tableName; + + public SchemaKey(final String catalog, final String tableName) { + this.catalog = catalog; + this.tableName = tableName; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((catalog == null) ? 0 : catalog.hashCode()); + result = prime * result + ((tableName == null) ? 0 : tableName.hashCode()); + return result; + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + + final SchemaKey other = (SchemaKey) obj; + if (catalog == null) { + if (other.catalog != null) { + return false; + } + } else if (!catalog.equals(other.catalog)) { + return false; + } + + + if (tableName == null) { + if (other.tableName != null) { + return false; + } + } else if (!tableName.equals(other.tableName)) { + return false; + } + + return true; + } + } +} diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java new file mode 100644 index 0000000000..d349a58fac --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.sql.Connection; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.stream.io.StreamUtils; + +@SupportsBatching +@SeeAlso(ConvertFlatJSONToSQL.class) +@Tags({"sql", "put", "rdbms", "database", "update", "insert", "relational"}) +@CapabilityDescription("Executes a SQL UPDATE or INSERT command. The content of an incoming FlowFile is expected to be the SQL command " + + "to execute. The SQL command may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes " + + "with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be " + + "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format.") +public class PutSQL extends AbstractProcessor { + + static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder() + .name("JDBC Connection Pool") + .description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. " + + "The Connection Pool is necessary in order to determine the appropriate database column types.") + .identifiesControllerService(DBCPService.class) + .required(true) + .build(); + + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("A FlowFile is routed to this relationship after the database is successfully updated") + .build(); + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("A FlowFile is routed to this relationship if the database cannot be updated for any reason") + .build(); + + private static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("sql\\.args\\.(\\d+)\\.type"); + private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+"); + + @Override + protected List getSupportedPropertyDescriptors() { + final List properties = new ArrayList<>(); + properties.add(CONNECTION_POOL); + return properties; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + // TODO: Batch. Pull in 50 or 100 at a time and map content of FlowFile to Set that have that + // same content. Then execute updates in batches. + + final DBCPService dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class); + try (final Connection conn = dbcpService.getConnection()) { + // Read the SQL from the FlowFile's content + final byte[] buffer = new byte[(int) flowFile.getSize()]; + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, buffer); + } + }); + + final String sql = new String(buffer, StandardCharsets.UTF_8); + + // Create a prepared statement and set the appropriate parameters on the statement. + try (final PreparedStatement stmt = conn.prepareStatement(sql)) { + for (final Map.Entry entry : flowFile.getAttributes().entrySet()) { + final String key = entry.getKey(); + final Matcher matcher = SQL_TYPE_ATTRIBUTE_PATTERN.matcher(key); + if (matcher.matches()) { + final int parameterIndex = Integer.parseInt(matcher.group(1)); + + final boolean isNumeric = NUMBER_PATTERN.matcher(entry.getValue()).matches(); + if (!isNumeric) { + getLogger().error("Cannot update database for {} because the value of the '{}' attribute is '{}', which is not a valid JDBC numeral type; routing to failure", + new Object[] {flowFile, key, entry.getValue()}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + final int jdbcType = Integer.parseInt(entry.getValue()); + final String valueAttrName = "sql.args." + parameterIndex + ".value"; + final String parameterValue = flowFile.getAttribute(valueAttrName); + if (parameterValue == null) { + getLogger().error("Cannot update database for {} because the '{}' attribute exists but the '{}' attribute does not", new Object[] {flowFile, key, valueAttrName}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + try { + setParameter(stmt, valueAttrName, parameterIndex, parameterValue, jdbcType); + } catch (final NumberFormatException nfe) { + getLogger().error("Cannot update database for {} because the '{}' attribute has a value of '{}', " + + "which cannot be converted into the necessary data type; routing to failure", new Object[] {flowFile, valueAttrName, parameterValue}); + session.transfer(flowFile, REL_FAILURE); + return; + } + } + } + + final int updatedRowCount = stmt.executeUpdate(); + flowFile = session.putAttribute(flowFile, "sql.update.count", String.valueOf(updatedRowCount)); + } + + // TODO: Need to expose Connection URL from DBCP Service and use it to emit a Provenance Event + session.transfer(flowFile, REL_SUCCESS); + } catch (final SQLException e) { + getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {flowFile, e}); + session.transfer(flowFile, REL_FAILURE); + return; + } + } + + private void setParameter(final PreparedStatement stmt, final String attrName, final int parameterIndex, final String parameterValue, final int jdbcType) throws SQLException { + switch (jdbcType) { + case Types.BIT: + stmt.setBoolean(parameterIndex, Boolean.parseBoolean(parameterValue)); + break; + case Types.TINYINT: + stmt.setByte(parameterIndex, Byte.parseByte(parameterValue)); + break; + case Types.SMALLINT: + stmt.setShort(parameterIndex, Short.parseShort(parameterValue)); + break; + case Types.INTEGER: + stmt.setInt(parameterIndex, Integer.parseInt(parameterValue)); + break; + case Types.BIGINT: + stmt.setLong(parameterIndex, Long.parseLong(parameterValue)); + break; + case Types.REAL: + stmt.setFloat(parameterIndex, Float.parseFloat(parameterValue)); + break; + case Types.FLOAT: + case Types.DOUBLE: + stmt.setDouble(parameterIndex, Double.parseDouble(parameterValue)); + break; + case Types.DATE: + stmt.setDate(parameterIndex, new Date(Long.parseLong(parameterValue))); + break; + case Types.TIME: + stmt.setTime(parameterIndex, new Time(Long.parseLong(parameterValue))); + break; + case Types.TIMESTAMP: + stmt.setTimestamp(parameterIndex, new Timestamp(Long.parseLong(parameterValue))); + break; + case Types.CHAR: + case Types.VARCHAR: + case Types.LONGNVARCHAR: + case Types.LONGVARCHAR: + stmt.setString(parameterIndex, parameterValue); + break; + default: + throw new SQLException("The '" + attrName + "' attribute has a value of '" + parameterValue + + "' and a type of '" + jdbcType + "' but this is not a known data type"); + } + } +} diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertFlatJSONToSQL.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertFlatJSONToSQL.java new file mode 100644 index 0000000000..b2bdbba551 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertFlatJSONToSQL.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestConvertFlatJSONToSQL { + static String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)"; + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @BeforeClass + public static void setup() { + System.setProperty("derby.stream.error.file", "target/derby.log"); + } + + @Test + public void testInsert() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "INSERT"); + runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-1.json")); + runner.run(); + + runner.assertTransferCount(ConvertFlatJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertFlatJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertFlatJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.1.value", "1"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.2.value", "Mark"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.3.value", "48"); + + out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)"); + } + + + @Test + public void testUpdateBasedOnPrimaryKey() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "UPDATE"); + runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-1.json")); + runner.run(); + + runner.assertTransferCount(ConvertFlatJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertFlatJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertFlatJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.1.value", "Mark"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.2.value", "48"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.3.value", "1"); + + out.assertContentEquals("UPDATE PERSONS SET NAME = ?, CODE = ? WHERE ID = ?"); + } + + + @Test + public void testUpdateBasedOnUpdateKey() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "UPDATE"); + runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "code"); + runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-1.json")); + runner.run(); + + runner.assertTransferCount(ConvertFlatJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertFlatJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertFlatJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.1.value", "1"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.2.value", "Mark"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.3.value", "48"); + + out.assertContentEquals("UPDATE PERSONS SET ID = ?, NAME = ? WHERE CODE = ?"); + } + + @Test + public void testUpdateBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "UPDATE"); + runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "name, code"); + runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-1.json")); + runner.run(); + + runner.assertTransferCount(ConvertFlatJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertFlatJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertFlatJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.1.value", "1"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.2.value", "Mark"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.3.value", "48"); + + out.assertContentEquals("UPDATE PERSONS SET ID = ? WHERE NAME = ? AND CODE = ?"); + } + + @Test + public void testUpdateWithMissingFieldBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "UPDATE"); + runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "name, code"); + runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-without-code.json")); + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertFlatJSONToSQL.REL_FAILURE, 1); + } + + @Test + public void testUpdateWithMalformedJson() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "UPDATE"); + runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "name, code"); + runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/malformed-person-extra-comma.json")); + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertFlatJSONToSQL.REL_FAILURE, 1); + } + + @Test + public void testInsertWithMissingField() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "INSERT"); + runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "name, code"); + runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-without-id.json")); + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertFlatJSONToSQL.REL_FAILURE, 1); + } + + + /** + * Simple implementation only for testing purposes + */ + private static class MockDBCPService extends AbstractControllerService implements DBCPService { + private final String dbLocation; + + public MockDBCPService(final String dbLocation) { + this.dbLocation = dbLocation; + } + + @Override + public String getIdentifier() { + return "dbcp"; + } + + @Override + public Connection getConnection() throws ProcessException { + try { + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + final Connection con = DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true"); + return con; + } catch (final Exception e) { + e.printStackTrace(); + throw new ProcessException("getConnection failed: " + e); + } + } + } + +} diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java new file mode 100644 index 0000000000..ccba3e176f --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Types; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestPutSQL { + static String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)"; + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @BeforeClass + public static void setup() { + System.setProperty("derby.stream.error.file", "target/derby.log"); + } + + @Test + public void testDirectStatements() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 'Mark', 84)".getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Mark", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + + runner.enqueue("UPDATE PERSONS SET NAME='George' WHERE ID=1".getBytes()); + runner.run(); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("George", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + } + + + @Test + public void testStatementsWithPreparedParameters() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + final Map attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.1.value", "1"); + + attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("sql.args.2.value", "Mark"); + + attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.3.value", "84"); + + runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Mark", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + + runner.clearTransferState(); + + attributes.clear(); + attributes.put("sql.args.1.type", String.valueOf(Types.VARCHAR)); + attributes.put("sql.args.1.value", "George"); + + attributes.put("sql.args.2.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.2.value", "1"); + + runner.enqueue("UPDATE PERSONS SET NAME=? WHERE ID=?".getBytes(), attributes); + runner.run(); + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("George", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + } + + + @Test + public void testMultipleStatementsWithinFlowFile() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + + final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + + "UPDATE PERSONS SET NAME='George' WHERE ID=?; "; + final Map attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.1.value", "1"); + + attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("sql.args.2.value", "Mark"); + + attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.3.value", "84"); + + attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.4.value", "1"); + + runner.enqueue(sql.getBytes(), attributes); + runner.run(); + + // should fail because of the semicolon + runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertFalse(rs.next()); + } + } + } + + + /** + * Simple implementation only for testing purposes + */ + private static class MockDBCPService extends AbstractControllerService implements DBCPService { + private final String dbLocation; + + public MockDBCPService(final String dbLocation) { + this.dbLocation = dbLocation; + } + + @Override + public String getIdentifier() { + return "dbcp"; + } + + @Override + public Connection getConnection() throws ProcessException { + try { + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + final Connection con = DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true"); + return con; + } catch (final Exception e) { + e.printStackTrace(); + throw new ProcessException("getConnection failed: " + e); + } + } + } +} diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/malformed-person-extra-comma.json b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/malformed-person-extra-comma.json new file mode 100644 index 0000000000..2f82a94919 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/malformed-person-extra-comma.json @@ -0,0 +1,4 @@ +{ + "id": 1, + "name": "Mark", +} \ No newline at end of file diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-1.json b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-1.json new file mode 100644 index 0000000000..8f9844770e --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-1.json @@ -0,0 +1,5 @@ +{ + "id": 1, + "name": "Mark", + "code": 48 +} \ No newline at end of file diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-code.json b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-code.json new file mode 100644 index 0000000000..3ff074ae9f --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-code.json @@ -0,0 +1,4 @@ +{ + "id": 1, + "name": "Mark" +} \ No newline at end of file diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-id.json b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-id.json new file mode 100644 index 0000000000..347cc9ebad --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-id.json @@ -0,0 +1,4 @@ +{ + "name": "Mark", + "code": 48 +} \ No newline at end of file