NIFI-293: Added processors ConvertFlatJSONToSQL, PutSQL

This commit is contained in:
Mark Payne 2015-08-12 17:17:12 -04:00
parent 16dc5d5fd9
commit 1a37c95f58
11 changed files with 1389 additions and 0 deletions

View File

@ -114,6 +114,29 @@ The following binary components are provided under the Apache Software License v
Apache Tika Core
Copyright 2007-2015 The Apache Software Foundation
(ASLv2) Jackson JSON processor
The following NOTICE information applies:
# Jackson JSON processor
Jackson is a high-performance, Free/Open Source JSON processing library.
It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
been in development since 2007.
It is currently developed by a community of developers, as well as supported
commercially by FasterXML.com.
## Licensing
Jackson core and extension components may licensed under different licenses.
To find the details that apply to this artifact see the accompanying LICENSE file.
For more information, including possible other licensing options, contact
FasterXML.com (http://fasterxml.com).
## Credits
A list of contributors may be found from CREDITS file, which is included
in some artifacts (usually source distributions); but is always available
from the source code management (SCM) system project uses.
************************
Common Development and Distribution License 1.1
************************

View File

@ -180,6 +180,10 @@ language governing permissions and limitations under the License. -->
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>

View File

@ -16,6 +16,7 @@ org.apache.nifi.processors.standard.Base64EncodeContent
org.apache.nifi.processors.standard.CompressContent
org.apache.nifi.processors.standard.ControlRate
org.apache.nifi.processors.standard.ConvertCharacterSet
org.apache.nifi.processors.standard.ConvertJSONToSQL
org.apache.nifi.processors.standard.DetectDuplicate
org.apache.nifi.processors.standard.DistributeLoad
org.apache.nifi.processors.standard.DuplicateFlowFile
@ -52,6 +53,7 @@ org.apache.nifi.processors.standard.PutFile
org.apache.nifi.processors.standard.PutFTP
org.apache.nifi.processors.standard.PutJMS
org.apache.nifi.processors.standard.PutSFTP
org.apache.nifi.processors.standard.PutSQL
org.apache.nifi.processors.standard.ReplaceText
org.apache.nifi.processors.standard.ReplaceTextWithMapping
org.apache.nifi.processors.standard.RouteOnAttribute

View File

@ -0,0 +1,600 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.ObjectHolder;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
@SideEffectFree
@SupportsBatching
@SeeAlso(PutSQL.class)
@Tags({"json", "sql", "database", "rdbms", "insert", "update", "relational", "flat"})
@CapabilityDescription("Converts a JSON-formatted FlowFile into an UPDATE or INSERT SQL statement. The incoming FlowFile is expected to be "
+ "\"flat\" JSON message, meaning that it consists of a single JSON element and each field maps to a simple type. If a field maps to "
+ "a JSON object, that JSON object will be interpreted as Text. Upon successful conversion, the original FlowFile is routed to the 'original' "
+ "relationship and the SQL is routed to the 'sql' relationship.")
public class ConvertFlatJSONToSQL extends AbstractProcessor {
private static final String UPDATE_TYPE = "UPDATE";
private static final String INSERT_TYPE = "INSERT";
static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder()
.name("JDBC Connection Pool")
.description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. "
+ "The Connection Pool is necessary in order to determine the appropriate database column types.")
.identifiesControllerService(DBCPService.class)
.required(true)
.build();
static final PropertyDescriptor STATEMENT_TYPE = new PropertyDescriptor.Builder()
.name("Statement Type")
.description("Specifies the type of SQL Statement to generate")
.required(true)
.allowableValues(UPDATE_TYPE, INSERT_TYPE)
.build();
static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
.name("Table Name")
.description("The name of the table that the statement should update")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor CATALOG_NAME = new PropertyDescriptor.Builder()
.name("Catalog Name")
.description("The name of the catalog that the statement should update. This may not apply for the database that you are updating. In this case, leave the field empty")
.required(false)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor TRANSLATE_FIELD_NAMES = new PropertyDescriptor.Builder()
.name("Translate Field Names")
.description("If true, the Processor will attempt to translate JSON field names into the appropriate column names for the table specified. "
+ "If false, the JSON field names must match the column names exactly, or the column will not be updated")
.allowableValues("true", "false")
.defaultValue("true")
.build();
static final PropertyDescriptor UPDATE_KEY = new PropertyDescriptor.Builder()
.name("Update Keys")
.description("A comma-separated list of column names that uniquely identifies a row in the database for UPDATE statements. "
+ "If the Statement Type is UPDATE and this property is not set, the table's Primary Keys are used. "
+ "In this case, if no Primary Key exists, the conversion to SQL will fail. "
+ "This property is ignored if the Statement Type is INSERT")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.expressionLanguageSupported(true)
.build();
static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("When a FlowFile is converted to SQL, the original JSON FlowFile is routed to this relationship")
.build();
static final Relationship REL_SQL = new Relationship.Builder()
.name("sql")
.description("A FlowFile is routed to this relationship when its contents have successfully been converted into a SQL statement")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("A FlowFile is routed to this relationship if it cannot be converted into a SQL statement. Common causes include invalid JSON "
+ "content or the JSON content missing a required field (if using an INSERT statement type).")
.build();
private final Map<SchemaKey, TableSchema> schemaCache = new LinkedHashMap<SchemaKey, TableSchema>(100) {
private static final long serialVersionUID = 1L;
@Override
protected boolean removeEldestEntry(Map.Entry<SchemaKey,TableSchema> eldest) {
return true;
}
};
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(CONNECTION_POOL);
properties.add(STATEMENT_TYPE);
properties.add(TABLE_NAME);
properties.add(CATALOG_NAME);
properties.add(TRANSLATE_FIELD_NAMES);
properties.add(UPDATE_KEY);
return properties;
}
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_ORIGINAL);
rels.add(REL_SQL);
rels.add(REL_FAILURE);
return rels;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
synchronized (this) {
schemaCache.clear();
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final boolean translateFieldNames = context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean();
final String statementType = context.getProperty(STATEMENT_TYPE).getValue();
final String updateKeys = context.getProperty(UPDATE_KEY).evaluateAttributeExpressions(flowFile).getValue();
final String catalog = context.getProperty(CATALOG_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final SchemaKey schemaKey = new SchemaKey(catalog, tableName);
final boolean includePrimaryKeys = UPDATE_TYPE.equals(statementType) && updateKeys == null;
// get the database schema from the cache, if one exists
TableSchema schema;
synchronized (this) {
schema = schemaCache.get(schemaKey);
if (schema == null) {
// No schema exists for this table yet. Query the database to determine the schema and put it into the cache.
final DBCPService dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class);
try (final Connection conn = dbcpService.getConnection()) {
schema = TableSchema.from(conn, catalog, tableName, translateFieldNames, includePrimaryKeys);
schemaCache.put(schemaKey, schema);
} catch (final SQLException e) {
getLogger().error("Failed to convert {} into a SQL statement due to {}; routing to failure", new Object[] {flowFile, e.toString()}, e);
session.transfer(flowFile, REL_FAILURE);
return;
}
}
}
// Parse the JSON document
final ObjectMapper mapper = new ObjectMapper();
final ObjectHolder<JsonNode> rootNodeRef = new ObjectHolder<>(null);
try {
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
try (final InputStream bufferedIn = new BufferedInputStream(in)) {
rootNodeRef.set(mapper.readTree(bufferedIn));
}
}
});
} catch (final ProcessException pe) {
getLogger().error("Failed to parse {} as JSON due to {}; routing to failure", new Object[] {flowFile, pe.toString()}, pe);
session.transfer(flowFile, REL_FAILURE);
return;
}
final JsonNode rootNode = rootNodeRef.get();
final String sql;
final Map<String, String> attributes = new HashMap<>();
if (INSERT_TYPE.equals(statementType)) {
try {
sql = generateInsert(rootNode, attributes, tableName, schema, translateFieldNames);
} catch (final ProcessException pe) {
getLogger().error("Failed to convert {} to a SQL INSERT statement due to {}; routing to failure",
new Object[] { flowFile, pe.toString() }, pe);
session.transfer(flowFile, REL_FAILURE);
return;
}
} else {
try {
sql = generateUpdate(rootNode, attributes, tableName, updateKeys, schema, translateFieldNames);
} catch (final ProcessException pe) {
getLogger().error("Failed to convert {} to a SQL UPDATE statement due to {}; routing to failure",
new Object[] { flowFile, pe.toString() }, pe);
session.transfer(flowFile, REL_FAILURE);
return;
}
}
FlowFile sqlFlowFile = session.create(flowFile);
sqlFlowFile = session.write(sqlFlowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(sql.getBytes(StandardCharsets.UTF_8));
}
});
attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
attributes.put("sql.table", tableName);
if (catalog != null) {
attributes.put("sql.catalog", catalog);
}
sqlFlowFile = session.putAllAttributes(sqlFlowFile, attributes);
session.transfer(flowFile, REL_ORIGINAL);
session.transfer(sqlFlowFile, REL_SQL);
}
private Set<String> getNormalizedColumnNames(final JsonNode node, final boolean translateFieldNames) {
final Set<String> normalizedFieldNames = new HashSet<>();
final Iterator<String> fieldNameItr = node.getFieldNames();
while (fieldNameItr.hasNext()) {
normalizedFieldNames.add(normalizeColumnName(fieldNameItr.next(), translateFieldNames));
}
return normalizedFieldNames;
}
private String generateInsert(final JsonNode rootNode, final Map<String, String> attributes, final String tableName,
final TableSchema schema, final boolean translateFieldNames) {
final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames);
for (final String requiredColName : schema.getRequiredColumnNames()) {
final String normalizedColName = normalizeColumnName(requiredColName, translateFieldNames);
if (!normalizedFieldNames.contains(normalizedColName)) {
throw new ProcessException("JSON does not have a value for the Required column '" + requiredColName + "'");
}
}
final StringBuilder sqlBuilder = new StringBuilder();
int fieldCount = 0;
sqlBuilder.append("INSERT INTO ").append(tableName).append(" (");
// iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as
// adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the
// columns that we are inserting into
final Iterator<String> fieldNames = rootNode.getFieldNames();
while (fieldNames.hasNext()) {
final String fieldName = fieldNames.next();
final ColumnDescription desc = schema.getColumns().get(normalizeColumnName(fieldName, translateFieldNames));
if (desc != null) {
if (fieldCount++ > 0) {
sqlBuilder.append(", ");
}
sqlBuilder.append(desc.getColumnName());
final int sqlType = desc.getDataType();
attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType));
final Integer colSize = desc.getColumnSize();
String fieldValue = rootNode.get(fieldName).asText();
if (colSize != null && fieldValue.length() > colSize) {
fieldValue = fieldValue.substring(0, colSize);
}
attributes.put("sql.args." + fieldCount + ".value", fieldValue);
}
}
// complete the SQL statements by adding ?'s for all of the values to be escaped.
sqlBuilder.append(") VALUES (");
for (int i=0; i < fieldCount; i++) {
if (i > 0) {
sqlBuilder.append(", ");
}
sqlBuilder.append("?");
}
sqlBuilder.append(")");
if (fieldCount == 0) {
throw new ProcessException("None of the fields in the JSON map to the columns defined by the " + tableName + " table");
}
return sqlBuilder.toString();
}
private String generateUpdate(final JsonNode rootNode, final Map<String, String> attributes, final String tableName, final String updateKeys,
final TableSchema schema, final boolean translateFieldNames) {
final Set<String> updateKeyNames;
if (updateKeys == null) {
updateKeyNames = schema.getPrimaryKeyColumnNames();
} else {
updateKeyNames = new HashSet<>();
for (final String updateKey : updateKeys.split(",")) {
updateKeyNames.add(updateKey.trim());
}
}
if (updateKeyNames.isEmpty()) {
throw new ProcessException("Table '" + tableName + "' does not have a Primary Key and no Update Keys were specified");
}
final StringBuilder sqlBuilder = new StringBuilder();
int fieldCount = 0;
sqlBuilder.append("UPDATE ").append(tableName).append(" SET ");
// Create a Set of all normalized Update Key names, and ensure that there is a field in the JSON
// for each of the Update Key fields.
final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames);
final Set<String> normalizedUpdateNames = new HashSet<>();
for (final String uk : updateKeyNames) {
final String normalizedUK = normalizeColumnName(uk, translateFieldNames);
normalizedUpdateNames.add(normalizedUK);
if (!normalizedFieldNames.contains(normalizedUK)) {
throw new ProcessException("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'");
}
}
// iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as
// adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the
// columns that we are inserting into
Iterator<String> fieldNames = rootNode.getFieldNames();
while (fieldNames.hasNext()) {
final String fieldName = fieldNames.next();
final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames);
final ColumnDescription desc = schema.getColumns().get(normalizedColName);
if (desc == null) {
continue;
}
// Check if this column is an Update Key. If so, skip it for now. We will come
// back to it after we finish the SET clause
if (normalizedUpdateNames.contains(normalizedColName)) {
continue;
}
if (fieldCount++ > 0) {
sqlBuilder.append(", ");
}
sqlBuilder.append(desc.getColumnName()).append(" = ?");
final int sqlType = desc.getDataType();
attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType));
final Integer colSize = desc.getColumnSize();
String fieldValue = rootNode.get(fieldName).asText();
if (colSize != null && fieldValue.length() > colSize) {
fieldValue = fieldValue.substring(0, colSize);
}
attributes.put("sql.args." + fieldCount + ".value", fieldValue);
}
// Set the WHERE clause based on the Update Key values
sqlBuilder.append(" WHERE ");
fieldNames = rootNode.getFieldNames();
int whereFieldCount = 0;
while (fieldNames.hasNext()) {
final String fieldName = fieldNames.next();
final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames);
final ColumnDescription desc = schema.getColumns().get(normalizedColName);
if (desc == null) {
continue;
}
// Check if this column is a Update Key. If so, skip it for now. We will come
// back to it after we finish the SET clause
if (!normalizedUpdateNames.contains(normalizedColName)) {
continue;
}
if (whereFieldCount++ > 0) {
sqlBuilder.append(" AND ");
}
fieldCount++;
sqlBuilder.append(normalizedColName).append(" = ?");
final int sqlType = desc.getDataType();
attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType));
final Integer colSize = desc.getColumnSize();
String fieldValue = rootNode.get(fieldName).asText();
if (colSize != null && fieldValue.length() > colSize) {
fieldValue = fieldValue.substring(0, colSize);
}
attributes.put("sql.args." + fieldCount + ".value", fieldValue);
}
return sqlBuilder.toString();
}
private static String normalizeColumnName(final String colName, final boolean translateColumnNames) {
return translateColumnNames ? colName.toUpperCase().replace("_", "") : colName;
}
private static class TableSchema {
private List<String> requiredColumnNames;
private Set<String> primaryKeyColumnNames;
private Map<String, ColumnDescription> columns;
private TableSchema(final List<ColumnDescription> columnDescriptions, final boolean translateColumnNames,
final Set<String> primaryKeyColumnNames) {
this.columns = new HashMap<>();
this.primaryKeyColumnNames = primaryKeyColumnNames;
this.requiredColumnNames = new ArrayList<>();
for (final ColumnDescription desc : columnDescriptions) {
columns.put(ConvertFlatJSONToSQL.normalizeColumnName(desc.columnName, translateColumnNames), desc);
if (desc.isRequired()) {
requiredColumnNames.add(desc.columnName);
}
}
}
public Map<String, ColumnDescription> getColumns() {
return columns;
}
public List<String> getRequiredColumnNames() {
return requiredColumnNames;
}
public Set<String> getPrimaryKeyColumnNames() {
return primaryKeyColumnNames;
}
public static TableSchema from(final Connection conn, final String catalog, final String tableName,
final boolean translateColumnNames, final boolean includePrimaryKeys) throws SQLException {
final ResultSet colrs = conn.getMetaData().getColumns(catalog, null, tableName, "%");
final List<ColumnDescription> cols = new ArrayList<>();
while (colrs.next()) {
final ColumnDescription col = ColumnDescription.from(colrs);
cols.add(col);
}
final Set<String> primaryKeyColumns = new HashSet<>();
if (includePrimaryKeys) {
final ResultSet pkrs = conn.getMetaData().getPrimaryKeys(catalog, null, tableName);
while (pkrs.next()) {
final String colName = pkrs.getString("COLUMN_NAME");
primaryKeyColumns.add(normalizeColumnName(colName, translateColumnNames));
}
}
return new TableSchema(cols, translateColumnNames, primaryKeyColumns);
}
}
private static class ColumnDescription {
private final String columnName;
private final int dataType;
private final boolean required;
private final Integer columnSize;
private ColumnDescription(final String columnName, final int dataType, final boolean required, final Integer columnSize) {
this.columnName = columnName;
this.dataType = dataType;
this.required = required;
this.columnSize = columnSize;
}
public int getDataType() {
return dataType;
}
public Integer getColumnSize() {
return columnSize;
}
public String getColumnName() {
return columnName;
}
public boolean isRequired() {
return required;
}
public static ColumnDescription from(final ResultSet resultSet) throws SQLException {
final String columnName = resultSet.getString("COLUMN_NAME");
final int dataType = resultSet.getInt("DATA_TYPE");
final int colSize = resultSet.getInt("COLUMN_SIZE");
final String nullableValue = resultSet.getString("IS_NULLABLE");
final boolean isNullable = "YES".equalsIgnoreCase(nullableValue) || nullableValue.isEmpty();
final String defaultValue = resultSet.getString("COLUMN_DEF");
final String autoIncrementValue = resultSet.getString("IS_AUTOINCREMENT");
final boolean isAutoIncrement = "YES".equalsIgnoreCase(autoIncrementValue);
final boolean required = !isNullable && !isAutoIncrement && defaultValue == null;
return new ColumnDescription(columnName, dataType, required, colSize == 0 ? null : colSize);
}
}
private static class SchemaKey {
private final String catalog;
private final String tableName;
public SchemaKey(final String catalog, final String tableName) {
this.catalog = catalog;
this.tableName = tableName;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((catalog == null) ? 0 : catalog.hashCode());
result = prime * result + ((tableName == null) ? 0 : tableName.hashCode());
return result;
}
@Override
public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final SchemaKey other = (SchemaKey) obj;
if (catalog == null) {
if (other.catalog != null) {
return false;
}
} else if (!catalog.equals(other.catalog)) {
return false;
}
if (tableName == null) {
if (other.tableName != null) {
return false;
}
} else if (!tableName.equals(other.tableName)) {
return false;
}
return true;
}
}
}

