NIFI-853: Made updates to processors

This commit is contained in:
Mark Payne 2015-08-13 11:40:33 -04:00
parent 1a37c95f58
commit 28549c2b1e
17 changed files with 2738 additions and 1343 deletions

View File

@ -220,6 +220,13 @@ language governing permissions and limitations under the License. -->
<exclude>src/test/resources/TestMergeContent/head</exclude>
<exclude>src/test/resources/TestMergeContent/user.avsc</exclude>
<exclude>src/test/resources/TestMergeContent/place.avsc</exclude>
<exclude>src/test/resources/TestConvertJSONToSQL/person-1.json</exclude>
<exclude>src/test/resources/TestConvertJSONToSQL/persons.json</exclude>
<exclude>src/test/resources/TestConvertJSONToSQL/malformed-person-extra-comma.json</exclude>
<exclude>src/test/resources/TestConvertJSONToSQL/person-with-extra-field.json</exclude>
<exclude>src/test/resources/TestConvertJSONToSQL/person-without-code.json</exclude>
<exclude>src/test/resources/TestConvertJSONToSQL/person-with-null-code.json</exclude>
<exclude>src/test/resources/TestConvertJSONToSQL/person-without-id.json</exclude>
<exclude>src/test/resources/TestModifyBytes/noFooter.txt</exclude>
<exclude>src/test/resources/TestModifyBytes/noFooter_noHeader.txt</exclude>
<exclude>src/test/resources/TestModifyBytes/noHeader.txt</exclude>

View File

