diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java index 210412634f..cb9388f4d4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java @@ -16,13 +16,11 @@ */ package org.apache.nifi.processors.standard; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; +import java.nio.charset.Charset; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.Statement; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -35,6 +33,8 @@ import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -50,8 +50,6 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.InputStreamCallback; -import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.standard.util.JdbcCommon; import org.apache.nifi.util.StopWatch; @@ -64,11 +62,32 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC @EventDriven @InputRequirement(Requirement.INPUT_ALLOWED) @Tags({"sql", "select", "jdbc", "query", "database"}) -@CapabilityDescription("Execute provided SQL select query. Query result will be converted to Avro format." +@CapabilityDescription("Executes provided SQL select query. Query result will be converted to Avro format." + " Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on " + "a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. " + "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the " - + "select query. FlowFile attribute 'executesql.row.count' indicates how many rows were selected.") + + "select query, and the query may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes " + + "with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be " + + "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format. " + + "FlowFile attribute 'executesql.row.count' indicates how many rows were selected.") +@ReadsAttributes({ + @ReadsAttribute(attribute = "sql.args.N.type", description = "Incoming FlowFiles are expected to be parametrized SQL statements. The type of each Parameter is specified as an integer " + + "that represents the JDBC Type of the parameter."), + @ReadsAttribute(attribute = "sql.args.N.value", description = "Incoming FlowFiles are expected to be parametrized SQL statements. The value of the Parameters are specified as " + + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute."), + @ReadsAttribute(attribute = "sql.args.N.format", description = "This attribute is always optional, but default options may not always work for your data. " + + "Incoming FlowFiles are expected to be parametrized SQL statements. In some cases " + + "a format option needs to be specified, currently this is only applicable for binary data types, dates, times and timestamps. Binary Data Types (defaults to 'ascii') - " + + "ascii: each string character in your attribute value represents a single byte. This is the format provided by Avro Processors. " + + "base64: the string is a Base64 encoded string that can be decoded to bytes. " + + "hex: the string is hex encoded with all letters in upper case and no '0x' at the beginning. " + + "Dates/Times/Timestamps - " + + "Date, Time and Timestamp formats all support both custom formats or named format ('yyyy-MM-dd','ISO_OFFSET_DATE_TIME') " + + "as specified according to java.time.format.DateTimeFormatter. " + + "If not specified, a long value input is expected to be an unix epoch (milli seconds from 1970/1/1), or a string value in " + + "'yyyy-MM-dd' format for Date, 'HH:mm:ss.SSS' for Time (some database engines e.g. Derby or MySQL do not support milliseconds and will truncate milliseconds), " + + "'yyyy-MM-dd HH:mm:ss.SSS' for Timestamp is used.") +}) @WritesAttributes({ @WritesAttribute(attribute="executesql.row.count", description = "Contains the number of rows returned in the select query"), @WritesAttribute(attribute="executesql.query.duration", description = "Duration of the query in milliseconds") @@ -187,22 +206,20 @@ public class ExecuteSQL extends AbstractProcessor { // If the query is not set, then an incoming flow file is required, and expected to contain a valid SQL select query. // If there is no incoming connection, onTrigger will not be called as the processor will fail when scheduled. final StringBuilder queryContents = new StringBuilder(); - session.read(fileToProcess, new InputStreamCallback() { - @Override - public void process(InputStream in) throws IOException { - queryContents.append(IOUtils.toString(in)); - } - }); + session.read(fileToProcess, in -> queryContents.append(IOUtils.toString(in, Charset.defaultCharset()))); selectQuery = queryContents.toString(); } int resultCount=0; try (final Connection con = dbcpService.getConnection(); - final Statement st = con.createStatement()) { + final PreparedStatement st = con.prepareStatement(selectQuery)) { st.setQueryTimeout(queryTimeout); // timeout in seconds + if (fileToProcess != null) { + JdbcCommon.setParameters(st, fileToProcess.getAttributes()); + } logger.debug("Executing query {}", new Object[]{selectQuery}); - boolean results = st.execute(selectQuery); + boolean results = st.execute(); while(results){ @@ -215,22 +232,19 @@ public class ExecuteSQL extends AbstractProcessor { } final AtomicLong nrOfRows = new AtomicLong(0L); - resultSetFF = session.write(resultSetFF, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - try { + resultSetFF = session.write(resultSetFF, out -> { + try { - final ResultSet resultSet = st.getResultSet(); - final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder() - .convertNames(convertNamesForAvro) - .useLogicalTypes(useAvroLogicalTypes) - .defaultPrecision(defaultPrecision) - .defaultScale(defaultScale) - .build(); - nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null)); - } catch (final SQLException e) { - throw new ProcessException(e); - } + final ResultSet resultSet = st.getResultSet(); + final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder() + .convertNames(convertNamesForAvro) + .useLogicalTypes(useAvroLogicalTypes) + .defaultPrecision(defaultPrecision) + .defaultScale(defaultScale) + .build(); + nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null)); + } catch (final SQLException e) { + throw new ProcessException(e); } }); @@ -261,12 +275,7 @@ public class ExecuteSQL extends AbstractProcessor { if(resultCount > 0){ session.remove(fileToProcess); } else { - fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { - @Override - public void process(OutputStream out) throws IOException { - JdbcCommon.createEmptyAvroStream(out); - } - }); + fileToProcess = session.write(fileToProcess, JdbcCommon::createEmptyAvroStream); session.transfer(fileToProcess, REL_SUCCESS); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java index b50dcd0671..fd9501b5ab 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java @@ -48,37 +48,19 @@ import org.apache.nifi.processor.util.pattern.PartialFunctions.FlowFileGroup; import org.apache.nifi.processor.util.pattern.PutGroup; import org.apache.nifi.processor.util.pattern.RollbackOnFailure; import org.apache.nifi.processor.util.pattern.RoutingResult; +import org.apache.nifi.processors.standard.util.JdbcCommon; import org.apache.nifi.stream.io.StreamUtils; -import javax.xml.bind.DatatypeConverter; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; -import java.io.StringReader; -import java.io.UnsupportedEncodingException; -import java.math.BigDecimal; import java.nio.charset.StandardCharsets; import java.sql.BatchUpdateException; import java.sql.Connection; -import java.sql.Date; import java.sql.PreparedStatement; import java.sql.ResultSet; -import java.sql.SQLDataException; import java.sql.SQLException; import java.sql.SQLNonTransientException; import java.sql.Statement; -import java.sql.Time; -import java.sql.Timestamp; -import java.sql.Types; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.time.Instant; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.LocalTime; -import java.time.ZoneId; -import java.time.format.DateTimeFormatter; -import java.time.temporal.TemporalAccessor; import java.util.ArrayList; import java.util.BitSet; import java.util.Comparator; @@ -89,8 +71,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import static org.apache.nifi.processor.util.pattern.ExceptionHandler.createOnError; @@ -197,15 +177,10 @@ public class PutSQL extends AbstractSessionFactoryProcessor { + "such as an invalid query or an integrity constraint violation") .build(); - private static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("sql\\.args\\.(\\d+)\\.type"); - private static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+"); - private static final String FRAGMENT_ID_ATTR = FragmentAttributes.FRAGMENT_ID.key(); private static final String FRAGMENT_INDEX_ATTR = FragmentAttributes.FRAGMENT_INDEX.key(); private static final String FRAGMENT_COUNT_ATTR = FragmentAttributes.FRAGMENT_COUNT.key(); - private static final Pattern LONG_PATTERN = Pattern.compile("^-?\\d{1,19}$"); - @Override protected List getSupportedPropertyDescriptors() { final List properties = new ArrayList<>(); @@ -310,7 +285,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor { if(!exceptionHandler.execute(fc, flowFile, input -> { final PreparedStatement stmt = enclosure.getCachedStatement(conn); - setParameters(stmt, flowFile.getAttributes()); + JdbcCommon.setParameters(stmt, flowFile.getAttributes()); stmt.addBatch(); }, onFlowFileError(context, session, result))) { continue; @@ -383,7 +358,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor { try (final PreparedStatement stmt = targetEnclosure.getNewStatement(conn, fc.obtainKeys)) { // set the appropriate parameters on the statement. - setParameters(stmt, flowFile.getAttributes()); + JdbcCommon.setParameters(stmt, flowFile.getAttributes()); stmt.executeUpdate(); @@ -677,46 +652,6 @@ public class PutSQL extends AbstractSessionFactoryProcessor { return sql; } - - /** - * Sets all of the appropriate parameters on the given PreparedStatement, based on the given FlowFile attributes. - * - * @param stmt the statement to set the parameters on - * @param attributes the attributes from which to derive parameter indices, values, and types - * @throws SQLException if the PreparedStatement throws a SQLException when the appropriate setter is called - */ - private void setParameters(final PreparedStatement stmt, final Map attributes) throws SQLException { - for (final Map.Entry entry : attributes.entrySet()) { - final String key = entry.getKey(); - final Matcher matcher = SQL_TYPE_ATTRIBUTE_PATTERN.matcher(key); - if (matcher.matches()) { - final int parameterIndex = Integer.parseInt(matcher.group(1)); - - final boolean isNumeric = NUMBER_PATTERN.matcher(entry.getValue()).matches(); - if (!isNumeric) { - throw new SQLDataException("Value of the " + key + " attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral type"); - } - - final int jdbcType = Integer.parseInt(entry.getValue()); - final String valueAttrName = "sql.args." + parameterIndex + ".value"; - final String parameterValue = attributes.get(valueAttrName); - final String formatAttrName = "sql.args." + parameterIndex + ".format"; - final String parameterFormat = attributes.containsKey(formatAttrName)? attributes.get(formatAttrName):""; - - try { - setParameter(stmt, valueAttrName, parameterIndex, parameterValue, jdbcType, parameterFormat); - } catch (final NumberFormatException nfe) { - throw new SQLDataException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted into the necessary data type", nfe); - } catch (ParseException pe) { - throw new SQLDataException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted to a timestamp", pe); - } catch (UnsupportedEncodingException uee) { - throw new SQLDataException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted to UTF-8", uee); - } - } - } - } - - /** * Determines which relationship the given FlowFiles should go to, based on a transaction timing out or * transaction information not being present. If the FlowFiles should be processed and not transferred @@ -810,183 +745,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor { return false; // not enough FlowFiles for this transaction. Return them all to queue. } - /** - * Determines how to map the given value to the appropriate JDBC data type and sets the parameter on the - * provided PreparedStatement - * - * @param stmt the PreparedStatement to set the parameter on - * @param attrName the name of the attribute that the parameter is coming from - for logging purposes - * @param parameterIndex the index of the SQL parameter to set - * @param parameterValue the value of the SQL parameter to set - * @param jdbcType the JDBC Type of the SQL parameter to set - * @throws SQLException if the PreparedStatement throws a SQLException when calling the appropriate setter - */ - private void setParameter(final PreparedStatement stmt, final String attrName, final int parameterIndex, final String parameterValue, final int jdbcType, - final String valueFormat) - throws SQLException, ParseException, UnsupportedEncodingException { - if (parameterValue == null) { - stmt.setNull(parameterIndex, jdbcType); - } else { - switch (jdbcType) { - case Types.BIT: - stmt.setBoolean(parameterIndex, "1".equals(parameterValue) || "t".equalsIgnoreCase(parameterValue) || Boolean.parseBoolean(parameterValue)); - break; - case Types.BOOLEAN: - stmt.setBoolean(parameterIndex, Boolean.parseBoolean(parameterValue)); - break; - case Types.TINYINT: - stmt.setByte(parameterIndex, Byte.parseByte(parameterValue)); - break; - case Types.SMALLINT: - stmt.setShort(parameterIndex, Short.parseShort(parameterValue)); - break; - case Types.INTEGER: - stmt.setInt(parameterIndex, Integer.parseInt(parameterValue)); - break; - case Types.BIGINT: - stmt.setLong(parameterIndex, Long.parseLong(parameterValue)); - break; - case Types.REAL: - stmt.setFloat(parameterIndex, Float.parseFloat(parameterValue)); - break; - case Types.FLOAT: - case Types.DOUBLE: - stmt.setDouble(parameterIndex, Double.parseDouble(parameterValue)); - break; - case Types.DECIMAL: - case Types.NUMERIC: - stmt.setBigDecimal(parameterIndex, new BigDecimal(parameterValue)); - break; - case Types.DATE: - Date date; - if (valueFormat.equals("")) { - if(LONG_PATTERN.matcher(parameterValue).matches()){ - date = new Date(Long.parseLong(parameterValue)); - }else { - String dateFormatString = "yyyy-MM-dd"; - SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatString); - java.util.Date parsedDate = dateFormat.parse(parameterValue); - date = new Date(parsedDate.getTime()); - } - } else { - final DateTimeFormatter dtFormatter = getDateTimeFormatter(valueFormat); - LocalDate parsedDate = LocalDate.parse(parameterValue, dtFormatter); - date = new Date(Date.from(parsedDate.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant()).getTime()); - } - - stmt.setDate(parameterIndex, date); - break; - case Types.TIME: - Time time; - - if (valueFormat.equals("")) { - if (LONG_PATTERN.matcher(parameterValue).matches()) { - time = new Time(Long.parseLong(parameterValue)); - } else { - String timeFormatString = "HH:mm:ss.SSS"; - SimpleDateFormat dateFormat = new SimpleDateFormat(timeFormatString); - java.util.Date parsedDate = dateFormat.parse(parameterValue); - time = new Time(parsedDate.getTime()); - } - } else { - final DateTimeFormatter dtFormatter = getDateTimeFormatter(valueFormat); - LocalTime parsedTime = LocalTime.parse(parameterValue, dtFormatter); - LocalDateTime localDateTime = parsedTime.atDate(LocalDate.ofEpochDay(0)); - Instant instant = localDateTime.atZone(ZoneId.systemDefault()).toInstant(); - time = new Time(instant.toEpochMilli()); - } - - stmt.setTime(parameterIndex, time); - break; - case Types.TIMESTAMP: - long lTimestamp=0L; - - // Backwards compatibility note: Format was unsupported for a timestamp field. - if (valueFormat.equals("")) { - if(LONG_PATTERN.matcher(parameterValue).matches()){ - lTimestamp = Long.parseLong(parameterValue); - } else { - final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); - java.util.Date parsedDate = dateFormat.parse(parameterValue); - lTimestamp = parsedDate.getTime(); - } - } else { - final DateTimeFormatter dtFormatter = getDateTimeFormatter(valueFormat); - TemporalAccessor accessor = dtFormatter.parse(parameterValue); - java.util.Date parsedDate = java.util.Date.from(Instant.from(accessor)); - lTimestamp = parsedDate.getTime(); - } - - stmt.setTimestamp(parameterIndex, new Timestamp(lTimestamp)); - - break; - case Types.BINARY: - case Types.VARBINARY: - case Types.LONGVARBINARY: - byte[] bValue; - - switch(valueFormat){ - case "": - case "ascii": - bValue = parameterValue.getBytes("ASCII"); - break; - case "hex": - bValue = DatatypeConverter.parseHexBinary(parameterValue); - break; - case "base64": - bValue = DatatypeConverter.parseBase64Binary(parameterValue); - break; - default: - throw new ParseException("Unable to parse binary data using the formatter `" + valueFormat + "`.",0); - } - - stmt.setBinaryStream(parameterIndex, new ByteArrayInputStream(bValue), bValue.length); - - break; - case Types.CHAR: - case Types.VARCHAR: - case Types.LONGNVARCHAR: - case Types.LONGVARCHAR: - stmt.setString(parameterIndex, parameterValue); - break; - case Types.CLOB: - try (final StringReader reader = new StringReader(parameterValue)) { - stmt.setCharacterStream(parameterIndex, reader); - } - break; - case Types.NCLOB: - try (final StringReader reader = new StringReader(parameterValue)) { - stmt.setNCharacterStream(parameterIndex, reader); - } - break; - default: - stmt.setObject(parameterIndex, parameterValue, jdbcType); - break; - } - } - } - - private DateTimeFormatter getDateTimeFormatter(String pattern) { - switch(pattern) { - case "BASIC_ISO_DATE": return DateTimeFormatter.BASIC_ISO_DATE; - case "ISO_LOCAL_DATE": return DateTimeFormatter.ISO_LOCAL_DATE; - case "ISO_OFFSET_DATE": return DateTimeFormatter.ISO_OFFSET_DATE; - case "ISO_DATE": return DateTimeFormatter.ISO_DATE; - case "ISO_LOCAL_TIME": return DateTimeFormatter.ISO_LOCAL_TIME; - case "ISO_OFFSET_TIME": return DateTimeFormatter.ISO_OFFSET_TIME; - case "ISO_TIME": return DateTimeFormatter.ISO_TIME; - case "ISO_LOCAL_DATE_TIME": return DateTimeFormatter.ISO_LOCAL_DATE_TIME; - case "ISO_OFFSET_DATE_TIME": return DateTimeFormatter.ISO_OFFSET_DATE_TIME; - case "ISO_ZONED_DATE_TIME": return DateTimeFormatter.ISO_ZONED_DATE_TIME; - case "ISO_DATE_TIME": return DateTimeFormatter.ISO_DATE_TIME; - case "ISO_ORDINAL_DATE": return DateTimeFormatter.ISO_ORDINAL_DATE; - case "ISO_WEEK_DATE": return DateTimeFormatter.ISO_WEEK_DATE; - case "ISO_INSTANT": return DateTimeFormatter.ISO_INSTANT; - case "RFC_1123_DATE_TIME": return DateTimeFormatter.RFC_1123_DATE_TIME; - default: return DateTimeFormatter.ofPattern(pattern); - } - } /** * A FlowFileFilter that is responsible for ensuring that the FlowFiles returned either belong @@ -1038,7 +797,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor { if (selectedId.equals(fragmentId)) { // fragment id's match. Find out if we have all of the necessary fragments or not. final int numFragments; - if (fragCount != null && NUMBER_PATTERN.matcher(fragCount).matches()) { + if (fragCount != null && JdbcCommon.NUMBER_PATTERN.matcher(fragCount).matches()) { numFragments = Integer.parseInt(fragCount); } else { numFragments = Integer.MAX_VALUE; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java index f8e88efabc..1cee441e69 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java @@ -45,10 +45,13 @@ import static java.sql.Types.TINYINT; import static java.sql.Types.VARBINARY; import static java.sql.Types.VARCHAR; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Reader; +import java.io.StringReader; +import java.io.UnsupportedEncodingException; import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; @@ -56,11 +59,28 @@ import java.nio.CharBuffer; import java.sql.Blob; import java.sql.Clob; import java.sql.NClob; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; +import java.sql.SQLDataException; import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Types; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.time.temporal.TemporalAccessor; import java.util.Date; +import java.util.Map; import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; @@ -79,6 +99,8 @@ import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.processor.util.StandardValidators; +import javax.xml.bind.DatatypeConverter; + /** * JDBC / SQL common functions. */ @@ -90,6 +112,10 @@ public class JdbcCommon { private static final int DEFAULT_PRECISION_VALUE = 10; private static final int DEFAULT_SCALE_VALUE = 0; + public static final Pattern LONG_PATTERN = Pattern.compile("^-?\\d{1,19}$"); + public static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("sql\\.args\\.(\\d+)\\.type"); + public static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+"); + public static final String MIME_TYPE_AVRO_BINARY = "application/avro-binary"; public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder() @@ -608,6 +634,222 @@ public class JdbcCommon { return normalizedName; } + /** + * Sets all of the appropriate parameters on the given PreparedStatement, based on the given FlowFile attributes. + * + * @param stmt the statement to set the parameters on + * @param attributes the attributes from which to derive parameter indices, values, and types + * @throws SQLException if the PreparedStatement throws a SQLException when the appropriate setter is called + */ + public static void setParameters(final PreparedStatement stmt, final Map attributes) throws SQLException { + for (final Map.Entry entry : attributes.entrySet()) { + final String key = entry.getKey(); + final Matcher matcher = SQL_TYPE_ATTRIBUTE_PATTERN.matcher(key); + if (matcher.matches()) { + final int parameterIndex = Integer.parseInt(matcher.group(1)); + + final boolean isNumeric = NUMBER_PATTERN.matcher(entry.getValue()).matches(); + if (!isNumeric) { + throw new SQLDataException("Value of the " + key + " attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral type"); + } + + final int jdbcType = Integer.parseInt(entry.getValue()); + final String valueAttrName = "sql.args." + parameterIndex + ".value"; + final String parameterValue = attributes.get(valueAttrName); + final String formatAttrName = "sql.args." + parameterIndex + ".format"; + final String parameterFormat = attributes.containsKey(formatAttrName)? attributes.get(formatAttrName):""; + + try { + JdbcCommon.setParameter(stmt, valueAttrName, parameterIndex, parameterValue, jdbcType, parameterFormat); + } catch (final NumberFormatException nfe) { + throw new SQLDataException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted into the necessary data type", nfe); + } catch (ParseException pe) { + throw new SQLDataException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted to a timestamp", pe); + } catch (UnsupportedEncodingException uee) { + throw new SQLDataException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted to UTF-8", uee); + } + } + } + } + + /** + * Determines how to map the given value to the appropriate JDBC data type and sets the parameter on the + * provided PreparedStatement + * + * @param stmt the PreparedStatement to set the parameter on + * @param attrName the name of the attribute that the parameter is coming from - for logging purposes + * @param parameterIndex the index of the SQL parameter to set + * @param parameterValue the value of the SQL parameter to set + * @param jdbcType the JDBC Type of the SQL parameter to set + * @throws SQLException if the PreparedStatement throws a SQLException when calling the appropriate setter + */ + public static void setParameter(final PreparedStatement stmt, final String attrName, final int parameterIndex, final String parameterValue, final int jdbcType, + final String valueFormat) + throws SQLException, ParseException, UnsupportedEncodingException { + if (parameterValue == null) { + stmt.setNull(parameterIndex, jdbcType); + } else { + switch (jdbcType) { + case Types.BIT: + stmt.setBoolean(parameterIndex, "1".equals(parameterValue) || "t".equalsIgnoreCase(parameterValue) || Boolean.parseBoolean(parameterValue)); + break; + case Types.BOOLEAN: + stmt.setBoolean(parameterIndex, Boolean.parseBoolean(parameterValue)); + break; + case Types.TINYINT: + stmt.setByte(parameterIndex, Byte.parseByte(parameterValue)); + break; + case Types.SMALLINT: + stmt.setShort(parameterIndex, Short.parseShort(parameterValue)); + break; + case Types.INTEGER: + stmt.setInt(parameterIndex, Integer.parseInt(parameterValue)); + break; + case Types.BIGINT: + stmt.setLong(parameterIndex, Long.parseLong(parameterValue)); + break; + case Types.REAL: + stmt.setFloat(parameterIndex, Float.parseFloat(parameterValue)); + break; + case Types.FLOAT: + case Types.DOUBLE: + stmt.setDouble(parameterIndex, Double.parseDouble(parameterValue)); + break; + case Types.DECIMAL: + case Types.NUMERIC: + stmt.setBigDecimal(parameterIndex, new BigDecimal(parameterValue)); + break; + case Types.DATE: + java.sql.Date date; + + if (valueFormat.equals("")) { + if(LONG_PATTERN.matcher(parameterValue).matches()){ + date = new java.sql.Date(Long.parseLong(parameterValue)); + }else { + String dateFormatString = "yyyy-MM-dd"; + SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatString); + java.util.Date parsedDate = dateFormat.parse(parameterValue); + date = new java.sql.Date(parsedDate.getTime()); + } + } else { + final DateTimeFormatter dtFormatter = getDateTimeFormatter(valueFormat); + LocalDate parsedDate = LocalDate.parse(parameterValue, dtFormatter); + date = new java.sql.Date(java.sql.Date.from(parsedDate.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant()).getTime()); + } + + stmt.setDate(parameterIndex, date); + break; + case Types.TIME: + Time time; + + if (valueFormat.equals("")) { + if (LONG_PATTERN.matcher(parameterValue).matches()) { + time = new Time(Long.parseLong(parameterValue)); + } else { + String timeFormatString = "HH:mm:ss.SSS"; + SimpleDateFormat dateFormat = new SimpleDateFormat(timeFormatString); + java.util.Date parsedDate = dateFormat.parse(parameterValue); + time = new Time(parsedDate.getTime()); + } + } else { + final DateTimeFormatter dtFormatter = getDateTimeFormatter(valueFormat); + LocalTime parsedTime = LocalTime.parse(parameterValue, dtFormatter); + LocalDateTime localDateTime = parsedTime.atDate(LocalDate.ofEpochDay(0)); + Instant instant = localDateTime.atZone(ZoneId.systemDefault()).toInstant(); + time = new Time(instant.toEpochMilli()); + } + + stmt.setTime(parameterIndex, time); + break; + case Types.TIMESTAMP: + long lTimestamp=0L; + + // Backwards compatibility note: Format was unsupported for a timestamp field. + if (valueFormat.equals("")) { + if(LONG_PATTERN.matcher(parameterValue).matches()){ + lTimestamp = Long.parseLong(parameterValue); + } else { + final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + java.util.Date parsedDate = dateFormat.parse(parameterValue); + lTimestamp = parsedDate.getTime(); + } + } else { + final DateTimeFormatter dtFormatter = getDateTimeFormatter(valueFormat); + TemporalAccessor accessor = dtFormatter.parse(parameterValue); + java.util.Date parsedDate = java.util.Date.from(Instant.from(accessor)); + lTimestamp = parsedDate.getTime(); + } + + stmt.setTimestamp(parameterIndex, new Timestamp(lTimestamp)); + + break; + case Types.BINARY: + case Types.VARBINARY: + case Types.LONGVARBINARY: + byte[] bValue; + + switch(valueFormat){ + case "": + case "ascii": + bValue = parameterValue.getBytes("ASCII"); + break; + case "hex": + bValue = DatatypeConverter.parseHexBinary(parameterValue); + break; + case "base64": + bValue = DatatypeConverter.parseBase64Binary(parameterValue); + break; + default: + throw new ParseException("Unable to parse binary data using the formatter `" + valueFormat + "`.",0); + } + + stmt.setBinaryStream(parameterIndex, new ByteArrayInputStream(bValue), bValue.length); + + break; + case Types.CHAR: + case Types.VARCHAR: + case Types.LONGNVARCHAR: + case Types.LONGVARCHAR: + stmt.setString(parameterIndex, parameterValue); + break; + case Types.CLOB: + try (final StringReader reader = new StringReader(parameterValue)) { + stmt.setCharacterStream(parameterIndex, reader); + } + break; + case Types.NCLOB: + try (final StringReader reader = new StringReader(parameterValue)) { + stmt.setNCharacterStream(parameterIndex, reader); + } + break; + default: + stmt.setObject(parameterIndex, parameterValue, jdbcType); + break; + } + } + } + + public static DateTimeFormatter getDateTimeFormatter(String pattern) { + switch(pattern) { + case "BASIC_ISO_DATE": return DateTimeFormatter.BASIC_ISO_DATE; + case "ISO_LOCAL_DATE": return DateTimeFormatter.ISO_LOCAL_DATE; + case "ISO_OFFSET_DATE": return DateTimeFormatter.ISO_OFFSET_DATE; + case "ISO_DATE": return DateTimeFormatter.ISO_DATE; + case "ISO_LOCAL_TIME": return DateTimeFormatter.ISO_LOCAL_TIME; + case "ISO_OFFSET_TIME": return DateTimeFormatter.ISO_OFFSET_TIME; + case "ISO_TIME": return DateTimeFormatter.ISO_TIME; + case "ISO_LOCAL_DATE_TIME": return DateTimeFormatter.ISO_LOCAL_DATE_TIME; + case "ISO_OFFSET_DATE_TIME": return DateTimeFormatter.ISO_OFFSET_DATE_TIME; + case "ISO_ZONED_DATE_TIME": return DateTimeFormatter.ISO_ZONED_DATE_TIME; + case "ISO_DATE_TIME": return DateTimeFormatter.ISO_DATE_TIME; + case "ISO_ORDINAL_DATE": return DateTimeFormatter.ISO_ORDINAL_DATE; + case "ISO_WEEK_DATE": return DateTimeFormatter.ISO_WEEK_DATE; + case "ISO_INSTANT": return DateTimeFormatter.ISO_INSTANT; + case "RFC_1123_DATE_TIME": return DateTimeFormatter.RFC_1123_DATE_TIME; + default: return DateTimeFormatter.ofPattern(pattern); + } + } + /** * An interface for callback methods which allows processing of a row during the convertToAvroStream() processing. * IMPORTANT: This method should only work on the row pointed at by the current ResultSet reference. diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java index 69b7ae5d91..3a0b773912 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java @@ -79,6 +79,14 @@ public class TestExecuteSQL { + " from persons PER, products PRD, relationships REL" + " where PER.ID = 10"; + final static String QUERY_WITHOUT_EL_WITH_PARAMS = "select " + + " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode" + + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode" + + ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode" + + ", ROW_NUMBER() OVER () as rownr " + + " from persons PER, products PRD, relationships REL" + + " where PER.ID < ? AND REL.ID < ?"; + @BeforeClass public static void setupClass() { @@ -124,23 +132,35 @@ public class TestExecuteSQL { @Test public void testNoIncomingConnection() throws ClassNotFoundException, SQLException, InitializationException, IOException { runner.setIncomingConnection(false); - invokeOnTrigger(null, QUERY_WITHOUT_EL, false, true); + invokeOnTrigger(null, QUERY_WITHOUT_EL, false, null, true); } @Test public void testNoTimeLimit() throws InitializationException, ClassNotFoundException, SQLException, IOException { - invokeOnTrigger(null, QUERY_WITH_EL, true, true); + invokeOnTrigger(null, QUERY_WITH_EL, true, null, true); } @Test public void testSelectQueryInFlowFile() throws InitializationException, ClassNotFoundException, SQLException, IOException { - invokeOnTrigger(null, QUERY_WITHOUT_EL, true, false); + invokeOnTrigger(null, QUERY_WITHOUT_EL, true, null, false); + } + + @Test + public void testSelectQueryInFlowFileWithParameters() throws InitializationException, ClassNotFoundException, SQLException, IOException { + Map sqlParams = new HashMap() {{ + put("sql.args.1.type", "4"); + put("sql.args.1.value", "20"); + put("sql.args.2.type", "4"); + put("sql.args.2.value", "5"); + }}; + + invokeOnTrigger(null, QUERY_WITHOUT_EL_WITH_PARAMS, true, sqlParams, false); } @Test public void testQueryTimeout() throws InitializationException, ClassNotFoundException, SQLException, IOException { // Does to seem to have any effect when using embedded Derby - invokeOnTrigger(1, QUERY_WITH_EL, true, true); // 1 second max time + invokeOnTrigger(1, QUERY_WITH_EL, true, null, true); // 1 second max time } @Test @@ -172,7 +192,7 @@ public class TestExecuteSQL { } @Test - public void testWithduplicateColumns() throws SQLException { + public void testWithDuplicateColumns() throws SQLException { // remove previous test database, if any final File dbLocation = new File(DB_LOCATION); dbLocation.delete(); @@ -228,7 +248,7 @@ public class TestExecuteSQL { runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 0); } - public void invokeOnTrigger(final Integer queryTimeout, final String query, final boolean incomingFlowFile, final boolean setQueryProperty) + public void invokeOnTrigger(final Integer queryTimeout, final String query, final boolean incomingFlowFile, final Map attrs, final boolean setQueryProperty) throws InitializationException, ClassNotFoundException, SQLException, IOException { if (queryTimeout != null) { @@ -250,7 +270,7 @@ public class TestExecuteSQL { if (incomingFlowFile) { // incoming FlowFile content is not used, but attributes are used - final Map attributes = new HashMap<>(); + final Map attributes = (attrs == null) ? new HashMap<>() : attrs; attributes.put("person.id", "10"); if (!setQueryProperty) { runner.enqueue(query.getBytes(), attributes);