diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java index fb059147d8..d4d31cc1e8 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java @@ -329,7 +329,7 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { int paramCount = StringUtils.countMatches(selectQuery, "?"); if (paramCount > 0) { - setParameters(1, (PreparedStatement) st, paramCount, flowfile.getAttributes()); + setParameters(1, (PreparedStatement) st, paramCount, fileToProcess.getAttributes()); } } diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java index 34384ac286..8313caa267 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java @@ -46,6 +46,7 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; +import java.sql.Types; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -330,6 +331,41 @@ public class TestSelectHiveQL { runner.clearTransferState(); } + @Test + public void testParametrizedQuery() throws ClassNotFoundException, SQLException, InitializationException, IOException { + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)"); + int rowCount = 0; + //create larger row set + for (int batch = 0; batch < 100; batch++) { + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')"); + rowCount++; + } + + runner.setIncomingConnection(true); + runner.setProperty(SelectHiveQL.MAX_ROWS_PER_FLOW_FILE, "${" + MAX_ROWS_KEY + "}"); + runner.setProperty(SelectHiveQL.HIVEQL_OUTPUT_FORMAT, HiveJdbcCommon.AVRO); + runner.setVariable(MAX_ROWS_KEY, "9"); + + Map attributes = new HashMap(); + attributes.put("hiveql.args.1.value", "1"); + attributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER)); + runner.enqueue("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id = ?", attributes ); + + runner.run(); + runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_SUCCESS, 1); + runner.clearTransferState(); + } + @Test public void testMaxRowsPerFlowFileCSV() throws ClassNotFoundException, SQLException, InitializationException, IOException {