@ -0,0 +1,679 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.ObjectHolder;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ArrayNode;
import org.codehaus.jackson.node.JsonNodeFactory;
@SideEffectFree
@SupportsBatching
@SeeAlso(PutSQL.class)
@Tags({"json", "sql", "database", "rdbms", "insert", "update", "relational", "flat"})
@CapabilityDescription("Converts a JSON-formatted FlowFile into an UPDATE or INSERT SQL statement. The incoming FlowFile is expected to be "
+ "\"flat\" JSON message, meaning that it consists of a single JSON element and each field maps to a simple type. If a field maps to "
+ "a JSON object, that JSON object will be interpreted as Text. If the input is an array of JSON elements, each element in the array is "
+ "output as a separate FlowFile to the 'sql' relationship. Upon successful conversion, the original FlowFile is routed to the 'original' "
+ "relationship and the SQL is routed to the 'sql' relationship.")
@WritesAttributes({
@WritesAttribute(attribute="mime.type", description="Sets mime.type of FlowFile that is routed to 'sql' to 'text/plain'."),
@WritesAttribute(attribute="sql.table", description="Sets the sql.table attribute of FlowFile that is routed to 'sql' to the name of the table that is updated by the SQL statement."),
@WritesAttribute(attribute="sql.catalog", description="If the Catalog name is set for this database, specifies the name of the catalog that the SQL statement will update. "
+ "If no catalog is used, this attribute will not be added."),
@WritesAttribute(attribute="fragment.identifier", description="All FlowFiles routed to the 'sql' relationship for the same incoming FlowFile (multiple will be output for the same incoming "
+ "FlowFile if the incoming FlowFile is a JSON Array) will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
@WritesAttribute(attribute="fragment.count", description="The number of SQL FlowFiles that were produced for same incoming FlowFile. This can be used in conjunction with the "
+ "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming FlowFile."),
@WritesAttribute(attribute="fragment.index", description="The position of this FlowFile in the list of outgoing FlowFiles that were all derived from the same incoming FlowFile. This can be "
+ "used in conjunction with the fragment.identifier and fragment.count attributes to know which FlowFiles originated from the same incoming FlowFile and in what order the SQL "
+ "FlowFiles were produced"),
@WritesAttribute(attribute="sql.args.N.type", description="The output SQL statements are parameterized in order to avoid SQL Injection Attacks. The types of the Parameters "
+ "to use are stored in attributes named sql.args.1.type, sql.args.2.type, sql.args.3.type, and so on. The type is a number representing a JDBC Type constant. "
+ "Generally, this is useful only for software to read and interpret but is added so that a processor such as PutSQL can understand how to interpret the values."),
@WritesAttribute(attribute="sql.args.N.value", description="The output SQL statements are parameterized in order to avoid SQL Injection Attacks. The values of the Parameters "
+ "to use are stored in the attributes named sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. Each of these attributes has a corresponding "
+ "sql.args.N.type attribute that indicates how the value should be interpreted when inserting it into the database.")
})
public class ConvertJSONToSQL extends AbstractProcessor {
private static final String UPDATE_TYPE = "UPDATE";
private static final String INSERT_TYPE = "INSERT";
static final AllowableValue IGNORE_UNMATCHED_FIELD = new AllowableValue("Ignore Unmatched Fields", "Ignore Unmatched Fields",
"Any field in the JSON document that cannot be mapped to a column in the database is ignored");
static final AllowableValue FAIL_UNMATCHED_FIELD = new AllowableValue("Fail", "Fail",
"If the JSON document has any field that cannot be mapped to a column in the database, the FlowFile will be routed to the failure relationship");
static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder()
.name("JDBC Connection Pool")
.description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. "
+ "The Connection Pool is necessary in order to determine the appropriate database column types.")
.identifiesControllerService(DBCPService.class)
.required(true)
.build();
static final PropertyDescriptor STATEMENT_TYPE = new PropertyDescriptor.Builder()
.name("Statement Type")
.description("Specifies the type of SQL Statement to generate")
.required(true)
.allowableValues(UPDATE_TYPE, INSERT_TYPE)
.build();
static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
.name("Table Name")
.description("The name of the table that the statement should update")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor CATALOG_NAME = new PropertyDescriptor.Builder()
.name("Catalog Name")
.description("The name of the catalog that the statement should update. This may not apply for the database that you are updating. In this case, leave the field empty")
.required(false)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor TRANSLATE_FIELD_NAMES = new PropertyDescriptor.Builder()
.name("Translate Field Names")
.description("If true, the Processor will attempt to translate JSON field names into the appropriate column names for the table specified. "
+ "If false, the JSON field names must match the column names exactly, or the column will not be updated")
.allowableValues("true", "false")
.defaultValue("true")
.build();
static final PropertyDescriptor UNMATCHED_FIELD_BEHAVIOR = new PropertyDescriptor.Builder()
.name("Unmatched Field Behavior")
.description("If an incoming JSON element has a field that does not map to any of the database table's columns, this property specifies how to handle the situation")
.allowableValues(IGNORE_UNMATCHED_FIELD, FAIL_UNMATCHED_FIELD)
.defaultValue(IGNORE_UNMATCHED_FIELD.getValue())
.build();
static final PropertyDescriptor UPDATE_KEY = new PropertyDescriptor.Builder()
.name("Update Keys")
.description("A comma-separated list of column names that uniquely identifies a row in the database for UPDATE statements. "
+ "If the Statement Type is UPDATE and this property is not set, the table's Primary Keys are used. "
+ "In this case, if no Primary Key exists, the conversion to SQL will fail. "
+ "This property is ignored if the Statement Type is INSERT")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.expressionLanguageSupported(true)
.build();
static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("When a FlowFile is converted to SQL, the original JSON FlowFile is routed to this relationship")
.build();
static final Relationship REL_SQL = new Relationship.Builder()
.name("sql")
.description("A FlowFile is routed to this relationship when its contents have successfully been converted into a SQL statement")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("A FlowFile is routed to this relationship if it cannot be converted into a SQL statement. Common causes include invalid JSON "
+ "content or the JSON content missing a required field (if using an INSERT statement type).")
.build();
private final Map<SchemaKey, TableSchema> schemaCache = new LinkedHashMap<SchemaKey, TableSchema>(100) {
private static final long serialVersionUID = 1L;
@Override
protected boolean removeEldestEntry(Map.Entry<SchemaKey,TableSchema> eldest) {
return true;
}
};
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(CONNECTION_POOL);
properties.add(STATEMENT_TYPE);
properties.add(TABLE_NAME);
properties.add(CATALOG_NAME);
properties.add(TRANSLATE_FIELD_NAMES);
properties.add(UNMATCHED_FIELD_BEHAVIOR);
properties.add(UPDATE_KEY);
return properties;
}
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_ORIGINAL);
rels.add(REL_SQL);
rels.add(REL_FAILURE);
return rels;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
synchronized (this) {
schemaCache.clear();
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final boolean translateFieldNames = context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean();
final boolean ignoreUnmappedFields = IGNORE_UNMATCHED_FIELD.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_FIELD_BEHAVIOR).getValue());
final String statementType = context.getProperty(STATEMENT_TYPE).getValue();
final String updateKeys = context.getProperty(UPDATE_KEY).evaluateAttributeExpressions(flowFile).getValue();
final String catalog = context.getProperty(CATALOG_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final SchemaKey schemaKey = new SchemaKey(catalog, tableName);
final boolean includePrimaryKeys = UPDATE_TYPE.equals(statementType) && updateKeys == null;
// get the database schema from the cache, if one exists. We do this in a synchronized block, rather than
// using a ConcurrentMap because the Map that we are using is a LinkedHashMap with a capacity such that if
// the Map grows beyond this capacity, old elements are evicted. We do this in order to avoid filling the
// Java Heap if there are a lot of different SQL statements being generated that reference different tables.
TableSchema schema;
synchronized (this) {
schema = schemaCache.get(schemaKey);
if (schema == null) {
// No schema exists for this table yet. Query the database to determine the schema and put it into the cache.
final DBCPService dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class);
try (final Connection conn = dbcpService.getConnection()) {
schema = TableSchema.from(conn, catalog, tableName, translateFieldNames, includePrimaryKeys);
schemaCache.put(schemaKey, schema);
} catch (final SQLException e) {
getLogger().error("Failed to convert {} into a SQL statement due to {}; routing to failure", new Object[] {flowFile, e.toString()}, e);
session.transfer(flowFile, REL_FAILURE);
return;
}
}
}
// Parse the JSON document
final ObjectMapper mapper = new ObjectMapper();
final ObjectHolder<JsonNode> rootNodeRef = new ObjectHolder<>(null);
try {
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
try (final InputStream bufferedIn = new BufferedInputStream(in)) {
rootNodeRef.set(mapper.readTree(bufferedIn));
}
}
});
} catch (final ProcessException pe) {
getLogger().error("Failed to parse {} as JSON due to {}; routing to failure", new Object[] {flowFile, pe.toString()}, pe);
session.transfer(flowFile, REL_FAILURE);
return;
}
final JsonNode rootNode = rootNodeRef.get();
// The node may or may not be a Json Array. If it isn't, we will create an
// ArrayNode and add just the root node to it. We do this so that we can easily iterate
// over the array node, rather than duplicating the logic or creating another function that takes many variables
// in order to implement the logic.
final ArrayNode arrayNode;
if (rootNode.isArray()) {
arrayNode = (ArrayNode) rootNode;
} else {
final JsonNodeFactory nodeFactory = JsonNodeFactory.instance;
arrayNode = new ArrayNode(nodeFactory);
arrayNode.add(rootNode);
}
final String fragmentIdentifier = UUID.randomUUID().toString();
final Set<FlowFile> created = new HashSet<>();
for (int i=0; i < arrayNode.size(); i++) {
final JsonNode jsonNode = arrayNode.get(i);
final String sql;
final Map<String, String> attributes = new HashMap<>();
try {
if (INSERT_TYPE.equals(statementType)) {
sql = generateInsert(jsonNode, attributes, tableName, schema, translateFieldNames, ignoreUnmappedFields);
} else {
sql = generateUpdate(jsonNode, attributes, tableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields);
}
} catch (final ProcessException pe) {
getLogger().error("Failed to convert {} to a SQL {} statement due to {}; routing to failure",
new Object[] { flowFile, statementType, pe.toString() }, pe);
session.remove(created);
session.transfer(flowFile, REL_FAILURE);
return;
}
FlowFile sqlFlowFile = session.create(flowFile);
created.add(sqlFlowFile);
sqlFlowFile = session.write(sqlFlowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(sql.getBytes(StandardCharsets.UTF_8));
}
});
attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
attributes.put("sql.table", tableName);
attributes.put("fragment.identifier", fragmentIdentifier);
attributes.put("fragment.count", String.valueOf(arrayNode.size()));
attributes.put("fragment.index", String.valueOf(i));
if (catalog != null) {
attributes.put("sql.catalog", catalog);
}
sqlFlowFile = session.putAllAttributes(sqlFlowFile, attributes);
session.transfer(sqlFlowFile, REL_SQL);
}
session.transfer(flowFile, REL_ORIGINAL);
}
private Set<String> getNormalizedColumnNames(final JsonNode node, final boolean translateFieldNames) {
final Set<String> normalizedFieldNames = new HashSet<>();
final Iterator<String> fieldNameItr = node.getFieldNames();
while (fieldNameItr.hasNext()) {
normalizedFieldNames.add(normalizeColumnName(fieldNameItr.next(), translateFieldNames));
}
return normalizedFieldNames;
}
private String generateInsert(final JsonNode rootNode, final Map<String, String> attributes, final String tableName,
final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields) {
final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames);
for (final String requiredColName : schema.getRequiredColumnNames()) {
final String normalizedColName = normalizeColumnName(requiredColName, translateFieldNames);
if (!normalizedFieldNames.contains(normalizedColName)) {
throw new ProcessException("JSON does not have a value for the Required column '" + requiredColName + "'");
}
}
final StringBuilder sqlBuilder = new StringBuilder();
int fieldCount = 0;
sqlBuilder.append("INSERT INTO ").append(tableName).append(" (");
// iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as
// adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the
// columns that we are inserting into
final Iterator<String> fieldNames = rootNode.getFieldNames();
while (fieldNames.hasNext()) {
final String fieldName = fieldNames.next();
final ColumnDescription desc = schema.getColumns().get(normalizeColumnName(fieldName, translateFieldNames));
if (desc == null && !ignoreUnmappedFields) {
throw new ProcessException("Cannot map JSON field '" + fieldName + "' to any column in the database");
}
if (desc != null) {
if (fieldCount++ > 0) {
sqlBuilder.append(", ");
}
sqlBuilder.append(desc.getColumnName());
final int sqlType = desc.getDataType();
attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType));
final Integer colSize = desc.getColumnSize();
final JsonNode fieldNode = rootNode.get(fieldName);
if (!fieldNode.isNull()) {
String fieldValue = fieldNode.asText();
if (colSize != null && fieldValue.length() > colSize) {
fieldValue = fieldValue.substring(0, colSize);
}
attributes.put("sql.args." + fieldCount + ".value", fieldValue);
}
}
}
// complete the SQL statements by adding ?'s for all of the values to be escaped.
sqlBuilder.append(") VALUES (");
for (int i=0; i < fieldCount; i++) {
if (i > 0) {
sqlBuilder.append(", ");
}
sqlBuilder.append("?");
}
sqlBuilder.append(")");
if (fieldCount == 0) {
throw new ProcessException("None of the fields in the JSON map to the columns defined by the " + tableName + " table");
}
return sqlBuilder.toString();
}
private String generateUpdate(final JsonNode rootNode, final Map<String, String> attributes, final String tableName, final String updateKeys,
final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields) {
final Set<String> updateKeyNames;
if (updateKeys == null) {
updateKeyNames = schema.getPrimaryKeyColumnNames();
} else {
updateKeyNames = new HashSet<>();
for (final String updateKey : updateKeys.split(",")) {
updateKeyNames.add(updateKey.trim());
}
}
if (updateKeyNames.isEmpty()) {
throw new ProcessException("Table '" + tableName + "' does not have a Primary Key and no Update Keys were specified");
}
final StringBuilder sqlBuilder = new StringBuilder();
int fieldCount = 0;
sqlBuilder.append("UPDATE ").append(tableName).append(" SET ");
// Create a Set of all normalized Update Key names, and ensure that there is a field in the JSON
// for each of the Update Key fields.
final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames);
final Set<String> normalizedUpdateNames = new HashSet<>();
for (final String uk : updateKeyNames) {
final String normalizedUK = normalizeColumnName(uk, translateFieldNames);
normalizedUpdateNames.add(normalizedUK);
if (!normalizedFieldNames.contains(normalizedUK)) {
throw new ProcessException("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'");
}
}
// iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as
// adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the
// columns that we are inserting into
Iterator<String> fieldNames = rootNode.getFieldNames();
while (fieldNames.hasNext()) {
final String fieldName = fieldNames.next();
final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames);
final ColumnDescription desc = schema.getColumns().get(normalizedColName);
if (desc == null) {
if (ignoreUnmappedFields) {
throw new ProcessException("Cannot map JSON field '" + fieldName + "' to any column in the database");
} else {
continue;
}
}
// Check if this column is an Update Key. If so, skip it for now. We will come
// back to it after we finish the SET clause
if (normalizedUpdateNames.contains(normalizedColName)) {
continue;
}
if (fieldCount++ > 0) {
sqlBuilder.append(", ");
}
sqlBuilder.append(desc.getColumnName()).append(" = ?");
final int sqlType = desc.getDataType();
attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType));
final Integer colSize = desc.getColumnSize();
final JsonNode fieldNode = rootNode.get(fieldName);
if (!fieldNode.isNull()) {
String fieldValue = rootNode.get(fieldName).asText();
if (colSize != null && fieldValue.length() > colSize) {
fieldValue = fieldValue.substring(0, colSize);
}
attributes.put("sql.args." + fieldCount + ".value", fieldValue);
}
}
// Set the WHERE clause based on the Update Key values
sqlBuilder.append(" WHERE ");
fieldNames = rootNode.getFieldNames();
int whereFieldCount = 0;
while (fieldNames.hasNext()) {
final String fieldName = fieldNames.next();
final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames);
final ColumnDescription desc = schema.getColumns().get(normalizedColName);
if (desc == null) {
continue;
}
// Check if this column is a Update Key. If so, skip it for now. We will come
// back to it after we finish the SET clause
if (!normalizedUpdateNames.contains(normalizedColName)) {
continue;
}
if (whereFieldCount++ > 0) {
sqlBuilder.append(" AND ");
}
fieldCount++;
sqlBuilder.append(normalizedColName).append(" = ?");
final int sqlType = desc.getDataType();
attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType));
final Integer colSize = desc.getColumnSize();
String fieldValue = rootNode.get(fieldName).asText();
if (colSize != null && fieldValue.length() > colSize) {
fieldValue = fieldValue.substring(0, colSize);
}
attributes.put("sql.args." + fieldCount + ".value", fieldValue);
}
return sqlBuilder.toString();
}
private static String normalizeColumnName(final String colName, final boolean translateColumnNames) {
return translateColumnNames ? colName.toUpperCase().replace("_", "") : colName;
}
private static class TableSchema {
private List<String> requiredColumnNames;
private Set<String> primaryKeyColumnNames;
private Map<String, ColumnDescription> columns;
private TableSchema(final List<ColumnDescription> columnDescriptions, final boolean translateColumnNames,
final Set<String> primaryKeyColumnNames) {
this.columns = new HashMap<>();
this.primaryKeyColumnNames = primaryKeyColumnNames;
this.requiredColumnNames = new ArrayList<>();
for (final ColumnDescription desc : columnDescriptions) {
columns.put(ConvertJSONToSQL.normalizeColumnName(desc.columnName, translateColumnNames), desc);
if (desc.isRequired()) {
requiredColumnNames.add(desc.columnName);
}
}
}
public Map<String, ColumnDescription> getColumns() {
return columns;
}
public List<String> getRequiredColumnNames() {
return requiredColumnNames;
}
public Set<String> getPrimaryKeyColumnNames() {
return primaryKeyColumnNames;
}
public static TableSchema from(final Connection conn, final String catalog, final String tableName,
final boolean translateColumnNames, final boolean includePrimaryKeys) throws SQLException {
final ResultSet colrs = conn.getMetaData().getColumns(catalog, null, tableName, "%");
final List<ColumnDescription> cols = new ArrayList<>();
while (colrs.next()) {
final ColumnDescription col = ColumnDescription.from(colrs);
cols.add(col);
}
final Set<String> primaryKeyColumns = new HashSet<>();
if (includePrimaryKeys) {
final ResultSet pkrs = conn.getMetaData().getPrimaryKeys(catalog, null, tableName);
while (pkrs.next()) {
final String colName = pkrs.getString("COLUMN_NAME");
primaryKeyColumns.add(normalizeColumnName(colName, translateColumnNames));
}
}
return new TableSchema(cols, translateColumnNames, primaryKeyColumns);
}
}
private static class ColumnDescription {
private final String columnName;
private final int dataType;
private final boolean required;
private final Integer columnSize;
private ColumnDescription(final String columnName, final int dataType, final boolean required, final Integer columnSize) {
this.columnName = columnName;
this.dataType = dataType;
this.required = required;
this.columnSize = columnSize;
}
public int getDataType() {
return dataType;
}
public Integer getColumnSize() {
return columnSize;
}
public String getColumnName() {
return columnName;
}
public boolean isRequired() {
return required;
}
public static ColumnDescription from(final ResultSet resultSet) throws SQLException {
final String columnName = resultSet.getString("COLUMN_NAME");
final int dataType = resultSet.getInt("DATA_TYPE");
final int colSize = resultSet.getInt("COLUMN_SIZE");
final String nullableValue = resultSet.getString("IS_NULLABLE");
final boolean isNullable = "YES".equalsIgnoreCase(nullableValue) || nullableValue.isEmpty();
final String defaultValue = resultSet.getString("COLUMN_DEF");
final String autoIncrementValue = resultSet.getString("IS_AUTOINCREMENT");
final boolean isAutoIncrement = "YES".equalsIgnoreCase(autoIncrementValue);
final boolean required = !isNullable && !isAutoIncrement && defaultValue == null;
return new ColumnDescription(columnName, dataType, required, colSize == 0 ? null : colSize);
}
}
private static class SchemaKey {
private final String catalog;
private final String tableName;
public SchemaKey(final String catalog, final String tableName) {
this.catalog = catalog;
this.tableName = tableName;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((catalog == null) ? 0 : catalog.hashCode());
result = prime * result + ((tableName == null) ? 0 : tableName.hashCode());
return result;
}
@Override
public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final SchemaKey other = (SchemaKey) obj;
if (catalog == null) {
if (other.catalog != null) {
return false;
}
} else if (!catalog.equals(other.catalog)) {
return false;
}
if (tableName == null) {
if (other.tableName != null) {
return false;
}
} else if (!tableName.equals(other.tableName)) {
return false;
}
return true;
}
}
}

View File