View File

@ -0,0 +1,203 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.stream.io.StreamUtils;
@SupportsBatching
@SeeAlso(ConvertFlatJSONToSQL.class)
@Tags({"sql", "put", "rdbms", "database", "update", "insert", "relational"})
@CapabilityDescription("Executes a SQL UPDATE or INSERT command. The content of an incoming FlowFile is expected to be the SQL command "
+ "to execute. The SQL command may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes "
+ "with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be "
+ "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format.")
public class PutSQL extends AbstractProcessor {
static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder()
.name("JDBC Connection Pool")
.description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. "
+ "The Connection Pool is necessary in order to determine the appropriate database column types.")
.identifiesControllerService(DBCPService.class)
.required(true)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A FlowFile is routed to this relationship after the database is successfully updated")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("A FlowFile is routed to this relationship if the database cannot be updated for any reason")
.build();
private static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("sql\\.args\\.(\\d+)\\.type");
private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(CONNECTION_POOL);
return properties;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
// TODO: Batch. Pull in 50 or 100 at a time and map content of FlowFile to Set<FlowFile> that have that
// same content. Then execute updates in batches.
final DBCPService dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class);
try (final Connection conn = dbcpService.getConnection()) {
// Read the SQL from the FlowFile's content
final byte[] buffer = new byte[(int) flowFile.getSize()];
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.fillBuffer(in, buffer);
}
});
final String sql = new String(buffer, StandardCharsets.UTF_8);
// Create a prepared statement and set the appropriate parameters on the statement.
try (final PreparedStatement stmt = conn.prepareStatement(sql)) {
for (final Map.Entry<String, String> entry : flowFile.getAttributes().entrySet()) {
final String key = entry.getKey();
final Matcher matcher = SQL_TYPE_ATTRIBUTE_PATTERN.matcher(key);
if (matcher.matches()) {
final int parameterIndex = Integer.parseInt(matcher.group(1));
final boolean isNumeric = NUMBER_PATTERN.matcher(entry.getValue()).matches();
if (!isNumeric) {
getLogger().error("Cannot update database for {} because the value of the '{}' attribute is '{}', which is not a valid JDBC numeral type; routing to failure",
new Object[] {flowFile, key, entry.getValue()});
session.transfer(flowFile, REL_FAILURE);
return;
}
final int jdbcType = Integer.parseInt(entry.getValue());
final String valueAttrName = "sql.args." + parameterIndex + ".value";
final String parameterValue = flowFile.getAttribute(valueAttrName);
if (parameterValue == null) {
getLogger().error("Cannot update database for {} because the '{}' attribute exists but the '{}' attribute does not", new Object[] {flowFile, key, valueAttrName});
session.transfer(flowFile, REL_FAILURE);
return;
}
try {
setParameter(stmt, valueAttrName, parameterIndex, parameterValue, jdbcType);
} catch (final NumberFormatException nfe) {
getLogger().error("Cannot update database for {} because the '{}' attribute has a value of '{}', "
+ "which cannot be converted into the necessary data type; routing to failure", new Object[] {flowFile, valueAttrName, parameterValue});
session.transfer(flowFile, REL_FAILURE);
return;
}
}
}
final int updatedRowCount = stmt.executeUpdate();
flowFile = session.putAttribute(flowFile, "sql.update.count", String.valueOf(updatedRowCount));
}
// TODO: Need to expose Connection URL from DBCP Service and use it to emit a Provenance Event
session.transfer(flowFile, REL_SUCCESS);
} catch (final SQLException e) {
getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {flowFile, e});
session.transfer(flowFile, REL_FAILURE);
return;
}
}
private void setParameter(final PreparedStatement stmt, final String attrName, final int parameterIndex, final String parameterValue, final int jdbcType) throws SQLException {
switch (jdbcType) {
case Types.BIT:
stmt.setBoolean(parameterIndex, Boolean.parseBoolean(parameterValue));
break;
case Types.TINYINT:
stmt.setByte(parameterIndex, Byte.parseByte(parameterValue));
break;
case Types.SMALLINT:
stmt.setShort(parameterIndex, Short.parseShort(parameterValue));
break;
case Types.INTEGER:
stmt.setInt(parameterIndex, Integer.parseInt(parameterValue));
break;
case Types.BIGINT:
stmt.setLong(parameterIndex, Long.parseLong(parameterValue));
break;
case Types.REAL:
stmt.setFloat(parameterIndex, Float.parseFloat(parameterValue));
break;
case Types.FLOAT:
case Types.DOUBLE:
stmt.setDouble(parameterIndex, Double.parseDouble(parameterValue));
break;
case Types.DATE:
stmt.setDate(parameterIndex, new Date(Long.parseLong(parameterValue)));
break;
case Types.TIME:
stmt.setTime(parameterIndex, new Time(Long.parseLong(parameterValue)));
break;
case Types.TIMESTAMP:
stmt.setTimestamp(parameterIndex, new Timestamp(Long.parseLong(parameterValue)));
break;
case Types.CHAR:
case Types.VARCHAR:
case Types.LONGNVARCHAR:
case Types.LONGVARCHAR:
stmt.setString(parameterIndex, parameterValue);
break;
default:
throw new SQLException("The '" + attrName + "' attribute has a value of '" + parameterValue
+ "' and a type of '" + jdbcType + "' but this is not a known data type");
}
}
}

