diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java index d4d31cc1e8..b9ce1c9455 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java @@ -257,7 +257,7 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { } private void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final FlowFile fileToProcess = (context.hasIncomingConnection() ? session.get() : null); + FlowFile fileToProcess = (context.hasIncomingConnection() ? session.get() : null); FlowFile flowfile = null; // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. @@ -338,8 +338,9 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { try { resultSet = (flowbased ? ((PreparedStatement) st).executeQuery() : st.executeQuery(selectQuery)); } catch (SQLException se) { - // If an error occurs during the query, a flowfile is expected to be routed to failure, so create one here (the original will be removed) - flowfile = session.create(fileToProcess); + // If an error occurs during the query, a flowfile is expected to be routed to failure, so ensure one here + flowfile = (fileToProcess == null) ? session.create() : fileToProcess; + fileToProcess = null; throw se; } diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java index 8313caa267..2222dbcb8d 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java @@ -181,6 +181,30 @@ public class TestSelectHiveQL { runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_FAILURE, 1); } + @Test + public void testWithBadSQL() throws SQLException { + final String BAD_SQL = "create table TEST_NO_ROWS (id integer)"; + + // Test with incoming flow file (it should be routed to failure intact, i.e. same content and no parent) + runner.setIncomingConnection(true); + // Try a valid SQL statement that will generate an error (val1 does not exist, e.g.) + runner.enqueue(BAD_SQL); + runner.run(); + runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_FAILURE, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(SelectHiveQL.REL_FAILURE).get(0); + flowFile.assertContentEquals(BAD_SQL); + flowFile.assertAttributeEquals("parentIds", null); + runner.clearTransferState(); + + // Test with no incoming flow file (an empty flow file is transferred) + runner.setIncomingConnection(false); + // Try a valid SQL statement that will generate an error (val1 does not exist, e.g.) + runner.setProperty(SelectHiveQL.HIVEQL_SELECT_QUERY, BAD_SQL); + runner.run(); + runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_FAILURE, 1); + flowFile = runner.getFlowFilesForRelationship(SelectHiveQL.REL_FAILURE).get(0); + flowFile.assertContentEquals(""); + } @Test public void invokeOnTriggerWithCsv()