@ -0,0 +1,920 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.sql.BatchUpdateException;
import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLNonTransientException;
import java.sql.Statement;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
@SupportsBatching
@SeeAlso(ConvertJSONToSQL.class)
@Tags({"sql", "put", "rdbms", "database", "update", "insert", "relational"})
@CapabilityDescription("Executes a SQL UPDATE or INSERT command. The content of an incoming FlowFile is expected to be the SQL command "
+ "to execute. The SQL command may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes "
+ "with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be "
+ "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format.")
@ReadsAttributes({
@ReadsAttribute(attribute="fragment.identifier", description="If the <Support Fragment Transactions> property is true, this attribute is used to determine whether or "
+ "not two FlowFiles belong to the same transaction."),
@ReadsAttribute(attribute="fragment.count", description="If the <Support Fragment Transactions> property is true, this attribute is used to determine how many FlowFiles "
+ "are needed to complete the transaction."),
@ReadsAttribute(attribute="fragment.index", description="If the <Support Fragment Transactions> property is true, this attribute is used to determine the order that the FlowFiles "
+ "in a transaction should be evaluated."),
@ReadsAttribute(attribute="sql.args.N.type", description="Incoming FlowFiles are expected to be parameterized SQL statements. The type of each Parameter is specified as an integer "
+ "that represents the JDBC Type of the parameter."),
@ReadsAttribute(attribute="sql.args.N.value", description="Incoming FlowFiles are expected to be parameterized SQL statements. The value of the Parameters are specified as "
+ "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute.")
})
@WritesAttributes({
@WritesAttribute(attribute="sql.generated.key", description="If the database generated a key for an INSERT statement and the Obtain Generated Keys property is set to true, "
+ "this attribute will be added to indicate the generated key, if possible. This feature is not supported by all database vendors.")
})
public class PutSQL extends AbstractProcessor {
static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder()
.name("JDBC Connection Pool")
.description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. "
+ "The Connection Pool is necessary in order to determine the appropriate database column types.")
.identifiesControllerService(DBCPService.class)
.required(true)
.build();
static final PropertyDescriptor SUPPORT_TRANSACTIONS = new PropertyDescriptor.Builder()
.name("Support Fragmented Transactions")
.description("If true, when a FlowFile is consumed by this Processor, the Processor will first check the fragment.identifier and fragment.count attributes of that FlowFile. "
+ "If the fragment.count value is greater than 1, the Processor will not process any FlowFile will that fragment.identifier until all are available; "
+ "at that point, it will process all FlowFiles with that fragment.identifier as a single transaction, in the order specified by the FlowFiles' fragment.index attributes. "
+ "This Provides atomicity of those SQL statements. If this value is false, these attributes will be ignored and the updates will occur independent of one another.")
.allowableValues("true", "false")
.defaultValue("true")
.build();
static final PropertyDescriptor TRANSACTION_TIMEOUT = new PropertyDescriptor.Builder()
.name("Transaction Timeout")
.description("If the <Support Fragmented Transactions> property is set to true, specifies how long to wait for all FlowFiles for a particular fragment.identifier attribute "
+ "to arrive before just transferring all of the FlowFiles with that identifier to the 'failure' relationship")
.required(false)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("The preferred number of FlowFiles to put to the database in a single transaction")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("100")
.build();
static final PropertyDescriptor OBTAIN_GENERATED_KEYS = new PropertyDescriptor.Builder()
.name("Obtain Generated Keys")
.description("If true, any key that is automatically generated by the database will be added to the FlowFile that generated it using the sql.generate.key attribute. "
+ "This may result in slightly slower performance and is not supported by all databases.")
.allowableValues("true", "false")
.defaultValue("false")
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A FlowFile is routed to this relationship after the database is successfully updated")
.build();
static final Relationship REL_RETRY = new Relationship.Builder()
.name("retry")
.description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail, "
+ "such as an invalid query or an integrity constraint violation")
.build();
private static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("sql\\.args\\.(\\d+)\\.type");
private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
private static final String FRAGMENT_ID_ATTR = "fragment.identifier";
private static final String FRAGMENT_INDEX_ATTR = "fragment.index";
private static final String FRAGMENT_COUNT_ATTR = "fragment.count";
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(CONNECTION_POOL);
properties.add(SUPPORT_TRANSACTIONS);
properties.add(TRANSACTION_TIMEOUT);
properties.add(BATCH_SIZE);
properties.add(OBTAIN_GENERATED_KEYS);
return properties;
}
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
rels.add(REL_RETRY);
rels.add(REL_FAILURE);
return rels;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final FlowFilePoll poll = pollFlowFiles(context, session);
if (poll == null) {
return;
}
final List<FlowFile> flowFiles = poll.getFlowFiles();
if (flowFiles == null) {
return;
}
final long startNanos = System.nanoTime();
final boolean obtainKeys = context.getProperty(OBTAIN_GENERATED_KEYS).asBoolean();
final Map<String, StatementFlowFileEnclosure> statementMap = new HashMap<>(); // Map SQL to a PreparedStatement and FlowFiles
final List<FlowFile> sentFlowFiles = new ArrayList<>(); // flowfiles that have been sent
final List<FlowFile> processedFlowFiles = new ArrayList<>(); // all flowfiles that we have processed
final Set<StatementFlowFileEnclosure> enclosuresToExecute = new LinkedHashSet<>(); // the enclosures that we've processed
// Because we can have a transaction that is necessary across multiple FlowFiles, things get complicated when
// some FlowFiles have been transferred to a relationship and then there is a failure. As a result, we will just
// map all FlowFiles to their destination relationship and do the session.transfer at the end. This way, if there
// is a failure, we can route all FlowFiles to failure if we need to.
final Map<FlowFile, Relationship> destinationRelationships = new HashMap<>();
final DBCPService dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class);
try (final Connection conn = dbcpService.getConnection()) {
final boolean originalAutoCommit = conn.getAutoCommit();
try {
conn.setAutoCommit(false);
for (final FlowFile flowFile : flowFiles) {
processedFlowFiles.add(flowFile);
final String sql = getSQL(session, flowFile);
// Get the appropriate PreparedStatement to use.
final StatementFlowFileEnclosure enclosure;
try {
enclosure = getEnclosure(sql, conn, statementMap, obtainKeys, poll.isFragmentedTransaction());
} catch (final SQLNonTransientException e) {
getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {flowFile, e});
destinationRelationships.put(flowFile, REL_FAILURE);
continue;
}
final PreparedStatement stmt = enclosure.getStatement();
// set the appropriate parameters on the statement.
try {
setParameters(stmt, flowFile.getAttributes());
} catch (final SQLException | ProcessException pe) {
getLogger().error("Cannot update database for {} due to {}; routing to failure", new Object[] {flowFile, pe.toString()}, pe);
destinationRelationships.put(flowFile, REL_FAILURE);
continue;
}
// If we need to obtain keys, we cannot do so in a a Batch Update. So we have to execute the statement and close it.
if (obtainKeys) {
try {
// Execute the actual update.
stmt.executeUpdate();
// attempt to determine the key that was generated, if any. This is not supported by all
// database vendors, so if we cannot determine the generated key (or if the statement is not an INSERT),
// we will just move on without setting the attribute.
FlowFile sentFlowFile = flowFile;
final String generatedKey = determineGeneratedKey(stmt);
if (generatedKey != null) {
sentFlowFile = session.putAttribute(sentFlowFile, "sql.generated.key", generatedKey);
}
stmt.close();
sentFlowFiles.add(sentFlowFile);
} catch (final SQLNonTransientException e) {
getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {flowFile, e});
destinationRelationships.put(flowFile, REL_FAILURE);
continue;
} catch (final SQLException e) {
getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", new Object[] {flowFile, e});
destinationRelationships.put(flowFile, REL_RETRY);
continue;
}
} else {
// We don't need to obtain keys. Just add the statement to the batch.
stmt.addBatch();
enclosure.addFlowFile(flowFile);
enclosuresToExecute.add(enclosure);
}
}
// If we are not trying to obtain the generated keys, we will have
// PreparedStatement's that have batches added to them. We need to execute each batch and close
// the PreparedStatement.
for (final StatementFlowFileEnclosure enclosure : enclosuresToExecute) {
try {
final PreparedStatement stmt = enclosure.getStatement();
stmt.executeBatch();
sentFlowFiles.addAll(enclosure.getFlowFiles());
} catch (final BatchUpdateException e) {
// If we get a BatchUpdateException, then we want to determine which FlowFile caused the failure,
// and route that FlowFile to failure while routing those that finished processing to success and those
// that have not yet been executed to retry. If the FlowFile was
// part of a fragmented transaction, then we must roll back all updates for this connection, because
// other statements may have been successful and been part of this transaction.
final int[] updateCounts = e.getUpdateCounts();
final int offendingFlowFileIndex = updateCounts.length;
final List<FlowFile> batchFlowFiles = enclosure.getFlowFiles();
if (poll.isFragmentedTransaction()) {
// There are potentially multiple statements for this one transaction. As a result,
// we need to roll back the entire transaction and route all of the FlowFiles to failure.
conn.rollback();
final FlowFile offendingFlowFile = batchFlowFiles.get(offendingFlowFileIndex);
getLogger().error("Failed to update database due to a failed batch update. A total of {} FlowFiles are required for this transaction, so routing all to failure. "
+ "Offending FlowFile was {}, which caused the following error: {}", new Object[] {flowFiles.size(), offendingFlowFile, e});
session.transfer(flowFiles, REL_FAILURE);
return;
}
// In the presence of a BatchUpdateException, the driver has the option of either stopping when an error
// occurs, or continuing. If it continues, then it must account for all statements in the batch and for
// those that fail return a Statement.EXECUTE_FAILED for the number of rows updated.
// So we will iterate over all of the update counts returned. If any is equal to Statement.EXECUTE_FAILED,
// we will route the corresponding FlowFile to failure. Otherwise, the FlowFile will go to success
// unless it has not yet been processed (its index in the List > updateCounts.length).
int failureCount = 0;
int successCount = 0;
int retryCount = 0;
for (int i=0; i < updateCounts.length; i++) {
final int updateCount = updateCounts[i];
final FlowFile flowFile = batchFlowFiles.get(i);
if (updateCount == Statement.EXECUTE_FAILED) {
destinationRelationships.put(flowFile, REL_FAILURE);
failureCount++;
} else {
destinationRelationships.put(flowFile, REL_SUCCESS);
successCount++;
}
}
if (failureCount == 0) {
// if no failures found, the driver decided not to execute the statements after the
// failure, so route the last one to failure.
final FlowFile failedFlowFile = batchFlowFiles.get(updateCounts.length);
destinationRelationships.put(failedFlowFile, REL_FAILURE);
failureCount++;
}
if (updateCounts.length < batchFlowFiles.size()) {
final List<FlowFile> unexecuted = batchFlowFiles.subList(updateCounts.length + 1, batchFlowFiles.size());
for (final FlowFile flowFile : unexecuted) {
destinationRelationships.put(flowFile, REL_RETRY);
retryCount++;
}
}
getLogger().error("Failed to update database due to a failed batch update. There were a total of {} FlowFiles that failed, {} that succeeded, "
+ "and {} that were not execute and will be routed to retry; ", new Object[] {failureCount, successCount, retryCount});
} catch (final SQLNonTransientException e) {
getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {enclosure.getFlowFiles(), e});
for (final FlowFile flowFile : enclosure.getFlowFiles()) {
destinationRelationships.put(flowFile, REL_FAILURE);
}
continue;
} catch (final SQLException e) {
getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry",
new Object[] {enclosure.getFlowFiles(), e});
for (final FlowFile flowFile : enclosure.getFlowFiles()) {
destinationRelationships.put(flowFile, REL_RETRY);
}
continue;
} finally {
enclosure.getStatement().close();
}
}
} finally {
try {
conn.commit();
} finally {
// make sure that we try to set the auto commit back to whatever it was.
if (originalAutoCommit) {
try {
conn.setAutoCommit(originalAutoCommit);
} catch (final SQLException se) {
}
}
}
}
// Determine the database URL
String url = "jdbc://unknown-host";
try {
url = conn.getMetaData().getURL();
} catch (final SQLException sqle) {
}
// Emit a Provenance SEND event
final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
for (final FlowFile flowFile : sentFlowFiles) {
session.getProvenanceReporter().send(flowFile, url, transmissionMillis, true);
}
for (final FlowFile flowFile : sentFlowFiles) {
destinationRelationships.put(flowFile, REL_SUCCESS);
}
} catch (final SQLException e) {
// Failed FlowFiles are all of them that we have processed minus those that were successfully sent
final List<FlowFile> failedFlowFiles = processedFlowFiles;
failedFlowFiles.removeAll(sentFlowFiles);
// All FlowFiles yet to be processed is all FlowFiles minus those processed
final List<FlowFile> retry = flowFiles;
retry.removeAll(processedFlowFiles);
final Relationship rel;
if (e instanceof SQLNonTransientException) {
getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {failedFlowFiles, e});
rel = REL_FAILURE;
} else {
getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", new Object[] {failedFlowFiles, e});
rel = REL_RETRY;
}
for (final FlowFile flowFile : failedFlowFiles) {
destinationRelationships.put(flowFile, rel);
}
for (final FlowFile flowFile : retry) {
destinationRelationships.put(flowFile, Relationship.SELF);
}
}
for (final Map.Entry<FlowFile, Relationship> entry : destinationRelationships.entrySet()) {
session.transfer(entry.getKey(), entry.getValue());
}
}
/**
* Pulls a batch of FlowFiles from the incoming queues. If no FlowFiles are available, returns <code>null</code>.
* Otherwise, a List of FlowFiles will be returned.
*
* If all FlowFiles pulled are not eligible to be processed, the FlowFiles will be penalized and transferred back
* to the input queue and an empty List will be returned.
*
* Otherwise, if the Support Fragmented Transactions property is true, all FlowFiles that belong to the same
* transaction will be sorted in the order that they should be evaluated.
*
* @param context the process context for determining properties
* @param session the process session for pulling flowfiles
* @return a FlowFilePoll containing a List of FlowFiles to process, or <code>null</code> if there are no FlowFiles to process
*/
private FlowFilePoll pollFlowFiles(final ProcessContext context, final ProcessSession session) {
// Determine which FlowFile Filter to use in order to obtain FlowFiles.
final boolean useTransactions = context.getProperty(SUPPORT_TRANSACTIONS).asBoolean();
boolean fragmentedTransaction = false;
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
List<FlowFile> flowFiles;
if (useTransactions) {
final TransactionalFlowFileFilter filter = new TransactionalFlowFileFilter();
flowFiles = session.get(filter);
fragmentedTransaction = filter.isFragmentedTransaction();
} else {
flowFiles = session.get(batchSize);
}
if (flowFiles.isEmpty()) {
return null;
}
// If we are supporting fragmented transactions, verify that all FlowFiles are correct
if (fragmentedTransaction) {
final Relationship relationship = determineRelationship(flowFiles, context.getProperty(TRANSACTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS));
if (relationship != null) {
// if transferring back to self, penalize the FlowFiles.
if (relationship == Relationship.SELF) {
// penalize all of the FlowFiles that we are going to route to SELF.
final ListIterator<FlowFile> itr = flowFiles.listIterator();
while (itr.hasNext()) {
final FlowFile flowFile = itr.next();
final FlowFile penalized = session.penalize(flowFile);
itr.remove();
itr.add(penalized);
}
}
session.transfer(flowFiles, relationship);
return null;
}
// sort by fragment index.
Collections.sort(flowFiles, new Comparator<FlowFile>() {
@Override
public int compare(final FlowFile o1, final FlowFile o2) {
return Integer.compare(Integer.parseInt(o1.getAttribute(FRAGMENT_INDEX_ATTR)), Integer.parseInt(o2.getAttribute(FRAGMENT_INDEX_ATTR)));
}
});
}
return new FlowFilePoll(flowFiles, fragmentedTransaction);
}
/**
* Returns the key that was generated from the given statement, or <code>null</code> if no key
* was generated or it could not be determined.
*
* @param stmt the statement that generated a key
* @return the key that was generated from the given statement, or <code>null</code> if no key
* was generated or it could not be determined.
*/
private String determineGeneratedKey(final PreparedStatement stmt) {
try {
final ResultSet generatedKeys = stmt.getGeneratedKeys();
if (generatedKeys != null && generatedKeys.next()) {
return generatedKeys.getString(1);
}
} catch (final SQLException sqle) {
// This is not supported by all vendors. This is a best-effort approach.
}
return null;
}
/**
* Returns the StatementFlowFileEnclosure that should be used for executing the given SQL statement
*
* @param sql the SQL to execute
* @param conn the connection from which a PreparedStatement can be created
* @param stmtMap the existing map of SQL to PreparedStatements
* @param obtainKeys whether or not we need to obtain generated keys for INSERT statements
* @param fragmentedTransaction whether or not the SQL pertains to a fragmented transaction
*
* @return a StatementFlowFileEnclosure to use for executing the given SQL statement
*
* @throws SQLException if unable to create the appropriate PreparedStatement
*/
private StatementFlowFileEnclosure getEnclosure(final String sql, final Connection conn, final Map<String, StatementFlowFileEnclosure> stmtMap,
final boolean obtainKeys, final boolean fragmentedTransaction) throws SQLException {
StatementFlowFileEnclosure enclosure = stmtMap.get(sql);
if (enclosure != null) {
return enclosure;
}
if (obtainKeys) {
// Create a new Prepared Statement, requesting that it return the generated keys.
PreparedStatement stmt = conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS);
if (stmt == null) {
// since we are passing Statement.RETURN_GENERATED_KEYS, calls to conn.prepareStatement will
// in some cases (at least for DerbyDB) return null.
// We will attempt to recompile the statement without the generated keys being returned.
stmt = conn.prepareStatement(sql);
}
// If we need to obtain keys, then we cannot do a Batch Update. In this case,
// we don't need to store the PreparedStatement in the Map because we aren't
// doing an addBatch/executeBatch. Instead, we will use the statement once
// and close it.
return new StatementFlowFileEnclosure(stmt);
} else if (fragmentedTransaction) {
// We cannot use Batch Updates if we have a transaction that spans multiple FlowFiles.
// If we did, we could end up processing the statements out of order. It's quite possible
// that we could refactor the code some to allow for this, but as it is right now, this
// could cause problems. This is because we have a Map<String, StatementFlowFileEnclosure>.
// If we had a transaction that needed to execute Stmt A with some parameters, then Stmt B with
// some parameters, then Stmt A with different parameters, this would become problematic because
// the executeUpdate would be evaluated first for Stmt A (the 1st and 3rd statements, and then
// the second statement would be evaluated).
final PreparedStatement stmt = conn.prepareStatement(sql);
return new StatementFlowFileEnclosure(stmt);
}
final PreparedStatement stmt = conn.prepareStatement(sql);
enclosure = new StatementFlowFileEnclosure(stmt);
stmtMap.put(sql, enclosure);
return enclosure;
}
/**
* Determines the SQL statement that should be executed for the given FlowFile
*
* @param session the session that can be used to access the given FlowFile
* @param flowFile the FlowFile whose SQL statement should be executed
*
* @return the SQL that is associated with the given FlowFile
*/
private String getSQL(final ProcessSession session, final FlowFile flowFile) {
// Read the SQL from the FlowFile's content
final byte[] buffer = new byte[(int) flowFile.getSize()];
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.fillBuffer(in, buffer);
}
});
// Create the PreparedStatement to use for this FlowFile.
final String sql = new String(buffer, StandardCharsets.UTF_8);
return sql;
}
/**
* Sets all of the appropriate parameters on the given PreparedStatement, based on the given FlowFile attributes.
*
* @param stmt the statement to set the parameters on
* @param attributes the attributes from which to derive parameter indices, values, and types
* @throws SQLException if the PreparedStatement throws a SQLException when the appropriate setter is called
*/
private void setParameters(final PreparedStatement stmt, final Map<String, String> attributes) throws SQLException {
for (final Map.Entry<String, String> entry : attributes.entrySet()) {
final String key = entry.getKey();
final Matcher matcher = SQL_TYPE_ATTRIBUTE_PATTERN.matcher(key);
if (matcher.matches()) {
final int parameterIndex = Integer.parseInt(matcher.group(1));
final boolean isNumeric = NUMBER_PATTERN.matcher(entry.getValue()).matches();
if (!isNumeric) {
throw new ProcessException("Value of the " + key + " attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral type");
}
final int jdbcType = Integer.parseInt(entry.getValue());
final String valueAttrName = "sql.args." + parameterIndex + ".value";
final String parameterValue = attributes.get(valueAttrName);
try {
setParameter(stmt, valueAttrName, parameterIndex, parameterValue, jdbcType);
} catch (final NumberFormatException nfe) {
throw new ProcessException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted into the necessary data type", nfe);
}
}
}
}
/**
* Determines which relationship the given FlowFiles should go to, based on a transaction timing out or
* transaction information not being present. If the FlowFiles should be processed and not transferred
* to any particular relationship yet, will return <code>null</code>
*
* @param flowFiles the FlowFiles whose relationship is to be determined
* @param transactionTimeoutMillis the maximum amount of time (in milliseconds) that we should wait
* for all FlowFiles in a transaction to be present before routing to failure
* @return the appropriate relationship to route the FlowFiles to, or <code>null</code> if the FlowFiles
* should instead be processed
*/
Relationship determineRelationship(final List<FlowFile> flowFiles, final Long transactionTimeoutMillis) {
int selectedNumFragments = 0;
final BitSet bitSet = new BitSet();
for (final FlowFile flowFile : flowFiles) {
final String fragmentCount = flowFile.getAttribute(FRAGMENT_COUNT_ATTR);
if (fragmentCount == null && flowFiles.size() == 1) {
return null;
} else if (fragmentCount == null) {
getLogger().error("Cannot process {} because there are {} FlowFiles with the same fragment.identifier "
+ "attribute but not all FlowFiles have a fragment.count attribute; routing all to failure", new Object[] {flowFile, flowFiles.size()});
return REL_FAILURE;
}
final int numFragments;
try {
numFragments = Integer.parseInt(fragmentCount);
} catch (final NumberFormatException nfe) {
getLogger().error("Cannot process {} because the fragment.count attribute has a value of '{}', which is not an integer; "
+ "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount});
return REL_FAILURE;
}
if (numFragments < 1) {
getLogger().error("Cannot process {} because the fragment.count attribute has a value of '{}', which is not a positive integer; "
+ "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount});
return REL_FAILURE;
}
if (selectedNumFragments == 0) {
selectedNumFragments = numFragments;
} else if (numFragments != selectedNumFragments) {
getLogger().error("Cannot process {} because the fragment.count attribute has different values for different FlowFiles with the same fragment.identifier; "
+ "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile});
return REL_FAILURE;
}
final String fragmentIndex = flowFile.getAttribute(FRAGMENT_INDEX_ATTR);
if (fragmentIndex == null) {
getLogger().error("Cannot process {} because the fragment.index attribute is missing; "
+ "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile});
return REL_FAILURE;
}
final int idx;
try {
idx = Integer.parseInt(fragmentIndex);
} catch (final NumberFormatException nfe) {
getLogger().error("Cannot process {} because the fragment.index attribute has a value of '{}', which is not an integer; "
+ "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex});
return REL_FAILURE;
}
if (idx < 0) {
getLogger().error("Cannot process {} because the fragment.index attribute has a value of '{}', which is not a positive integer; "
+ "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex});
return REL_FAILURE;
}
if (bitSet.get(idx)) {
getLogger().error("Cannot process {} because it has the same value for the fragment.index attribute as another FlowFile with the same fragment.identifier; "
+ "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile});
return REL_FAILURE;
}
bitSet.set(idx);
}
if (selectedNumFragments == flowFiles.size()) {
return null; // no relationship to route FlowFiles to yet - process the FlowFiles.
}
long latestQueueTime = 0L;
for (final FlowFile flowFile : flowFiles) {
if (flowFile.getLastQueueDate() != null && flowFile.getLastQueueDate() > latestQueueTime) {
latestQueueTime = flowFile.getLastQueueDate();
}
}
if (transactionTimeoutMillis != null) {
if (latestQueueTime > 0L && System.currentTimeMillis() - latestQueueTime > transactionTimeoutMillis) {
getLogger().error("The transaction timeout has expired for the following FlowFiles; they will be routed to failure: {}", new Object[] {flowFiles});
return REL_FAILURE;
}
}
getLogger().debug("Not enough FlowFiles for transaction. Returning all FlowFiles to queue");
return Relationship.SELF; // not enough FlowFiles for this transaction. Return them all to queue.
}
/**
* Determines how to map the given value to the appropriate JDBC data type and sets the parameter on the
* provided PreparedStatement
*
* @param stmt the PreparedStatement to set the parameter on
* @param attrName the name of the attribute that the parameter is coming from - for logging purposes
* @param parameterIndex the index of the SQL parameter to set
* @param parameterValue the value of the SQL parameter to set
* @param jdbcType the JDBC Type of the SQL parameter to set
* @throws SQLException if the PreparedStatement throws a SQLException when calling the appropriate setter
*/
private void setParameter(final PreparedStatement stmt, final String attrName, final int parameterIndex, final String parameterValue, final int jdbcType) throws SQLException {
if (parameterValue == null) {
stmt.setNull(parameterIndex, jdbcType);
} else {
switch (jdbcType) {
case Types.BIT:
stmt.setBoolean(parameterIndex, Boolean.parseBoolean(parameterValue));
break;
case Types.TINYINT:
stmt.setByte(parameterIndex, Byte.parseByte(parameterValue));
break;
case Types.SMALLINT:
stmt.setShort(parameterIndex, Short.parseShort(parameterValue));
break;
case Types.INTEGER:
stmt.setInt(parameterIndex, Integer.parseInt(parameterValue));
break;
case Types.BIGINT:
stmt.setLong(parameterIndex, Long.parseLong(parameterValue));
break;
case Types.REAL:
stmt.setFloat(parameterIndex, Float.parseFloat(parameterValue));
break;
case Types.FLOAT:
case Types.DOUBLE:
stmt.setDouble(parameterIndex, Double.parseDouble(parameterValue));
break;
case Types.DATE:
stmt.setDate(parameterIndex, new Date(Long.parseLong(parameterValue)));
break;
case Types.TIME:
stmt.setTime(parameterIndex, new Time(Long.parseLong(parameterValue)));
break;
case Types.TIMESTAMP:
stmt.setTimestamp(parameterIndex, new Timestamp(Long.parseLong(parameterValue)));
break;
case Types.CHAR:
case Types.VARCHAR:
case Types.LONGNVARCHAR:
case Types.LONGVARCHAR:
stmt.setString(parameterIndex, parameterValue);
break;
default:
throw new SQLException("The '" + attrName + "' attribute has a value of '" + parameterValue
+ "' and a type of '" + jdbcType + "' but this is not a known data type");
}
}
}
/**
* A FlowFileFilter that is responsible for ensuring that the FlowFiles returned either belong
* to the same "fragmented transaction" (i.e., 1 transaction whose information is fragmented
* across multiple FlowFiles) or that none of the FlowFiles belongs to a fragmented transaction
*/
static class TransactionalFlowFileFilter implements FlowFileFilter {
private String selectedId = null;
private int numSelected = 0;
private boolean ignoreFragmentIdentifiers = false;
public boolean isFragmentedTransaction() {
return !ignoreFragmentIdentifiers;
}
@Override
public FlowFileFilterResult filter(final FlowFile flowFile) {
final String fragmentId = flowFile.getAttribute(FRAGMENT_ID_ATTR);
final String fragCount = flowFile.getAttribute(FRAGMENT_COUNT_ATTR);
// if first FlowFile selected is not part of a fragmented transaction, then
// we accept any FlowFile that is also not part of a fragmented transaction.
if (ignoreFragmentIdentifiers) {
if (fragmentId == null || "1".equals(fragCount)) {
return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
} else {
return FlowFileFilterResult.REJECT_AND_CONTINUE;
}
}
if (fragmentId == null || "1".equals(fragCount)) {
if (selectedId == null) {
// Only one FlowFile in the transaction.
ignoreFragmentIdentifiers = true;
return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
} else {
// we've already selected 1 FlowFile, and this one doesn't match.
return FlowFileFilterResult.REJECT_AND_CONTINUE;
}
}
if (selectedId == null) {
// select this fragment id as the chosen one.
selectedId = fragmentId;
numSelected++;
return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
}
if (selectedId.equals(fragmentId)) {
// fragment id's match. Find out if we have all of the necessary fragments or not.
final int numFragments;
if (NUMBER_PATTERN.matcher(fragCount).matches()) {
numFragments = Integer.parseInt(fragCount);
} else {
numFragments = Integer.MAX_VALUE;
}
if (numSelected >= numFragments - 1) {
// We have all of the fragments we need for this transaction.
return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
} else {
// We still need more fragments for this transaction, so accept this one and continue.
numSelected++;
return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
}
} else {
return FlowFileFilterResult.REJECT_AND_CONTINUE;
}
}
}
/**
* A simple, immutable data structure to hold a List of FlowFiles and an indicator as to whether
* or not those FlowFiles represent a "fragmented transaction" - that is, a collection of FlowFiles
* that all must be executed as a single transaction (we refer to it as a fragment transaction
* because the information for that transaction, including SQL and the parameters, is fragmented
* across multiple FlowFiles).
*/
private static class FlowFilePoll {
private final List<FlowFile> flowFiles;
private final boolean fragmentedTransaction;
public FlowFilePoll(final List<FlowFile> flowFiles, final boolean fragmentedTransaction) {
this.flowFiles = flowFiles;
this.fragmentedTransaction = fragmentedTransaction;
}
public List<FlowFile> getFlowFiles() {
return flowFiles;
}
public boolean isFragmentedTransaction() {
return fragmentedTransaction;
}
}
/**
* A simple, immutable data structure to hold a Prepared Statement and a List of FlowFiles
* for which that statement should be evaluated.
*/
private static class StatementFlowFileEnclosure {
private final PreparedStatement statement;
private final List<FlowFile> flowFiles = new ArrayList<>();
public StatementFlowFileEnclosure(final PreparedStatement statement) {
this.statement = statement;
}
public PreparedStatement getStatement() {
return statement;
}
public List<FlowFile> getFlowFiles() {
return flowFiles;
}
public void addFlowFile(final FlowFile flowFile) {
this.flowFiles.add(flowFile);
}
@Override
public int hashCode() {
return statement.hashCode();
}
@Override
public boolean equals(final Object obj) {
if (obj == null) {
return false;
}
if (obj == this) {
return false;
}
if (!(obj instanceof StatementFlowFileEnclosure)) {
return false;
}
final StatementFlowFileEnclosure other = (StatementFlowFileEnclosure) obj;
return statement.equals(other.getStatement());
}
}
}

View File

@ -0,0 +1,431 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class TestConvertJSONToSQL {
static String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)";
@Rule
public TemporaryFolder folder = new TemporaryFolder();
@BeforeClass
public static void setup() {
System.setProperty("derby.stream.error.file", "target/derby.log");
}
@Test
public void testInsert() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.1.value", "1");
out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
out.assertAttributeEquals("sql.args.2.value", "Mark");
out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.3.value", "48");
out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)");
}
@Test
public void testInsertWithNullValue() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-null-code.json"));
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.1.value", "1");
out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
out.assertAttributeEquals("sql.args.2.value", "Mark");
out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeNotExists("sql.args.3.value");
out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)");
}
@Test
public void testUpdateWithNullValue() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-null-code.json"));
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR));
out.assertAttributeEquals("sql.args.1.value", "Mark");
out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeNotExists("sql.args.2.value");
out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.3.value", "1");
out.assertContentEquals("UPDATE PERSONS SET NAME = ?, CODE = ? WHERE ID = ?");
}
@Test
public void testMultipleInserts() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/persons.json"));
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 5);
final List<MockFlowFile> mffs = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL);
for (final MockFlowFile mff : mffs) {
mff.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)");
for (int i=1; i <= 3; i++) {
mff.assertAttributeExists("sql.args." + i + ".type");
mff.assertAttributeExists("sql.args." + i + ".value");
}
}
}
@Test
public void testUpdateBasedOnPrimaryKey() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR));
out.assertAttributeEquals("sql.args.1.value", "Mark");
out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.2.value", "48");
out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.3.value", "1");
out.assertContentEquals("UPDATE PERSONS SET NAME = ?, CODE = ? WHERE ID = ?");
}
@Test
public void testUnmappedFieldBehavior() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
runner.setProperty(ConvertJSONToSQL.UNMATCHED_FIELD_BEHAVIOR, ConvertJSONToSQL.IGNORE_UNMATCHED_FIELD.getValue());
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-extra-field.json"));
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)");
runner.clearTransferState();
runner.setProperty(ConvertJSONToSQL.UNMATCHED_FIELD_BEHAVIOR, ConvertJSONToSQL.FAIL_UNMATCHED_FIELD.getValue());
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-extra-field.json"));
runner.run();
runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
}
@Test
public void testUpdateBasedOnUpdateKey() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "code");
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.1.value", "1");
out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
out.assertAttributeEquals("sql.args.2.value", "Mark");
out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.3.value", "48");
out.assertContentEquals("UPDATE PERSONS SET ID = ?, NAME = ? WHERE CODE = ?");
}
@Test
public void testUpdateBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code");
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.1.value", "1");
out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
out.assertAttributeEquals("sql.args.2.value", "Mark");
out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.3.value", "48");
out.assertContentEquals("UPDATE PERSONS SET ID = ? WHERE NAME = ? AND CODE = ?");
}
@Test
public void testUpdateWithMissingFieldBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code");
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-without-code.json"));
runner.run();
runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
}
@Test
public void testUpdateWithMalformedJson() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code");
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/malformed-person-extra-comma.json"));
runner.run();
runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
}
@Test
public void testInsertWithMissingField() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code");
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-without-id.json"));
runner.run();
runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
}
/**
* Simple implementation only for testing purposes
*/
private static class MockDBCPService extends AbstractControllerService implements DBCPService {
private final String dbLocation;
public MockDBCPService(final String dbLocation) {
this.dbLocation = dbLocation;
}
@Override
public String getIdentifier() {
return "dbcp";
}
@Override
public Connection getConnection() throws ProcessException {
try {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
final Connection con = DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true");
return con;
} catch (final Exception e) {
e.printStackTrace();
throw new ProcessException("getConnection failed: " + e);
}
}
}
}

