NIFI-4473 - fixed NPE in parametrized queries

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2215
This commit is contained in:
Matthew Burgess 2017-10-16 13:38:56 -04:00
parent 4edafad6e5
commit 1a1e01c568
2 changed files with 37 additions and 1 deletions

View File

@ -329,7 +329,7 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
int paramCount = StringUtils.countMatches(selectQuery, "?"); int paramCount = StringUtils.countMatches(selectQuery, "?");
if (paramCount > 0) { if (paramCount > 0) {
setParameters(1, (PreparedStatement) st, paramCount, flowfile.getAttributes()); setParameters(1, (PreparedStatement) st, paramCount, fileToProcess.getAttributes());
} }
} }

View File

@ -46,6 +46,7 @@ import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.sql.Types;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -330,6 +331,41 @@ public class TestSelectHiveQL {
runner.clearTransferState(); runner.clearTransferState();
} }
@Test
public void testParametrizedQuery() throws ClassNotFoundException, SQLException, InitializationException, IOException {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
}
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
int rowCount = 0;
//create larger row set
for (int batch = 0; batch < 100; batch++) {
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
rowCount++;
}
runner.setIncomingConnection(true);
runner.setProperty(SelectHiveQL.MAX_ROWS_PER_FLOW_FILE, "${" + MAX_ROWS_KEY + "}");
runner.setProperty(SelectHiveQL.HIVEQL_OUTPUT_FORMAT, HiveJdbcCommon.AVRO);
runner.setVariable(MAX_ROWS_KEY, "9");
Map<String, String> attributes = new HashMap<String, String>();
attributes.put("hiveql.args.1.value", "1");
attributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER));
runner.enqueue("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id = ?", attributes );
runner.run();
runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_SUCCESS, 1);
runner.clearTransferState();
}
@Test @Test
public void testMaxRowsPerFlowFileCSV() throws ClassNotFoundException, SQLException, InitializationException, IOException { public void testMaxRowsPerFlowFileCSV() throws ClassNotFoundException, SQLException, InitializationException, IOException {