View File

@ -0,0 +1,294 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class TestConvertFlatJSONToSQL {
static String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)";
@Rule
public TemporaryFolder folder = new TemporaryFolder();
@BeforeClass
public static void setup() {
System.setProperty("derby.stream.error.file", "target/derby.log");
}
@Test
public void testInsert() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "INSERT");
runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-1.json"));
runner.run();
runner.assertTransferCount(ConvertFlatJSONToSQL.REL_ORIGINAL, 1);
runner.assertTransferCount(ConvertFlatJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertFlatJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.1.value", "1");
out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
out.assertAttributeEquals("sql.args.2.value", "Mark");
out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.3.value", "48");
out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)");
}
@Test
public void testUpdateBasedOnPrimaryKey() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "UPDATE");
runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-1.json"));
runner.run();
runner.assertTransferCount(ConvertFlatJSONToSQL.REL_ORIGINAL, 1);
runner.assertTransferCount(ConvertFlatJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertFlatJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR));
out.assertAttributeEquals("sql.args.1.value", "Mark");
out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.2.value", "48");
out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.3.value", "1");
out.assertContentEquals("UPDATE PERSONS SET NAME = ?, CODE = ? WHERE ID = ?");
}
@Test
public void testUpdateBasedOnUpdateKey() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "UPDATE");
runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "code");
runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-1.json"));
runner.run();
runner.assertTransferCount(ConvertFlatJSONToSQL.REL_ORIGINAL, 1);
runner.assertTransferCount(ConvertFlatJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertFlatJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.1.value", "1");
out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
out.assertAttributeEquals("sql.args.2.value", "Mark");
out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.3.value", "48");
out.assertContentEquals("UPDATE PERSONS SET ID = ?, NAME = ? WHERE CODE = ?");
}
@Test
public void testUpdateBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "UPDATE");
runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "name, code");
runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-1.json"));
runner.run();
runner.assertTransferCount(ConvertFlatJSONToSQL.REL_ORIGINAL, 1);
runner.assertTransferCount(ConvertFlatJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertFlatJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.1.value", "1");
out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
out.assertAttributeEquals("sql.args.2.value", "Mark");
out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.3.value", "48");
out.assertContentEquals("UPDATE PERSONS SET ID = ? WHERE NAME = ? AND CODE = ?");
}
@Test
public void testUpdateWithMissingFieldBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "UPDATE");
runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "name, code");
runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-without-code.json"));
runner.run();
runner.assertAllFlowFilesTransferred(ConvertFlatJSONToSQL.REL_FAILURE, 1);
}
@Test
public void testUpdateWithMalformedJson() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "UPDATE");
runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "name, code");
runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/malformed-person-extra-comma.json"));
runner.run();
runner.assertAllFlowFilesTransferred(ConvertFlatJSONToSQL.REL_FAILURE, 1);
}
@Test
public void testInsertWithMissingField() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "INSERT");
runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "name, code");
runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-without-id.json"));
runner.run();
runner.assertAllFlowFilesTransferred(ConvertFlatJSONToSQL.REL_FAILURE, 1);
}
/**
* Simple implementation only for testing purposes
*/
private static class MockDBCPService extends AbstractControllerService implements DBCPService {
private final String dbLocation;
public MockDBCPService(final String dbLocation) {
this.dbLocation = dbLocation;
}
@Override
public String getIdentifier() {
return "dbcp";
}
@Override
public Connection getConnection() throws ProcessException {
try {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
final Connection con = DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true");
return con;
} catch (final Exception e) {
e.printStackTrace();
throw new ProcessException("getConnection failed: " + e);
}
}
}
}