View File

@ -0,0 +1,664 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.HashMap;
import java.util.Map;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
public class TestPutSQL {
private static final String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)";
private static final String createPersonsAutoId = "CREATE TABLE PERSONS (id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1), name VARCHAR(100), code INTEGER check(code <= 100))";
@Rule
public TemporaryFolder folder = new TemporaryFolder();
@BeforeClass
public static void setup() {
System.setProperty("derby.stream.error.file", "target/derby.log");
}
@Test
public void testDirectStatements() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 'Mark', 84)".getBytes());
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("Mark", rs.getString(2));
assertEquals(84, rs.getInt(3));
assertFalse(rs.next());
}
}
runner.enqueue("UPDATE PERSONS SET NAME='George' WHERE ID=1".getBytes());
runner.run();
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("George", rs.getString(2));
assertEquals(84, rs.getInt(3));
assertFalse(rs.next());
}
}
}
@Test
public void testInsertWithGeneratedKeys() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersonsAutoId);
}
}
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "true");
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', 84)".getBytes());
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
final MockFlowFile mff = runner.getFlowFilesForRelationship(PutSQL.REL_SUCCESS).get(0);
mff.assertAttributeEquals("sql.generated.key", "1");
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("Mark", rs.getString(2));
assertEquals(84, rs.getInt(3));
assertFalse(rs.next());
}
}
}
@Test
public void testFailInMiddleWithBadStatement() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersonsAutoId);
}
}
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', 84)".getBytes());
runner.enqueue("INSERT INTO PERSONS".getBytes()); // intentionally wrong syntax
runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Tom', 3)".getBytes());
runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Harry', 44)".getBytes());
runner.run();
runner.assertTransferCount(PutSQL.REL_FAILURE, 1);
runner.assertTransferCount(PutSQL.REL_SUCCESS, 3);
}
@Test
public void testFailInMiddleWithBadParameterType() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersonsAutoId);
}
}
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final Map<String, String> goodAttributes = new HashMap<>();
goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
goodAttributes.put("sql.args.1.value", "84");
final Map<String, String> badAttributes = new HashMap<>();
badAttributes.put("sql.args.1.type", String.valueOf(Types.VARCHAR));
badAttributes.put("sql.args.1.value", "hello");
final byte[] data = "INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', ?)".getBytes();
runner.enqueue(data, goodAttributes);
runner.enqueue(data, badAttributes);
runner.enqueue(data, goodAttributes);
runner.enqueue(data, goodAttributes);
runner.run();
runner.assertTransferCount(PutSQL.REL_FAILURE, 1);
runner.assertTransferCount(PutSQL.REL_SUCCESS, 3);
}
@Test
public void testFailInMiddleWithBadParameterValue() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersonsAutoId);
}
}
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final Map<String, String> goodAttributes = new HashMap<>();
goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
goodAttributes.put("sql.args.1.value", "84");
final Map<String, String> badAttributes = new HashMap<>();
badAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
badAttributes.put("sql.args.1.value", "9999");
final byte[] data = "INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', ?)".getBytes();
runner.enqueue(data, goodAttributes);
runner.enqueue(data, badAttributes);
runner.enqueue(data, goodAttributes);
runner.enqueue(data, goodAttributes);
runner.run();
runner.assertTransferCount(PutSQL.REL_SUCCESS, 1);
runner.assertTransferCount(PutSQL.REL_FAILURE, 1);
runner.assertTransferCount(PutSQL.REL_RETRY, 2);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("Mark", rs.getString(2));
assertEquals(84, rs.getInt(3));
assertFalse(rs.next());
}
}
}
@Test
public void testStatementsWithPreparedParameters() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "1");
attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
attributes.put("sql.args.2.value", "Mark");
attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.3.value", "84");
runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("Mark", rs.getString(2));
assertEquals(84, rs.getInt(3));
assertFalse(rs.next());
}
}
runner.clearTransferState();
attributes.clear();
attributes.put("sql.args.1.type", String.valueOf(Types.VARCHAR));
attributes.put("sql.args.1.value", "George");
attributes.put("sql.args.2.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.2.value", "1");
runner.enqueue("UPDATE PERSONS SET NAME=? WHERE ID=?".getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("George", rs.getString(2));
assertEquals(84, rs.getInt(3));
assertFalse(rs.next());
}
}
}
@Test
public void testMultipleStatementsWithinFlowFile() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
"UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "1");
attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
attributes.put("sql.args.2.value", "Mark");
attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.3.value", "84");
attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.4.value", "1");
runner.enqueue(sql.getBytes(), attributes);
runner.run();
// should fail because of the semicolon
runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertFalse(rs.next());
}
}
}
@Test
public void testWithNullParameter() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "1");
attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
attributes.put("sql.args.2.value", "Mark");
attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("Mark", rs.getString(2));
assertEquals(0, rs.getInt(3));
assertFalse(rs.next());
}
}
}
@Test
public void testInvalidStatement() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
"UPDATE SOME_RANDOM_TABLE NAME='George' WHERE ID=?; ";
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "1");
attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
attributes.put("sql.args.2.value", "Mark");
attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.3.value", "84");
attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.4.value", "1");
runner.enqueue(sql.getBytes(), attributes);
runner.run();
// should fail because of the semicolon
runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertFalse(rs.next());
}
}
}
@Test
public void testRetryableFailure() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final DBCPService service = new SQLExceptionService(null);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
"UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "1");
attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
attributes.put("sql.args.2.value", "Mark");
attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.3.value", "84");
attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.4.value", "1");
runner.enqueue(sql.getBytes(), attributes);
runner.run();
// should fail because of the semicolon
runner.assertAllFlowFilesTransferred(PutSQL.REL_RETRY, 1);
}
@Test
public void testMultipleFlowFilesSuccessfulInTransaction() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(PutSQL.BATCH_SIZE, "1");
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "1");
attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
attributes.put("sql.args.2.value", "Mark");
attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.3.value", "84");
attributes.put("fragment.identifier", "1");
attributes.put("fragment.count", "2");
attributes.put("fragment.index", "0");
runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes);
runner.run();
// No FlowFiles should be transferred because there were not enough flowfiles with the same fragment identifier
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 0);
attributes.clear();
attributes.put("fragment.identifier", "1");
attributes.put("fragment.count", "2");
attributes.put("fragment.index", "1");
runner.clearTransferState();
runner.enqueue("UPDATE PERSONS SET NAME='Leonard' WHERE ID=1".getBytes(), attributes);
runner.run();
// Both FlowFiles with fragment identifier 1 should be successful
runner.assertTransferCount(PutSQL.REL_SUCCESS, 2);
runner.assertTransferCount(PutSQL.REL_FAILURE, 0);
runner.assertTransferCount(PutSQL.REL_RETRY, 0);
for (final MockFlowFile mff : runner.getFlowFilesForRelationship(PutSQL.REL_SUCCESS)) {
mff.assertAttributeEquals("fragment.identifier", "1");
}
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("Leonard", rs.getString(2));
assertEquals(84, rs.getInt(3));
assertFalse(rs.next());
}
}
}
@Test
public void testTransactionTimeout() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.TRANSACTION_TIMEOUT, "5 secs");
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final Map<String, String> attributes = new HashMap<>();
attributes.put("fragment.identifier", "1");
attributes.put("fragment.count", "2");
attributes.put("fragment.index", "0");
final MockFlowFile mff = new MockFlowFile(0L) {
@Override
public Long getLastQueueDate() {
return System.currentTimeMillis() - 10000L; // return 10 seconds ago
}
@Override
public Map<String, String> getAttributes() {
return attributes;
}
@Override
public String getAttribute(final String attrName) {
return attributes.get(attrName);
}
};
runner.enqueue(mff);
runner.run();
// No FlowFiles should be transferred because there were not enough flowfiles with the same fragment identifier
runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1);
}
/**
* Simple implementation only for testing purposes
*/
private static class MockDBCPService extends AbstractControllerService implements DBCPService {
private final String dbLocation;
public MockDBCPService(final String dbLocation) {
this.dbLocation = dbLocation;
}
@Override
public String getIdentifier() {
return "dbcp";
}
@Override
public Connection getConnection() throws ProcessException {
try {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
final Connection conn = DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true");
return conn;
} catch (final Exception e) {
e.printStackTrace();
throw new ProcessException("getConnection failed: " + e);
}
}
}
/**
* Simple implementation only for testing purposes
*/
private static class SQLExceptionService extends AbstractControllerService implements DBCPService {
private final DBCPService service;
private int allowedBeforeFailure = 0;
private int successful = 0;
public SQLExceptionService(final DBCPService service) {
this.service = service;
}
@Override
public String getIdentifier() {
return "dbcp";
}
@Override
public Connection getConnection() throws ProcessException {
try {
if (++successful > allowedBeforeFailure) {
final Connection conn = Mockito.mock(Connection.class);
Mockito.when(conn.prepareStatement(Mockito.any(String.class))).thenThrow(new SQLException("Unit Test Generated SQLException"));
return conn;
} else {
return service.getConnection();
}
} catch (final Exception e) {
e.printStackTrace();
throw new ProcessException("getConnection failed: " + e);
}
}
}
}

