diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHiveQLProcessor.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHiveQLProcessor.java index e18e464ba3..3835ff7c2a 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHiveQLProcessor.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHiveQLProcessor.java @@ -18,13 +18,38 @@ package org.apache.nifi.processors.hive; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.dbcp.hive.HiveDBCPService; +import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.StreamUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.math.BigDecimal; +import java.nio.charset.Charset; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Types; + +import java.util.Map; +import java.util.TreeMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** * An abstract base class for HiveQL processors to share common data, methods, etc. */ public abstract class AbstractHiveQLProcessor extends AbstractProcessor { + protected static final Pattern HIVEQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("hiveql\\.args\\.(\\d+)\\.type"); + protected static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+"); + public static final PropertyDescriptor HIVE_DBCP_SERVICE = new PropertyDescriptor.Builder() .name("Hive Database Connection Pooling Service") .description("The Hive Controller Service that is used to obtain connection(s) to the Hive database") @@ -32,4 +57,163 @@ public abstract class AbstractHiveQLProcessor extends AbstractProcessor { .identifiesControllerService(HiveDBCPService.class) .build(); + public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("hive-charset") + .displayName("Character Set") + .description("Specifies the character set of the record data.") + .required(true) + .defaultValue("UTF-8") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .build(); + + /** + * Determines the HiveQL statement that should be executed for the given FlowFile + * + * @param session the session that can be used to access the given FlowFile + * @param flowFile the FlowFile whose HiveQL statement should be executed + * @return the HiveQL that is associated with the given FlowFile + */ + protected String getHiveQL(final ProcessSession session, final FlowFile flowFile, final Charset charset) { + // Read the HiveQL from the FlowFile's content + final byte[] buffer = new byte[(int) flowFile.getSize()]; + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, buffer); + } + }); + + // Create the PreparedStatement to use for this FlowFile. + return new String(buffer, charset); + } + + private class ParameterHolder { + String attributeName; + int jdbcType; + String value; + } + + /** + * Sets all of the appropriate parameters on the given PreparedStatement, based on the given FlowFile attributes. + * + * @param stmt the statement to set the parameters on + * @param attributes the attributes from which to derive parameter indices, values, and types + * @throws SQLException if the PreparedStatement throws a SQLException when the appropriate setter is called + */ + protected int setParameters(int base, final PreparedStatement stmt, int paramCount, final Map attributes) throws SQLException { + + Map parmMap = new TreeMap(); + + for (final Map.Entry entry : attributes.entrySet()) { + final String key = entry.getKey(); + final Matcher matcher = HIVEQL_TYPE_ATTRIBUTE_PATTERN.matcher(key); + if (matcher.matches()) { + final int parameterIndex = Integer.parseInt(matcher.group(1)); + if (parameterIndex >= base && parameterIndex < base + paramCount) { + final boolean isNumeric = NUMBER_PATTERN.matcher(entry.getValue()).matches(); + if (!isNumeric) { + throw new ProcessException("Value of the " + key + " attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral jdbcType"); + } + + final String valueAttrName = "hiveql.args." + parameterIndex + ".value"; + + ParameterHolder ph = new ParameterHolder(); + int realIndexLoc = parameterIndex - base +1; + + ph.jdbcType = Integer.parseInt(entry.getValue()); + ph.value = attributes.get(valueAttrName); + ph.attributeName = valueAttrName; + + parmMap.put(realIndexLoc, ph); + + } + } + } + + + // Now that's we've retrieved the correct number of parameters and it's sorted, let's set them. + for (final Map.Entry entry : parmMap.entrySet()) { + final Integer index = entry.getKey(); + final ParameterHolder ph = entry.getValue(); + + try { + setParameter(stmt, ph.attributeName, index, ph.value, ph.jdbcType); + } catch (final NumberFormatException nfe) { + throw new ProcessException("The value of the " + ph.attributeName + " is '" + ph.value + "', which cannot be converted into the necessary data jdbcType", nfe); + } + } + return base + paramCount; + } + + /** + * Determines how to map the given value to the appropriate JDBC data jdbcType and sets the parameter on the + * provided PreparedStatement + * + * @param stmt the PreparedStatement to set the parameter on + * @param attrName the name of the attribute that the parameter is coming from - for logging purposes + * @param parameterIndex the index of the HiveQL parameter to set + * @param parameterValue the value of the HiveQL parameter to set + * @param jdbcType the JDBC Type of the HiveQL parameter to set + * @throws SQLException if the PreparedStatement throws a SQLException when calling the appropriate setter + */ + protected void setParameter(final PreparedStatement stmt, final String attrName, final int parameterIndex, final String parameterValue, final int jdbcType) throws SQLException { + if (parameterValue == null) { + stmt.setNull(parameterIndex, jdbcType); + } else { + try { + switch (jdbcType) { + case Types.BIT: + case Types.BOOLEAN: + stmt.setBoolean(parameterIndex, Boolean.parseBoolean(parameterValue)); + break; + case Types.TINYINT: + stmt.setByte(parameterIndex, Byte.parseByte(parameterValue)); + break; + case Types.SMALLINT: + stmt.setShort(parameterIndex, Short.parseShort(parameterValue)); + break; + case Types.INTEGER: + stmt.setInt(parameterIndex, Integer.parseInt(parameterValue)); + break; + case Types.BIGINT: + stmt.setLong(parameterIndex, Long.parseLong(parameterValue)); + break; + case Types.REAL: + stmt.setFloat(parameterIndex, Float.parseFloat(parameterValue)); + break; + case Types.FLOAT: + case Types.DOUBLE: + stmt.setDouble(parameterIndex, Double.parseDouble(parameterValue)); + break; + case Types.DECIMAL: + case Types.NUMERIC: + stmt.setBigDecimal(parameterIndex, new BigDecimal(parameterValue)); + break; + case Types.DATE: + stmt.setDate(parameterIndex, new Date(Long.parseLong(parameterValue))); + break; + case Types.TIME: + stmt.setTime(parameterIndex, new Time(Long.parseLong(parameterValue))); + break; + case Types.TIMESTAMP: + stmt.setTimestamp(parameterIndex, new Timestamp(Long.parseLong(parameterValue))); + break; + case Types.CHAR: + case Types.VARCHAR: + case Types.LONGNVARCHAR: + case Types.LONGVARCHAR: + stmt.setString(parameterIndex, parameterValue); + break; + default: + stmt.setObject(parameterIndex, parameterValue, jdbcType); + break; + } + } catch (SQLException e) { + // Log which attribute/parameter had an error, then rethrow to be handled at the top level + getLogger().error("Error setting parameter {} to value from {} ({})", new Object[]{parameterIndex, attrName, parameterValue}, e); + throw e; + } + } + } + } diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java index 830031224d..5eabfe8a15 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.hive; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.ReadsAttribute; @@ -30,30 +31,19 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.stream.io.StreamUtils; -import java.io.IOException; -import java.io.InputStream; -import java.math.BigDecimal; import java.nio.charset.Charset; import java.sql.Connection; -import java.sql.Date; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.SQLNonTransientException; -import java.sql.Time; -import java.sql.Timestamp; -import java.sql.Types; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; import java.util.regex.Pattern; @SeeAlso(SelectHiveQL.class) @@ -80,13 +70,14 @@ public class PutHiveQL extends AbstractHiveQLProcessor { .defaultValue("100") .build(); - public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() - .name("hive-charset") - .displayName("Character Set") - .description("Specifies the character set of the record data.") + public static final PropertyDescriptor STATEMENT_DELIMITER = new PropertyDescriptor.Builder() + .name("statement-delimiter") + .displayName("Statement Delimiter") + .description("Statement Delimiter used to separate SQL statements in a multiple statement script") .required(true) - .defaultValue("UTF-8") - .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .defaultValue(";") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() @@ -103,14 +94,12 @@ public class PutHiveQL extends AbstractHiveQLProcessor { + "such as an invalid query or an integrity constraint violation") .build(); - private static final Pattern HIVEQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("hiveql\\.args\\.(\\d+)\\.type"); - private static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+"); private final static List propertyDescriptors; private final static Set relationships; /* - * Will ensure that the list of property descriptors is build only once. + * Will ensure that the list of property descriptors is built only once. * Will also create a Set of relationships */ static { @@ -118,6 +107,7 @@ public class PutHiveQL extends AbstractHiveQLProcessor { _propertyDescriptors.add(HIVE_DBCP_SERVICE); _propertyDescriptors.add(BATCH_SIZE); _propertyDescriptors.add(CHARSET); + _propertyDescriptors.add(STATEMENT_DELIMITER); propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); Set _relationships = new HashSet<>(); @@ -149,19 +139,42 @@ public class PutHiveQL extends AbstractHiveQLProcessor { final long startNanos = System.nanoTime(); final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue()); final HiveDBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class); + final String statementDelimiter = context.getProperty(STATEMENT_DELIMITER).getValue(); + try (final Connection conn = dbcpService.getConnection()) { for (FlowFile flowFile : flowFiles) { try { - final String hiveQL = getHiveQL(session, flowFile, charset); - final PreparedStatement stmt = conn.prepareStatement(hiveQL); - setParameters(stmt, flowFile.getAttributes()); + final String script = getHiveQL(session, flowFile, charset); + String regex = "(? 0) { + loc = setParameters(loc, stmt, paramCount, flowFile.getAttributes()); + } + + // Execute the statement + stmt.execute(); + } + } // Emit a Provenance SEND event final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().send(flowFile, dbcpService.getConnectionURL(), transmissionMillis, true); session.transfer(flowFile, REL_SUCCESS); @@ -185,130 +198,4 @@ public class PutHiveQL extends AbstractHiveQLProcessor { context.yield(); } } - - /** - * Determines the HiveQL statement that should be executed for the given FlowFile - * - * @param session the session that can be used to access the given FlowFile - * @param flowFile the FlowFile whose HiveQL statement should be executed - * @return the HiveQL that is associated with the given FlowFile - */ - private String getHiveQL(final ProcessSession session, final FlowFile flowFile, final Charset charset) { - // Read the HiveQL from the FlowFile's content - final byte[] buffer = new byte[(int) flowFile.getSize()]; - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - StreamUtils.fillBuffer(in, buffer); - } - }); - - // Create the PreparedStatement to use for this FlowFile. - return new String(buffer, charset); - } - - - /** - * Sets all of the appropriate parameters on the given PreparedStatement, based on the given FlowFile attributes. - * - * @param stmt the statement to set the parameters on - * @param attributes the attributes from which to derive parameter indices, values, and types - * @throws SQLException if the PreparedStatement throws a SQLException when the appropriate setter is called - */ - private void setParameters(final PreparedStatement stmt, final Map attributes) throws SQLException { - for (final Map.Entry entry : attributes.entrySet()) { - final String key = entry.getKey(); - final Matcher matcher = HIVEQL_TYPE_ATTRIBUTE_PATTERN.matcher(key); - if (matcher.matches()) { - final int parameterIndex = Integer.parseInt(matcher.group(1)); - - final boolean isNumeric = NUMBER_PATTERN.matcher(entry.getValue()).matches(); - if (!isNumeric) { - throw new ProcessException("Value of the " + key + " attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral type"); - } - - final int jdbcType = Integer.parseInt(entry.getValue()); - final String valueAttrName = "hiveql.args." + parameterIndex + ".value"; - final String parameterValue = attributes.get(valueAttrName); - - try { - setParameter(stmt, valueAttrName, parameterIndex, parameterValue, jdbcType); - } catch (final NumberFormatException nfe) { - throw new ProcessException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted into the necessary data type", nfe); - } - } - } - } - - /** - * Determines how to map the given value to the appropriate JDBC data type and sets the parameter on the - * provided PreparedStatement - * - * @param stmt the PreparedStatement to set the parameter on - * @param attrName the name of the attribute that the parameter is coming from - for logging purposes - * @param parameterIndex the index of the HiveQL parameter to set - * @param parameterValue the value of the HiveQL parameter to set - * @param jdbcType the JDBC Type of the HiveQL parameter to set - * @throws SQLException if the PreparedStatement throws a SQLException when calling the appropriate setter - */ - private void setParameter(final PreparedStatement stmt, final String attrName, final int parameterIndex, final String parameterValue, final int jdbcType) throws SQLException { - if (parameterValue == null) { - stmt.setNull(parameterIndex, jdbcType); - } else { - try { - switch (jdbcType) { - case Types.BIT: - case Types.BOOLEAN: - stmt.setBoolean(parameterIndex, Boolean.parseBoolean(parameterValue)); - break; - case Types.TINYINT: - stmt.setByte(parameterIndex, Byte.parseByte(parameterValue)); - break; - case Types.SMALLINT: - stmt.setShort(parameterIndex, Short.parseShort(parameterValue)); - break; - case Types.INTEGER: - stmt.setInt(parameterIndex, Integer.parseInt(parameterValue)); - break; - case Types.BIGINT: - stmt.setLong(parameterIndex, Long.parseLong(parameterValue)); - break; - case Types.REAL: - stmt.setFloat(parameterIndex, Float.parseFloat(parameterValue)); - break; - case Types.FLOAT: - case Types.DOUBLE: - stmt.setDouble(parameterIndex, Double.parseDouble(parameterValue)); - break; - case Types.DECIMAL: - case Types.NUMERIC: - stmt.setBigDecimal(parameterIndex, new BigDecimal(parameterValue)); - break; - case Types.DATE: - stmt.setDate(parameterIndex, new Date(Long.parseLong(parameterValue))); - break; - case Types.TIME: - stmt.setTime(parameterIndex, new Time(Long.parseLong(parameterValue))); - break; - case Types.TIMESTAMP: - stmt.setTimestamp(parameterIndex, new Timestamp(Long.parseLong(parameterValue))); - break; - case Types.CHAR: - case Types.VARCHAR: - case Types.LONGNVARCHAR: - case Types.LONGVARCHAR: - stmt.setString(parameterIndex, parameterValue); - break; - default: - stmt.setObject(parameterIndex, parameterValue, jdbcType); - break; - } - } catch (SQLException e) { - // Log which attribute/parameter had an error, then rethrow to be handled at the top level - getLogger().error("Error setting parameter {} to value from {} ({})", new Object[]{parameterIndex, attrName, parameterValue}, e); - throw e; - } - } - } - -} +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java index 9c0ebef394..342fadaa5e 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java @@ -17,11 +17,14 @@ package org.apache.nifi.processors.hive; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; +import java.nio.charset.Charset; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; -import java.sql.SQLException; import java.sql.Statement; +import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -30,6 +33,8 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -37,6 +42,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.dbcp.hive.HiveDBCPService; import org.apache.nifi.flowfile.FlowFile; @@ -46,9 +52,11 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.hive.CsvOutputOptions; import org.apache.nifi.util.hive.HiveJdbcCommon; @EventDriven @@ -90,11 +98,59 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { .name("hive-query") .displayName("HiveQL Select Query") .description("HiveQL SELECT query to execute") - .required(true) + .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true) .build(); + public static final PropertyDescriptor HIVEQL_CSV_HEADER = new PropertyDescriptor.Builder() + .name("csv-header") + .displayName("CSV Header") + .description("Include Header in Output") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + + public static final PropertyDescriptor HIVEQL_CSV_ALT_HEADER = new PropertyDescriptor.Builder() + .name("csv-alt-header") + .displayName("Alternate CSV Header") + .description("Comma separated list of header fields") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor HIVEQL_CSV_DELIMITER = new PropertyDescriptor.Builder() + .name("csv-delimiter") + .displayName("CSV Delimiter") + .description("CSV Delimiter used to separate fields") + .required(true) + .defaultValue(",") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor HIVEQL_CSV_QUOTE = new PropertyDescriptor.Builder() + .name("csv-quote") + .displayName("CSV Quote") + .description("Whether to force quoting of CSV fields. Note that this might conflict with the setting for CSV Escape.") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + public static final PropertyDescriptor HIVEQL_CSV_ESCAPE = new PropertyDescriptor.Builder() + .name("csv-escape") + .displayName("CSV Escape") + .description("Whether to escape CSV strings in output. Note that this might conflict with the setting for CSV Quote.") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + public static final PropertyDescriptor HIVEQL_OUTPUT_FORMAT = new PropertyDescriptor.Builder() .name("hive-output-format") .displayName("Output Format") @@ -117,6 +173,12 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { _propertyDescriptors.add(HIVE_DBCP_SERVICE); _propertyDescriptors.add(HIVEQL_SELECT_QUERY); _propertyDescriptors.add(HIVEQL_OUTPUT_FORMAT); + _propertyDescriptors.add(HIVEQL_CSV_HEADER); + _propertyDescriptors.add(HIVEQL_CSV_ALT_HEADER); + _propertyDescriptors.add(HIVEQL_CSV_DELIMITER); + _propertyDescriptors.add(HIVEQL_CSV_QUOTE); + _propertyDescriptors.add(HIVEQL_CSV_ESCAPE); + _propertyDescriptors.add(CHARSET); propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); Set _relationships = new HashSet<>(); @@ -135,15 +197,26 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { return relationships; } + @OnScheduled + public void setup(ProcessContext context) { + // If the query is not set, then an incoming flow file is needed. Otherwise fail the initialization + if (!context.getProperty(HIVEQL_SELECT_QUERY).isSet() && !context.hasIncomingConnection()) { + final String errorString = "Either the Select Query must be specified or there must be an incoming connection " + + "providing flowfile(s) containing a SQL select query"; + getLogger().error(errorString); + throw new ProcessException(errorString); + } + } + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - FlowFile fileToProcess = null; - if (context.hasIncomingConnection()) { - fileToProcess = session.get(); + final FlowFile fileToProcess = (context.hasIncomingConnection()? session.get():null); + FlowFile flowfile = null; - // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. - // However, if we have no FlowFile and we have connections coming from other Processors, then - // we know that we should run only if we have a FlowFile. + // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. + // However, if we have no FlowFile and we have connections coming from other Processors, then + // we know that we should run only if we have a FlowFile. + if (context.hasIncomingConnection()) { if (fileToProcess == null && context.hasNonLoopConnection()) { return; } @@ -151,26 +224,73 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { final ComponentLog logger = getLogger(); final HiveDBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class); - final String selectQuery = context.getProperty(HIVEQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); + final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue()); + + final boolean flowbased = !(context.getProperty(HIVEQL_SELECT_QUERY).isSet()); + + // Source the SQL + final String selectQuery; + + if (context.getProperty(HIVEQL_SELECT_QUERY).isSet()) { + selectQuery = context.getProperty(HIVEQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); + } else { + // If the query is not set, then an incoming flow file is required, and expected to contain a valid SQL select query. + // If there is no incoming connection, onTrigger will not be called as the processor will fail when scheduled. + final StringBuilder queryContents = new StringBuilder(); + session.read(fileToProcess, new InputStreamCallback() { + @Override + public void process(InputStream in) throws IOException { + queryContents.append(IOUtils.toString(in)); + } + }); + selectQuery = queryContents.toString(); + } + + final String outputFormat = context.getProperty(HIVEQL_OUTPUT_FORMAT).getValue(); final StopWatch stopWatch = new StopWatch(true); + final boolean header = context.getProperty(HIVEQL_CSV_HEADER).asBoolean(); + final String altHeader = context.getProperty(HIVEQL_CSV_ALT_HEADER).evaluateAttributeExpressions(fileToProcess).getValue(); + final String delimiter = context.getProperty(HIVEQL_CSV_DELIMITER).evaluateAttributeExpressions(fileToProcess).getValue(); + final boolean quote = context.getProperty(HIVEQL_CSV_QUOTE).asBoolean(); + final boolean escape = context.getProperty(HIVEQL_CSV_HEADER).asBoolean(); try (final Connection con = dbcpService.getConnection(); - final Statement st = con.createStatement()) { + final Statement st = ( flowbased ? con.prepareStatement(selectQuery): con.createStatement()) + ) { + final AtomicLong nrOfRows = new AtomicLong(0L); if (fileToProcess == null) { - fileToProcess = session.create(); + flowfile = session.create(); + } else { + flowfile = fileToProcess; } - fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { + + flowfile = session.write(flowfile, new OutputStreamCallback() { @Override public void process(final OutputStream out) throws IOException { try { logger.debug("Executing query {}", new Object[]{selectQuery}); - final ResultSet resultSet = st.executeQuery(selectQuery); + if (flowbased) { + // Hive JDBC Doesn't Support this yet: + // ParameterMetaData pmd = ((PreparedStatement)st).getParameterMetaData(); + // int paramCount = pmd.getParameterCount(); + + // Alternate way to determine number of params in SQL. + int paramCount = StringUtils.countMatches(selectQuery, "?"); + + if (paramCount > 0) { + setParameters(1, (PreparedStatement) st, paramCount, fileToProcess.getAttributes()); + } + } + + final ResultSet resultSet = (flowbased ? ((PreparedStatement)st).executeQuery(): st.executeQuery(selectQuery)); + if (AVRO.equals(outputFormat)) { nrOfRows.set(HiveJdbcCommon.convertToAvroStream(resultSet, out)); } else if (CSV.equals(outputFormat)) { - nrOfRows.set(HiveJdbcCommon.convertToCsvStream(resultSet, out)); + CsvOutputOptions options = new CsvOutputOptions(header, altHeader, delimiter, quote, escape); + nrOfRows.set(HiveJdbcCommon.convertToCsvStream(resultSet, out,options)); } else { nrOfRows.set(0L); throw new ProcessException("Unsupported output format: " + outputFormat); @@ -181,33 +301,34 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { } }); - // set attribute how many rows were selected - fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); + // Set attribute for how many rows were selected + flowfile = session.putAttribute(flowfile, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); - // Set MIME type on output document and add extension + // Set MIME type on output document and add extension to filename if (AVRO.equals(outputFormat)) { - fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), AVRO_MIME_TYPE); - fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.FILENAME.key(), fileToProcess.getAttribute(CoreAttributes.FILENAME.key()) + ".avro"); + flowfile = session.putAttribute(flowfile, CoreAttributes.MIME_TYPE.key(), AVRO_MIME_TYPE); + flowfile = session.putAttribute(flowfile, CoreAttributes.FILENAME.key(), flowfile.getAttribute(CoreAttributes.FILENAME.key()) + ".avro"); } else if (CSV.equals(outputFormat)) { - fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), CSV_MIME_TYPE); - fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.FILENAME.key(), fileToProcess.getAttribute(CoreAttributes.FILENAME.key()) + ".csv"); + flowfile = session.putAttribute(flowfile, CoreAttributes.MIME_TYPE.key(), CSV_MIME_TYPE); + flowfile = session.putAttribute(flowfile, CoreAttributes.FILENAME.key(), flowfile.getAttribute(CoreAttributes.FILENAME.key()) + ".csv"); } logger.info("{} contains {} Avro records; transferring to 'success'", - new Object[]{fileToProcess, nrOfRows.get()}); + new Object[]{flowfile, nrOfRows.get()}); if (context.hasIncomingConnection()) { // If the flow file came from an incoming connection, issue a Modify Content provenance event - session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " + nrOfRows.get() + " rows", + session.getProvenanceReporter().modifyContent(flowfile, "Retrieved " + nrOfRows.get() + " rows", stopWatch.getElapsed(TimeUnit.MILLISECONDS)); } else { // If we created a flow file from rows received from Hive, issue a Receive provenance event - session.getProvenanceReporter().receive(fileToProcess, dbcpService.getConnectionURL(), stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.getProvenanceReporter().receive(flowfile, dbcpService.getConnectionURL(), stopWatch.getElapsed(TimeUnit.MILLISECONDS)); } - session.transfer(fileToProcess, REL_SUCCESS); + session.transfer(flowfile, REL_SUCCESS); } catch (final ProcessException | SQLException e) { - if (fileToProcess == null) { + logger.error("Issue processing SQL {} due to {}.", new Object[]{selectQuery, e}); + if (flowfile == null) { // This can happen if any exceptions occur while setting up the connection, statement, etc. logger.error("Unable to execute HiveQL select query {} due to {}. No FlowFile to route to failure", new Object[]{selectQuery, e}); @@ -215,15 +336,17 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { } else { if (context.hasIncomingConnection()) { logger.error("Unable to execute HiveQL select query {} for {} due to {}; routing to failure", - new Object[]{selectQuery, fileToProcess, e}); - fileToProcess = session.penalize(fileToProcess); + new Object[]{selectQuery, flowfile, e}); + flowfile = session.penalize(flowfile); } else { logger.error("Unable to execute HiveQL select query {} due to {}; routing to failure", new Object[]{selectQuery, e}); context.yield(); } - session.transfer(fileToProcess, REL_FAILURE); + session.transfer(flowfile, REL_FAILURE); } + } finally { + } } } diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/CsvOutputOptions.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/CsvOutputOptions.java new file mode 100644 index 0000000000..bad6926c17 --- /dev/null +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/CsvOutputOptions.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.hive; + +public class CsvOutputOptions { + + private boolean header = true; + private String altHeader = null; + private String delimiter = ","; + private boolean quote = false; + private boolean escape = true; + + public boolean isHeader() { + return header; + } + + public String getAltHeader() { + return altHeader; + } + + + public String getDelimiter() { + return delimiter; + } + + + public boolean isQuote() { + return quote; + } + + public boolean isEscape() { + return escape; + } + + public CsvOutputOptions(boolean header, String altHeader, String delimiter, boolean quote, boolean escape) { + this.header = header; + this.altHeader = altHeader; + this.delimiter = delimiter; + this.quote = quote; + this.escape = escape; + } +} diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java index 689baf941d..83d4e22431 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java @@ -40,6 +40,7 @@ import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import static java.sql.Types.ARRAY; @@ -292,27 +293,36 @@ public class HiveJdbcCommon { return builder.endRecord(); } - public static long convertToCsvStream(final ResultSet rs, final OutputStream outStream) throws SQLException, IOException { - return convertToCsvStream(rs, outStream, null, null); + public static long convertToCsvStream(final ResultSet rs, final OutputStream outStream, CsvOutputOptions outputOptions) throws SQLException, IOException { + return convertToCsvStream(rs, outStream, null, null, outputOptions); } - public static long convertToCsvStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback) + public static long convertToCsvStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback, CsvOutputOptions outputOptions) throws SQLException, IOException { final ResultSetMetaData meta = rs.getMetaData(); final int nrOfColumns = meta.getColumnCount(); List columnNames = new ArrayList<>(nrOfColumns); - for (int i = 1; i <= nrOfColumns; i++) { - String columnNameFromMeta = meta.getColumnName(i); - // Hive returns table.column for column name. Grab the column name as the string after the last period - int columnNameDelimiter = columnNameFromMeta.lastIndexOf("."); - columnNames.add(columnNameFromMeta.substring(columnNameDelimiter + 1)); + if (outputOptions.isHeader()) { + if (outputOptions.getAltHeader() == null) { + for (int i = 1; i <= nrOfColumns; i++) { + String columnNameFromMeta = meta.getColumnName(i); + // Hive returns table.column for column name. Grab the column name as the string after the last period + int columnNameDelimiter = columnNameFromMeta.lastIndexOf("."); + columnNames.add(columnNameFromMeta.substring(columnNameDelimiter + 1)); + } + } else { + String[] altHeaderNames = outputOptions.getAltHeader().split(","); + columnNames = Arrays.asList(altHeaderNames); + } } // Write column names as header row - outStream.write(StringUtils.join(columnNames, ",").getBytes(StandardCharsets.UTF_8)); - outStream.write("\n".getBytes(StandardCharsets.UTF_8)); + outStream.write(StringUtils.join(columnNames, outputOptions.getDelimiter()).getBytes(StandardCharsets.UTF_8)); + if (outputOptions.isHeader()) { + outStream.write("\n".getBytes(StandardCharsets.UTF_8)); + } // Iterate over the rows long nrOfRows = 0; @@ -334,7 +344,24 @@ public class HiveJdbcCommon { case VARCHAR: String valueString = rs.getString(i); if (valueString != null) { - rowValues.add("\"" + StringEscapeUtils.escapeCsv(valueString) + "\""); + // Removed extra quotes as those are a part of the escapeCsv when required. + StringBuilder sb = new StringBuilder(); + if (outputOptions.isQuote()) { + sb.append("\""); + if (outputOptions.isEscape()) { + sb.append(StringEscapeUtils.escapeCsv(valueString)); + } else { + sb.append(valueString); + } + sb.append("\""); + rowValues.add(sb.toString()); + } else { + if (outputOptions.isEscape()) { + rowValues.add(StringEscapeUtils.escapeCsv(valueString)); + } else { + rowValues.add(valueString); + } + } } else { rowValues.add(""); } @@ -358,7 +385,7 @@ public class HiveJdbcCommon { } } // Write row values - outStream.write(StringUtils.join(rowValues, ",").getBytes(StandardCharsets.UTF_8)); + outStream.write(StringUtils.join(rowValues, outputOptions.getDelimiter()).getBytes(StandardCharsets.UTF_8)); outStream.write("\n".getBytes(StandardCharsets.UTF_8)); nrOfRows++; } diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java index b46b8478c3..c7498f9f0f 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java @@ -356,12 +356,64 @@ public class TestPutHiveQL { runner.run(); // should fail because of the semicolon - runner.assertAllFlowFilesTransferred(PutHiveQL.REL_FAILURE, 1); + runner.assertAllFlowFilesTransferred(PutHiveQL.REL_SUCCESS, 1); + // Now we can check that the values were inserted by the multi-statement script. try (final Connection conn = service.getConnection()) { try (final Statement stmt = conn.createStatement()) { final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); - assertFalse(rs.next()); + assertTrue(rs.next()); + assertEquals("Record ID mismatch", 1, rs.getInt(1)); + assertEquals("Record NAME mismatch", "George", rs.getString(2)); + } + } + } + + @Test + public void testMultipleStatementsWithinFlowFilePlusEmbeddedDelimiter() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp"); + + final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + + "UPDATE PERSONS SET NAME='George\\;' WHERE ID=?; "; + final Map attributes = new HashMap<>(); + attributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("hiveql.args.1.value", "1"); + + attributes.put("hiveql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("hiveql.args.2.value", "Mark"); + + attributes.put("hiveql.args.3.type", String.valueOf(Types.INTEGER)); + attributes.put("hiveql.args.3.value", "84"); + + attributes.put("hiveql.args.4.type", String.valueOf(Types.INTEGER)); + attributes.put("hiveql.args.4.value", "1"); + + runner.enqueue(sql.getBytes(), attributes); + runner.run(); + + // should fail because of the semicolon + runner.assertAllFlowFilesTransferred(PutHiveQL.REL_SUCCESS, 1); + + // Now we can check that the values were inserted by the multi-statement script. + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals("Record ID mismatch", 1, rs.getInt(1)); + assertEquals("Record NAME mismatch", "George\\;", rs.getString(2)); } } } @@ -444,13 +496,13 @@ public class TestPutHiveQL { runner.enqueue(sql.getBytes(), attributes); runner.run(); - // should fail because of the semicolon + // should fail because of the table is invalid runner.assertAllFlowFilesTransferred(PutHiveQL.REL_FAILURE, 1); try (final Connection conn = service.getConnection()) { try (final Statement stmt = conn.createStatement()) { final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); - assertFalse(rs.next()); + assertTrue(rs.next()); } } } @@ -467,6 +519,7 @@ public class TestPutHiveQL { final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + "UPDATE PERSONS SET NAME='George' WHERE ID=?; "; + final Map attributes = new HashMap<>(); attributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER)); attributes.put("hiveql.args.1.value", "1"); @@ -483,7 +536,7 @@ public class TestPutHiveQL { runner.enqueue(sql.getBytes(), attributes); runner.run(); - // should fail because of the semicolon + // should fail because there isn't a valid connection and tables don't exist. runner.assertAllFlowFilesTransferred(PutHiveQL.REL_RETRY, 1); }