View File

@ -0,0 +1,246 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.HashMap;
import java.util.Map;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class TestPutSQL {
static String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)";
@Rule
public TemporaryFolder folder = new TemporaryFolder();
@BeforeClass
public static void setup() {
System.setProperty("derby.stream.error.file", "target/derby.log");
}
@Test
public void testDirectStatements() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 'Mark', 84)".getBytes());
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("Mark", rs.getString(2));
assertEquals(84, rs.getInt(3));
assertFalse(rs.next());
}
}
runner.enqueue("UPDATE PERSONS SET NAME='George' WHERE ID=1".getBytes());
runner.run();
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("George", rs.getString(2));
assertEquals(84, rs.getInt(3));
assertFalse(rs.next());
}
}
}
@Test
public void testStatementsWithPreparedParameters() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "1");
attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
attributes.put("sql.args.2.value", "Mark");
attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.3.value", "84");
runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("Mark", rs.getString(2));
assertEquals(84, rs.getInt(3));
assertFalse(rs.next());
}
}
runner.clearTransferState();
attributes.clear();
attributes.put("sql.args.1.type", String.valueOf(Types.VARCHAR));
attributes.put("sql.args.1.value", "George");
attributes.put("sql.args.2.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.2.value", "1");
runner.enqueue("UPDATE PERSONS SET NAME=? WHERE ID=?".getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("George", rs.getString(2));
assertEquals(84, rs.getInt(3));
assertFalse(rs.next());
}
}
}
@Test
public void testMultipleStatementsWithinFlowFile() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
"UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "1");
attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
attributes.put("sql.args.2.value", "Mark");
attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.3.value", "84");
attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.4.value", "1");
runner.enqueue(sql.getBytes(), attributes);
runner.run();
// should fail because of the semicolon
runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertFalse(rs.next());
}
}
}
/**
* Simple implementation only for testing purposes
*/
private static class MockDBCPService extends AbstractControllerService implements DBCPService {
private final String dbLocation;
public MockDBCPService(final String dbLocation) {
this.dbLocation = dbLocation;
}
@Override
public String getIdentifier() {
return "dbcp";
}
@Override
public Connection getConnection() throws ProcessException {
try {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
final Connection con = DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true");
return con;
} catch (final Exception e) {
e.printStackTrace();
throw new ProcessException("getConnection failed: " + e);
}
}
}
}