View File

@ -0,0 +1,6 @@
{
"id": 1,
"name": "Mark",
"code": 48,
"extra": "another"
}

View File

@ -0,0 +1,21 @@
[{
"id": 1,
"name": "Mark",
"code": 48
}, {
"id": 2,
"name": "George",
"code": 48
}, {
"id": 3,
"name": "Harry",
"code": 21
}, {
"id": 4,
"name": "Julie",
"code": 48
}, {
"id": 82,
"name": "Frank Henry",
"code": 16
}]

View File

@ -1,600 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.ObjectHolder;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
@SideEffectFree
@SupportsBatching
@SeeAlso(PutSQL.class)
@Tags({"json", "sql", "database", "rdbms", "insert", "update", "relational", "flat"})
@CapabilityDescription("Converts a JSON-formatted FlowFile into an UPDATE or INSERT SQL statement. The incoming FlowFile is expected to be "
+ "\"flat\" JSON message, meaning that it consists of a single JSON element and each field maps to a simple type. If a field maps to "
+ "a JSON object, that JSON object will be interpreted as Text. Upon successful conversion, the original FlowFile is routed to the 'original' "
+ "relationship and the SQL is routed to the 'sql' relationship.")
public class ConvertFlatJSONToSQL extends AbstractProcessor {
private static final String UPDATE_TYPE = "UPDATE";
private static final String INSERT_TYPE = "INSERT";
static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder()
.name("JDBC Connection Pool")
.description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. "
+ "The Connection Pool is necessary in order to determine the appropriate database column types.")
.identifiesControllerService(DBCPService.class)
.required(true)
.build();
static final PropertyDescriptor STATEMENT_TYPE = new PropertyDescriptor.Builder()
.name("Statement Type")
.description("Specifies the type of SQL Statement to generate")
.required(true)
.allowableValues(UPDATE_TYPE, INSERT_TYPE)
.build();
static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
.name("Table Name")
.description("The name of the table that the statement should update")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor CATALOG_NAME = new PropertyDescriptor.Builder()
.name("Catalog Name")
.description("The name of the catalog that the statement should update. This may not apply for the database that you are updating. In this case, leave the field empty")
.required(false)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor TRANSLATE_FIELD_NAMES = new PropertyDescriptor.Builder()
.name("Translate Field Names")
.description("If true, the Processor will attempt to translate JSON field names into the appropriate column names for the table specified. "
+ "If false, the JSON field names must match the column names exactly, or the column will not be updated")
.allowableValues("true", "false")
.defaultValue("true")
.build();
static final PropertyDescriptor UPDATE_KEY = new PropertyDescriptor.Builder()
.name("Update Keys")
.description("A comma-separated list of column names that uniquely identifies a row in the database for UPDATE statements. "
+ "If the Statement Type is UPDATE and this property is not set, the table's Primary Keys are used. "
+ "In this case, if no Primary Key exists, the conversion to SQL will fail. "
+ "This property is ignored if the Statement Type is INSERT")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.expressionLanguageSupported(true)
.build();
static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("When a FlowFile is converted to SQL, the original JSON FlowFile is routed to this relationship")
.build();
static final Relationship REL_SQL = new Relationship.Builder()
.name("sql")
.description("A FlowFile is routed to this relationship when its contents have successfully been converted into a SQL statement")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("A FlowFile is routed to this relationship if it cannot be converted into a SQL statement. Common causes include invalid JSON "
+ "content or the JSON content missing a required field (if using an INSERT statement type).")
.build();
private final Map<SchemaKey, TableSchema> schemaCache = new LinkedHashMap<SchemaKey, TableSchema>(100) {
private static final long serialVersionUID = 1L;
@Override
protected boolean removeEldestEntry(Map.Entry<SchemaKey,TableSchema> eldest) {
return true;
}
};
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(CONNECTION_POOL);
properties.add(STATEMENT_TYPE);
properties.add(TABLE_NAME);
properties.add(CATALOG_NAME);
properties.add(TRANSLATE_FIELD_NAMES);
properties.add(UPDATE_KEY);
return properties;
}
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_ORIGINAL);
rels.add(REL_SQL);
rels.add(REL_FAILURE);
return rels;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
synchronized (this) {
schemaCache.clear();
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final boolean translateFieldNames = context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean();
final String statementType = context.getProperty(STATEMENT_TYPE).getValue();
final String updateKeys = context.getProperty(UPDATE_KEY).evaluateAttributeExpressions(flowFile).getValue();
final String catalog = context.getProperty(CATALOG_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final SchemaKey schemaKey = new SchemaKey(catalog, tableName);
final boolean includePrimaryKeys = UPDATE_TYPE.equals(statementType) && updateKeys == null;
// get the database schema from the cache, if one exists
TableSchema schema;
synchronized (this) {
schema = schemaCache.get(schemaKey);
if (schema == null) {
// No schema exists for this table yet. Query the database to determine the schema and put it into the cache.
final DBCPService dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class);
try (final Connection conn = dbcpService.getConnection()) {
schema = TableSchema.from(conn, catalog, tableName, translateFieldNames, includePrimaryKeys);
schemaCache.put(schemaKey, schema);
} catch (final SQLException e) {
getLogger().error("Failed to convert {} into a SQL statement due to {}; routing to failure", new Object[] {flowFile, e.toString()}, e);
session.transfer(flowFile, REL_FAILURE);
return;
}
}
}
// Parse the JSON document
final ObjectMapper mapper = new ObjectMapper();
final ObjectHolder<JsonNode> rootNodeRef = new ObjectHolder<>(null);
try {
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
try (final InputStream bufferedIn = new BufferedInputStream(in)) {
rootNodeRef.set(mapper.readTree(bufferedIn));
}
}
});
} catch (final ProcessException pe) {
getLogger().error("Failed to parse {} as JSON due to {}; routing to failure", new Object[] {flowFile, pe.toString()}, pe);
session.transfer(flowFile, REL_FAILURE);
return;
}
final JsonNode rootNode = rootNodeRef.get();
final String sql;
final Map<String, String> attributes = new HashMap<>();
if (INSERT_TYPE.equals(statementType)) {
try {
sql = generateInsert(rootNode, attributes, tableName, schema, translateFieldNames);
} catch (final ProcessException pe) {
getLogger().error("Failed to convert {} to a SQL INSERT statement due to {}; routing to failure",
new Object[] { flowFile, pe.toString() }, pe);
session.transfer(flowFile, REL_FAILURE);
return;
}
} else {
try {
sql = generateUpdate(rootNode, attributes, tableName, updateKeys, schema, translateFieldNames);
} catch (final ProcessException pe) {
getLogger().error("Failed to convert {} to a SQL UPDATE statement due to {}; routing to failure",
new Object[] { flowFile, pe.toString() }, pe);
session.transfer(flowFile, REL_FAILURE);
return;
}
}
FlowFile sqlFlowFile = session.create(flowFile);
sqlFlowFile = session.write(sqlFlowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(sql.getBytes(StandardCharsets.UTF_8));
}
});
attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
attributes.put("sql.table", tableName);
if (catalog != null) {
attributes.put("sql.catalog", catalog);
}
sqlFlowFile = session.putAllAttributes(sqlFlowFile, attributes);
session.transfer(flowFile, REL_ORIGINAL);
session.transfer(sqlFlowFile, REL_SQL);
}
private Set<String> getNormalizedColumnNames(final JsonNode node, final boolean translateFieldNames) {
final Set<String> normalizedFieldNames = new HashSet<>();
final Iterator<String> fieldNameItr = node.getFieldNames();
while (fieldNameItr.hasNext()) {
normalizedFieldNames.add(normalizeColumnName(fieldNameItr.next(), translateFieldNames));
}
return normalizedFieldNames;
}
private String generateInsert(final JsonNode rootNode, final Map<String, String> attributes, final String tableName,
final TableSchema schema, final boolean translateFieldNames) {
final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames);
for (final String requiredColName : schema.getRequiredColumnNames()) {
final String normalizedColName = normalizeColumnName(requiredColName, translateFieldNames);
if (!normalizedFieldNames.contains(normalizedColName)) {
throw new ProcessException("JSON does not have a value for the Required column '" + requiredColName + "'");
}
}
final StringBuilder sqlBuilder = new StringBuilder();
int fieldCount = 0;
sqlBuilder.append("INSERT INTO ").append(tableName).append(" (");
// iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as
// adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the
// columns that we are inserting into
final Iterator<String> fieldNames = rootNode.getFieldNames();
while (fieldNames.hasNext()) {
final String fieldName = fieldNames.next();
final ColumnDescription desc = schema.getColumns().get(normalizeColumnName(fieldName, translateFieldNames));
if (desc != null) {
if (fieldCount++ > 0) {
sqlBuilder.append(", ");
}
sqlBuilder.append(desc.getColumnName());
final int sqlType = desc.getDataType();
attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType));
final Integer colSize = desc.getColumnSize();
String fieldValue = rootNode.get(fieldName).asText();
if (colSize != null && fieldValue.length() > colSize) {
fieldValue = fieldValue.substring(0, colSize);
}
attributes.put("sql.args." + fieldCount + ".value", fieldValue);
}
}
// complete the SQL statements by adding ?'s for all of the values to be escaped.
sqlBuilder.append(") VALUES (");
for (int i=0; i < fieldCount; i++) {
if (i > 0) {
sqlBuilder.append(", ");
}
sqlBuilder.append("?");
}
sqlBuilder.append(")");
if (fieldCount == 0) {
throw new ProcessException("None of the fields in the JSON map to the columns defined by the " + tableName + " table");
}
return sqlBuilder.toString();
}
private String generateUpdate(final JsonNode rootNode, final Map<String, String> attributes, final String tableName, final String updateKeys,
final TableSchema schema, final boolean translateFieldNames) {
final Set<String> updateKeyNames;
if (updateKeys == null) {
updateKeyNames = schema.getPrimaryKeyColumnNames();
} else {
updateKeyNames = new HashSet<>();
for (final String updateKey : updateKeys.split(",")) {
updateKeyNames.add(updateKey.trim());
}
}
if (updateKeyNames.isEmpty()) {
throw new ProcessException("Table '" + tableName + "' does not have a Primary Key and no Update Keys were specified");
}
final StringBuilder sqlBuilder = new StringBuilder();
int fieldCount = 0;
sqlBuilder.append("UPDATE ").append(tableName).append(" SET ");
// Create a Set of all normalized Update Key names, and ensure that there is a field in the JSON
// for each of the Update Key fields.
final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames);
final Set<String> normalizedUpdateNames = new HashSet<>();
for (final String uk : updateKeyNames) {
final String normalizedUK = normalizeColumnName(uk, translateFieldNames);
normalizedUpdateNames.add(normalizedUK);
if (!normalizedFieldNames.contains(normalizedUK)) {
throw new ProcessException("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'");
}
}
// iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as
// adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the
// columns that we are inserting into
Iterator<String> fieldNames = rootNode.getFieldNames();
while (fieldNames.hasNext()) {
final String fieldName = fieldNames.next();
final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames);
final ColumnDescription desc = schema.getColumns().get(normalizedColName);
if (desc == null) {
continue;
}
// Check if this column is an Update Key. If so, skip it for now. We will come
// back to it after we finish the SET clause
if (normalizedUpdateNames.contains(normalizedColName)) {
continue;
}
if (fieldCount++ > 0) {
sqlBuilder.append(", ");
}
sqlBuilder.append(desc.getColumnName()).append(" = ?");
final int sqlType = desc.getDataType();
attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType));
final Integer colSize = desc.getColumnSize();
String fieldValue = rootNode.get(fieldName).asText();
if (colSize != null && fieldValue.length() > colSize) {
fieldValue = fieldValue.substring(0, colSize);
}
attributes.put("sql.args." + fieldCount + ".value", fieldValue);
}
// Set the WHERE clause based on the Update Key values
sqlBuilder.append(" WHERE ");
fieldNames = rootNode.getFieldNames();
int whereFieldCount = 0;
while (fieldNames.hasNext()) {
final String fieldName = fieldNames.next();
final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames);
final ColumnDescription desc = schema.getColumns().get(normalizedColName);
if (desc == null) {
continue;
}
// Check if this column is a Update Key. If so, skip it for now. We will come
// back to it after we finish the SET clause
if (!normalizedUpdateNames.contains(normalizedColName)) {
continue;
}
if (whereFieldCount++ > 0) {
sqlBuilder.append(" AND ");
}
fieldCount++;
sqlBuilder.append(normalizedColName).append(" = ?");
final int sqlType = desc.getDataType();
attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType));
final Integer colSize = desc.getColumnSize();
String fieldValue = rootNode.get(fieldName).asText();
if (colSize != null && fieldValue.length() > colSize) {
fieldValue = fieldValue.substring(0, colSize);
}
attributes.put("sql.args." + fieldCount + ".value", fieldValue);
}
return sqlBuilder.toString();
}
private static String normalizeColumnName(final String colName, final boolean translateColumnNames) {
return translateColumnNames ? colName.toUpperCase().replace("_", "") : colName;
}
private static class TableSchema {
private List<String> requiredColumnNames;
private Set<String> primaryKeyColumnNames;
private Map<String, ColumnDescription> columns;
private TableSchema(final List<ColumnDescription> columnDescriptions, final boolean translateColumnNames,
final Set<String> primaryKeyColumnNames) {
this.columns = new HashMap<>();
this.primaryKeyColumnNames = primaryKeyColumnNames;
this.requiredColumnNames = new ArrayList<>();
for (final ColumnDescription desc : columnDescriptions) {
columns.put(ConvertFlatJSONToSQL.normalizeColumnName(desc.columnName, translateColumnNames), desc);
if (desc.isRequired()) {
requiredColumnNames.add(desc.columnName);
}
}
}
public Map<String, ColumnDescription> getColumns() {
return columns;
}
public List<String> getRequiredColumnNames() {
return requiredColumnNames;
}
public Set<String> getPrimaryKeyColumnNames() {
return primaryKeyColumnNames;
}
public static TableSchema from(final Connection conn, final String catalog, final String tableName,
final boolean translateColumnNames, final boolean includePrimaryKeys) throws SQLException {
final ResultSet colrs = conn.getMetaData().getColumns(catalog, null, tableName, "%");
final List<ColumnDescription> cols = new ArrayList<>();
while (colrs.next()) {
final ColumnDescription col = ColumnDescription.from(colrs);
cols.add(col);
}
final Set<String> primaryKeyColumns = new HashSet<>();
if (includePrimaryKeys) {
final ResultSet pkrs = conn.getMetaData().getPrimaryKeys(catalog, null, tableName);
while (pkrs.next()) {
final String colName = pkrs.getString("COLUMN_NAME");
primaryKeyColumns.add(normalizeColumnName(colName, translateColumnNames));
}
}
return new TableSchema(cols, translateColumnNames, primaryKeyColumns);
}
}
private static class ColumnDescription {
private final String columnName;
private final int dataType;
private final boolean required;
private final Integer columnSize;
private ColumnDescription(final String columnName, final int dataType, final boolean required, final Integer columnSize) {
this.columnName = columnName;
this.dataType = dataType;
this.required = required;
this.columnSize = columnSize;
}
public int getDataType() {
return dataType;
}
public Integer getColumnSize() {
return columnSize;
}
public String getColumnName() {
return columnName;
}
public boolean isRequired() {
return required;
}
public static ColumnDescription from(final ResultSet resultSet) throws SQLException {
final String columnName = resultSet.getString("COLUMN_NAME");
final int dataType = resultSet.getInt("DATA_TYPE");
final int colSize = resultSet.getInt("COLUMN_SIZE");
final String nullableValue = resultSet.getString("IS_NULLABLE");
final boolean isNullable = "YES".equalsIgnoreCase(nullableValue) || nullableValue.isEmpty();
final String defaultValue = resultSet.getString("COLUMN_DEF");
final String autoIncrementValue = resultSet.getString("IS_AUTOINCREMENT");
final boolean isAutoIncrement = "YES".equalsIgnoreCase(autoIncrementValue);
final boolean required = !isNullable && !isAutoIncrement && defaultValue == null;
return new ColumnDescription(columnName, dataType, required, colSize == 0 ? null : colSize);
}
}
private static class SchemaKey {
private final String catalog;
private final String tableName;
public SchemaKey(final String catalog, final String tableName) {
this.catalog = catalog;
this.tableName = tableName;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((catalog == null) ? 0 : catalog.hashCode());
result = prime * result + ((tableName == null) ? 0 : tableName.hashCode());
return result;
}
@Override
public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final SchemaKey other = (SchemaKey) obj;
if (catalog == null) {
if (other.catalog != null) {
return false;
}
} else if (!catalog.equals(other.catalog)) {
return false;
}
if (tableName == null) {
if (other.tableName != null) {
return false;
}
} else if (!tableName.equals(other.tableName)) {
return false;
}
return true;
}
}
}

