NIFI-1409: Fix missing transfer on error in ExecuteSql

Signed-off-by: jpercivall <joepercivall@yahoo.com>
This commit is contained in:
Matt Burgess 2016-01-27 14:01:50 -05:00 committed by jpercivall
parent c5a174ae6d
commit aa6bc5d505
2 changed files with 84 additions and 47 deletions

View File

@ -53,50 +53,49 @@ import org.apache.nifi.util.StopWatch;
@InputRequirement(Requirement.INPUT_ALLOWED)
@Tags({"sql", "select", "jdbc", "query", "database"})
@CapabilityDescription("Execute provided SQL select query. Query result will be converted to Avro format."
+ " Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on " +
"a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. " +
"If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the " +
"select query. " +
"FlowFile attribute 'executesql.row.count' indicates how many rows were selected.")
+ " Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on "
+ "a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. "
+ "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the "
+ "select query. FlowFile attribute 'executesql.row.count' indicates how many rows were selected.")
public class ExecuteSQL extends AbstractProcessor {
public static final String RESULT_ROW_COUNT = "executesql.row.count";
// Relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Successfully created FlowFile from SQL query result set.")
.build();
.name("success")
.description("Successfully created FlowFile from SQL query result set.")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("SQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship")
.build();
.name("failure")
.description("SQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship")
.build();
private final Set<Relationship> relationships;
public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
.name("Database Connection Pooling Service")
.description("The Controller Service that is used to obtain connection to database")
.required(true)
.identifiesControllerService(DBCPService.class)
.build();
.name("Database Connection Pooling Service")
.description("The Controller Service that is used to obtain connection to database")
.required(true)
.identifiesControllerService(DBCPService.class)
.build();
public static final PropertyDescriptor SQL_SELECT_QUERY = new PropertyDescriptor.Builder()
.name("SQL select query")
.description("SQL select query")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
.name("SQL select query")
.description("SQL select query")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder()
.name("Max Wait Time")
.description("The maximum amount of time allowed for a running SQL select query "
+ " , zero means there is no limit. Max time less than 1 second will be equal to zero.")
.defaultValue("0 seconds")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.sensitive(false)
.build();
.name("Max Wait Time")
.description("The maximum amount of time allowed for a running SQL select query "
+ " , zero means there is no limit. Max time less than 1 second will be equal to zero.")
.defaultValue("0 seconds")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.sensitive(false)
.build();
private final List<PropertyDescriptor> propDescriptors;
@ -125,36 +124,36 @@ public class ExecuteSQL extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile incoming = null;
FlowFile fileToProcess = null;
if (context.hasIncomingConnection()) {
incoming = session.get();
fileToProcess = session.get();
// If we have no FlowFile, and all incoming connections are self-loops then we can continue on.
// However, if we have no FlowFile and we have connections coming from other Processors, then
// we know that we should run only if we have a FlowFile.
if (incoming == null && context.hasNonLoopConnection()) {
if (fileToProcess == null && context.hasNonLoopConnection()) {
return;
}
}
final ProcessorLog logger = getLogger();
final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
final String selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(incoming).getValue();
final String selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
final StopWatch stopWatch = new StopWatch(true);
try (final Connection con = dbcpService.getConnection();
final Statement st = con.createStatement()) {
st.setQueryTimeout(queryTimeout); // timeout in seconds
final LongHolder nrOfRows = new LongHolder(0L);
FlowFile outgoing = (incoming == null ? session.create() : incoming);
outgoing = session.write(outgoing, new OutputStreamCallback() {
if (fileToProcess == null) {
fileToProcess = session.create();
}
fileToProcess = session.write(fileToProcess, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
try {
logger.debug("Executing query {}", new Object[] {selectQuery});
logger.debug("Executing query {}", new Object[]{selectQuery});
final ResultSet resultSet = st.executeQuery(selectQuery);
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out));
} catch (final SQLException e) {
@ -164,17 +163,30 @@ public class ExecuteSQL extends AbstractProcessor {
});
// set attribute how many rows were selected
outgoing = session.putAttribute(outgoing, RESULT_ROW_COUNT, nrOfRows.get().toString());
fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, nrOfRows.get().toString());
logger.info("{} contains {} Avro records; transferring to 'success'", new Object[] {outgoing, nrOfRows.get()});
session.getProvenanceReporter().modifyContent(outgoing, "Retrieved " + nrOfRows.get() + " rows", stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(outgoing, REL_SUCCESS);
logger.info("{} contains {} Avro records; transferring to 'success'",
new Object[]{fileToProcess, nrOfRows.get()});
session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " + nrOfRows.get() + " rows",
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(fileToProcess, REL_SUCCESS);
} catch (final ProcessException | SQLException e) {
if (incoming == null) {
logger.error("Unable to execute SQL select query {} due to {}. No incoming flow file to route to failure", new Object[] {selectQuery, e});
if (fileToProcess == null) {
// This can happen if any exceptions occur while setting up the connection, statement, etc.
logger.error("Unable to execute SQL select query {} due to {}. No FlowFile to route to failure",
new Object[]{selectQuery, e});
context.yield();
} else {
logger.error("Unable to execute SQL select query {} for {} due to {}; routing to failure", new Object[] {selectQuery, incoming, e});
session.transfer(incoming, REL_FAILURE);
if (context.hasIncomingConnection()) {
logger.error("Unable to execute SQL select query {} for {} due to {}; routing to failure",
new Object[]{selectQuery, fileToProcess, e});
fileToProcess = session.penalize(fileToProcess);
} else {
logger.error("Unable to execute SQL select query {} due to {}; routing to failure",
new Object[]{selectQuery, e});
context.yield();
}
session.transfer(fileToProcess, REL_FAILURE);
}
}
}

View File

@ -152,6 +152,31 @@ public class TestExecuteSQL {
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0).assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "2");
}
@Test
public void testWithSqlException() throws SQLException {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_NO_ROWS");
} catch (final SQLException sqle) {
}
stmt.execute("create table TEST_NO_ROWS (id integer)");
runner.setIncomingConnection(false);
// Try a valid SQL statment that will generate an error (val1 does not exist, e.g.)
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT val1 FROM TEST_NO_ROWS");
runner.run();
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 1);
}
public void invokeOnTrigger(final Integer queryTimeout, final String query, final boolean incomingFlowFile)
throws InitializationException, ClassNotFoundException, SQLException, IOException {