diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java index 37fd0f77c0..07e6fe9ed0 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java @@ -40,7 +40,6 @@ import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.FlowFileAccessException; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; @@ -79,6 +78,16 @@ public class ExecuteSQL extends AbstractProcessor { .expressionLanguageSupported(true) .build(); + public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder() + .name("Max Wait Time") + .description("The maximum amount of time allowed for a running SQL select query " + + " , zero means there is no limit. Max time less than 1 second will be equal to zero.") + .defaultValue("0 seconds") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .sensitive(false) + .build(); + private final List propDescriptors; public ExecuteSQL() { @@ -89,6 +98,7 @@ public class ExecuteSQL extends AbstractProcessor { ArrayList pds = new ArrayList<>(); pds.add(DBCP_SERVICE); pds.add(SQL_SELECT_QUERY); + pds.add(QUERY_TIMEOUT); propDescriptors = Collections.unmodifiableList(pds); } @@ -113,6 +123,8 @@ public class ExecuteSQL extends AbstractProcessor { final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); final String selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(incoming).getValue(); + final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue(); + final StopWatch stopWatch = new StopWatch(true); try { @@ -120,6 +132,7 @@ public class ExecuteSQL extends AbstractProcessor { try { final Statement st = con.createStatement(); try { + st.setQueryTimeout(queryTimeout); // timeout in seconds FlowFile outgoing = session.write(incoming, new OutputStreamCallback() { @Override public void process(final OutputStream out) throws IOException { diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java index 1b6de83bdc..b9243ef3b0 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java @@ -34,7 +34,6 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.dbcp.DBCPService; -import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processors.standard.util.TestJdbcHugeStream; import org.apache.nifi.reporting.InitializationException; @@ -68,7 +67,17 @@ public class TestExecuteSQL { } @Test - public void test1() throws InitializationException, ClassNotFoundException, SQLException, IOException { + public void testNoTimeLimit() throws InitializationException, ClassNotFoundException, SQLException, IOException { + invokeOnTrigger(null); + } + + @Test + public void testQueryTimeout() throws InitializationException, ClassNotFoundException, SQLException, IOException { + // Does to seem to have any effect when using embedded Derby + invokeOnTrigger(1); // 1 second max time + } + + public void invokeOnTrigger(Integer queryTimeout) throws InitializationException, ClassNotFoundException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ExecuteSQL.class); final DBCPService dbcp = new DBCPServiceSimpleImpl(); @@ -79,18 +88,21 @@ public class TestExecuteSQL { runner.enableControllerService(dbcp); runner.setProperty(ExecuteSQL.DBCP_SERVICE, "dbcp"); + if (queryTimeout!=null) + runner.setProperty(ExecuteSQL.QUERY_TIMEOUT, queryTimeout.toString() + " secs"); + // remove previous test database, if any File dbLocation = new File(DB_LOCATION); dbLocation.delete(); // load test data to database Connection con = dbcp.getConnection(); - TestJdbcHugeStream.loadTestData2Database(con, 100, 100, 100); + TestJdbcHugeStream.loadTestData2Database(con, 100, 2000, 1000); System.out.println("test data loaded"); - // ResultSet size will be 1x100x100 = 10000 rows + // ResultSet size will be 1x2000x1000 = 2 000 000 rows // because of where PER.ID = ${person.id} - final int nrOfRows = 10000; + final int nrOfRows = 2000000; String query = "select " + " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode" + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode" @@ -128,6 +140,9 @@ public class TestExecuteSQL { dataFileReader.close(); } + + + /** * Simple implementation only for ExecuteSQL processor testing. *