View File

@ -1,203 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.stream.io.StreamUtils;
@SupportsBatching
@SeeAlso(ConvertFlatJSONToSQL.class)
@Tags({"sql", "put", "rdbms", "database", "update", "insert", "relational"})
@CapabilityDescription("Executes a SQL UPDATE or INSERT command. The content of an incoming FlowFile is expected to be the SQL command "
+ "to execute. The SQL command may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes "
+ "with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be "
+ "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format.")
public class PutSQL extends AbstractProcessor {
static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder()
.name("JDBC Connection Pool")
.description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. "
+ "The Connection Pool is necessary in order to determine the appropriate database column types.")
.identifiesControllerService(DBCPService.class)
.required(true)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A FlowFile is routed to this relationship after the database is successfully updated")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("A FlowFile is routed to this relationship if the database cannot be updated for any reason")
.build();
private static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("sql\\.args\\.(\\d+)\\.type");
private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(CONNECTION_POOL);
return properties;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
// TODO: Batch. Pull in 50 or 100 at a time and map content of FlowFile to Set<FlowFile> that have that
// same content. Then execute updates in batches.
final DBCPService dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class);
try (final Connection conn = dbcpService.getConnection()) {
// Read the SQL from the FlowFile's content
final byte[] buffer = new byte[(int) flowFile.getSize()];
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.fillBuffer(in, buffer);
}
});
final String sql = new String(buffer, StandardCharsets.UTF_8);
// Create a prepared statement and set the appropriate parameters on the statement.
try (final PreparedStatement stmt = conn.prepareStatement(sql)) {
for (final Map.Entry<String, String> entry : flowFile.getAttributes().entrySet()) {
final String key = entry.getKey();
final Matcher matcher = SQL_TYPE_ATTRIBUTE_PATTERN.matcher(key);
if (matcher.matches()) {
final int parameterIndex = Integer.parseInt(matcher.group(1));
final boolean isNumeric = NUMBER_PATTERN.matcher(entry.getValue()).matches();
if (!isNumeric) {
getLogger().error("Cannot update database for {} because the value of the '{}' attribute is '{}', which is not a valid JDBC numeral type; routing to failure",
new Object[] {flowFile, key, entry.getValue()});
session.transfer(flowFile, REL_FAILURE);
return;
}
final int jdbcType = Integer.parseInt(entry.getValue());
final String valueAttrName = "sql.args." + parameterIndex + ".value";
final String parameterValue = flowFile.getAttribute(valueAttrName);
if (parameterValue == null) {
getLogger().error("Cannot update database for {} because the '{}' attribute exists but the '{}' attribute does not", new Object[] {flowFile, key, valueAttrName});
session.transfer(flowFile, REL_FAILURE);
return;
}
try {
setParameter(stmt, valueAttrName, parameterIndex, parameterValue, jdbcType);
} catch (final NumberFormatException nfe) {
getLogger().error("Cannot update database for {} because the '{}' attribute has a value of '{}', "
+ "which cannot be converted into the necessary data type; routing to failure", new Object[] {flowFile, valueAttrName, parameterValue});
session.transfer(flowFile, REL_FAILURE);
return;
}
}
}
final int updatedRowCount = stmt.executeUpdate();
flowFile = session.putAttribute(flowFile, "sql.update.count", String.valueOf(updatedRowCount));
}
// TODO: Need to expose Connection URL from DBCP Service and use it to emit a Provenance Event
session.transfer(flowFile, REL_SUCCESS);
} catch (final SQLException e) {
getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {flowFile, e});
session.transfer(flowFile, REL_FAILURE);
return;
}
}
private void setParameter(final PreparedStatement stmt, final String attrName, final int parameterIndex, final String parameterValue, final int jdbcType) throws SQLException {
switch (jdbcType) {
case Types.BIT:
stmt.setBoolean(parameterIndex, Boolean.parseBoolean(parameterValue));
break;
case Types.TINYINT:
stmt.setByte(parameterIndex, Byte.parseByte(parameterValue));
break;
case Types.SMALLINT:
stmt.setShort(parameterIndex, Short.parseShort(parameterValue));
break;
case Types.INTEGER:
stmt.setInt(parameterIndex, Integer.parseInt(parameterValue));
break;
case Types.BIGINT:
stmt.setLong(parameterIndex, Long.parseLong(parameterValue));
break;
case Types.REAL:
stmt.setFloat(parameterIndex, Float.parseFloat(parameterValue));
break;
case Types.FLOAT:
case Types.DOUBLE:
stmt.setDouble(parameterIndex, Double.parseDouble(parameterValue));
break;
case Types.DATE:
stmt.setDate(parameterIndex, new Date(Long.parseLong(parameterValue)));
break;
case Types.TIME:
stmt.setTime(parameterIndex, new Time(Long.parseLong(parameterValue)));
break;
case Types.TIMESTAMP:
stmt.setTimestamp(parameterIndex, new Timestamp(Long.parseLong(parameterValue)));
break;
case Types.CHAR:
case Types.VARCHAR:
case Types.LONGNVARCHAR:
case Types.LONGVARCHAR:
stmt.setString(parameterIndex, parameterValue);
break;
default:
throw new SQLException("The '" + attrName + "' attribute has a value of '" + parameterValue
+ "' and a type of '" + jdbcType + "' but this is not a known data type");
}
}
}

