mirror of https://github.com/apache/nifi.git
NIFI-5780 Add pre and post statements to ExecuteSQL and ExecuteSQLRecord
Signed-off-by: Peter Wicks <patricker@gmail.com> This closes #3156.
This commit is contained in:
parent
be0949570a
commit
75906226a6
|
@ -109,7 +109,7 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
|
|||
public static final PropertyDescriptor HIVEQL_PRE_QUERY = new PropertyDescriptor.Builder()
|
||||
.name("hive-pre-query")
|
||||
.displayName("HiveQL Pre-Query")
|
||||
.description("HiveQL pre-query to execute. Semicolon-delimited list of queries. "
|
||||
.description("A semicolon-delimited list of queries executed before the main SQL query is executed. "
|
||||
+ "Example: 'set tez.queue.name=queue1; set hive.exec.orc.split.strategy=ETL; set hive.exec.reducers.bytes.per.reducer=1073741824'. "
|
||||
+ "Note, the results/outputs of these queries will be suppressed if successfully executed.")
|
||||
.required(false)
|
||||
|
@ -129,7 +129,7 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
|
|||
public static final PropertyDescriptor HIVEQL_POST_QUERY = new PropertyDescriptor.Builder()
|
||||
.name("hive-post-query")
|
||||
.displayName("HiveQL Post-Query")
|
||||
.description("HiveQL post-query to execute. Semicolon-delimited list of queries. "
|
||||
.description("A semicolon-delimited list of queries executed after the main SQL query is executed. "
|
||||
+ "Note, the results/outputs of these queries will be suppressed if successfully executed.")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.dbcp.DBCPService;
|
||||
|
@ -44,6 +45,7 @@ import java.sql.Statement;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -82,6 +84,17 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
|
|||
.identifiesControllerService(DBCPService.class)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor SQL_PRE_QUERY = new PropertyDescriptor.Builder()
|
||||
.name("sql-pre-query")
|
||||
.displayName("SQL Pre-Query")
|
||||
.description("A semicolon-delimited list of queries executed before the main SQL query is executed. " +
|
||||
"For example, set session properties before main query. " +
|
||||
"Results/outputs from these queries will be suppressed if there are no errors.")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor SQL_SELECT_QUERY = new PropertyDescriptor.Builder()
|
||||
.name("SQL select query")
|
||||
.description("The SQL select query to execute. The query can be empty, a constant value, or built from attributes "
|
||||
|
@ -94,6 +107,17 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
|
|||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor SQL_POST_QUERY = new PropertyDescriptor.Builder()
|
||||
.name("sql-post-query")
|
||||
.displayName("SQL Post-Query")
|
||||
.description("A semicolon-delimited list of queries executed after the main SQL query is executed. " +
|
||||
"Example like setting session properties after main query. " +
|
||||
"Results/outputs from these queries will be suppressed if there are no errors.")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder()
|
||||
.name("Max Wait Time")
|
||||
.description("The maximum amount of time allowed for a running SQL select query "
|
||||
|
@ -177,10 +201,12 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
|
|||
final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
|
||||
final Integer outputBatchSizeField = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
|
||||
final int outputBatchSize = outputBatchSizeField == null ? 0 : outputBatchSizeField;
|
||||
List<String> preQueries = getQueries(context.getProperty(SQL_PRE_QUERY).evaluateAttributeExpressions(fileToProcess).getValue());
|
||||
List<String> postQueries = getQueries(context.getProperty(SQL_POST_QUERY).evaluateAttributeExpressions(fileToProcess).getValue());
|
||||
|
||||
SqlWriter sqlWriter = configureSqlWriter(session, context, fileToProcess);
|
||||
|
||||
final String selectQuery;
|
||||
String selectQuery;
|
||||
if (context.getProperty(SQL_SELECT_QUERY).isSet()) {
|
||||
selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
|
||||
} else {
|
||||
|
@ -196,6 +222,14 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
|
|||
final PreparedStatement st = con.prepareStatement(selectQuery)) {
|
||||
st.setQueryTimeout(queryTimeout); // timeout in seconds
|
||||
|
||||
// Execute pre-query, throw exception and cleanup Flow Files if fail
|
||||
Pair<String,SQLException> failure = executeConfigStatements(con, preQueries);
|
||||
if (failure != null) {
|
||||
// In case of failure, assigning config query to "selectQuery" to follow current error handling
|
||||
selectQuery = failure.getLeft();
|
||||
throw failure.getRight();
|
||||
}
|
||||
|
||||
if (fileToProcess != null) {
|
||||
JdbcCommon.setParameters(st, fileToProcess.getAttributes());
|
||||
}
|
||||
|
@ -317,6 +351,14 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
// Execute post-query, throw exception and cleanup Flow Files if fail
|
||||
failure = executeConfigStatements(con, postQueries);
|
||||
if (failure != null) {
|
||||
selectQuery = failure.getLeft();
|
||||
resultSetFlowFiles.forEach(ff -> session.remove(ff));
|
||||
throw failure.getRight();
|
||||
}
|
||||
|
||||
// Transfer any remaining files to SUCCESS
|
||||
session.transfer(resultSetFlowFiles, REL_SUCCESS);
|
||||
resultSetFlowFiles.clear();
|
||||
|
@ -365,5 +407,40 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Executes given queries using pre-defined connection.
|
||||
* Returns null on success, or a query string if failed.
|
||||
*/
|
||||
protected Pair<String,SQLException> executeConfigStatements(final Connection con, final List<String> configQueries){
|
||||
if (configQueries == null || configQueries.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
for (String confSQL : configQueries) {
|
||||
try(final Statement st = con.createStatement()){
|
||||
st.execute(confSQL);
|
||||
} catch (SQLException e) {
|
||||
return Pair.of(confSQL, e);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/*
|
||||
* Extract list of queries from config property
|
||||
*/
|
||||
protected List<String> getQueries(final String value) {
|
||||
if (value == null || value.length() == 0 || value.trim().length() == 0) {
|
||||
return null;
|
||||
}
|
||||
final List<String> queries = new LinkedList<>();
|
||||
for (String query : value.split(";")) {
|
||||
if (query.trim().length() > 0) {
|
||||
queries.add(query.trim());
|
||||
}
|
||||
}
|
||||
return queries;
|
||||
}
|
||||
|
||||
protected abstract SqlWriter configureSqlWriter(ProcessSession session, ProcessContext context, FlowFile fileToProcess);
|
||||
}
|
||||
|
|
|
@ -114,7 +114,9 @@ public class ExecuteSQL extends AbstractExecuteSQL {
|
|||
|
||||
final List<PropertyDescriptor> pds = new ArrayList<>();
|
||||
pds.add(DBCP_SERVICE);
|
||||
pds.add(SQL_PRE_QUERY);
|
||||
pds.add(SQL_SELECT_QUERY);
|
||||
pds.add(SQL_POST_QUERY);
|
||||
pds.add(QUERY_TIMEOUT);
|
||||
pds.add(NORMALIZE_NAMES_FOR_AVRO);
|
||||
pds.add(USE_AVRO_LOGICAL_TYPES);
|
||||
|
|
|
@ -121,7 +121,9 @@ public class ExecuteSQLRecord extends AbstractExecuteSQL {
|
|||
|
||||
final List<PropertyDescriptor> pds = new ArrayList<>();
|
||||
pds.add(DBCP_SERVICE);
|
||||
pds.add(SQL_PRE_QUERY);
|
||||
pds.add(SQL_SELECT_QUERY);
|
||||
pds.add(SQL_POST_QUERY);
|
||||
pds.add(QUERY_TIMEOUT);
|
||||
pds.add(RECORD_WRITER_FACTORY);
|
||||
pds.add(NORMALIZE_NAMES);
|
||||
|
|
|
@ -544,7 +544,153 @@ public class TestExecuteSQL {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPreQuery() throws Exception {
|
||||
// remove previous test database, if any
|
||||
final File dbLocation = new File(DB_LOCATION);
|
||||
dbLocation.delete();
|
||||
|
||||
// load test data to database
|
||||
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
|
||||
Statement stmt = con.createStatement();
|
||||
|
||||
try {
|
||||
stmt.execute("drop table TEST_NULL_INT");
|
||||
} catch (final SQLException sqle) {
|
||||
}
|
||||
|
||||
stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
|
||||
stmt.execute("insert into TEST_NULL_INT values(1,2,3)");
|
||||
|
||||
runner.setIncomingConnection(true);
|
||||
runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
|
||||
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
|
||||
runner.enqueue("test".getBytes());
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
|
||||
MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
|
||||
firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "1");
|
||||
|
||||
final InputStream in = new ByteArrayInputStream(firstFlowFile.toByteArray());
|
||||
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
|
||||
try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) {
|
||||
GenericRecord record = null;
|
||||
long recordsFromStream = 0;
|
||||
while (dataFileReader.hasNext()) {
|
||||
// Reuse record object by passing it to next(). This saves us from
|
||||
// allocating and garbage collecting many objects for files with
|
||||
// many items.
|
||||
record = dataFileReader.next(record);
|
||||
recordsFromStream += 1;
|
||||
}
|
||||
|
||||
assertEquals(1, recordsFromStream);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPostQuery() throws Exception {
|
||||
// remove previous test database, if any
|
||||
final File dbLocation = new File(DB_LOCATION);
|
||||
dbLocation.delete();
|
||||
|
||||
// load test data to database
|
||||
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
|
||||
Statement stmt = con.createStatement();
|
||||
|
||||
try {
|
||||
stmt.execute("drop table TEST_NULL_INT");
|
||||
} catch (final SQLException sqle) {
|
||||
}
|
||||
|
||||
stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
|
||||
stmt.execute("insert into TEST_NULL_INT values(1,2,3)");
|
||||
|
||||
runner.setIncomingConnection(true);
|
||||
runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
|
||||
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
|
||||
runner.setProperty(ExecuteSQL.SQL_POST_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(0);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(0)");
|
||||
runner.enqueue("test".getBytes());
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
|
||||
MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
|
||||
firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "1");
|
||||
|
||||
final InputStream in = new ByteArrayInputStream(firstFlowFile.toByteArray());
|
||||
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
|
||||
try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) {
|
||||
GenericRecord record = null;
|
||||
long recordsFromStream = 0;
|
||||
while (dataFileReader.hasNext()) {
|
||||
// Reuse record object by passing it to next(). This saves us from
|
||||
// allocating and garbage collecting many objects for files with
|
||||
// many items.
|
||||
record = dataFileReader.next(record);
|
||||
recordsFromStream += 1;
|
||||
}
|
||||
|
||||
assertEquals(1, recordsFromStream);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPreQueryFail() throws Exception {
|
||||
// remove previous test database, if any
|
||||
final File dbLocation = new File(DB_LOCATION);
|
||||
dbLocation.delete();
|
||||
|
||||
// load test data to database
|
||||
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
|
||||
Statement stmt = con.createStatement();
|
||||
|
||||
try {
|
||||
stmt.execute("drop table TEST_NULL_INT");
|
||||
} catch (final SQLException sqle) {
|
||||
}
|
||||
|
||||
stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
|
||||
|
||||
runner.setIncomingConnection(true);
|
||||
// Simulate failure by not provide parameter
|
||||
runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");
|
||||
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
|
||||
runner.enqueue("test".getBytes());
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPostQueryFail() throws Exception {
|
||||
// remove previous test database, if any
|
||||
final File dbLocation = new File(DB_LOCATION);
|
||||
dbLocation.delete();
|
||||
|
||||
// load test data to database
|
||||
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
|
||||
Statement stmt = con.createStatement();
|
||||
|
||||
try {
|
||||
stmt.execute("drop table TEST_NULL_INT");
|
||||
} catch (final SQLException sqle) {
|
||||
}
|
||||
|
||||
stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
|
||||
|
||||
runner.setIncomingConnection(true);
|
||||
runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
|
||||
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
|
||||
// Simulate failure by not provide parameter
|
||||
runner.setProperty(ExecuteSQL.SQL_POST_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");
|
||||
runner.enqueue("test".getBytes());
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 1);
|
||||
MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_FAILURE).get(0);
|
||||
firstFlowFile.assertContentEquals("test");
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple implementation only for ExecuteSQL processor testing.
|
||||
|
|
|
@ -350,6 +350,138 @@ public class TestExecuteSQLRecord {
|
|||
assertEquals(durationTime, fetchTime + executionTime);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPreQuery() throws Exception {
|
||||
// remove previous test database, if any
|
||||
final File dbLocation = new File(DB_LOCATION);
|
||||
dbLocation.delete();
|
||||
|
||||
// load test data to database
|
||||
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
|
||||
Statement stmt = con.createStatement();
|
||||
|
||||
try {
|
||||
stmt.execute("drop table TEST_NULL_INT");
|
||||
} catch (final SQLException sqle) {
|
||||
}
|
||||
|
||||
stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
|
||||
stmt.execute("insert into TEST_NULL_INT values(1,2,3)");
|
||||
|
||||
runner.setIncomingConnection(true);
|
||||
runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
|
||||
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
|
||||
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
|
||||
runner.addControllerService("writer", recordWriter);
|
||||
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
|
||||
runner.enableControllerService(recordWriter);
|
||||
runner.enqueue("test".getBytes());
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
|
||||
MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
|
||||
firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "1");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPostQuery() throws Exception {
|
||||
// remove previous test database, if any
|
||||
final File dbLocation = new File(DB_LOCATION);
|
||||
dbLocation.delete();
|
||||
|
||||
// load test data to database
|
||||
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
|
||||
Statement stmt = con.createStatement();
|
||||
|
||||
try {
|
||||
stmt.execute("drop table TEST_NULL_INT");
|
||||
} catch (final SQLException sqle) {
|
||||
}
|
||||
|
||||
stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
|
||||
stmt.execute("insert into TEST_NULL_INT values(1,2,3)");
|
||||
|
||||
runner.setIncomingConnection(true);
|
||||
runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
|
||||
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
|
||||
runner.setProperty(ExecuteSQL.SQL_POST_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(0);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(0)");
|
||||
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
|
||||
runner.addControllerService("writer", recordWriter);
|
||||
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
|
||||
runner.enableControllerService(recordWriter);
|
||||
runner.enqueue("test".getBytes());
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
|
||||
MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
|
||||
firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "1");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPreQueryFail() throws Exception {
|
||||
// remove previous test database, if any
|
||||
final File dbLocation = new File(DB_LOCATION);
|
||||
dbLocation.delete();
|
||||
|
||||
// load test data to database
|
||||
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
|
||||
Statement stmt = con.createStatement();
|
||||
|
||||
try {
|
||||
stmt.execute("drop table TEST_NULL_INT");
|
||||
} catch (final SQLException sqle) {
|
||||
}
|
||||
|
||||
stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
|
||||
|
||||
runner.setIncomingConnection(true);
|
||||
// Simulate failure by not provide parameter
|
||||
runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");
|
||||
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
|
||||
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
|
||||
runner.addControllerService("writer", recordWriter);
|
||||
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
|
||||
runner.enableControllerService(recordWriter);
|
||||
runner.enqueue("test".getBytes());
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPostQueryFail() throws Exception {
|
||||
// remove previous test database, if any
|
||||
final File dbLocation = new File(DB_LOCATION);
|
||||
dbLocation.delete();
|
||||
|
||||
// load test data to database
|
||||
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
|
||||
Statement stmt = con.createStatement();
|
||||
|
||||
try {
|
||||
stmt.execute("drop table TEST_NULL_INT");
|
||||
} catch (final SQLException sqle) {
|
||||
}
|
||||
|
||||
stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
|
||||
|
||||
runner.setIncomingConnection(true);
|
||||
runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
|
||||
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
|
||||
// Simulate failure by not provide parameter
|
||||
runner.setProperty(ExecuteSQL.SQL_POST_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");
|
||||
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
|
||||
runner.addControllerService("writer", recordWriter);
|
||||
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
|
||||
runner.enableControllerService(recordWriter);
|
||||
runner.enqueue("test".getBytes());
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 1);
|
||||
MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_FAILURE).get(0);
|
||||
firstFlowFile.assertContentEquals("test");
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Simple implementation only for ExecuteSQL processor testing.
|
||||
|
|
Loading…
Reference in New Issue