From 5cd8a3e729cb9cbd64d97289d610c6e25fe87b9a Mon Sep 17 00:00:00 2001 From: Matthew Burgess Date: Thu, 2 Nov 2017 17:13:22 -0400 Subject: [PATCH] NIFI-4473: Fixed SelectHiveQL flowfile handling during error conditions Signed-off-by: Pierre Villard This closes #2247. --- .../nifi/processors/hive/SelectHiveQL.java | 7 +++--- .../processors/hive/TestSelectHiveQL.java | 24 +++++++++++++++++++ 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java index d4d31cc1e8..b9ce1c9455 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java @@ -257,7 +257,7 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { } private void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final FlowFile fileToProcess = (context.hasIncomingConnection() ? session.get() : null); + FlowFile fileToProcess = (context.hasIncomingConnection() ? session.get() : null); FlowFile flowfile = null; // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. @@ -338,8 +338,9 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { try { resultSet = (flowbased ? ((PreparedStatement) st).executeQuery() : st.executeQuery(selectQuery)); } catch (SQLException se) { - // If an error occurs during the query, a flowfile is expected to be routed to failure, so create one here (the original will be removed) - flowfile = session.create(fileToProcess); + // If an error occurs during the query, a flowfile is expected to be routed to failure, so ensure one here + flowfile = (fileToProcess == null) ? session.create() : fileToProcess; + fileToProcess = null; throw se; } diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java index 8313caa267..2222dbcb8d 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java @@ -181,6 +181,30 @@ public class TestSelectHiveQL { runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_FAILURE, 1); } + @Test + public void testWithBadSQL() throws SQLException { + final String BAD_SQL = "create table TEST_NO_ROWS (id integer)"; + + // Test with incoming flow file (it should be routed to failure intact, i.e. same content and no parent) + runner.setIncomingConnection(true); + // Try a valid SQL statement that will generate an error (val1 does not exist, e.g.) + runner.enqueue(BAD_SQL); + runner.run(); + runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_FAILURE, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(SelectHiveQL.REL_FAILURE).get(0); + flowFile.assertContentEquals(BAD_SQL); + flowFile.assertAttributeEquals("parentIds", null); + runner.clearTransferState(); + + // Test with no incoming flow file (an empty flow file is transferred) + runner.setIncomingConnection(false); + // Try a valid SQL statement that will generate an error (val1 does not exist, e.g.) + runner.setProperty(SelectHiveQL.HIVEQL_SELECT_QUERY, BAD_SQL); + runner.run(); + runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_FAILURE, 1); + flowFile = runner.getFlowFilesForRelationship(SelectHiveQL.REL_FAILURE).get(0); + flowFile.assertContentEquals(""); + } @Test public void invokeOnTriggerWithCsv()