View File

@ -1,294 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class TestConvertFlatJSONToSQL {
static String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)";
@Rule
public TemporaryFolder folder = new TemporaryFolder();
@BeforeClass
public static void setup() {
System.setProperty("derby.stream.error.file", "target/derby.log");
}
@Test
public void testInsert() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "INSERT");
runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-1.json"));
runner.run();
runner.assertTransferCount(ConvertFlatJSONToSQL.REL_ORIGINAL, 1);
runner.assertTransferCount(ConvertFlatJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertFlatJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.1.value", "1");
out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
out.assertAttributeEquals("sql.args.2.value", "Mark");
out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.3.value", "48");
out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)");
}
@Test
public void testUpdateBasedOnPrimaryKey() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "UPDATE");
runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-1.json"));
runner.run();
runner.assertTransferCount(ConvertFlatJSONToSQL.REL_ORIGINAL, 1);
runner.assertTransferCount(ConvertFlatJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertFlatJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR));
out.assertAttributeEquals("sql.args.1.value", "Mark");
out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.2.value", "48");
out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.3.value", "1");
out.assertContentEquals("UPDATE PERSONS SET NAME = ?, CODE = ? WHERE ID = ?");
}
@Test
public void testUpdateBasedOnUpdateKey() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "UPDATE");
runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "code");
runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-1.json"));
runner.run();
runner.assertTransferCount(ConvertFlatJSONToSQL.REL_ORIGINAL, 1);
runner.assertTransferCount(ConvertFlatJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertFlatJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.1.value", "1");
out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
out.assertAttributeEquals("sql.args.2.value", "Mark");
out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.3.value", "48");
out.assertContentEquals("UPDATE PERSONS SET ID = ?, NAME = ? WHERE CODE = ?");
}
@Test
public void testUpdateBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "UPDATE");
runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "name, code");
runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-1.json"));
runner.run();
runner.assertTransferCount(ConvertFlatJSONToSQL.REL_ORIGINAL, 1);
runner.assertTransferCount(ConvertFlatJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertFlatJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.1.value", "1");
out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
out.assertAttributeEquals("sql.args.2.value", "Mark");
out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.3.value", "48");
out.assertContentEquals("UPDATE PERSONS SET ID = ? WHERE NAME = ? AND CODE = ?");
}
@Test
public void testUpdateWithMissingFieldBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "UPDATE");
runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "name, code");
runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-without-code.json"));
runner.run();
runner.assertAllFlowFilesTransferred(ConvertFlatJSONToSQL.REL_FAILURE, 1);
}
@Test
public void testUpdateWithMalformedJson() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "UPDATE");
runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "name, code");
runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/malformed-person-extra-comma.json"));
runner.run();
runner.assertAllFlowFilesTransferred(ConvertFlatJSONToSQL.REL_FAILURE, 1);
}
@Test
public void testInsertWithMissingField() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "INSERT");
runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "name, code");
runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-without-id.json"));
runner.run();
runner.assertAllFlowFilesTransferred(ConvertFlatJSONToSQL.REL_FAILURE, 1);
}
/**
* Simple implementation only for testing purposes
*/
private static class MockDBCPService extends AbstractControllerService implements DBCPService {
private final String dbLocation;
public MockDBCPService(final String dbLocation) {
this.dbLocation = dbLocation;
}
@Override
public String getIdentifier() {
return "dbcp";
}
@Override
public Connection getConnection() throws ProcessException {
try {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
final Connection con = DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true");
return con;
} catch (final Exception e) {
e.printStackTrace();
throw new ProcessException("getConnection failed: " + e);
}
}
}
}

