mirror of https://github.com/apache/nifi.git
NIFI-4473: Fixed SelectHiveQL flowfile handling during error conditions
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #2247.
This commit is contained in:
parent
b5d7aecfac
commit
5cd8a3e729
|
@ -257,7 +257,7 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
private void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||||
final FlowFile fileToProcess = (context.hasIncomingConnection() ? session.get() : null);
|
FlowFile fileToProcess = (context.hasIncomingConnection() ? session.get() : null);
|
||||||
FlowFile flowfile = null;
|
FlowFile flowfile = null;
|
||||||
|
|
||||||
// If we have no FlowFile, and all incoming connections are self-loops then we can continue on.
|
// If we have no FlowFile, and all incoming connections are self-loops then we can continue on.
|
||||||
|
@ -338,8 +338,9 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
|
||||||
try {
|
try {
|
||||||
resultSet = (flowbased ? ((PreparedStatement) st).executeQuery() : st.executeQuery(selectQuery));
|
resultSet = (flowbased ? ((PreparedStatement) st).executeQuery() : st.executeQuery(selectQuery));
|
||||||
} catch (SQLException se) {
|
} catch (SQLException se) {
|
||||||
// If an error occurs during the query, a flowfile is expected to be routed to failure, so create one here (the original will be removed)
|
// If an error occurs during the query, a flowfile is expected to be routed to failure, so ensure one here
|
||||||
flowfile = session.create(fileToProcess);
|
flowfile = (fileToProcess == null) ? session.create() : fileToProcess;
|
||||||
|
fileToProcess = null;
|
||||||
throw se;
|
throw se;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -181,6 +181,30 @@ public class TestSelectHiveQL {
|
||||||
runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_FAILURE, 1);
|
runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_FAILURE, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWithBadSQL() throws SQLException {
|
||||||
|
final String BAD_SQL = "create table TEST_NO_ROWS (id integer)";
|
||||||
|
|
||||||
|
// Test with incoming flow file (it should be routed to failure intact, i.e. same content and no parent)
|
||||||
|
runner.setIncomingConnection(true);
|
||||||
|
// Try a valid SQL statement that will generate an error (val1 does not exist, e.g.)
|
||||||
|
runner.enqueue(BAD_SQL);
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_FAILURE, 1);
|
||||||
|
MockFlowFile flowFile = runner.getFlowFilesForRelationship(SelectHiveQL.REL_FAILURE).get(0);
|
||||||
|
flowFile.assertContentEquals(BAD_SQL);
|
||||||
|
flowFile.assertAttributeEquals("parentIds", null);
|
||||||
|
runner.clearTransferState();
|
||||||
|
|
||||||
|
// Test with no incoming flow file (an empty flow file is transferred)
|
||||||
|
runner.setIncomingConnection(false);
|
||||||
|
// Try a valid SQL statement that will generate an error (val1 does not exist, e.g.)
|
||||||
|
runner.setProperty(SelectHiveQL.HIVEQL_SELECT_QUERY, BAD_SQL);
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_FAILURE, 1);
|
||||||
|
flowFile = runner.getFlowFilesForRelationship(SelectHiveQL.REL_FAILURE).get(0);
|
||||||
|
flowFile.assertContentEquals("");
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void invokeOnTriggerWithCsv()
|
public void invokeOnTriggerWithCsv()
|
||||||
|
|
Loading…
Reference in New Issue