diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index fd267d13af..729b495337 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -220,6 +220,13 @@ language governing permissions and limitations under the License. --> src/test/resources/TestMergeContent/head src/test/resources/TestMergeContent/user.avsc src/test/resources/TestMergeContent/place.avsc + src/test/resources/TestConvertJSONToSQL/person-1.json + src/test/resources/TestConvertJSONToSQL/persons.json + src/test/resources/TestConvertJSONToSQL/malformed-person-extra-comma.json + src/test/resources/TestConvertJSONToSQL/person-with-extra-field.json + src/test/resources/TestConvertJSONToSQL/person-without-code.json + src/test/resources/TestConvertJSONToSQL/person-with-null-code.json + src/test/resources/TestConvertJSONToSQL/person-without-id.json src/test/resources/TestModifyBytes/noFooter.txt src/test/resources/TestModifyBytes/noFooter_noHeader.txt src/test/resources/TestModifyBytes/noHeader.txt diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java new file mode 100644 index 0000000000..7eda59352e --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java @@ -0,0 +1,679 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.ObjectHolder; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.node.ArrayNode; +import org.codehaus.jackson.node.JsonNodeFactory; + +@SideEffectFree +@SupportsBatching +@SeeAlso(PutSQL.class) +@Tags({"json", "sql", "database", "rdbms", "insert", "update", "relational", "flat"}) +@CapabilityDescription("Converts a JSON-formatted FlowFile into an UPDATE or INSERT SQL statement. The incoming FlowFile is expected to be " + + "\"flat\" JSON message, meaning that it consists of a single JSON element and each field maps to a simple type. If a field maps to " + + "a JSON object, that JSON object will be interpreted as Text. If the input is an array of JSON elements, each element in the array is " + + "output as a separate FlowFile to the 'sql' relationship. Upon successful conversion, the original FlowFile is routed to the 'original' " + + "relationship and the SQL is routed to the 'sql' relationship.") +@WritesAttributes({ + @WritesAttribute(attribute="mime.type", description="Sets mime.type of FlowFile that is routed to 'sql' to 'text/plain'."), + @WritesAttribute(attribute="sql.table", description="Sets the sql.table attribute of FlowFile that is routed to 'sql' to the name of the table that is updated by the SQL statement."), + @WritesAttribute(attribute="sql.catalog", description="If the Catalog name is set for this database, specifies the name of the catalog that the SQL statement will update. " + + "If no catalog is used, this attribute will not be added."), + @WritesAttribute(attribute="fragment.identifier", description="All FlowFiles routed to the 'sql' relationship for the same incoming FlowFile (multiple will be output for the same incoming " + + "FlowFile if the incoming FlowFile is a JSON Array) will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), + @WritesAttribute(attribute="fragment.count", description="The number of SQL FlowFiles that were produced for same incoming FlowFile. This can be used in conjunction with the " + + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming FlowFile."), + @WritesAttribute(attribute="fragment.index", description="The position of this FlowFile in the list of outgoing FlowFiles that were all derived from the same incoming FlowFile. This can be " + + "used in conjunction with the fragment.identifier and fragment.count attributes to know which FlowFiles originated from the same incoming FlowFile and in what order the SQL " + + "FlowFiles were produced"), + @WritesAttribute(attribute="sql.args.N.type", description="The output SQL statements are parameterized in order to avoid SQL Injection Attacks. The types of the Parameters " + + "to use are stored in attributes named sql.args.1.type, sql.args.2.type, sql.args.3.type, and so on. The type is a number representing a JDBC Type constant. " + + "Generally, this is useful only for software to read and interpret but is added so that a processor such as PutSQL can understand how to interpret the values."), + @WritesAttribute(attribute="sql.args.N.value", description="The output SQL statements are parameterized in order to avoid SQL Injection Attacks. The values of the Parameters " + + "to use are stored in the attributes named sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. Each of these attributes has a corresponding " + + "sql.args.N.type attribute that indicates how the value should be interpreted when inserting it into the database.") +}) +public class ConvertJSONToSQL extends AbstractProcessor { + private static final String UPDATE_TYPE = "UPDATE"; + private static final String INSERT_TYPE = "INSERT"; + + static final AllowableValue IGNORE_UNMATCHED_FIELD = new AllowableValue("Ignore Unmatched Fields", "Ignore Unmatched Fields", + "Any field in the JSON document that cannot be mapped to a column in the database is ignored"); + static final AllowableValue FAIL_UNMATCHED_FIELD = new AllowableValue("Fail", "Fail", + "If the JSON document has any field that cannot be mapped to a column in the database, the FlowFile will be routed to the failure relationship"); + + static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder() + .name("JDBC Connection Pool") + .description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. " + + "The Connection Pool is necessary in order to determine the appropriate database column types.") + .identifiesControllerService(DBCPService.class) + .required(true) + .build(); + static final PropertyDescriptor STATEMENT_TYPE = new PropertyDescriptor.Builder() + .name("Statement Type") + .description("Specifies the type of SQL Statement to generate") + .required(true) + .allowableValues(UPDATE_TYPE, INSERT_TYPE) + .build(); + static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() + .name("Table Name") + .description("The name of the table that the statement should update") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + static final PropertyDescriptor CATALOG_NAME = new PropertyDescriptor.Builder() + .name("Catalog Name") + .description("The name of the catalog that the statement should update. This may not apply for the database that you are updating. In this case, leave the field empty") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + static final PropertyDescriptor TRANSLATE_FIELD_NAMES = new PropertyDescriptor.Builder() + .name("Translate Field Names") + .description("If true, the Processor will attempt to translate JSON field names into the appropriate column names for the table specified. " + + "If false, the JSON field names must match the column names exactly, or the column will not be updated") + .allowableValues("true", "false") + .defaultValue("true") + .build(); + static final PropertyDescriptor UNMATCHED_FIELD_BEHAVIOR = new PropertyDescriptor.Builder() + .name("Unmatched Field Behavior") + .description("If an incoming JSON element has a field that does not map to any of the database table's columns, this property specifies how to handle the situation") + .allowableValues(IGNORE_UNMATCHED_FIELD, FAIL_UNMATCHED_FIELD) + .defaultValue(IGNORE_UNMATCHED_FIELD.getValue()) + .build(); + static final PropertyDescriptor UPDATE_KEY = new PropertyDescriptor.Builder() + .name("Update Keys") + .description("A comma-separated list of column names that uniquely identifies a row in the database for UPDATE statements. " + + "If the Statement Type is UPDATE and this property is not set, the table's Primary Keys are used. " + + "In this case, if no Primary Key exists, the conversion to SQL will fail. " + + "This property is ignored if the Statement Type is INSERT") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .expressionLanguageSupported(true) + .build(); + + + static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("When a FlowFile is converted to SQL, the original JSON FlowFile is routed to this relationship") + .build(); + static final Relationship REL_SQL = new Relationship.Builder() + .name("sql") + .description("A FlowFile is routed to this relationship when its contents have successfully been converted into a SQL statement") + .build(); + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("A FlowFile is routed to this relationship if it cannot be converted into a SQL statement. Common causes include invalid JSON " + + "content or the JSON content missing a required field (if using an INSERT statement type).") + .build(); + + private final Map schemaCache = new LinkedHashMap(100) { + private static final long serialVersionUID = 1L; + + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return true; + } + }; + + @Override + protected List getSupportedPropertyDescriptors() { + final List properties = new ArrayList<>(); + properties.add(CONNECTION_POOL); + properties.add(STATEMENT_TYPE); + properties.add(TABLE_NAME); + properties.add(CATALOG_NAME); + properties.add(TRANSLATE_FIELD_NAMES); + properties.add(UNMATCHED_FIELD_BEHAVIOR); + properties.add(UPDATE_KEY); + return properties; + } + + + @Override + public Set getRelationships() { + final Set rels = new HashSet<>(); + rels.add(REL_ORIGINAL); + rels.add(REL_SQL); + rels.add(REL_FAILURE); + return rels; + } + + + @OnScheduled + public void onScheduled(final ProcessContext context) { + synchronized (this) { + schemaCache.clear(); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final boolean translateFieldNames = context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean(); + final boolean ignoreUnmappedFields = IGNORE_UNMATCHED_FIELD.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_FIELD_BEHAVIOR).getValue()); + final String statementType = context.getProperty(STATEMENT_TYPE).getValue(); + final String updateKeys = context.getProperty(UPDATE_KEY).evaluateAttributeExpressions(flowFile).getValue(); + + final String catalog = context.getProperty(CATALOG_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final SchemaKey schemaKey = new SchemaKey(catalog, tableName); + final boolean includePrimaryKeys = UPDATE_TYPE.equals(statementType) && updateKeys == null; + + // get the database schema from the cache, if one exists. We do this in a synchronized block, rather than + // using a ConcurrentMap because the Map that we are using is a LinkedHashMap with a capacity such that if + // the Map grows beyond this capacity, old elements are evicted. We do this in order to avoid filling the + // Java Heap if there are a lot of different SQL statements being generated that reference different tables. + TableSchema schema; + synchronized (this) { + schema = schemaCache.get(schemaKey); + if (schema == null) { + // No schema exists for this table yet. Query the database to determine the schema and put it into the cache. + final DBCPService dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class); + try (final Connection conn = dbcpService.getConnection()) { + schema = TableSchema.from(conn, catalog, tableName, translateFieldNames, includePrimaryKeys); + schemaCache.put(schemaKey, schema); + } catch (final SQLException e) { + getLogger().error("Failed to convert {} into a SQL statement due to {}; routing to failure", new Object[] {flowFile, e.toString()}, e); + session.transfer(flowFile, REL_FAILURE); + return; + } + } + } + + // Parse the JSON document + final ObjectMapper mapper = new ObjectMapper(); + final ObjectHolder rootNodeRef = new ObjectHolder<>(null); + try { + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + try (final InputStream bufferedIn = new BufferedInputStream(in)) { + rootNodeRef.set(mapper.readTree(bufferedIn)); + } + } + }); + } catch (final ProcessException pe) { + getLogger().error("Failed to parse {} as JSON due to {}; routing to failure", new Object[] {flowFile, pe.toString()}, pe); + session.transfer(flowFile, REL_FAILURE); + return; + } + + final JsonNode rootNode = rootNodeRef.get(); + + // The node may or may not be a Json Array. If it isn't, we will create an + // ArrayNode and add just the root node to it. We do this so that we can easily iterate + // over the array node, rather than duplicating the logic or creating another function that takes many variables + // in order to implement the logic. + final ArrayNode arrayNode; + if (rootNode.isArray()) { + arrayNode = (ArrayNode) rootNode; + } else { + final JsonNodeFactory nodeFactory = JsonNodeFactory.instance; + arrayNode = new ArrayNode(nodeFactory); + arrayNode.add(rootNode); + } + + final String fragmentIdentifier = UUID.randomUUID().toString(); + + final Set created = new HashSet<>(); + for (int i=0; i < arrayNode.size(); i++) { + final JsonNode jsonNode = arrayNode.get(i); + + final String sql; + final Map attributes = new HashMap<>(); + + try { + if (INSERT_TYPE.equals(statementType)) { + sql = generateInsert(jsonNode, attributes, tableName, schema, translateFieldNames, ignoreUnmappedFields); + } else { + sql = generateUpdate(jsonNode, attributes, tableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields); + } + } catch (final ProcessException pe) { + getLogger().error("Failed to convert {} to a SQL {} statement due to {}; routing to failure", + new Object[] { flowFile, statementType, pe.toString() }, pe); + session.remove(created); + session.transfer(flowFile, REL_FAILURE); + return; + } + + FlowFile sqlFlowFile = session.create(flowFile); + created.add(sqlFlowFile); + + sqlFlowFile = session.write(sqlFlowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + out.write(sql.getBytes(StandardCharsets.UTF_8)); + } + }); + + attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); + attributes.put("sql.table", tableName); + attributes.put("fragment.identifier", fragmentIdentifier); + attributes.put("fragment.count", String.valueOf(arrayNode.size())); + attributes.put("fragment.index", String.valueOf(i)); + + if (catalog != null) { + attributes.put("sql.catalog", catalog); + } + + sqlFlowFile = session.putAllAttributes(sqlFlowFile, attributes); + session.transfer(sqlFlowFile, REL_SQL); + } + + session.transfer(flowFile, REL_ORIGINAL); + } + + private Set getNormalizedColumnNames(final JsonNode node, final boolean translateFieldNames) { + final Set normalizedFieldNames = new HashSet<>(); + final Iterator fieldNameItr = node.getFieldNames(); + while (fieldNameItr.hasNext()) { + normalizedFieldNames.add(normalizeColumnName(fieldNameItr.next(), translateFieldNames)); + } + + return normalizedFieldNames; + } + + private String generateInsert(final JsonNode rootNode, final Map attributes, final String tableName, + final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields) { + + final Set normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames); + for (final String requiredColName : schema.getRequiredColumnNames()) { + final String normalizedColName = normalizeColumnName(requiredColName, translateFieldNames); + if (!normalizedFieldNames.contains(normalizedColName)) { + throw new ProcessException("JSON does not have a value for the Required column '" + requiredColName + "'"); + } + } + + final StringBuilder sqlBuilder = new StringBuilder(); + int fieldCount = 0; + sqlBuilder.append("INSERT INTO ").append(tableName).append(" ("); + + // iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as + // adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the + // columns that we are inserting into + final Iterator fieldNames = rootNode.getFieldNames(); + while (fieldNames.hasNext()) { + final String fieldName = fieldNames.next(); + + final ColumnDescription desc = schema.getColumns().get(normalizeColumnName(fieldName, translateFieldNames)); + if (desc == null && !ignoreUnmappedFields) { + throw new ProcessException("Cannot map JSON field '" + fieldName + "' to any column in the database"); + } + + if (desc != null) { + if (fieldCount++ > 0) { + sqlBuilder.append(", "); + } + + sqlBuilder.append(desc.getColumnName()); + + final int sqlType = desc.getDataType(); + attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType)); + + final Integer colSize = desc.getColumnSize(); + final JsonNode fieldNode = rootNode.get(fieldName); + if (!fieldNode.isNull()) { + String fieldValue = fieldNode.asText(); + if (colSize != null && fieldValue.length() > colSize) { + fieldValue = fieldValue.substring(0, colSize); + } + attributes.put("sql.args." + fieldCount + ".value", fieldValue); + } + } + } + + // complete the SQL statements by adding ?'s for all of the values to be escaped. + sqlBuilder.append(") VALUES ("); + for (int i=0; i < fieldCount; i++) { + if (i > 0) { + sqlBuilder.append(", "); + } + + sqlBuilder.append("?"); + } + sqlBuilder.append(")"); + + if (fieldCount == 0) { + throw new ProcessException("None of the fields in the JSON map to the columns defined by the " + tableName + " table"); + } + + return sqlBuilder.toString(); + } + + private String generateUpdate(final JsonNode rootNode, final Map attributes, final String tableName, final String updateKeys, + final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields) { + + final Set updateKeyNames; + if (updateKeys == null) { + updateKeyNames = schema.getPrimaryKeyColumnNames(); + } else { + updateKeyNames = new HashSet<>(); + for (final String updateKey : updateKeys.split(",")) { + updateKeyNames.add(updateKey.trim()); + } + } + + if (updateKeyNames.isEmpty()) { + throw new ProcessException("Table '" + tableName + "' does not have a Primary Key and no Update Keys were specified"); + } + + final StringBuilder sqlBuilder = new StringBuilder(); + int fieldCount = 0; + sqlBuilder.append("UPDATE ").append(tableName).append(" SET "); + + + // Create a Set of all normalized Update Key names, and ensure that there is a field in the JSON + // for each of the Update Key fields. + final Set normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames); + final Set normalizedUpdateNames = new HashSet<>(); + for (final String uk : updateKeyNames) { + final String normalizedUK = normalizeColumnName(uk, translateFieldNames); + normalizedUpdateNames.add(normalizedUK); + + if (!normalizedFieldNames.contains(normalizedUK)) { + throw new ProcessException("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'"); + } + } + + // iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as + // adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the + // columns that we are inserting into + Iterator fieldNames = rootNode.getFieldNames(); + while (fieldNames.hasNext()) { + final String fieldName = fieldNames.next(); + + final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames); + final ColumnDescription desc = schema.getColumns().get(normalizedColName); + + if (desc == null) { + if (ignoreUnmappedFields) { + throw new ProcessException("Cannot map JSON field '" + fieldName + "' to any column in the database"); + } else { + continue; + } + } + + // Check if this column is an Update Key. If so, skip it for now. We will come + // back to it after we finish the SET clause + if (normalizedUpdateNames.contains(normalizedColName)) { + continue; + } + + if (fieldCount++ > 0) { + sqlBuilder.append(", "); + } + + sqlBuilder.append(desc.getColumnName()).append(" = ?"); + final int sqlType = desc.getDataType(); + attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType)); + + final Integer colSize = desc.getColumnSize(); + + final JsonNode fieldNode = rootNode.get(fieldName); + if (!fieldNode.isNull()) { + String fieldValue = rootNode.get(fieldName).asText(); + if (colSize != null && fieldValue.length() > colSize) { + fieldValue = fieldValue.substring(0, colSize); + } + attributes.put("sql.args." + fieldCount + ".value", fieldValue); + } + } + + // Set the WHERE clause based on the Update Key values + sqlBuilder.append(" WHERE "); + + fieldNames = rootNode.getFieldNames(); + int whereFieldCount = 0; + while (fieldNames.hasNext()) { + final String fieldName = fieldNames.next(); + + final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames); + final ColumnDescription desc = schema.getColumns().get(normalizedColName); + if (desc == null) { + continue; + } + + // Check if this column is a Update Key. If so, skip it for now. We will come + // back to it after we finish the SET clause + if (!normalizedUpdateNames.contains(normalizedColName)) { + continue; + } + + if (whereFieldCount++ > 0) { + sqlBuilder.append(" AND "); + } + fieldCount++; + + sqlBuilder.append(normalizedColName).append(" = ?"); + final int sqlType = desc.getDataType(); + attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType)); + + final Integer colSize = desc.getColumnSize(); + String fieldValue = rootNode.get(fieldName).asText(); + if (colSize != null && fieldValue.length() > colSize) { + fieldValue = fieldValue.substring(0, colSize); + } + attributes.put("sql.args." + fieldCount + ".value", fieldValue); + } + + return sqlBuilder.toString(); + } + + private static String normalizeColumnName(final String colName, final boolean translateColumnNames) { + return translateColumnNames ? colName.toUpperCase().replace("_", "") : colName; + } + + private static class TableSchema { + private List requiredColumnNames; + private Set primaryKeyColumnNames; + private Map columns; + + private TableSchema(final List columnDescriptions, final boolean translateColumnNames, + final Set primaryKeyColumnNames) { + this.columns = new HashMap<>(); + this.primaryKeyColumnNames = primaryKeyColumnNames; + + this.requiredColumnNames = new ArrayList<>(); + for (final ColumnDescription desc : columnDescriptions) { + columns.put(ConvertJSONToSQL.normalizeColumnName(desc.columnName, translateColumnNames), desc); + if (desc.isRequired()) { + requiredColumnNames.add(desc.columnName); + } + } + } + + public Map getColumns() { + return columns; + } + + public List getRequiredColumnNames() { + return requiredColumnNames; + } + + public Set getPrimaryKeyColumnNames() { + return primaryKeyColumnNames; + } + + public static TableSchema from(final Connection conn, final String catalog, final String tableName, + final boolean translateColumnNames, final boolean includePrimaryKeys) throws SQLException { + final ResultSet colrs = conn.getMetaData().getColumns(catalog, null, tableName, "%"); + + final List cols = new ArrayList<>(); + while (colrs.next()) { + final ColumnDescription col = ColumnDescription.from(colrs); + cols.add(col); + } + + final Set primaryKeyColumns = new HashSet<>(); + if (includePrimaryKeys) { + final ResultSet pkrs = conn.getMetaData().getPrimaryKeys(catalog, null, tableName); + + while (pkrs.next()) { + final String colName = pkrs.getString("COLUMN_NAME"); + primaryKeyColumns.add(normalizeColumnName(colName, translateColumnNames)); + } + } + + return new TableSchema(cols, translateColumnNames, primaryKeyColumns); + } + } + + private static class ColumnDescription { + private final String columnName; + private final int dataType; + private final boolean required; + private final Integer columnSize; + + private ColumnDescription(final String columnName, final int dataType, final boolean required, final Integer columnSize) { + this.columnName = columnName; + this.dataType = dataType; + this.required = required; + this.columnSize = columnSize; + } + + public int getDataType() { + return dataType; + } + + public Integer getColumnSize() { + return columnSize; + } + + public String getColumnName() { + return columnName; + } + + public boolean isRequired() { + return required; + } + + public static ColumnDescription from(final ResultSet resultSet) throws SQLException { + final String columnName = resultSet.getString("COLUMN_NAME"); + final int dataType = resultSet.getInt("DATA_TYPE"); + final int colSize = resultSet.getInt("COLUMN_SIZE"); + + final String nullableValue = resultSet.getString("IS_NULLABLE"); + final boolean isNullable = "YES".equalsIgnoreCase(nullableValue) || nullableValue.isEmpty(); + final String defaultValue = resultSet.getString("COLUMN_DEF"); + final String autoIncrementValue = resultSet.getString("IS_AUTOINCREMENT"); + final boolean isAutoIncrement = "YES".equalsIgnoreCase(autoIncrementValue); + final boolean required = !isNullable && !isAutoIncrement && defaultValue == null; + + return new ColumnDescription(columnName, dataType, required, colSize == 0 ? null : colSize); + } + } + + private static class SchemaKey { + private final String catalog; + private final String tableName; + + public SchemaKey(final String catalog, final String tableName) { + this.catalog = catalog; + this.tableName = tableName; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((catalog == null) ? 0 : catalog.hashCode()); + result = prime * result + ((tableName == null) ? 0 : tableName.hashCode()); + return result; + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + + final SchemaKey other = (SchemaKey) obj; + if (catalog == null) { + if (other.catalog != null) { + return false; + } + } else if (!catalog.equals(other.catalog)) { + return false; + } + + + if (tableName == null) { + if (other.tableName != null) { + return false; + } + } else if (!tableName.equals(other.tableName)) { + return false; + } + + return true; + } + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java new file mode 100644 index 0000000000..b087737c9b --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java @@ -0,0 +1,920 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.sql.BatchUpdateException; +import java.sql.Connection; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.SQLNonTransientException; +import java.sql.Statement; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.FlowFileFilter; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.StreamUtils; + +@SupportsBatching +@SeeAlso(ConvertJSONToSQL.class) +@Tags({"sql", "put", "rdbms", "database", "update", "insert", "relational"}) +@CapabilityDescription("Executes a SQL UPDATE or INSERT command. The content of an incoming FlowFile is expected to be the SQL command " + + "to execute. The SQL command may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes " + + "with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be " + + "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format.") +@ReadsAttributes({ + @ReadsAttribute(attribute="fragment.identifier", description="If the property is true, this attribute is used to determine whether or " + + "not two FlowFiles belong to the same transaction."), + @ReadsAttribute(attribute="fragment.count", description="If the property is true, this attribute is used to determine how many FlowFiles " + + "are needed to complete the transaction."), + @ReadsAttribute(attribute="fragment.index", description="If the property is true, this attribute is used to determine the order that the FlowFiles " + + "in a transaction should be evaluated."), + @ReadsAttribute(attribute="sql.args.N.type", description="Incoming FlowFiles are expected to be parameterized SQL statements. The type of each Parameter is specified as an integer " + + "that represents the JDBC Type of the parameter."), + @ReadsAttribute(attribute="sql.args.N.value", description="Incoming FlowFiles are expected to be parameterized SQL statements. The value of the Parameters are specified as " + + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute.") +}) +@WritesAttributes({ + @WritesAttribute(attribute="sql.generated.key", description="If the database generated a key for an INSERT statement and the Obtain Generated Keys property is set to true, " + + "this attribute will be added to indicate the generated key, if possible. This feature is not supported by all database vendors.") +}) +public class PutSQL extends AbstractProcessor { + + static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder() + .name("JDBC Connection Pool") + .description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. " + + "The Connection Pool is necessary in order to determine the appropriate database column types.") + .identifiesControllerService(DBCPService.class) + .required(true) + .build(); + static final PropertyDescriptor SUPPORT_TRANSACTIONS = new PropertyDescriptor.Builder() + .name("Support Fragmented Transactions") + .description("If true, when a FlowFile is consumed by this Processor, the Processor will first check the fragment.identifier and fragment.count attributes of that FlowFile. " + + "If the fragment.count value is greater than 1, the Processor will not process any FlowFile will that fragment.identifier until all are available; " + + "at that point, it will process all FlowFiles with that fragment.identifier as a single transaction, in the order specified by the FlowFiles' fragment.index attributes. " + + "This Provides atomicity of those SQL statements. If this value is false, these attributes will be ignored and the updates will occur independent of one another.") + .allowableValues("true", "false") + .defaultValue("true") + .build(); + static final PropertyDescriptor TRANSACTION_TIMEOUT = new PropertyDescriptor.Builder() + .name("Transaction Timeout") + .description("If the property is set to true, specifies how long to wait for all FlowFiles for a particular fragment.identifier attribute " + + "to arrive before just transferring all of the FlowFiles with that identifier to the 'failure' relationship") + .required(false) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("The preferred number of FlowFiles to put to the database in a single transaction") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("100") + .build(); + static final PropertyDescriptor OBTAIN_GENERATED_KEYS = new PropertyDescriptor.Builder() + .name("Obtain Generated Keys") + .description("If true, any key that is automatically generated by the database will be added to the FlowFile that generated it using the sql.generate.key attribute. " + + "This may result in slightly slower performance and is not supported by all databases.") + .allowableValues("true", "false") + .defaultValue("false") + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("A FlowFile is routed to this relationship after the database is successfully updated") + .build(); + static final Relationship REL_RETRY = new Relationship.Builder() + .name("retry") + .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed") + .build(); + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail, " + + "such as an invalid query or an integrity constraint violation") + .build(); + + private static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("sql\\.args\\.(\\d+)\\.type"); + private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+"); + + private static final String FRAGMENT_ID_ATTR = "fragment.identifier"; + private static final String FRAGMENT_INDEX_ATTR = "fragment.index"; + private static final String FRAGMENT_COUNT_ATTR = "fragment.count"; + + @Override + protected List getSupportedPropertyDescriptors() { + final List properties = new ArrayList<>(); + properties.add(CONNECTION_POOL); + properties.add(SUPPORT_TRANSACTIONS); + properties.add(TRANSACTION_TIMEOUT); + properties.add(BATCH_SIZE); + properties.add(OBTAIN_GENERATED_KEYS); + return properties; + } + + @Override + public Set getRelationships() { + final Set rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_RETRY); + rels.add(REL_FAILURE); + return rels; + } + + + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final FlowFilePoll poll = pollFlowFiles(context, session); + if (poll == null) { + return; + } + + final List flowFiles = poll.getFlowFiles(); + if (flowFiles == null) { + return; + } + + final long startNanos = System.nanoTime(); + final boolean obtainKeys = context.getProperty(OBTAIN_GENERATED_KEYS).asBoolean(); + final Map statementMap = new HashMap<>(); // Map SQL to a PreparedStatement and FlowFiles + final List sentFlowFiles = new ArrayList<>(); // flowfiles that have been sent + final List processedFlowFiles = new ArrayList<>(); // all flowfiles that we have processed + final Set enclosuresToExecute = new LinkedHashSet<>(); // the enclosures that we've processed + + // Because we can have a transaction that is necessary across multiple FlowFiles, things get complicated when + // some FlowFiles have been transferred to a relationship and then there is a failure. As a result, we will just + // map all FlowFiles to their destination relationship and do the session.transfer at the end. This way, if there + // is a failure, we can route all FlowFiles to failure if we need to. + final Map destinationRelationships = new HashMap<>(); + + final DBCPService dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class); + try (final Connection conn = dbcpService.getConnection()) { + final boolean originalAutoCommit = conn.getAutoCommit(); + try { + conn.setAutoCommit(false); + + for (final FlowFile flowFile : flowFiles) { + processedFlowFiles.add(flowFile); + final String sql = getSQL(session, flowFile); + + // Get the appropriate PreparedStatement to use. + final StatementFlowFileEnclosure enclosure; + try { + enclosure = getEnclosure(sql, conn, statementMap, obtainKeys, poll.isFragmentedTransaction()); + } catch (final SQLNonTransientException e) { + getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {flowFile, e}); + destinationRelationships.put(flowFile, REL_FAILURE); + continue; + } + + final PreparedStatement stmt = enclosure.getStatement(); + + // set the appropriate parameters on the statement. + try { + setParameters(stmt, flowFile.getAttributes()); + } catch (final SQLException | ProcessException pe) { + getLogger().error("Cannot update database for {} due to {}; routing to failure", new Object[] {flowFile, pe.toString()}, pe); + destinationRelationships.put(flowFile, REL_FAILURE); + continue; + } + + // If we need to obtain keys, we cannot do so in a a Batch Update. So we have to execute the statement and close it. + if (obtainKeys) { + try { + // Execute the actual update. + stmt.executeUpdate(); + + // attempt to determine the key that was generated, if any. This is not supported by all + // database vendors, so if we cannot determine the generated key (or if the statement is not an INSERT), + // we will just move on without setting the attribute. + FlowFile sentFlowFile = flowFile; + final String generatedKey = determineGeneratedKey(stmt); + if (generatedKey != null) { + sentFlowFile = session.putAttribute(sentFlowFile, "sql.generated.key", generatedKey); + } + + stmt.close(); + sentFlowFiles.add(sentFlowFile); + } catch (final SQLNonTransientException e) { + getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {flowFile, e}); + destinationRelationships.put(flowFile, REL_FAILURE); + continue; + } catch (final SQLException e) { + getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", new Object[] {flowFile, e}); + destinationRelationships.put(flowFile, REL_RETRY); + continue; + } + } else { + // We don't need to obtain keys. Just add the statement to the batch. + stmt.addBatch(); + enclosure.addFlowFile(flowFile); + enclosuresToExecute.add(enclosure); + } + } + + // If we are not trying to obtain the generated keys, we will have + // PreparedStatement's that have batches added to them. We need to execute each batch and close + // the PreparedStatement. + for (final StatementFlowFileEnclosure enclosure : enclosuresToExecute) { + try { + final PreparedStatement stmt = enclosure.getStatement(); + stmt.executeBatch(); + sentFlowFiles.addAll(enclosure.getFlowFiles()); + } catch (final BatchUpdateException e) { + // If we get a BatchUpdateException, then we want to determine which FlowFile caused the failure, + // and route that FlowFile to failure while routing those that finished processing to success and those + // that have not yet been executed to retry. If the FlowFile was + // part of a fragmented transaction, then we must roll back all updates for this connection, because + // other statements may have been successful and been part of this transaction. + final int[] updateCounts = e.getUpdateCounts(); + final int offendingFlowFileIndex = updateCounts.length; + final List batchFlowFiles = enclosure.getFlowFiles(); + + if (poll.isFragmentedTransaction()) { + // There are potentially multiple statements for this one transaction. As a result, + // we need to roll back the entire transaction and route all of the FlowFiles to failure. + conn.rollback(); + final FlowFile offendingFlowFile = batchFlowFiles.get(offendingFlowFileIndex); + getLogger().error("Failed to update database due to a failed batch update. A total of {} FlowFiles are required for this transaction, so routing all to failure. " + + "Offending FlowFile was {}, which caused the following error: {}", new Object[] {flowFiles.size(), offendingFlowFile, e}); + session.transfer(flowFiles, REL_FAILURE); + return; + } + + // In the presence of a BatchUpdateException, the driver has the option of either stopping when an error + // occurs, or continuing. If it continues, then it must account for all statements in the batch and for + // those that fail return a Statement.EXECUTE_FAILED for the number of rows updated. + // So we will iterate over all of the update counts returned. If any is equal to Statement.EXECUTE_FAILED, + // we will route the corresponding FlowFile to failure. Otherwise, the FlowFile will go to success + // unless it has not yet been processed (its index in the List > updateCounts.length). + int failureCount = 0; + int successCount = 0; + int retryCount = 0; + for (int i=0; i < updateCounts.length; i++) { + final int updateCount = updateCounts[i]; + final FlowFile flowFile = batchFlowFiles.get(i); + if (updateCount == Statement.EXECUTE_FAILED) { + destinationRelationships.put(flowFile, REL_FAILURE); + failureCount++; + } else { + destinationRelationships.put(flowFile, REL_SUCCESS); + successCount++; + } + } + + if (failureCount == 0) { + // if no failures found, the driver decided not to execute the statements after the + // failure, so route the last one to failure. + final FlowFile failedFlowFile = batchFlowFiles.get(updateCounts.length); + destinationRelationships.put(failedFlowFile, REL_FAILURE); + failureCount++; + } + + if (updateCounts.length < batchFlowFiles.size()) { + final List unexecuted = batchFlowFiles.subList(updateCounts.length + 1, batchFlowFiles.size()); + for (final FlowFile flowFile : unexecuted) { + destinationRelationships.put(flowFile, REL_RETRY); + retryCount++; + } + } + + getLogger().error("Failed to update database due to a failed batch update. There were a total of {} FlowFiles that failed, {} that succeeded, " + + "and {} that were not execute and will be routed to retry; ", new Object[] {failureCount, successCount, retryCount}); + } catch (final SQLNonTransientException e) { + getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {enclosure.getFlowFiles(), e}); + + for (final FlowFile flowFile : enclosure.getFlowFiles()) { + destinationRelationships.put(flowFile, REL_FAILURE); + } + continue; + } catch (final SQLException e) { + getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", + new Object[] {enclosure.getFlowFiles(), e}); + + for (final FlowFile flowFile : enclosure.getFlowFiles()) { + destinationRelationships.put(flowFile, REL_RETRY); + } + continue; + } finally { + enclosure.getStatement().close(); + } + } + } finally { + try { + conn.commit(); + } finally { + // make sure that we try to set the auto commit back to whatever it was. + if (originalAutoCommit) { + try { + conn.setAutoCommit(originalAutoCommit); + } catch (final SQLException se) { + } + } + } + } + + // Determine the database URL + String url = "jdbc://unknown-host"; + try { + url = conn.getMetaData().getURL(); + } catch (final SQLException sqle) { + } + + // Emit a Provenance SEND event + final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + for (final FlowFile flowFile : sentFlowFiles) { + session.getProvenanceReporter().send(flowFile, url, transmissionMillis, true); + } + + for (final FlowFile flowFile : sentFlowFiles) { + destinationRelationships.put(flowFile, REL_SUCCESS); + } + } catch (final SQLException e) { + // Failed FlowFiles are all of them that we have processed minus those that were successfully sent + final List failedFlowFiles = processedFlowFiles; + failedFlowFiles.removeAll(sentFlowFiles); + + // All FlowFiles yet to be processed is all FlowFiles minus those processed + final List retry = flowFiles; + retry.removeAll(processedFlowFiles); + + final Relationship rel; + if (e instanceof SQLNonTransientException) { + getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {failedFlowFiles, e}); + rel = REL_FAILURE; + } else { + getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", new Object[] {failedFlowFiles, e}); + rel = REL_RETRY; + } + + for (final FlowFile flowFile : failedFlowFiles) { + destinationRelationships.put(flowFile, rel); + } + + for (final FlowFile flowFile : retry) { + destinationRelationships.put(flowFile, Relationship.SELF); + } + } + + for (final Map.Entry entry : destinationRelationships.entrySet()) { + session.transfer(entry.getKey(), entry.getValue()); + } + } + + + /** + * Pulls a batch of FlowFiles from the incoming queues. If no FlowFiles are available, returns null. + * Otherwise, a List of FlowFiles will be returned. + * + * If all FlowFiles pulled are not eligible to be processed, the FlowFiles will be penalized and transferred back + * to the input queue and an empty List will be returned. + * + * Otherwise, if the Support Fragmented Transactions property is true, all FlowFiles that belong to the same + * transaction will be sorted in the order that they should be evaluated. + * + * @param context the process context for determining properties + * @param session the process session for pulling flowfiles + * @return a FlowFilePoll containing a List of FlowFiles to process, or null if there are no FlowFiles to process + */ + private FlowFilePoll pollFlowFiles(final ProcessContext context, final ProcessSession session) { + // Determine which FlowFile Filter to use in order to obtain FlowFiles. + final boolean useTransactions = context.getProperty(SUPPORT_TRANSACTIONS).asBoolean(); + boolean fragmentedTransaction = false; + + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + List flowFiles; + if (useTransactions) { + final TransactionalFlowFileFilter filter = new TransactionalFlowFileFilter(); + flowFiles = session.get(filter); + fragmentedTransaction = filter.isFragmentedTransaction(); + } else { + flowFiles = session.get(batchSize); + } + + if (flowFiles.isEmpty()) { + return null; + } + + // If we are supporting fragmented transactions, verify that all FlowFiles are correct + if (fragmentedTransaction) { + final Relationship relationship = determineRelationship(flowFiles, context.getProperty(TRANSACTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS)); + if (relationship != null) { + // if transferring back to self, penalize the FlowFiles. + if (relationship == Relationship.SELF) { + // penalize all of the FlowFiles that we are going to route to SELF. + final ListIterator itr = flowFiles.listIterator(); + while (itr.hasNext()) { + final FlowFile flowFile = itr.next(); + final FlowFile penalized = session.penalize(flowFile); + itr.remove(); + itr.add(penalized); + } + } + + session.transfer(flowFiles, relationship); + return null; + } + + // sort by fragment index. + Collections.sort(flowFiles, new Comparator() { + @Override + public int compare(final FlowFile o1, final FlowFile o2) { + return Integer.compare(Integer.parseInt(o1.getAttribute(FRAGMENT_INDEX_ATTR)), Integer.parseInt(o2.getAttribute(FRAGMENT_INDEX_ATTR))); + } + }); + } + + return new FlowFilePoll(flowFiles, fragmentedTransaction); + } + + + /** + * Returns the key that was generated from the given statement, or null if no key + * was generated or it could not be determined. + * + * @param stmt the statement that generated a key + * @return the key that was generated from the given statement, or null if no key + * was generated or it could not be determined. + */ + private String determineGeneratedKey(final PreparedStatement stmt) { + try { + final ResultSet generatedKeys = stmt.getGeneratedKeys(); + if (generatedKeys != null && generatedKeys.next()) { + return generatedKeys.getString(1); + } + } catch (final SQLException sqle) { + // This is not supported by all vendors. This is a best-effort approach. + } + + return null; + } + + + /** + * Returns the StatementFlowFileEnclosure that should be used for executing the given SQL statement + * + * @param sql the SQL to execute + * @param conn the connection from which a PreparedStatement can be created + * @param stmtMap the existing map of SQL to PreparedStatements + * @param obtainKeys whether or not we need to obtain generated keys for INSERT statements + * @param fragmentedTransaction whether or not the SQL pertains to a fragmented transaction + * + * @return a StatementFlowFileEnclosure to use for executing the given SQL statement + * + * @throws SQLException if unable to create the appropriate PreparedStatement + */ + private StatementFlowFileEnclosure getEnclosure(final String sql, final Connection conn, final Map stmtMap, + final boolean obtainKeys, final boolean fragmentedTransaction) throws SQLException { + StatementFlowFileEnclosure enclosure = stmtMap.get(sql); + if (enclosure != null) { + return enclosure; + } + + if (obtainKeys) { + // Create a new Prepared Statement, requesting that it return the generated keys. + PreparedStatement stmt = conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS); + + if (stmt == null) { + // since we are passing Statement.RETURN_GENERATED_KEYS, calls to conn.prepareStatement will + // in some cases (at least for DerbyDB) return null. + // We will attempt to recompile the statement without the generated keys being returned. + stmt = conn.prepareStatement(sql); + } + + // If we need to obtain keys, then we cannot do a Batch Update. In this case, + // we don't need to store the PreparedStatement in the Map because we aren't + // doing an addBatch/executeBatch. Instead, we will use the statement once + // and close it. + return new StatementFlowFileEnclosure(stmt); + } else if (fragmentedTransaction) { + // We cannot use Batch Updates if we have a transaction that spans multiple FlowFiles. + // If we did, we could end up processing the statements out of order. It's quite possible + // that we could refactor the code some to allow for this, but as it is right now, this + // could cause problems. This is because we have a Map. + // If we had a transaction that needed to execute Stmt A with some parameters, then Stmt B with + // some parameters, then Stmt A with different parameters, this would become problematic because + // the executeUpdate would be evaluated first for Stmt A (the 1st and 3rd statements, and then + // the second statement would be evaluated). + final PreparedStatement stmt = conn.prepareStatement(sql); + return new StatementFlowFileEnclosure(stmt); + } + + final PreparedStatement stmt = conn.prepareStatement(sql); + enclosure = new StatementFlowFileEnclosure(stmt); + stmtMap.put(sql, enclosure); + return enclosure; + } + + + /** + * Determines the SQL statement that should be executed for the given FlowFile + * + * @param session the session that can be used to access the given FlowFile + * @param flowFile the FlowFile whose SQL statement should be executed + * + * @return the SQL that is associated with the given FlowFile + */ + private String getSQL(final ProcessSession session, final FlowFile flowFile) { + // Read the SQL from the FlowFile's content + final byte[] buffer = new byte[(int) flowFile.getSize()]; + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, buffer); + } + }); + + // Create the PreparedStatement to use for this FlowFile. + final String sql = new String(buffer, StandardCharsets.UTF_8); + return sql; + } + + + /** + * Sets all of the appropriate parameters on the given PreparedStatement, based on the given FlowFile attributes. + * + * @param stmt the statement to set the parameters on + * @param attributes the attributes from which to derive parameter indices, values, and types + * @throws SQLException if the PreparedStatement throws a SQLException when the appropriate setter is called + */ + private void setParameters(final PreparedStatement stmt, final Map attributes) throws SQLException { + for (final Map.Entry entry : attributes.entrySet()) { + final String key = entry.getKey(); + final Matcher matcher = SQL_TYPE_ATTRIBUTE_PATTERN.matcher(key); + if (matcher.matches()) { + final int parameterIndex = Integer.parseInt(matcher.group(1)); + + final boolean isNumeric = NUMBER_PATTERN.matcher(entry.getValue()).matches(); + if (!isNumeric) { + throw new ProcessException("Value of the " + key + " attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral type"); + } + + final int jdbcType = Integer.parseInt(entry.getValue()); + final String valueAttrName = "sql.args." + parameterIndex + ".value"; + final String parameterValue = attributes.get(valueAttrName); + + try { + setParameter(stmt, valueAttrName, parameterIndex, parameterValue, jdbcType); + } catch (final NumberFormatException nfe) { + throw new ProcessException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted into the necessary data type", nfe); + } + } + } + } + + + /** + * Determines which relationship the given FlowFiles should go to, based on a transaction timing out or + * transaction information not being present. If the FlowFiles should be processed and not transferred + * to any particular relationship yet, will return null + * + * @param flowFiles the FlowFiles whose relationship is to be determined + * @param transactionTimeoutMillis the maximum amount of time (in milliseconds) that we should wait + * for all FlowFiles in a transaction to be present before routing to failure + * @return the appropriate relationship to route the FlowFiles to, or null if the FlowFiles + * should instead be processed + */ + Relationship determineRelationship(final List flowFiles, final Long transactionTimeoutMillis) { + int selectedNumFragments = 0; + final BitSet bitSet = new BitSet(); + + for (final FlowFile flowFile : flowFiles) { + final String fragmentCount = flowFile.getAttribute(FRAGMENT_COUNT_ATTR); + if (fragmentCount == null && flowFiles.size() == 1) { + return null; + } else if (fragmentCount == null) { + getLogger().error("Cannot process {} because there are {} FlowFiles with the same fragment.identifier " + + "attribute but not all FlowFiles have a fragment.count attribute; routing all to failure", new Object[] {flowFile, flowFiles.size()}); + return REL_FAILURE; + } + + final int numFragments; + try { + numFragments = Integer.parseInt(fragmentCount); + } catch (final NumberFormatException nfe) { + getLogger().error("Cannot process {} because the fragment.count attribute has a value of '{}', which is not an integer; " + + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount}); + return REL_FAILURE; + } + + if (numFragments < 1) { + getLogger().error("Cannot process {} because the fragment.count attribute has a value of '{}', which is not a positive integer; " + + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount}); + return REL_FAILURE; + } + + if (selectedNumFragments == 0) { + selectedNumFragments = numFragments; + } else if (numFragments != selectedNumFragments) { + getLogger().error("Cannot process {} because the fragment.count attribute has different values for different FlowFiles with the same fragment.identifier; " + + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile}); + return REL_FAILURE; + } + + final String fragmentIndex = flowFile.getAttribute(FRAGMENT_INDEX_ATTR); + if (fragmentIndex == null) { + getLogger().error("Cannot process {} because the fragment.index attribute is missing; " + + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile}); + return REL_FAILURE; + } + + final int idx; + try { + idx = Integer.parseInt(fragmentIndex); + } catch (final NumberFormatException nfe) { + getLogger().error("Cannot process {} because the fragment.index attribute has a value of '{}', which is not an integer; " + + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex}); + return REL_FAILURE; + } + + if (idx < 0) { + getLogger().error("Cannot process {} because the fragment.index attribute has a value of '{}', which is not a positive integer; " + + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex}); + return REL_FAILURE; + } + + if (bitSet.get(idx)) { + getLogger().error("Cannot process {} because it has the same value for the fragment.index attribute as another FlowFile with the same fragment.identifier; " + + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile}); + return REL_FAILURE; + } + + bitSet.set(idx); + } + + if (selectedNumFragments == flowFiles.size()) { + return null; // no relationship to route FlowFiles to yet - process the FlowFiles. + } + + long latestQueueTime = 0L; + for (final FlowFile flowFile : flowFiles) { + if (flowFile.getLastQueueDate() != null && flowFile.getLastQueueDate() > latestQueueTime) { + latestQueueTime = flowFile.getLastQueueDate(); + } + } + + if (transactionTimeoutMillis != null) { + if (latestQueueTime > 0L && System.currentTimeMillis() - latestQueueTime > transactionTimeoutMillis) { + getLogger().error("The transaction timeout has expired for the following FlowFiles; they will be routed to failure: {}", new Object[] {flowFiles}); + return REL_FAILURE; + } + } + + getLogger().debug("Not enough FlowFiles for transaction. Returning all FlowFiles to queue"); + return Relationship.SELF; // not enough FlowFiles for this transaction. Return them all to queue. + } + + /** + * Determines how to map the given value to the appropriate JDBC data type and sets the parameter on the + * provided PreparedStatement + * + * @param stmt the PreparedStatement to set the parameter on + * @param attrName the name of the attribute that the parameter is coming from - for logging purposes + * @param parameterIndex the index of the SQL parameter to set + * @param parameterValue the value of the SQL parameter to set + * @param jdbcType the JDBC Type of the SQL parameter to set + * @throws SQLException if the PreparedStatement throws a SQLException when calling the appropriate setter + */ + private void setParameter(final PreparedStatement stmt, final String attrName, final int parameterIndex, final String parameterValue, final int jdbcType) throws SQLException { + if (parameterValue == null) { + stmt.setNull(parameterIndex, jdbcType); + } else { + switch (jdbcType) { + case Types.BIT: + stmt.setBoolean(parameterIndex, Boolean.parseBoolean(parameterValue)); + break; + case Types.TINYINT: + stmt.setByte(parameterIndex, Byte.parseByte(parameterValue)); + break; + case Types.SMALLINT: + stmt.setShort(parameterIndex, Short.parseShort(parameterValue)); + break; + case Types.INTEGER: + stmt.setInt(parameterIndex, Integer.parseInt(parameterValue)); + break; + case Types.BIGINT: + stmt.setLong(parameterIndex, Long.parseLong(parameterValue)); + break; + case Types.REAL: + stmt.setFloat(parameterIndex, Float.parseFloat(parameterValue)); + break; + case Types.FLOAT: + case Types.DOUBLE: + stmt.setDouble(parameterIndex, Double.parseDouble(parameterValue)); + break; + case Types.DATE: + stmt.setDate(parameterIndex, new Date(Long.parseLong(parameterValue))); + break; + case Types.TIME: + stmt.setTime(parameterIndex, new Time(Long.parseLong(parameterValue))); + break; + case Types.TIMESTAMP: + stmt.setTimestamp(parameterIndex, new Timestamp(Long.parseLong(parameterValue))); + break; + case Types.CHAR: + case Types.VARCHAR: + case Types.LONGNVARCHAR: + case Types.LONGVARCHAR: + stmt.setString(parameterIndex, parameterValue); + break; + default: + throw new SQLException("The '" + attrName + "' attribute has a value of '" + parameterValue + + "' and a type of '" + jdbcType + "' but this is not a known data type"); + } + } + } + + + /** + * A FlowFileFilter that is responsible for ensuring that the FlowFiles returned either belong + * to the same "fragmented transaction" (i.e., 1 transaction whose information is fragmented + * across multiple FlowFiles) or that none of the FlowFiles belongs to a fragmented transaction + */ + static class TransactionalFlowFileFilter implements FlowFileFilter { + private String selectedId = null; + private int numSelected = 0; + private boolean ignoreFragmentIdentifiers = false; + + public boolean isFragmentedTransaction() { + return !ignoreFragmentIdentifiers; + } + + @Override + public FlowFileFilterResult filter(final FlowFile flowFile) { + final String fragmentId = flowFile.getAttribute(FRAGMENT_ID_ATTR); + final String fragCount = flowFile.getAttribute(FRAGMENT_COUNT_ATTR); + + // if first FlowFile selected is not part of a fragmented transaction, then + // we accept any FlowFile that is also not part of a fragmented transaction. + if (ignoreFragmentIdentifiers) { + if (fragmentId == null || "1".equals(fragCount)) { + return FlowFileFilterResult.ACCEPT_AND_CONTINUE; + } else { + return FlowFileFilterResult.REJECT_AND_CONTINUE; + } + } + + if (fragmentId == null || "1".equals(fragCount)) { + if (selectedId == null) { + // Only one FlowFile in the transaction. + ignoreFragmentIdentifiers = true; + return FlowFileFilterResult.ACCEPT_AND_CONTINUE; + } else { + // we've already selected 1 FlowFile, and this one doesn't match. + return FlowFileFilterResult.REJECT_AND_CONTINUE; + } + } + + if (selectedId == null) { + // select this fragment id as the chosen one. + selectedId = fragmentId; + numSelected++; + return FlowFileFilterResult.ACCEPT_AND_CONTINUE; + } + + if (selectedId.equals(fragmentId)) { + // fragment id's match. Find out if we have all of the necessary fragments or not. + final int numFragments; + if (NUMBER_PATTERN.matcher(fragCount).matches()) { + numFragments = Integer.parseInt(fragCount); + } else { + numFragments = Integer.MAX_VALUE; + } + + if (numSelected >= numFragments - 1) { + // We have all of the fragments we need for this transaction. + return FlowFileFilterResult.ACCEPT_AND_TERMINATE; + } else { + // We still need more fragments for this transaction, so accept this one and continue. + numSelected++; + return FlowFileFilterResult.ACCEPT_AND_CONTINUE; + } + } else { + return FlowFileFilterResult.REJECT_AND_CONTINUE; + } + } + } + + + /** + * A simple, immutable data structure to hold a List of FlowFiles and an indicator as to whether + * or not those FlowFiles represent a "fragmented transaction" - that is, a collection of FlowFiles + * that all must be executed as a single transaction (we refer to it as a fragment transaction + * because the information for that transaction, including SQL and the parameters, is fragmented + * across multiple FlowFiles). + */ + private static class FlowFilePoll { + private final List flowFiles; + private final boolean fragmentedTransaction; + + public FlowFilePoll(final List flowFiles, final boolean fragmentedTransaction) { + this.flowFiles = flowFiles; + this.fragmentedTransaction = fragmentedTransaction; + } + + public List getFlowFiles() { + return flowFiles; + } + + public boolean isFragmentedTransaction() { + return fragmentedTransaction; + } + } + + + /** + * A simple, immutable data structure to hold a Prepared Statement and a List of FlowFiles + * for which that statement should be evaluated. + */ + private static class StatementFlowFileEnclosure { + private final PreparedStatement statement; + private final List flowFiles = new ArrayList<>(); + + public StatementFlowFileEnclosure(final PreparedStatement statement) { + this.statement = statement; + } + + public PreparedStatement getStatement() { + return statement; + } + + public List getFlowFiles() { + return flowFiles; + } + + public void addFlowFile(final FlowFile flowFile) { + this.flowFiles.add(flowFile); + } + + @Override + public int hashCode() { + return statement.hashCode(); + } + + @Override + public boolean equals(final Object obj) { + if (obj == null) { + return false; + } + if (obj == this) { + return false; + } + if (!(obj instanceof StatementFlowFileEnclosure)) { + return false; + } + + final StatementFlowFileEnclosure other = (StatementFlowFileEnclosure) obj; + return statement.equals(other.getStatement()); + } + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java new file mode 100644 index 0000000000..63073e6334 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java @@ -0,0 +1,431 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestConvertJSONToSQL { + static String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)"; + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @BeforeClass + public static void setup() { + System.setProperty("derby.stream.error.file", "target/derby.log"); + } + + @Test + public void testInsert() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.1.value", "1"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.2.value", "Mark"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.3.value", "48"); + + out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)"); + } + + @Test + public void testInsertWithNullValue() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-null-code.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.1.value", "1"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.2.value", "Mark"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeNotExists("sql.args.3.value"); + + out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)"); + } + + + @Test + public void testUpdateWithNullValue() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-null-code.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.1.value", "Mark"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeNotExists("sql.args.2.value"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.3.value", "1"); + + out.assertContentEquals("UPDATE PERSONS SET NAME = ?, CODE = ? WHERE ID = ?"); + } + + + @Test + public void testMultipleInserts() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/persons.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 5); + final List mffs = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL); + for (final MockFlowFile mff : mffs) { + mff.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)"); + + for (int i=1; i <= 3; i++) { + mff.assertAttributeExists("sql.args." + i + ".type"); + mff.assertAttributeExists("sql.args." + i + ".value"); + } + } + } + + @Test + public void testUpdateBasedOnPrimaryKey() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.1.value", "Mark"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.2.value", "48"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.3.value", "1"); + + out.assertContentEquals("UPDATE PERSONS SET NAME = ?, CODE = ? WHERE ID = ?"); + } + + @Test + public void testUnmappedFieldBehavior() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); + runner.setProperty(ConvertJSONToSQL.UNMATCHED_FIELD_BEHAVIOR, ConvertJSONToSQL.IGNORE_UNMATCHED_FIELD.getValue()); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-extra-field.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + + out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)"); + + runner.clearTransferState(); + runner.setProperty(ConvertJSONToSQL.UNMATCHED_FIELD_BEHAVIOR, ConvertJSONToSQL.FAIL_UNMATCHED_FIELD.getValue()); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-extra-field.json")); + runner.run(); + runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1); + } + + @Test + public void testUpdateBasedOnUpdateKey() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); + runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "code"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.1.value", "1"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.2.value", "Mark"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.3.value", "48"); + + out.assertContentEquals("UPDATE PERSONS SET ID = ?, NAME = ? WHERE CODE = ?"); + } + + @Test + public void testUpdateBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); + runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.1.value", "1"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.2.value", "Mark"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.3.value", "48"); + + out.assertContentEquals("UPDATE PERSONS SET ID = ? WHERE NAME = ? AND CODE = ?"); + } + + @Test + public void testUpdateWithMissingFieldBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); + runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-without-code.json")); + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1); + } + + @Test + public void testUpdateWithMalformedJson() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); + runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/malformed-person-extra-comma.json")); + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1); + } + + @Test + public void testInsertWithMissingField() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); + runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-without-id.json")); + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1); + } + + + /** + * Simple implementation only for testing purposes + */ + private static class MockDBCPService extends AbstractControllerService implements DBCPService { + private final String dbLocation; + + public MockDBCPService(final String dbLocation) { + this.dbLocation = dbLocation; + } + + @Override + public String getIdentifier() { + return "dbcp"; + } + + @Override + public Connection getConnection() throws ProcessException { + try { + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + final Connection con = DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true"); + return con; + } catch (final Exception e) { + e.printStackTrace(); + throw new ProcessException("getConnection failed: " + e); + } + } + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java new file mode 100644 index 0000000000..a348c9e60f --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java @@ -0,0 +1,664 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Types; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; + +public class TestPutSQL { + private static final String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)"; + private static final String createPersonsAutoId = "CREATE TABLE PERSONS (id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1), name VARCHAR(100), code INTEGER check(code <= 100))"; + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @BeforeClass + public static void setup() { + System.setProperty("derby.stream.error.file", "target/derby.log"); + } + + @Test + public void testDirectStatements() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 'Mark', 84)".getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Mark", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + + runner.enqueue("UPDATE PERSONS SET NAME='George' WHERE ID=1".getBytes()); + runner.run(); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("George", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + } + + @Test + public void testInsertWithGeneratedKeys() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersonsAutoId); + } + } + + runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "true"); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', 84)".getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); + final MockFlowFile mff = runner.getFlowFilesForRelationship(PutSQL.REL_SUCCESS).get(0); + mff.assertAttributeEquals("sql.generated.key", "1"); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Mark", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + } + + + @Test + public void testFailInMiddleWithBadStatement() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersonsAutoId); + } + } + + runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false"); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', 84)".getBytes()); + runner.enqueue("INSERT INTO PERSONS".getBytes()); // intentionally wrong syntax + runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Tom', 3)".getBytes()); + runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Harry', 44)".getBytes()); + runner.run(); + + runner.assertTransferCount(PutSQL.REL_FAILURE, 1); + runner.assertTransferCount(PutSQL.REL_SUCCESS, 3); + } + + + @Test + public void testFailInMiddleWithBadParameterType() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersonsAutoId); + } + } + + runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false"); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + + final Map goodAttributes = new HashMap<>(); + goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + goodAttributes.put("sql.args.1.value", "84"); + + final Map badAttributes = new HashMap<>(); + badAttributes.put("sql.args.1.type", String.valueOf(Types.VARCHAR)); + badAttributes.put("sql.args.1.value", "hello"); + + final byte[] data = "INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', ?)".getBytes(); + runner.enqueue(data, goodAttributes); + runner.enqueue(data, badAttributes); + runner.enqueue(data, goodAttributes); + runner.enqueue(data, goodAttributes); + runner.run(); + + runner.assertTransferCount(PutSQL.REL_FAILURE, 1); + runner.assertTransferCount(PutSQL.REL_SUCCESS, 3); + } + + + @Test + public void testFailInMiddleWithBadParameterValue() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersonsAutoId); + } + } + + runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false"); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + + final Map goodAttributes = new HashMap<>(); + goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + goodAttributes.put("sql.args.1.value", "84"); + + final Map badAttributes = new HashMap<>(); + badAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + badAttributes.put("sql.args.1.value", "9999"); + + final byte[] data = "INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', ?)".getBytes(); + runner.enqueue(data, goodAttributes); + runner.enqueue(data, badAttributes); + runner.enqueue(data, goodAttributes); + runner.enqueue(data, goodAttributes); + runner.run(); + + runner.assertTransferCount(PutSQL.REL_SUCCESS, 1); + runner.assertTransferCount(PutSQL.REL_FAILURE, 1); + runner.assertTransferCount(PutSQL.REL_RETRY, 2); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Mark", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + } + + + + @Test + public void testStatementsWithPreparedParameters() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + final Map attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.1.value", "1"); + + attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("sql.args.2.value", "Mark"); + + attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.3.value", "84"); + + runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Mark", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + + runner.clearTransferState(); + + attributes.clear(); + attributes.put("sql.args.1.type", String.valueOf(Types.VARCHAR)); + attributes.put("sql.args.1.value", "George"); + + attributes.put("sql.args.2.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.2.value", "1"); + + runner.enqueue("UPDATE PERSONS SET NAME=? WHERE ID=?".getBytes(), attributes); + runner.run(); + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("George", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + } + + + @Test + public void testMultipleStatementsWithinFlowFile() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + + final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + + "UPDATE PERSONS SET NAME='George' WHERE ID=?; "; + final Map attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.1.value", "1"); + + attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("sql.args.2.value", "Mark"); + + attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.3.value", "84"); + + attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.4.value", "1"); + + runner.enqueue(sql.getBytes(), attributes); + runner.run(); + + // should fail because of the semicolon + runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertFalse(rs.next()); + } + } + } + + + @Test + public void testWithNullParameter() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + final Map attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.1.value", "1"); + + attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("sql.args.2.value", "Mark"); + + attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); + + runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Mark", rs.getString(2)); + assertEquals(0, rs.getInt(3)); + assertFalse(rs.next()); + } + } + } + + @Test + public void testInvalidStatement() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + + final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + + "UPDATE SOME_RANDOM_TABLE NAME='George' WHERE ID=?; "; + final Map attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.1.value", "1"); + + attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("sql.args.2.value", "Mark"); + + attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.3.value", "84"); + + attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.4.value", "1"); + + runner.enqueue(sql.getBytes(), attributes); + runner.run(); + + // should fail because of the semicolon + runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertFalse(rs.next()); + } + } + } + + + @Test + public void testRetryableFailure() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final DBCPService service = new SQLExceptionService(null); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + + final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + + "UPDATE PERSONS SET NAME='George' WHERE ID=?; "; + final Map attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.1.value", "1"); + + attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("sql.args.2.value", "Mark"); + + attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.3.value", "84"); + + attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.4.value", "1"); + + runner.enqueue(sql.getBytes(), attributes); + runner.run(); + + // should fail because of the semicolon + runner.assertAllFlowFilesTransferred(PutSQL.REL_RETRY, 1); + } + + + @Test + public void testMultipleFlowFilesSuccessfulInTransaction() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(PutSQL.BATCH_SIZE, "1"); + + final Map attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.1.value", "1"); + + attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("sql.args.2.value", "Mark"); + + attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.3.value", "84"); + + attributes.put("fragment.identifier", "1"); + attributes.put("fragment.count", "2"); + attributes.put("fragment.index", "0"); + runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes); + runner.run(); + + // No FlowFiles should be transferred because there were not enough flowfiles with the same fragment identifier + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 0); + + attributes.clear(); + attributes.put("fragment.identifier", "1"); + attributes.put("fragment.count", "2"); + attributes.put("fragment.index", "1"); + + runner.clearTransferState(); + runner.enqueue("UPDATE PERSONS SET NAME='Leonard' WHERE ID=1".getBytes(), attributes); + runner.run(); + + // Both FlowFiles with fragment identifier 1 should be successful + runner.assertTransferCount(PutSQL.REL_SUCCESS, 2); + runner.assertTransferCount(PutSQL.REL_FAILURE, 0); + runner.assertTransferCount(PutSQL.REL_RETRY, 0); + for (final MockFlowFile mff : runner.getFlowFilesForRelationship(PutSQL.REL_SUCCESS)) { + mff.assertAttributeEquals("fragment.identifier", "1"); + } + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Leonard", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + } + + + @Test + public void testTransactionTimeout() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + runner.setProperty(PutSQL.TRANSACTION_TIMEOUT, "5 secs"); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + final Map attributes = new HashMap<>(); + attributes.put("fragment.identifier", "1"); + attributes.put("fragment.count", "2"); + attributes.put("fragment.index", "0"); + + final MockFlowFile mff = new MockFlowFile(0L) { + @Override + public Long getLastQueueDate() { + return System.currentTimeMillis() - 10000L; // return 10 seconds ago + } + + @Override + public Map getAttributes() { + return attributes; + } + + @Override + public String getAttribute(final String attrName) { + return attributes.get(attrName); + } + }; + + runner.enqueue(mff); + runner.run(); + + // No FlowFiles should be transferred because there were not enough flowfiles with the same fragment identifier + runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1); + } + + /** + * Simple implementation only for testing purposes + */ + private static class MockDBCPService extends AbstractControllerService implements DBCPService { + private final String dbLocation; + + public MockDBCPService(final String dbLocation) { + this.dbLocation = dbLocation; + } + + @Override + public String getIdentifier() { + return "dbcp"; + } + + @Override + public Connection getConnection() throws ProcessException { + try { + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + final Connection conn = DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true"); + return conn; + } catch (final Exception e) { + e.printStackTrace(); + throw new ProcessException("getConnection failed: " + e); + } + } + } + + /** + * Simple implementation only for testing purposes + */ + private static class SQLExceptionService extends AbstractControllerService implements DBCPService { + private final DBCPService service; + private int allowedBeforeFailure = 0; + private int successful = 0; + + public SQLExceptionService(final DBCPService service) { + this.service = service; + } + + @Override + public String getIdentifier() { + return "dbcp"; + } + + @Override + public Connection getConnection() throws ProcessException { + try { + if (++successful > allowedBeforeFailure) { + final Connection conn = Mockito.mock(Connection.class); + Mockito.when(conn.prepareStatement(Mockito.any(String.class))).thenThrow(new SQLException("Unit Test Generated SQLException")); + return conn; + } else { + return service.getConnection(); + } + } catch (final Exception e) { + e.printStackTrace(); + throw new ProcessException("getConnection failed: " + e); + } + } + } +} diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/malformed-person-extra-comma.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/malformed-person-extra-comma.json similarity index 100% rename from nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/malformed-person-extra-comma.json rename to nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/malformed-person-extra-comma.json diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-1.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-1.json similarity index 100% rename from nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-1.json rename to nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-1.json diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-with-extra-field.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-with-extra-field.json new file mode 100644 index 0000000000..3ebd5874df --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-with-extra-field.json @@ -0,0 +1,6 @@ +{ + "id": 1, + "name": "Mark", + "code": 48, + "extra": "another" +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-with-null-code.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-with-null-code.json new file mode 100644 index 0000000000..0f491ce29d --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-with-null-code.json @@ -0,0 +1,5 @@ +{ + "id": 1, + "name": "Mark", + "code": null +} \ No newline at end of file diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-code.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-without-code.json similarity index 100% rename from nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-code.json rename to nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-without-code.json diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-id.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-without-id.json similarity index 100% rename from nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-id.json rename to nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-without-id.json diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/persons.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/persons.json new file mode 100644 index 0000000000..573b6983bd --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/persons.json @@ -0,0 +1,21 @@ +[{ + "id": 1, + "name": "Mark", + "code": 48 +}, { + "id": 2, + "name": "George", + "code": 48 +}, { + "id": 3, + "name": "Harry", + "code": 21 +}, { + "id": 4, + "name": "Julie", + "code": 48 +}, { + "id": 82, + "name": "Frank Henry", + "code": 16 +}] \ No newline at end of file diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertFlatJSONToSQL.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertFlatJSONToSQL.java deleted file mode 100644 index 57d855f3ba..0000000000 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertFlatJSONToSQL.java +++ /dev/null @@ -1,600 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard; - -import java.io.BufferedInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.nifi.annotation.behavior.SideEffectFree; -import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.SeeAlso; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.dbcp.DBCPService; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.InputStreamCallback; -import org.apache.nifi.processor.io.OutputStreamCallback; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.util.ObjectHolder; -import org.codehaus.jackson.JsonNode; -import org.codehaus.jackson.map.ObjectMapper; - - -@SideEffectFree -@SupportsBatching -@SeeAlso(PutSQL.class) -@Tags({"json", "sql", "database", "rdbms", "insert", "update", "relational", "flat"}) -@CapabilityDescription("Converts a JSON-formatted FlowFile into an UPDATE or INSERT SQL statement. The incoming FlowFile is expected to be " - + "\"flat\" JSON message, meaning that it consists of a single JSON element and each field maps to a simple type. If a field maps to " - + "a JSON object, that JSON object will be interpreted as Text. Upon successful conversion, the original FlowFile is routed to the 'original' " - + "relationship and the SQL is routed to the 'sql' relationship.") -public class ConvertFlatJSONToSQL extends AbstractProcessor { - private static final String UPDATE_TYPE = "UPDATE"; - private static final String INSERT_TYPE = "INSERT"; - - static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder() - .name("JDBC Connection Pool") - .description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. " - + "The Connection Pool is necessary in order to determine the appropriate database column types.") - .identifiesControllerService(DBCPService.class) - .required(true) - .build(); - static final PropertyDescriptor STATEMENT_TYPE = new PropertyDescriptor.Builder() - .name("Statement Type") - .description("Specifies the type of SQL Statement to generate") - .required(true) - .allowableValues(UPDATE_TYPE, INSERT_TYPE) - .build(); - static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() - .name("Table Name") - .description("The name of the table that the statement should update") - .required(true) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - static final PropertyDescriptor CATALOG_NAME = new PropertyDescriptor.Builder() - .name("Catalog Name") - .description("The name of the catalog that the statement should update. This may not apply for the database that you are updating. In this case, leave the field empty") - .required(false) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - static final PropertyDescriptor TRANSLATE_FIELD_NAMES = new PropertyDescriptor.Builder() - .name("Translate Field Names") - .description("If true, the Processor will attempt to translate JSON field names into the appropriate column names for the table specified. " - + "If false, the JSON field names must match the column names exactly, or the column will not be updated") - .allowableValues("true", "false") - .defaultValue("true") - .build(); - static final PropertyDescriptor UPDATE_KEY = new PropertyDescriptor.Builder() - .name("Update Keys") - .description("A comma-separated list of column names that uniquely identifies a row in the database for UPDATE statements. " - + "If the Statement Type is UPDATE and this property is not set, the table's Primary Keys are used. " - + "In this case, if no Primary Key exists, the conversion to SQL will fail. " - + "This property is ignored if the Statement Type is INSERT") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .required(false) - .expressionLanguageSupported(true) - .build(); - - - static final Relationship REL_ORIGINAL = new Relationship.Builder() - .name("original") - .description("When a FlowFile is converted to SQL, the original JSON FlowFile is routed to this relationship") - .build(); - static final Relationship REL_SQL = new Relationship.Builder() - .name("sql") - .description("A FlowFile is routed to this relationship when its contents have successfully been converted into a SQL statement") - .build(); - static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("A FlowFile is routed to this relationship if it cannot be converted into a SQL statement. Common causes include invalid JSON " - + "content or the JSON content missing a required field (if using an INSERT statement type).") - .build(); - - private final Map schemaCache = new LinkedHashMap(100) { - private static final long serialVersionUID = 1L; - - @Override - protected boolean removeEldestEntry(Map.Entry eldest) { - return true; - } - }; - - @Override - protected List getSupportedPropertyDescriptors() { - final List properties = new ArrayList<>(); - properties.add(CONNECTION_POOL); - properties.add(STATEMENT_TYPE); - properties.add(TABLE_NAME); - properties.add(CATALOG_NAME); - properties.add(TRANSLATE_FIELD_NAMES); - properties.add(UPDATE_KEY); - return properties; - } - - - @Override - public Set getRelationships() { - final Set rels = new HashSet<>(); - rels.add(REL_ORIGINAL); - rels.add(REL_SQL); - rels.add(REL_FAILURE); - return rels; - } - - - @OnScheduled - public void onScheduled(final ProcessContext context) { - synchronized (this) { - schemaCache.clear(); - } - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - - final boolean translateFieldNames = context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean(); - final String statementType = context.getProperty(STATEMENT_TYPE).getValue(); - final String updateKeys = context.getProperty(UPDATE_KEY).evaluateAttributeExpressions(flowFile).getValue(); - - final String catalog = context.getProperty(CATALOG_NAME).evaluateAttributeExpressions(flowFile).getValue(); - final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); - final SchemaKey schemaKey = new SchemaKey(catalog, tableName); - final boolean includePrimaryKeys = UPDATE_TYPE.equals(statementType) && updateKeys == null; - - // get the database schema from the cache, if one exists - TableSchema schema; - synchronized (this) { - schema = schemaCache.get(schemaKey); - if (schema == null) { - // No schema exists for this table yet. Query the database to determine the schema and put it into the cache. - final DBCPService dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class); - try (final Connection conn = dbcpService.getConnection()) { - schema = TableSchema.from(conn, catalog, tableName, translateFieldNames, includePrimaryKeys); - schemaCache.put(schemaKey, schema); - } catch (final SQLException e) { - getLogger().error("Failed to convert {} into a SQL statement due to {}; routing to failure", new Object[] {flowFile, e.toString()}, e); - session.transfer(flowFile, REL_FAILURE); - return; - } - } - } - - // Parse the JSON document - final ObjectMapper mapper = new ObjectMapper(); - final ObjectHolder rootNodeRef = new ObjectHolder<>(null); - try { - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - try (final InputStream bufferedIn = new BufferedInputStream(in)) { - rootNodeRef.set(mapper.readTree(bufferedIn)); - } - } - }); - } catch (final ProcessException pe) { - getLogger().error("Failed to parse {} as JSON due to {}; routing to failure", new Object[] {flowFile, pe.toString()}, pe); - session.transfer(flowFile, REL_FAILURE); - return; - } - - final JsonNode rootNode = rootNodeRef.get(); - final String sql; - final Map attributes = new HashMap<>(); - if (INSERT_TYPE.equals(statementType)) { - try { - sql = generateInsert(rootNode, attributes, tableName, schema, translateFieldNames); - } catch (final ProcessException pe) { - getLogger().error("Failed to convert {} to a SQL INSERT statement due to {}; routing to failure", - new Object[] { flowFile, pe.toString() }, pe); - session.transfer(flowFile, REL_FAILURE); - return; - } - } else { - try { - sql = generateUpdate(rootNode, attributes, tableName, updateKeys, schema, translateFieldNames); - } catch (final ProcessException pe) { - getLogger().error("Failed to convert {} to a SQL UPDATE statement due to {}; routing to failure", - new Object[] { flowFile, pe.toString() }, pe); - session.transfer(flowFile, REL_FAILURE); - return; - } - } - - FlowFile sqlFlowFile = session.create(flowFile); - sqlFlowFile = session.write(sqlFlowFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - out.write(sql.getBytes(StandardCharsets.UTF_8)); - } - }); - - attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); - attributes.put("sql.table", tableName); - if (catalog != null) { - attributes.put("sql.catalog", catalog); - } - - sqlFlowFile = session.putAllAttributes(sqlFlowFile, attributes); - - session.transfer(flowFile, REL_ORIGINAL); - session.transfer(sqlFlowFile, REL_SQL); - } - - private Set getNormalizedColumnNames(final JsonNode node, final boolean translateFieldNames) { - final Set normalizedFieldNames = new HashSet<>(); - final Iterator fieldNameItr = node.getFieldNames(); - while (fieldNameItr.hasNext()) { - normalizedFieldNames.add(normalizeColumnName(fieldNameItr.next(), translateFieldNames)); - } - - return normalizedFieldNames; - } - - private String generateInsert(final JsonNode rootNode, final Map attributes, final String tableName, - final TableSchema schema, final boolean translateFieldNames) { - - final Set normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames); - for (final String requiredColName : schema.getRequiredColumnNames()) { - final String normalizedColName = normalizeColumnName(requiredColName, translateFieldNames); - if (!normalizedFieldNames.contains(normalizedColName)) { - throw new ProcessException("JSON does not have a value for the Required column '" + requiredColName + "'"); - } - } - - final StringBuilder sqlBuilder = new StringBuilder(); - int fieldCount = 0; - sqlBuilder.append("INSERT INTO ").append(tableName).append(" ("); - - // iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as - // adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the - // columns that we are inserting into - final Iterator fieldNames = rootNode.getFieldNames(); - while (fieldNames.hasNext()) { - final String fieldName = fieldNames.next(); - - final ColumnDescription desc = schema.getColumns().get(normalizeColumnName(fieldName, translateFieldNames)); - if (desc != null) { - if (fieldCount++ > 0) { - sqlBuilder.append(", "); - } - - sqlBuilder.append(desc.getColumnName()); - - final int sqlType = desc.getDataType(); - attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType)); - - final Integer colSize = desc.getColumnSize(); - String fieldValue = rootNode.get(fieldName).asText(); - if (colSize != null && fieldValue.length() > colSize) { - fieldValue = fieldValue.substring(0, colSize); - } - attributes.put("sql.args." + fieldCount + ".value", fieldValue); - } - } - - // complete the SQL statements by adding ?'s for all of the values to be escaped. - sqlBuilder.append(") VALUES ("); - for (int i=0; i < fieldCount; i++) { - if (i > 0) { - sqlBuilder.append(", "); - } - - sqlBuilder.append("?"); - } - sqlBuilder.append(")"); - - if (fieldCount == 0) { - throw new ProcessException("None of the fields in the JSON map to the columns defined by the " + tableName + " table"); - } - - return sqlBuilder.toString(); - } - - private String generateUpdate(final JsonNode rootNode, final Map attributes, final String tableName, final String updateKeys, - final TableSchema schema, final boolean translateFieldNames) { - - final Set updateKeyNames; - if (updateKeys == null) { - updateKeyNames = schema.getPrimaryKeyColumnNames(); - } else { - updateKeyNames = new HashSet<>(); - for (final String updateKey : updateKeys.split(",")) { - updateKeyNames.add(updateKey.trim()); - } - } - - if (updateKeyNames.isEmpty()) { - throw new ProcessException("Table '" + tableName + "' does not have a Primary Key and no Update Keys were specified"); - } - - final StringBuilder sqlBuilder = new StringBuilder(); - int fieldCount = 0; - sqlBuilder.append("UPDATE ").append(tableName).append(" SET "); - - - // Create a Set of all normalized Update Key names, and ensure that there is a field in the JSON - // for each of the Update Key fields. - final Set normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames); - final Set normalizedUpdateNames = new HashSet<>(); - for (final String uk : updateKeyNames) { - final String normalizedUK = normalizeColumnName(uk, translateFieldNames); - normalizedUpdateNames.add(normalizedUK); - - if (!normalizedFieldNames.contains(normalizedUK)) { - throw new ProcessException("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'"); - } - } - - // iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as - // adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the - // columns that we are inserting into - Iterator fieldNames = rootNode.getFieldNames(); - while (fieldNames.hasNext()) { - final String fieldName = fieldNames.next(); - - final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames); - final ColumnDescription desc = schema.getColumns().get(normalizedColName); - if (desc == null) { - continue; - } - - // Check if this column is an Update Key. If so, skip it for now. We will come - // back to it after we finish the SET clause - if (normalizedUpdateNames.contains(normalizedColName)) { - continue; - } - - if (fieldCount++ > 0) { - sqlBuilder.append(", "); - } - - sqlBuilder.append(desc.getColumnName()).append(" = ?"); - final int sqlType = desc.getDataType(); - attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType)); - - final Integer colSize = desc.getColumnSize(); - String fieldValue = rootNode.get(fieldName).asText(); - if (colSize != null && fieldValue.length() > colSize) { - fieldValue = fieldValue.substring(0, colSize); - } - attributes.put("sql.args." + fieldCount + ".value", fieldValue); - } - - // Set the WHERE clause based on the Update Key values - sqlBuilder.append(" WHERE "); - - fieldNames = rootNode.getFieldNames(); - int whereFieldCount = 0; - while (fieldNames.hasNext()) { - final String fieldName = fieldNames.next(); - - final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames); - final ColumnDescription desc = schema.getColumns().get(normalizedColName); - if (desc == null) { - continue; - } - - // Check if this column is a Update Key. If so, skip it for now. We will come - // back to it after we finish the SET clause - if (!normalizedUpdateNames.contains(normalizedColName)) { - continue; - } - - if (whereFieldCount++ > 0) { - sqlBuilder.append(" AND "); - } - fieldCount++; - - sqlBuilder.append(normalizedColName).append(" = ?"); - final int sqlType = desc.getDataType(); - attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType)); - - final Integer colSize = desc.getColumnSize(); - String fieldValue = rootNode.get(fieldName).asText(); - if (colSize != null && fieldValue.length() > colSize) { - fieldValue = fieldValue.substring(0, colSize); - } - attributes.put("sql.args." + fieldCount + ".value", fieldValue); - } - - return sqlBuilder.toString(); - } - - private static String normalizeColumnName(final String colName, final boolean translateColumnNames) { - return translateColumnNames ? colName.toUpperCase().replace("_", "") : colName; - } - - private static class TableSchema { - private List requiredColumnNames; - private Set primaryKeyColumnNames; - private Map columns; - - private TableSchema(final List columnDescriptions, final boolean translateColumnNames, - final Set primaryKeyColumnNames) { - this.columns = new HashMap<>(); - this.primaryKeyColumnNames = primaryKeyColumnNames; - - this.requiredColumnNames = new ArrayList<>(); - for (final ColumnDescription desc : columnDescriptions) { - columns.put(ConvertFlatJSONToSQL.normalizeColumnName(desc.columnName, translateColumnNames), desc); - if (desc.isRequired()) { - requiredColumnNames.add(desc.columnName); - } - } - } - - public Map getColumns() { - return columns; - } - - public List getRequiredColumnNames() { - return requiredColumnNames; - } - - public Set getPrimaryKeyColumnNames() { - return primaryKeyColumnNames; - } - - public static TableSchema from(final Connection conn, final String catalog, final String tableName, - final boolean translateColumnNames, final boolean includePrimaryKeys) throws SQLException { - final ResultSet colrs = conn.getMetaData().getColumns(catalog, null, tableName, "%"); - - final List cols = new ArrayList<>(); - while (colrs.next()) { - final ColumnDescription col = ColumnDescription.from(colrs); - cols.add(col); - } - - final Set primaryKeyColumns = new HashSet<>(); - if (includePrimaryKeys) { - final ResultSet pkrs = conn.getMetaData().getPrimaryKeys(catalog, null, tableName); - - while (pkrs.next()) { - final String colName = pkrs.getString("COLUMN_NAME"); - primaryKeyColumns.add(normalizeColumnName(colName, translateColumnNames)); - } - } - - return new TableSchema(cols, translateColumnNames, primaryKeyColumns); - } - } - - private static class ColumnDescription { - private final String columnName; - private final int dataType; - private final boolean required; - private final Integer columnSize; - - private ColumnDescription(final String columnName, final int dataType, final boolean required, final Integer columnSize) { - this.columnName = columnName; - this.dataType = dataType; - this.required = required; - this.columnSize = columnSize; - } - - public int getDataType() { - return dataType; - } - - public Integer getColumnSize() { - return columnSize; - } - - public String getColumnName() { - return columnName; - } - - public boolean isRequired() { - return required; - } - - public static ColumnDescription from(final ResultSet resultSet) throws SQLException { - final String columnName = resultSet.getString("COLUMN_NAME"); - final int dataType = resultSet.getInt("DATA_TYPE"); - final int colSize = resultSet.getInt("COLUMN_SIZE"); - - final String nullableValue = resultSet.getString("IS_NULLABLE"); - final boolean isNullable = "YES".equalsIgnoreCase(nullableValue) || nullableValue.isEmpty(); - final String defaultValue = resultSet.getString("COLUMN_DEF"); - final String autoIncrementValue = resultSet.getString("IS_AUTOINCREMENT"); - final boolean isAutoIncrement = "YES".equalsIgnoreCase(autoIncrementValue); - final boolean required = !isNullable && !isAutoIncrement && defaultValue == null; - - return new ColumnDescription(columnName, dataType, required, colSize == 0 ? null : colSize); - } - } - - private static class SchemaKey { - private final String catalog; - private final String tableName; - - public SchemaKey(final String catalog, final String tableName) { - this.catalog = catalog; - this.tableName = tableName; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((catalog == null) ? 0 : catalog.hashCode()); - result = prime * result + ((tableName == null) ? 0 : tableName.hashCode()); - return result; - } - - @Override - public boolean equals(final Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - - final SchemaKey other = (SchemaKey) obj; - if (catalog == null) { - if (other.catalog != null) { - return false; - } - } else if (!catalog.equals(other.catalog)) { - return false; - } - - - if (tableName == null) { - if (other.tableName != null) { - return false; - } - } else if (!tableName.equals(other.tableName)) { - return false; - } - - return true; - } - } -} diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java deleted file mode 100644 index d349a58fac..0000000000 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.sql.Connection; -import java.sql.Date; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.sql.Time; -import java.sql.Timestamp; -import java.sql.Types; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.SeeAlso; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.dbcp.DBCPService; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.InputStreamCallback; -import org.apache.nifi.stream.io.StreamUtils; - -@SupportsBatching -@SeeAlso(ConvertFlatJSONToSQL.class) -@Tags({"sql", "put", "rdbms", "database", "update", "insert", "relational"}) -@CapabilityDescription("Executes a SQL UPDATE or INSERT command. The content of an incoming FlowFile is expected to be the SQL command " - + "to execute. The SQL command may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes " - + "with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be " - + "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format.") -public class PutSQL extends AbstractProcessor { - - static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder() - .name("JDBC Connection Pool") - .description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. " - + "The Connection Pool is necessary in order to determine the appropriate database column types.") - .identifiesControllerService(DBCPService.class) - .required(true) - .build(); - - - static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("A FlowFile is routed to this relationship after the database is successfully updated") - .build(); - static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("A FlowFile is routed to this relationship if the database cannot be updated for any reason") - .build(); - - private static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("sql\\.args\\.(\\d+)\\.type"); - private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+"); - - @Override - protected List getSupportedPropertyDescriptors() { - final List properties = new ArrayList<>(); - properties.add(CONNECTION_POOL); - return properties; - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - - // TODO: Batch. Pull in 50 or 100 at a time and map content of FlowFile to Set that have that - // same content. Then execute updates in batches. - - final DBCPService dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class); - try (final Connection conn = dbcpService.getConnection()) { - // Read the SQL from the FlowFile's content - final byte[] buffer = new byte[(int) flowFile.getSize()]; - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - StreamUtils.fillBuffer(in, buffer); - } - }); - - final String sql = new String(buffer, StandardCharsets.UTF_8); - - // Create a prepared statement and set the appropriate parameters on the statement. - try (final PreparedStatement stmt = conn.prepareStatement(sql)) { - for (final Map.Entry entry : flowFile.getAttributes().entrySet()) { - final String key = entry.getKey(); - final Matcher matcher = SQL_TYPE_ATTRIBUTE_PATTERN.matcher(key); - if (matcher.matches()) { - final int parameterIndex = Integer.parseInt(matcher.group(1)); - - final boolean isNumeric = NUMBER_PATTERN.matcher(entry.getValue()).matches(); - if (!isNumeric) { - getLogger().error("Cannot update database for {} because the value of the '{}' attribute is '{}', which is not a valid JDBC numeral type; routing to failure", - new Object[] {flowFile, key, entry.getValue()}); - session.transfer(flowFile, REL_FAILURE); - return; - } - - final int jdbcType = Integer.parseInt(entry.getValue()); - final String valueAttrName = "sql.args." + parameterIndex + ".value"; - final String parameterValue = flowFile.getAttribute(valueAttrName); - if (parameterValue == null) { - getLogger().error("Cannot update database for {} because the '{}' attribute exists but the '{}' attribute does not", new Object[] {flowFile, key, valueAttrName}); - session.transfer(flowFile, REL_FAILURE); - return; - } - - try { - setParameter(stmt, valueAttrName, parameterIndex, parameterValue, jdbcType); - } catch (final NumberFormatException nfe) { - getLogger().error("Cannot update database for {} because the '{}' attribute has a value of '{}', " - + "which cannot be converted into the necessary data type; routing to failure", new Object[] {flowFile, valueAttrName, parameterValue}); - session.transfer(flowFile, REL_FAILURE); - return; - } - } - } - - final int updatedRowCount = stmt.executeUpdate(); - flowFile = session.putAttribute(flowFile, "sql.update.count", String.valueOf(updatedRowCount)); - } - - // TODO: Need to expose Connection URL from DBCP Service and use it to emit a Provenance Event - session.transfer(flowFile, REL_SUCCESS); - } catch (final SQLException e) { - getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {flowFile, e}); - session.transfer(flowFile, REL_FAILURE); - return; - } - } - - private void setParameter(final PreparedStatement stmt, final String attrName, final int parameterIndex, final String parameterValue, final int jdbcType) throws SQLException { - switch (jdbcType) { - case Types.BIT: - stmt.setBoolean(parameterIndex, Boolean.parseBoolean(parameterValue)); - break; - case Types.TINYINT: - stmt.setByte(parameterIndex, Byte.parseByte(parameterValue)); - break; - case Types.SMALLINT: - stmt.setShort(parameterIndex, Short.parseShort(parameterValue)); - break; - case Types.INTEGER: - stmt.setInt(parameterIndex, Integer.parseInt(parameterValue)); - break; - case Types.BIGINT: - stmt.setLong(parameterIndex, Long.parseLong(parameterValue)); - break; - case Types.REAL: - stmt.setFloat(parameterIndex, Float.parseFloat(parameterValue)); - break; - case Types.FLOAT: - case Types.DOUBLE: - stmt.setDouble(parameterIndex, Double.parseDouble(parameterValue)); - break; - case Types.DATE: - stmt.setDate(parameterIndex, new Date(Long.parseLong(parameterValue))); - break; - case Types.TIME: - stmt.setTime(parameterIndex, new Time(Long.parseLong(parameterValue))); - break; - case Types.TIMESTAMP: - stmt.setTimestamp(parameterIndex, new Timestamp(Long.parseLong(parameterValue))); - break; - case Types.CHAR: - case Types.VARCHAR: - case Types.LONGNVARCHAR: - case Types.LONGVARCHAR: - stmt.setString(parameterIndex, parameterValue); - break; - default: - throw new SQLException("The '" + attrName + "' attribute has a value of '" + parameterValue - + "' and a type of '" + jdbcType + "' but this is not a known data type"); - } - } -} diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertFlatJSONToSQL.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertFlatJSONToSQL.java deleted file mode 100644 index b2bdbba551..0000000000 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertFlatJSONToSQL.java +++ /dev/null @@ -1,294 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Paths; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; - -import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.dbcp.DBCPService; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -public class TestConvertFlatJSONToSQL { - static String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)"; - - @Rule - public TemporaryFolder folder = new TemporaryFolder(); - - @BeforeClass - public static void setup() { - System.setProperty("derby.stream.error.file", "target/derby.log"); - } - - @Test - public void testInsert() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp"); - runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS"); - runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "INSERT"); - runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-1.json")); - runner.run(); - - runner.assertTransferCount(ConvertFlatJSONToSQL.REL_ORIGINAL, 1); - runner.assertTransferCount(ConvertFlatJSONToSQL.REL_SQL, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertFlatJSONToSQL.REL_SQL).get(0); - out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); - out.assertAttributeEquals("sql.args.1.value", "1"); - out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); - out.assertAttributeEquals("sql.args.2.value", "Mark"); - out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); - out.assertAttributeEquals("sql.args.3.value", "48"); - - out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)"); - } - - - @Test - public void testUpdateBasedOnPrimaryKey() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp"); - runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS"); - runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "UPDATE"); - runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-1.json")); - runner.run(); - - runner.assertTransferCount(ConvertFlatJSONToSQL.REL_ORIGINAL, 1); - runner.assertTransferCount(ConvertFlatJSONToSQL.REL_SQL, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertFlatJSONToSQL.REL_SQL).get(0); - out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR)); - out.assertAttributeEquals("sql.args.1.value", "Mark"); - out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.INTEGER)); - out.assertAttributeEquals("sql.args.2.value", "48"); - out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); - out.assertAttributeEquals("sql.args.3.value", "1"); - - out.assertContentEquals("UPDATE PERSONS SET NAME = ?, CODE = ? WHERE ID = ?"); - } - - - @Test - public void testUpdateBasedOnUpdateKey() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp"); - runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS"); - runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "UPDATE"); - runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "code"); - runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-1.json")); - runner.run(); - - runner.assertTransferCount(ConvertFlatJSONToSQL.REL_ORIGINAL, 1); - runner.assertTransferCount(ConvertFlatJSONToSQL.REL_SQL, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertFlatJSONToSQL.REL_SQL).get(0); - out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); - out.assertAttributeEquals("sql.args.1.value", "1"); - out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); - out.assertAttributeEquals("sql.args.2.value", "Mark"); - out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); - out.assertAttributeEquals("sql.args.3.value", "48"); - - out.assertContentEquals("UPDATE PERSONS SET ID = ?, NAME = ? WHERE CODE = ?"); - } - - @Test - public void testUpdateBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp"); - runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS"); - runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "UPDATE"); - runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "name, code"); - runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-1.json")); - runner.run(); - - runner.assertTransferCount(ConvertFlatJSONToSQL.REL_ORIGINAL, 1); - runner.assertTransferCount(ConvertFlatJSONToSQL.REL_SQL, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertFlatJSONToSQL.REL_SQL).get(0); - out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); - out.assertAttributeEquals("sql.args.1.value", "1"); - out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); - out.assertAttributeEquals("sql.args.2.value", "Mark"); - out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); - out.assertAttributeEquals("sql.args.3.value", "48"); - - out.assertContentEquals("UPDATE PERSONS SET ID = ? WHERE NAME = ? AND CODE = ?"); - } - - @Test - public void testUpdateWithMissingFieldBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp"); - runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS"); - runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "UPDATE"); - runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "name, code"); - runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-without-code.json")); - runner.run(); - - runner.assertAllFlowFilesTransferred(ConvertFlatJSONToSQL.REL_FAILURE, 1); - } - - @Test - public void testUpdateWithMalformedJson() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp"); - runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS"); - runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "UPDATE"); - runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "name, code"); - runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/malformed-person-extra-comma.json")); - runner.run(); - - runner.assertAllFlowFilesTransferred(ConvertFlatJSONToSQL.REL_FAILURE, 1); - } - - @Test - public void testInsertWithMissingField() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp"); - runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS"); - runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "INSERT"); - runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "name, code"); - runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-without-id.json")); - runner.run(); - - runner.assertAllFlowFilesTransferred(ConvertFlatJSONToSQL.REL_FAILURE, 1); - } - - - /** - * Simple implementation only for testing purposes - */ - private static class MockDBCPService extends AbstractControllerService implements DBCPService { - private final String dbLocation; - - public MockDBCPService(final String dbLocation) { - this.dbLocation = dbLocation; - } - - @Override - public String getIdentifier() { - return "dbcp"; - } - - @Override - public Connection getConnection() throws ProcessException { - try { - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - final Connection con = DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true"); - return con; - } catch (final Exception e) { - e.printStackTrace(); - throw new ProcessException("getConnection failed: " + e); - } - } - } - -} diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java deleted file mode 100644 index ccba3e176f..0000000000 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java +++ /dev/null @@ -1,246 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.sql.Types; -import java.util.HashMap; -import java.util.Map; - -import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.dbcp.DBCPService; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -public class TestPutSQL { - static String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)"; - - @Rule - public TemporaryFolder folder = new TemporaryFolder(); - - @BeforeClass - public static void setup() { - System.setProperty("derby.stream.error.file", "target/derby.log"); - } - - @Test - public void testDirectStatements() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); - runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 'Mark', 84)".getBytes()); - runner.run(); - - runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertEquals("Mark", rs.getString(2)); - assertEquals(84, rs.getInt(3)); - assertFalse(rs.next()); - } - } - - runner.enqueue("UPDATE PERSONS SET NAME='George' WHERE ID=1".getBytes()); - runner.run(); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertEquals("George", rs.getString(2)); - assertEquals(84, rs.getInt(3)); - assertFalse(rs.next()); - } - } - } - - - @Test - public void testStatementsWithPreparedParameters() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); - final Map attributes = new HashMap<>(); - attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); - attributes.put("sql.args.1.value", "1"); - - attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); - attributes.put("sql.args.2.value", "Mark"); - - attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); - attributes.put("sql.args.3.value", "84"); - - runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes); - runner.run(); - - runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertEquals("Mark", rs.getString(2)); - assertEquals(84, rs.getInt(3)); - assertFalse(rs.next()); - } - } - - runner.clearTransferState(); - - attributes.clear(); - attributes.put("sql.args.1.type", String.valueOf(Types.VARCHAR)); - attributes.put("sql.args.1.value", "George"); - - attributes.put("sql.args.2.type", String.valueOf(Types.INTEGER)); - attributes.put("sql.args.2.value", "1"); - - runner.enqueue("UPDATE PERSONS SET NAME=? WHERE ID=?".getBytes(), attributes); - runner.run(); - runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertEquals("George", rs.getString(2)); - assertEquals(84, rs.getInt(3)); - assertFalse(rs.next()); - } - } - } - - - @Test - public void testMultipleStatementsWithinFlowFile() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); - - final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + - "UPDATE PERSONS SET NAME='George' WHERE ID=?; "; - final Map attributes = new HashMap<>(); - attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); - attributes.put("sql.args.1.value", "1"); - - attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); - attributes.put("sql.args.2.value", "Mark"); - - attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); - attributes.put("sql.args.3.value", "84"); - - attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER)); - attributes.put("sql.args.4.value", "1"); - - runner.enqueue(sql.getBytes(), attributes); - runner.run(); - - // should fail because of the semicolon - runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); - assertFalse(rs.next()); - } - } - } - - - /** - * Simple implementation only for testing purposes - */ - private static class MockDBCPService extends AbstractControllerService implements DBCPService { - private final String dbLocation; - - public MockDBCPService(final String dbLocation) { - this.dbLocation = dbLocation; - } - - @Override - public String getIdentifier() { - return "dbcp"; - } - - @Override - public Connection getConnection() throws ProcessException { - try { - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - final Connection con = DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true"); - return con; - } catch (final Exception e) { - e.printStackTrace(); - throw new ProcessException("getConnection failed: " + e); - } - } - } -} diff --git a/pom.xml b/pom.xml index 7cbc76cca7..af1c8fb765 100644 --- a/pom.xml +++ b/pom.xml @@ -670,6 +670,11 @@ jasypt 1.9.2 + + org.codehaus.jackson + jackson-mapper-asl + 1.9.13 + org.apache.spark spark-streaming_2.10