View File

@ -1,246 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.HashMap;
import java.util.Map;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class TestPutSQL {
static String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)";
@Rule
public TemporaryFolder folder = new TemporaryFolder();
@BeforeClass
public static void setup() {
System.setProperty("derby.stream.error.file", "target/derby.log");
}
@Test
public void testDirectStatements() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 'Mark', 84)".getBytes());
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("Mark", rs.getString(2));
assertEquals(84, rs.getInt(3));
assertFalse(rs.next());
}
}
runner.enqueue("UPDATE PERSONS SET NAME='George' WHERE ID=1".getBytes());
runner.run();
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("George", rs.getString(2));
assertEquals(84, rs.getInt(3));
assertFalse(rs.next());
}
}
}
@Test
public void testStatementsWithPreparedParameters() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "1");
attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
attributes.put("sql.args.2.value", "Mark");
attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.3.value", "84");
runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("Mark", rs.getString(2));
assertEquals(84, rs.getInt(3));
assertFalse(rs.next());
}
}
runner.clearTransferState();
attributes.clear();
attributes.put("sql.args.1.type", String.valueOf(Types.VARCHAR));
attributes.put("sql.args.1.value", "George");
attributes.put("sql.args.2.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.2.value", "1");
runner.enqueue("UPDATE PERSONS SET NAME=? WHERE ID=?".getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("George", rs.getString(2));
assertEquals(84, rs.getInt(3));
assertFalse(rs.next());
}
}
}
@Test
public void testMultipleStatementsWithinFlowFile() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
"UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "1");
attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
attributes.put("sql.args.2.value", "Mark");
attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.3.value", "84");
attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.4.value", "1");
runner.enqueue(sql.getBytes(), attributes);
runner.run();
// should fail because of the semicolon
runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertFalse(rs.next());
}
}
}
/**
* Simple implementation only for testing purposes
*/
private static class MockDBCPService extends AbstractControllerService implements DBCPService {
private final String dbLocation;
public MockDBCPService(final String dbLocation) {
this.dbLocation = dbLocation;
}
@Override
public String getIdentifier() {
return "dbcp";
}
@Override
public Connection getConnection() throws ProcessException {
try {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
final Connection con = DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true");
return con;
} catch (final Exception e) {
e.printStackTrace();
throw new ProcessException("getConnection failed: " + e);
}
}
}
}

View File

@ -670,6 +670,11 @@
<artifactId>jasypt</artifactId>
<version>1.9.2</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>