NIFI-978: Support parameterized statements in ExecuteSQL

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2433.
This commit is contained in:
Matthew Burgess 2018-01-25 10:24:17 -05:00 committed by Pierre Villard
parent b4a9f52a4e
commit b5ca7adbb9
4 changed files with 319 additions and 289 deletions

View File

@ -16,13 +16,11 @@
*/ */
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import java.io.IOException; import java.nio.charset.Charset;
import java.io.InputStream;
import java.io.OutputStream;
import java.sql.Connection; import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
@ -35,6 +33,8 @@ import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -50,8 +50,6 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.JdbcCommon; import org.apache.nifi.processors.standard.util.JdbcCommon;
import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StopWatch;
@ -64,11 +62,32 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC
@EventDriven @EventDriven
@InputRequirement(Requirement.INPUT_ALLOWED) @InputRequirement(Requirement.INPUT_ALLOWED)
@Tags({"sql", "select", "jdbc", "query", "database"}) @Tags({"sql", "select", "jdbc", "query", "database"})
@CapabilityDescription("Execute provided SQL select query. Query result will be converted to Avro format." @CapabilityDescription("Executes provided SQL select query. Query result will be converted to Avro format."
+ " Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on " + " Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on "
+ "a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. " + "a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. "
+ "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the " + "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the "
+ "select query. FlowFile attribute 'executesql.row.count' indicates how many rows were selected.") + "select query, and the query may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes "
+ "with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be "
+ "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format. "
+ "FlowFile attribute 'executesql.row.count' indicates how many rows were selected.")
@ReadsAttributes({
@ReadsAttribute(attribute = "sql.args.N.type", description = "Incoming FlowFiles are expected to be parametrized SQL statements. The type of each Parameter is specified as an integer "
+ "that represents the JDBC Type of the parameter."),
@ReadsAttribute(attribute = "sql.args.N.value", description = "Incoming FlowFiles are expected to be parametrized SQL statements. The value of the Parameters are specified as "
+ "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute."),
@ReadsAttribute(attribute = "sql.args.N.format", description = "This attribute is always optional, but default options may not always work for your data. "
+ "Incoming FlowFiles are expected to be parametrized SQL statements. In some cases "
+ "a format option needs to be specified, currently this is only applicable for binary data types, dates, times and timestamps. Binary Data Types (defaults to 'ascii') - "
+ "ascii: each string character in your attribute value represents a single byte. This is the format provided by Avro Processors. "
+ "base64: the string is a Base64 encoded string that can be decoded to bytes. "
+ "hex: the string is hex encoded with all letters in upper case and no '0x' at the beginning. "
+ "Dates/Times/Timestamps - "
+ "Date, Time and Timestamp formats all support both custom formats or named format ('yyyy-MM-dd','ISO_OFFSET_DATE_TIME') "
+ "as specified according to java.time.format.DateTimeFormatter. "
+ "If not specified, a long value input is expected to be an unix epoch (milli seconds from 1970/1/1), or a string value in "
+ "'yyyy-MM-dd' format for Date, 'HH:mm:ss.SSS' for Time (some database engines e.g. Derby or MySQL do not support milliseconds and will truncate milliseconds), "
+ "'yyyy-MM-dd HH:mm:ss.SSS' for Timestamp is used.")
})
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute="executesql.row.count", description = "Contains the number of rows returned in the select query"), @WritesAttribute(attribute="executesql.row.count", description = "Contains the number of rows returned in the select query"),
@WritesAttribute(attribute="executesql.query.duration", description = "Duration of the query in milliseconds") @WritesAttribute(attribute="executesql.query.duration", description = "Duration of the query in milliseconds")
@ -187,22 +206,20 @@ public class ExecuteSQL extends AbstractProcessor {
// If the query is not set, then an incoming flow file is required, and expected to contain a valid SQL select query. // If the query is not set, then an incoming flow file is required, and expected to contain a valid SQL select query.
// If there is no incoming connection, onTrigger will not be called as the processor will fail when scheduled. // If there is no incoming connection, onTrigger will not be called as the processor will fail when scheduled.
final StringBuilder queryContents = new StringBuilder(); final StringBuilder queryContents = new StringBuilder();
session.read(fileToProcess, new InputStreamCallback() { session.read(fileToProcess, in -> queryContents.append(IOUtils.toString(in, Charset.defaultCharset())));
@Override
public void process(InputStream in) throws IOException {
queryContents.append(IOUtils.toString(in));
}
});
selectQuery = queryContents.toString(); selectQuery = queryContents.toString();
} }
int resultCount=0; int resultCount=0;
try (final Connection con = dbcpService.getConnection(); try (final Connection con = dbcpService.getConnection();
final Statement st = con.createStatement()) { final PreparedStatement st = con.prepareStatement(selectQuery)) {
st.setQueryTimeout(queryTimeout); // timeout in seconds st.setQueryTimeout(queryTimeout); // timeout in seconds
if (fileToProcess != null) {
JdbcCommon.setParameters(st, fileToProcess.getAttributes());
}
logger.debug("Executing query {}", new Object[]{selectQuery}); logger.debug("Executing query {}", new Object[]{selectQuery});
boolean results = st.execute(selectQuery); boolean results = st.execute();
while(results){ while(results){
@ -215,22 +232,19 @@ public class ExecuteSQL extends AbstractProcessor {
} }
final AtomicLong nrOfRows = new AtomicLong(0L); final AtomicLong nrOfRows = new AtomicLong(0L);
resultSetFF = session.write(resultSetFF, new OutputStreamCallback() { resultSetFF = session.write(resultSetFF, out -> {
@Override try {
public void process(final OutputStream out) throws IOException {
try {
final ResultSet resultSet = st.getResultSet(); final ResultSet resultSet = st.getResultSet();
final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder() final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
.convertNames(convertNamesForAvro) .convertNames(convertNamesForAvro)
.useLogicalTypes(useAvroLogicalTypes) .useLogicalTypes(useAvroLogicalTypes)
.defaultPrecision(defaultPrecision) .defaultPrecision(defaultPrecision)
.defaultScale(defaultScale) .defaultScale(defaultScale)
.build(); .build();
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null)); nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
} catch (final SQLException e) { } catch (final SQLException e) {
throw new ProcessException(e); throw new ProcessException(e);
}
} }
}); });
@ -261,12 +275,7 @@ public class ExecuteSQL extends AbstractProcessor {
if(resultCount > 0){ if(resultCount > 0){
session.remove(fileToProcess); session.remove(fileToProcess);
} else { } else {
fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { fileToProcess = session.write(fileToProcess, JdbcCommon::createEmptyAvroStream);
@Override
public void process(OutputStream out) throws IOException {
JdbcCommon.createEmptyAvroStream(out);
}
});
session.transfer(fileToProcess, REL_SUCCESS); session.transfer(fileToProcess, REL_SUCCESS);
} }

View File

@ -48,37 +48,19 @@ import org.apache.nifi.processor.util.pattern.PartialFunctions.FlowFileGroup;
import org.apache.nifi.processor.util.pattern.PutGroup; import org.apache.nifi.processor.util.pattern.PutGroup;
import org.apache.nifi.processor.util.pattern.RollbackOnFailure; import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
import org.apache.nifi.processor.util.pattern.RoutingResult; import org.apache.nifi.processor.util.pattern.RoutingResult;
import org.apache.nifi.processors.standard.util.JdbcCommon;
import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.StreamUtils;
import javax.xml.bind.DatatypeConverter;
import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.StringReader;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.sql.BatchUpdateException; import java.sql.BatchUpdateException;
import java.sql.Connection; import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLDataException;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.SQLNonTransientException; import java.sql.SQLNonTransientException;
import java.sql.Statement; import java.sql.Statement;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.temporal.TemporalAccessor;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.BitSet; import java.util.BitSet;
import java.util.Comparator; import java.util.Comparator;
@ -89,8 +71,6 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.apache.nifi.processor.util.pattern.ExceptionHandler.createOnError; import static org.apache.nifi.processor.util.pattern.ExceptionHandler.createOnError;
@ -197,15 +177,10 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
+ "such as an invalid query or an integrity constraint violation") + "such as an invalid query or an integrity constraint violation")
.build(); .build();
private static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("sql\\.args\\.(\\d+)\\.type");
private static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+");
private static final String FRAGMENT_ID_ATTR = FragmentAttributes.FRAGMENT_ID.key(); private static final String FRAGMENT_ID_ATTR = FragmentAttributes.FRAGMENT_ID.key();
private static final String FRAGMENT_INDEX_ATTR = FragmentAttributes.FRAGMENT_INDEX.key(); private static final String FRAGMENT_INDEX_ATTR = FragmentAttributes.FRAGMENT_INDEX.key();
private static final String FRAGMENT_COUNT_ATTR = FragmentAttributes.FRAGMENT_COUNT.key(); private static final String FRAGMENT_COUNT_ATTR = FragmentAttributes.FRAGMENT_COUNT.key();
private static final Pattern LONG_PATTERN = Pattern.compile("^-?\\d{1,19}$");
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(); final List<PropertyDescriptor> properties = new ArrayList<>();
@ -310,7 +285,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
if(!exceptionHandler.execute(fc, flowFile, input -> { if(!exceptionHandler.execute(fc, flowFile, input -> {
final PreparedStatement stmt = enclosure.getCachedStatement(conn); final PreparedStatement stmt = enclosure.getCachedStatement(conn);
setParameters(stmt, flowFile.getAttributes()); JdbcCommon.setParameters(stmt, flowFile.getAttributes());
stmt.addBatch(); stmt.addBatch();
}, onFlowFileError(context, session, result))) { }, onFlowFileError(context, session, result))) {
continue; continue;
@ -383,7 +358,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
try (final PreparedStatement stmt = targetEnclosure.getNewStatement(conn, fc.obtainKeys)) { try (final PreparedStatement stmt = targetEnclosure.getNewStatement(conn, fc.obtainKeys)) {
// set the appropriate parameters on the statement. // set the appropriate parameters on the statement.
setParameters(stmt, flowFile.getAttributes()); JdbcCommon.setParameters(stmt, flowFile.getAttributes());
stmt.executeUpdate(); stmt.executeUpdate();
@ -677,46 +652,6 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
return sql; return sql;
} }
/**
* Sets all of the appropriate parameters on the given PreparedStatement, based on the given FlowFile attributes.
*
* @param stmt the statement to set the parameters on
* @param attributes the attributes from which to derive parameter indices, values, and types
* @throws SQLException if the PreparedStatement throws a SQLException when the appropriate setter is called
*/
private void setParameters(final PreparedStatement stmt, final Map<String, String> attributes) throws SQLException {
for (final Map.Entry<String, String> entry : attributes.entrySet()) {
final String key = entry.getKey();
final Matcher matcher = SQL_TYPE_ATTRIBUTE_PATTERN.matcher(key);
if (matcher.matches()) {
final int parameterIndex = Integer.parseInt(matcher.group(1));
final boolean isNumeric = NUMBER_PATTERN.matcher(entry.getValue()).matches();
if (!isNumeric) {
throw new SQLDataException("Value of the " + key + " attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral type");
}
final int jdbcType = Integer.parseInt(entry.getValue());
final String valueAttrName = "sql.args." + parameterIndex + ".value";
final String parameterValue = attributes.get(valueAttrName);
final String formatAttrName = "sql.args." + parameterIndex + ".format";
final String parameterFormat = attributes.containsKey(formatAttrName)? attributes.get(formatAttrName):"";
try {
setParameter(stmt, valueAttrName, parameterIndex, parameterValue, jdbcType, parameterFormat);
} catch (final NumberFormatException nfe) {
throw new SQLDataException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted into the necessary data type", nfe);
} catch (ParseException pe) {
throw new SQLDataException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted to a timestamp", pe);
} catch (UnsupportedEncodingException uee) {
throw new SQLDataException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted to UTF-8", uee);
}
}
}
}
/** /**
* Determines which relationship the given FlowFiles should go to, based on a transaction timing out or * Determines which relationship the given FlowFiles should go to, based on a transaction timing out or
* transaction information not being present. If the FlowFiles should be processed and not transferred * transaction information not being present. If the FlowFiles should be processed and not transferred
@ -810,183 +745,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
return false; // not enough FlowFiles for this transaction. Return them all to queue. return false; // not enough FlowFiles for this transaction. Return them all to queue.
} }
/**
* Determines how to map the given value to the appropriate JDBC data type and sets the parameter on the
* provided PreparedStatement
*
* @param stmt the PreparedStatement to set the parameter on
* @param attrName the name of the attribute that the parameter is coming from - for logging purposes
* @param parameterIndex the index of the SQL parameter to set
* @param parameterValue the value of the SQL parameter to set
* @param jdbcType the JDBC Type of the SQL parameter to set
* @throws SQLException if the PreparedStatement throws a SQLException when calling the appropriate setter
*/
private void setParameter(final PreparedStatement stmt, final String attrName, final int parameterIndex, final String parameterValue, final int jdbcType,
final String valueFormat)
throws SQLException, ParseException, UnsupportedEncodingException {
if (parameterValue == null) {
stmt.setNull(parameterIndex, jdbcType);
} else {
switch (jdbcType) {
case Types.BIT:
stmt.setBoolean(parameterIndex, "1".equals(parameterValue) || "t".equalsIgnoreCase(parameterValue) || Boolean.parseBoolean(parameterValue));
break;
case Types.BOOLEAN:
stmt.setBoolean(parameterIndex, Boolean.parseBoolean(parameterValue));
break;
case Types.TINYINT:
stmt.setByte(parameterIndex, Byte.parseByte(parameterValue));
break;
case Types.SMALLINT:
stmt.setShort(parameterIndex, Short.parseShort(parameterValue));
break;
case Types.INTEGER:
stmt.setInt(parameterIndex, Integer.parseInt(parameterValue));
break;
case Types.BIGINT:
stmt.setLong(parameterIndex, Long.parseLong(parameterValue));
break;
case Types.REAL:
stmt.setFloat(parameterIndex, Float.parseFloat(parameterValue));
break;
case Types.FLOAT:
case Types.DOUBLE:
stmt.setDouble(parameterIndex, Double.parseDouble(parameterValue));
break;
case Types.DECIMAL:
case Types.NUMERIC:
stmt.setBigDecimal(parameterIndex, new BigDecimal(parameterValue));
break;
case Types.DATE:
Date date;
if (valueFormat.equals("")) {
if(LONG_PATTERN.matcher(parameterValue).matches()){
date = new Date(Long.parseLong(parameterValue));
}else {
String dateFormatString = "yyyy-MM-dd";
SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatString);
java.util.Date parsedDate = dateFormat.parse(parameterValue);
date = new Date(parsedDate.getTime());
}
} else {
final DateTimeFormatter dtFormatter = getDateTimeFormatter(valueFormat);
LocalDate parsedDate = LocalDate.parse(parameterValue, dtFormatter);
date = new Date(Date.from(parsedDate.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant()).getTime());
}
stmt.setDate(parameterIndex, date);
break;
case Types.TIME:
Time time;
if (valueFormat.equals("")) {
if (LONG_PATTERN.matcher(parameterValue).matches()) {
time = new Time(Long.parseLong(parameterValue));
} else {
String timeFormatString = "HH:mm:ss.SSS";
SimpleDateFormat dateFormat = new SimpleDateFormat(timeFormatString);
java.util.Date parsedDate = dateFormat.parse(parameterValue);
time = new Time(parsedDate.getTime());
}
} else {
final DateTimeFormatter dtFormatter = getDateTimeFormatter(valueFormat);
LocalTime parsedTime = LocalTime.parse(parameterValue, dtFormatter);
LocalDateTime localDateTime = parsedTime.atDate(LocalDate.ofEpochDay(0));
Instant instant = localDateTime.atZone(ZoneId.systemDefault()).toInstant();
time = new Time(instant.toEpochMilli());
}
stmt.setTime(parameterIndex, time);
break;
case Types.TIMESTAMP:
long lTimestamp=0L;
// Backwards compatibility note: Format was unsupported for a timestamp field.
if (valueFormat.equals("")) {
if(LONG_PATTERN.matcher(parameterValue).matches()){
lTimestamp = Long.parseLong(parameterValue);
} else {
final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
java.util.Date parsedDate = dateFormat.parse(parameterValue);
lTimestamp = parsedDate.getTime();
}
} else {
final DateTimeFormatter dtFormatter = getDateTimeFormatter(valueFormat);
TemporalAccessor accessor = dtFormatter.parse(parameterValue);
java.util.Date parsedDate = java.util.Date.from(Instant.from(accessor));
lTimestamp = parsedDate.getTime();
}
stmt.setTimestamp(parameterIndex, new Timestamp(lTimestamp));
break;
case Types.BINARY:
case Types.VARBINARY:
case Types.LONGVARBINARY:
byte[] bValue;
switch(valueFormat){
case "":
case "ascii":
bValue = parameterValue.getBytes("ASCII");
break;
case "hex":
bValue = DatatypeConverter.parseHexBinary(parameterValue);
break;
case "base64":
bValue = DatatypeConverter.parseBase64Binary(parameterValue);
break;
default:
throw new ParseException("Unable to parse binary data using the formatter `" + valueFormat + "`.",0);
}
stmt.setBinaryStream(parameterIndex, new ByteArrayInputStream(bValue), bValue.length);
break;
case Types.CHAR:
case Types.VARCHAR:
case Types.LONGNVARCHAR:
case Types.LONGVARCHAR:
stmt.setString(parameterIndex, parameterValue);
break;
case Types.CLOB:
try (final StringReader reader = new StringReader(parameterValue)) {
stmt.setCharacterStream(parameterIndex, reader);
}
break;
case Types.NCLOB:
try (final StringReader reader = new StringReader(parameterValue)) {
stmt.setNCharacterStream(parameterIndex, reader);
}
break;
default:
stmt.setObject(parameterIndex, parameterValue, jdbcType);
break;
}
}
}
private DateTimeFormatter getDateTimeFormatter(String pattern) {
switch(pattern) {
case "BASIC_ISO_DATE": return DateTimeFormatter.BASIC_ISO_DATE;
case "ISO_LOCAL_DATE": return DateTimeFormatter.ISO_LOCAL_DATE;
case "ISO_OFFSET_DATE": return DateTimeFormatter.ISO_OFFSET_DATE;
case "ISO_DATE": return DateTimeFormatter.ISO_DATE;
case "ISO_LOCAL_TIME": return DateTimeFormatter.ISO_LOCAL_TIME;
case "ISO_OFFSET_TIME": return DateTimeFormatter.ISO_OFFSET_TIME;
case "ISO_TIME": return DateTimeFormatter.ISO_TIME;
case "ISO_LOCAL_DATE_TIME": return DateTimeFormatter.ISO_LOCAL_DATE_TIME;
case "ISO_OFFSET_DATE_TIME": return DateTimeFormatter.ISO_OFFSET_DATE_TIME;
case "ISO_ZONED_DATE_TIME": return DateTimeFormatter.ISO_ZONED_DATE_TIME;
case "ISO_DATE_TIME": return DateTimeFormatter.ISO_DATE_TIME;
case "ISO_ORDINAL_DATE": return DateTimeFormatter.ISO_ORDINAL_DATE;
case "ISO_WEEK_DATE": return DateTimeFormatter.ISO_WEEK_DATE;
case "ISO_INSTANT": return DateTimeFormatter.ISO_INSTANT;
case "RFC_1123_DATE_TIME": return DateTimeFormatter.RFC_1123_DATE_TIME;
default: return DateTimeFormatter.ofPattern(pattern);
}
}
/** /**
* A FlowFileFilter that is responsible for ensuring that the FlowFiles returned either belong * A FlowFileFilter that is responsible for ensuring that the FlowFiles returned either belong
@ -1038,7 +797,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
if (selectedId.equals(fragmentId)) { if (selectedId.equals(fragmentId)) {
// fragment id's match. Find out if we have all of the necessary fragments or not. // fragment id's match. Find out if we have all of the necessary fragments or not.
final int numFragments; final int numFragments;
if (fragCount != null && NUMBER_PATTERN.matcher(fragCount).matches()) { if (fragCount != null && JdbcCommon.NUMBER_PATTERN.matcher(fragCount).matches()) {
numFragments = Integer.parseInt(fragCount); numFragments = Integer.parseInt(fragCount);
} else { } else {
numFragments = Integer.MAX_VALUE; numFragments = Integer.MAX_VALUE;

View File

@ -45,10 +45,13 @@ import static java.sql.Types.TINYINT;
import static java.sql.Types.VARBINARY; import static java.sql.Types.VARBINARY;
import static java.sql.Types.VARCHAR; import static java.sql.Types.VARCHAR;
import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.Reader; import java.io.Reader;
import java.io.StringReader;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.math.BigInteger; import java.math.BigInteger;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -56,11 +59,28 @@ import java.nio.CharBuffer;
import java.sql.Blob; import java.sql.Blob;
import java.sql.Clob; import java.sql.Clob;
import java.sql.NClob; import java.sql.NClob;
import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.ResultSetMetaData; import java.sql.ResultSetMetaData;
import java.sql.SQLDataException;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.temporal.TemporalAccessor;
import java.util.Date; import java.util.Date;
import java.util.Map;
import java.util.function.Function; import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.avro.LogicalTypes; import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema; import org.apache.avro.Schema;
@ -79,6 +99,8 @@ import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import javax.xml.bind.DatatypeConverter;
/** /**
* JDBC / SQL common functions. * JDBC / SQL common functions.
*/ */
@ -90,6 +112,10 @@ public class JdbcCommon {
private static final int DEFAULT_PRECISION_VALUE = 10; private static final int DEFAULT_PRECISION_VALUE = 10;
private static final int DEFAULT_SCALE_VALUE = 0; private static final int DEFAULT_SCALE_VALUE = 0;
public static final Pattern LONG_PATTERN = Pattern.compile("^-?\\d{1,19}$");
public static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("sql\\.args\\.(\\d+)\\.type");
public static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+");
public static final String MIME_TYPE_AVRO_BINARY = "application/avro-binary"; public static final String MIME_TYPE_AVRO_BINARY = "application/avro-binary";
public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder() public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder()
@ -608,6 +634,222 @@ public class JdbcCommon {
return normalizedName; return normalizedName;
} }
/**
* Sets all of the appropriate parameters on the given PreparedStatement, based on the given FlowFile attributes.
*
* @param stmt the statement to set the parameters on
* @param attributes the attributes from which to derive parameter indices, values, and types
* @throws SQLException if the PreparedStatement throws a SQLException when the appropriate setter is called
*/
public static void setParameters(final PreparedStatement stmt, final Map<String, String> attributes) throws SQLException {
for (final Map.Entry<String, String> entry : attributes.entrySet()) {
final String key = entry.getKey();
final Matcher matcher = SQL_TYPE_ATTRIBUTE_PATTERN.matcher(key);
if (matcher.matches()) {
final int parameterIndex = Integer.parseInt(matcher.group(1));
final boolean isNumeric = NUMBER_PATTERN.matcher(entry.getValue()).matches();
if (!isNumeric) {
throw new SQLDataException("Value of the " + key + " attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral type");
}
final int jdbcType = Integer.parseInt(entry.getValue());
final String valueAttrName = "sql.args." + parameterIndex + ".value";
final String parameterValue = attributes.get(valueAttrName);
final String formatAttrName = "sql.args." + parameterIndex + ".format";
final String parameterFormat = attributes.containsKey(formatAttrName)? attributes.get(formatAttrName):"";
try {
JdbcCommon.setParameter(stmt, valueAttrName, parameterIndex, parameterValue, jdbcType, parameterFormat);
} catch (final NumberFormatException nfe) {
throw new SQLDataException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted into the necessary data type", nfe);
} catch (ParseException pe) {
throw new SQLDataException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted to a timestamp", pe);
} catch (UnsupportedEncodingException uee) {
throw new SQLDataException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted to UTF-8", uee);
}
}
}
}
/**
* Determines how to map the given value to the appropriate JDBC data type and sets the parameter on the
* provided PreparedStatement
*
* @param stmt the PreparedStatement to set the parameter on
* @param attrName the name of the attribute that the parameter is coming from - for logging purposes
* @param parameterIndex the index of the SQL parameter to set
* @param parameterValue the value of the SQL parameter to set
* @param jdbcType the JDBC Type of the SQL parameter to set
* @throws SQLException if the PreparedStatement throws a SQLException when calling the appropriate setter
*/
public static void setParameter(final PreparedStatement stmt, final String attrName, final int parameterIndex, final String parameterValue, final int jdbcType,
final String valueFormat)
throws SQLException, ParseException, UnsupportedEncodingException {
if (parameterValue == null) {
stmt.setNull(parameterIndex, jdbcType);
} else {
switch (jdbcType) {
case Types.BIT:
stmt.setBoolean(parameterIndex, "1".equals(parameterValue) || "t".equalsIgnoreCase(parameterValue) || Boolean.parseBoolean(parameterValue));
break;
case Types.BOOLEAN:
stmt.setBoolean(parameterIndex, Boolean.parseBoolean(parameterValue));
break;
case Types.TINYINT:
stmt.setByte(parameterIndex, Byte.parseByte(parameterValue));
break;
case Types.SMALLINT:
stmt.setShort(parameterIndex, Short.parseShort(parameterValue));
break;
case Types.INTEGER:
stmt.setInt(parameterIndex, Integer.parseInt(parameterValue));
break;
case Types.BIGINT:
stmt.setLong(parameterIndex, Long.parseLong(parameterValue));
break;
case Types.REAL:
stmt.setFloat(parameterIndex, Float.parseFloat(parameterValue));
break;
case Types.FLOAT:
case Types.DOUBLE:
stmt.setDouble(parameterIndex, Double.parseDouble(parameterValue));
break;
case Types.DECIMAL:
case Types.NUMERIC:
stmt.setBigDecimal(parameterIndex, new BigDecimal(parameterValue));
break;
case Types.DATE:
java.sql.Date date;
if (valueFormat.equals("")) {
if(LONG_PATTERN.matcher(parameterValue).matches()){
date = new java.sql.Date(Long.parseLong(parameterValue));
}else {
String dateFormatString = "yyyy-MM-dd";
SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatString);
java.util.Date parsedDate = dateFormat.parse(parameterValue);
date = new java.sql.Date(parsedDate.getTime());
}
} else {
final DateTimeFormatter dtFormatter = getDateTimeFormatter(valueFormat);
LocalDate parsedDate = LocalDate.parse(parameterValue, dtFormatter);
date = new java.sql.Date(java.sql.Date.from(parsedDate.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant()).getTime());
}
stmt.setDate(parameterIndex, date);
break;
case Types.TIME:
Time time;
if (valueFormat.equals("")) {
if (LONG_PATTERN.matcher(parameterValue).matches()) {
time = new Time(Long.parseLong(parameterValue));
} else {
String timeFormatString = "HH:mm:ss.SSS";
SimpleDateFormat dateFormat = new SimpleDateFormat(timeFormatString);
java.util.Date parsedDate = dateFormat.parse(parameterValue);
time = new Time(parsedDate.getTime());
}
} else {
final DateTimeFormatter dtFormatter = getDateTimeFormatter(valueFormat);
LocalTime parsedTime = LocalTime.parse(parameterValue, dtFormatter);
LocalDateTime localDateTime = parsedTime.atDate(LocalDate.ofEpochDay(0));
Instant instant = localDateTime.atZone(ZoneId.systemDefault()).toInstant();
time = new Time(instant.toEpochMilli());
}
stmt.setTime(parameterIndex, time);
break;
case Types.TIMESTAMP:
long lTimestamp=0L;
// Backwards compatibility note: Format was unsupported for a timestamp field.
if (valueFormat.equals("")) {
if(LONG_PATTERN.matcher(parameterValue).matches()){
lTimestamp = Long.parseLong(parameterValue);
} else {
final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
java.util.Date parsedDate = dateFormat.parse(parameterValue);
lTimestamp = parsedDate.getTime();
}
} else {
final DateTimeFormatter dtFormatter = getDateTimeFormatter(valueFormat);
TemporalAccessor accessor = dtFormatter.parse(parameterValue);
java.util.Date parsedDate = java.util.Date.from(Instant.from(accessor));
lTimestamp = parsedDate.getTime();
}
stmt.setTimestamp(parameterIndex, new Timestamp(lTimestamp));
break;
case Types.BINARY:
case Types.VARBINARY:
case Types.LONGVARBINARY:
byte[] bValue;
switch(valueFormat){
case "":
case "ascii":
bValue = parameterValue.getBytes("ASCII");
break;
case "hex":
bValue = DatatypeConverter.parseHexBinary(parameterValue);
break;
case "base64":
bValue = DatatypeConverter.parseBase64Binary(parameterValue);
break;
default:
throw new ParseException("Unable to parse binary data using the formatter `" + valueFormat + "`.",0);
}
stmt.setBinaryStream(parameterIndex, new ByteArrayInputStream(bValue), bValue.length);
break;
case Types.CHAR:
case Types.VARCHAR:
case Types.LONGNVARCHAR:
case Types.LONGVARCHAR:
stmt.setString(parameterIndex, parameterValue);
break;
case Types.CLOB:
try (final StringReader reader = new StringReader(parameterValue)) {
stmt.setCharacterStream(parameterIndex, reader);
}
break;
case Types.NCLOB:
try (final StringReader reader = new StringReader(parameterValue)) {
stmt.setNCharacterStream(parameterIndex, reader);
}
break;
default:
stmt.setObject(parameterIndex, parameterValue, jdbcType);
break;
}
}
}
public static DateTimeFormatter getDateTimeFormatter(String pattern) {
switch(pattern) {
case "BASIC_ISO_DATE": return DateTimeFormatter.BASIC_ISO_DATE;
case "ISO_LOCAL_DATE": return DateTimeFormatter.ISO_LOCAL_DATE;
case "ISO_OFFSET_DATE": return DateTimeFormatter.ISO_OFFSET_DATE;
case "ISO_DATE": return DateTimeFormatter.ISO_DATE;
case "ISO_LOCAL_TIME": return DateTimeFormatter.ISO_LOCAL_TIME;
case "ISO_OFFSET_TIME": return DateTimeFormatter.ISO_OFFSET_TIME;
case "ISO_TIME": return DateTimeFormatter.ISO_TIME;
case "ISO_LOCAL_DATE_TIME": return DateTimeFormatter.ISO_LOCAL_DATE_TIME;
case "ISO_OFFSET_DATE_TIME": return DateTimeFormatter.ISO_OFFSET_DATE_TIME;
case "ISO_ZONED_DATE_TIME": return DateTimeFormatter.ISO_ZONED_DATE_TIME;
case "ISO_DATE_TIME": return DateTimeFormatter.ISO_DATE_TIME;
case "ISO_ORDINAL_DATE": return DateTimeFormatter.ISO_ORDINAL_DATE;
case "ISO_WEEK_DATE": return DateTimeFormatter.ISO_WEEK_DATE;
case "ISO_INSTANT": return DateTimeFormatter.ISO_INSTANT;
case "RFC_1123_DATE_TIME": return DateTimeFormatter.RFC_1123_DATE_TIME;
default: return DateTimeFormatter.ofPattern(pattern);
}
}
/** /**
* An interface for callback methods which allows processing of a row during the convertToAvroStream() processing. * An interface for callback methods which allows processing of a row during the convertToAvroStream() processing.
* <b>IMPORTANT:</b> This method should only work on the row pointed at by the current ResultSet reference. * <b>IMPORTANT:</b> This method should only work on the row pointed at by the current ResultSet reference.

View File

@ -79,6 +79,14 @@ public class TestExecuteSQL {
+ " from persons PER, products PRD, relationships REL" + " from persons PER, products PRD, relationships REL"
+ " where PER.ID = 10"; + " where PER.ID = 10";
final static String QUERY_WITHOUT_EL_WITH_PARAMS = "select "
+ " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode"
+ ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode"
+ ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode"
+ ", ROW_NUMBER() OVER () as rownr "
+ " from persons PER, products PRD, relationships REL"
+ " where PER.ID < ? AND REL.ID < ?";
@BeforeClass @BeforeClass
public static void setupClass() { public static void setupClass() {
@ -124,23 +132,35 @@ public class TestExecuteSQL {
@Test @Test
public void testNoIncomingConnection() throws ClassNotFoundException, SQLException, InitializationException, IOException { public void testNoIncomingConnection() throws ClassNotFoundException, SQLException, InitializationException, IOException {
runner.setIncomingConnection(false); runner.setIncomingConnection(false);
invokeOnTrigger(null, QUERY_WITHOUT_EL, false, true); invokeOnTrigger(null, QUERY_WITHOUT_EL, false, null, true);
} }
@Test @Test
public void testNoTimeLimit() throws InitializationException, ClassNotFoundException, SQLException, IOException { public void testNoTimeLimit() throws InitializationException, ClassNotFoundException, SQLException, IOException {
invokeOnTrigger(null, QUERY_WITH_EL, true, true); invokeOnTrigger(null, QUERY_WITH_EL, true, null, true);
} }
@Test @Test
public void testSelectQueryInFlowFile() throws InitializationException, ClassNotFoundException, SQLException, IOException { public void testSelectQueryInFlowFile() throws InitializationException, ClassNotFoundException, SQLException, IOException {
invokeOnTrigger(null, QUERY_WITHOUT_EL, true, false); invokeOnTrigger(null, QUERY_WITHOUT_EL, true, null, false);
}
@Test
public void testSelectQueryInFlowFileWithParameters() throws InitializationException, ClassNotFoundException, SQLException, IOException {
Map<String, String> sqlParams = new HashMap<String, String>() {{
put("sql.args.1.type", "4");
put("sql.args.1.value", "20");
put("sql.args.2.type", "4");
put("sql.args.2.value", "5");
}};
invokeOnTrigger(null, QUERY_WITHOUT_EL_WITH_PARAMS, true, sqlParams, false);
} }
@Test @Test
public void testQueryTimeout() throws InitializationException, ClassNotFoundException, SQLException, IOException { public void testQueryTimeout() throws InitializationException, ClassNotFoundException, SQLException, IOException {
// Does to seem to have any effect when using embedded Derby // Does to seem to have any effect when using embedded Derby
invokeOnTrigger(1, QUERY_WITH_EL, true, true); // 1 second max time invokeOnTrigger(1, QUERY_WITH_EL, true, null, true); // 1 second max time
} }
@Test @Test
@ -172,7 +192,7 @@ public class TestExecuteSQL {
} }
@Test @Test
public void testWithduplicateColumns() throws SQLException { public void testWithDuplicateColumns() throws SQLException {
// remove previous test database, if any // remove previous test database, if any
final File dbLocation = new File(DB_LOCATION); final File dbLocation = new File(DB_LOCATION);
dbLocation.delete(); dbLocation.delete();
@ -228,7 +248,7 @@ public class TestExecuteSQL {
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 0); runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 0);
} }
public void invokeOnTrigger(final Integer queryTimeout, final String query, final boolean incomingFlowFile, final boolean setQueryProperty) public void invokeOnTrigger(final Integer queryTimeout, final String query, final boolean incomingFlowFile, final Map<String,String> attrs, final boolean setQueryProperty)
throws InitializationException, ClassNotFoundException, SQLException, IOException { throws InitializationException, ClassNotFoundException, SQLException, IOException {
if (queryTimeout != null) { if (queryTimeout != null) {
@ -250,7 +270,7 @@ public class TestExecuteSQL {
if (incomingFlowFile) { if (incomingFlowFile) {
// incoming FlowFile content is not used, but attributes are used // incoming FlowFile content is not used, but attributes are used
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = (attrs == null) ? new HashMap<>() : attrs;
attributes.put("person.id", "10"); attributes.put("person.id", "10");
if (!setQueryProperty) { if (!setQueryProperty) {
runner.enqueue(query.getBytes(), attributes); runner.enqueue(query.getBytes(), attributes);