NIFI-626 - query timeout added

Signed-off-by: Toivo Adams <toivo.adams@gmail.com>
Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Toivo Adams 2015-05-31 16:43:49 +03:00 committed by Mark Payne
parent 14bcad212a
commit d98757e4c7
2 changed files with 34 additions and 6 deletions

View File

@ -40,7 +40,6 @@ import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
@ -79,6 +78,16 @@ public class ExecuteSQL extends AbstractProcessor {
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder()
.name("Max Wait Time")
.description("The maximum amount of time allowed for a running SQL select query "
+ " , zero means there is no limit. Max time less than 1 second will be equal to zero.")
.defaultValue("0 seconds")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.sensitive(false)
.build();
private final List<PropertyDescriptor> propDescriptors;
public ExecuteSQL() {
@ -89,6 +98,7 @@ public class ExecuteSQL extends AbstractProcessor {
ArrayList<PropertyDescriptor> pds = new ArrayList<>();
pds.add(DBCP_SERVICE);
pds.add(SQL_SELECT_QUERY);
pds.add(QUERY_TIMEOUT);
propDescriptors = Collections.unmodifiableList(pds);
}
@ -113,6 +123,8 @@ public class ExecuteSQL extends AbstractProcessor {
final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
final String selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(incoming).getValue();
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
final StopWatch stopWatch = new StopWatch(true);
try {
@ -120,6 +132,7 @@ public class ExecuteSQL extends AbstractProcessor {
try {
final Statement st = con.createStatement();
try {
st.setQueryTimeout(queryTimeout); // timeout in seconds
FlowFile outgoing = session.write(incoming, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {

View File

@ -34,7 +34,6 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.util.TestJdbcHugeStream;
import org.apache.nifi.reporting.InitializationException;
@ -68,7 +67,17 @@ public class TestExecuteSQL {
}
@Test
public void test1() throws InitializationException, ClassNotFoundException, SQLException, IOException {
public void testNoTimeLimit() throws InitializationException, ClassNotFoundException, SQLException, IOException {
invokeOnTrigger(null);
}
@Test
public void testQueryTimeout() throws InitializationException, ClassNotFoundException, SQLException, IOException {
// Does to seem to have any effect when using embedded Derby
invokeOnTrigger(1); // 1 second max time
}
public void invokeOnTrigger(Integer queryTimeout) throws InitializationException, ClassNotFoundException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ExecuteSQL.class);
final DBCPService dbcp = new DBCPServiceSimpleImpl();
@ -79,18 +88,21 @@ public class TestExecuteSQL {
runner.enableControllerService(dbcp);
runner.setProperty(ExecuteSQL.DBCP_SERVICE, "dbcp");
if (queryTimeout!=null)
runner.setProperty(ExecuteSQL.QUERY_TIMEOUT, queryTimeout.toString() + " secs");
// remove previous test database, if any
File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
// load test data to database
Connection con = dbcp.getConnection();
TestJdbcHugeStream.loadTestData2Database(con, 100, 100, 100);
TestJdbcHugeStream.loadTestData2Database(con, 100, 2000, 1000);
System.out.println("test data loaded");
// ResultSet size will be 1x100x100 = 10000 rows
// ResultSet size will be 1x2000x1000 = 2 000 000 rows
// because of where PER.ID = ${person.id}
final int nrOfRows = 10000;
final int nrOfRows = 2000000;
String query = "select "
+ " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode"
+ ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode"
@ -128,6 +140,9 @@ public class TestExecuteSQL {
dataFileReader.close();
}
/**
* Simple implementation only for ExecuteSQL processor testing.
*