mirror of https://github.com/apache/nifi.git
NIFI-8605 Adding a new property for ExecuteSQL and ExecuteSQLRecord processors to enable/disable auto committing
change the default value of auto commit function to true Changed the auto commit property name and add more details in the description If the auto commit is set to false, commit() is called for consistency adds unit tests Fix the check style issue of having more than 200 characters in single line Signed-off-by: Matthew Burgess <mattyb149@apache.org> This closes #5554
This commit is contained in:
parent
d6efc69585
commit
0ac8f1b32c
|
@ -169,6 +169,22 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
|
||||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor AUTO_COMMIT = new PropertyDescriptor.Builder()
|
||||||
|
.name("esql-auto-commit")
|
||||||
|
.displayName("Set Auto Commit")
|
||||||
|
.description("Enables or disables the auto commit functionality of the DB connection. Default value is 'true'. " +
|
||||||
|
"The default value can be used with most of the JDBC drivers and this functionality doesn't have any impact in most of the cases " +
|
||||||
|
"since this processor is used to read data. " +
|
||||||
|
"However, for some JDBC drivers such as PostgreSQL driver, it is required to disable the auto committing functionality " +
|
||||||
|
"to limit the number of result rows fetching at a time. " +
|
||||||
|
"When auto commit is enabled, postgreSQL driver loads whole result set to memory at once. " +
|
||||||
|
"This could lead for a large amount of memory usage when executing queries which fetch large data sets. " +
|
||||||
|
"More Details of this behaviour in PostgreSQL driver can be found in https://jdbc.postgresql.org//documentation/head/query.html. ")
|
||||||
|
.allowableValues("true", "false")
|
||||||
|
.defaultValue("true")
|
||||||
|
.required(true)
|
||||||
|
.build();
|
||||||
|
|
||||||
protected List<PropertyDescriptor> propDescriptors;
|
protected List<PropertyDescriptor> propDescriptors;
|
||||||
|
|
||||||
protected DBCPService dbcpService;
|
protected DBCPService dbcpService;
|
||||||
|
@ -236,195 +252,202 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
int resultCount = 0;
|
int resultCount = 0;
|
||||||
try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes());
|
try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes())) {
|
||||||
final PreparedStatement st = con.prepareStatement(selectQuery)) {
|
con.setAutoCommit(context.getProperty(AUTO_COMMIT).asBoolean());
|
||||||
if (fetchSize != null && fetchSize > 0) {
|
try (final PreparedStatement st = con.prepareStatement(selectQuery)) {
|
||||||
try {
|
if (fetchSize != null && fetchSize > 0) {
|
||||||
st.setFetchSize(fetchSize);
|
|
||||||
} catch (SQLException se) {
|
|
||||||
// Not all drivers support this, just log the error (at debug level) and move on
|
|
||||||
logger.debug("Cannot set fetch size to {} due to {}", new Object[]{fetchSize, se.getLocalizedMessage()}, se);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
st.setQueryTimeout(queryTimeout); // timeout in seconds
|
|
||||||
|
|
||||||
// Execute pre-query, throw exception and cleanup Flow Files if fail
|
|
||||||
Pair<String,SQLException> failure = executeConfigStatements(con, preQueries);
|
|
||||||
if (failure != null) {
|
|
||||||
// In case of failure, assigning config query to "selectQuery" to follow current error handling
|
|
||||||
selectQuery = failure.getLeft();
|
|
||||||
throw failure.getRight();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (fileToProcess != null) {
|
|
||||||
JdbcCommon.setParameters(st, fileToProcess.getAttributes());
|
|
||||||
}
|
|
||||||
logger.debug("Executing query {}", new Object[]{selectQuery});
|
|
||||||
|
|
||||||
int fragmentIndex = 0;
|
|
||||||
final String fragmentId = UUID.randomUUID().toString();
|
|
||||||
|
|
||||||
final StopWatch executionTime = new StopWatch(true);
|
|
||||||
|
|
||||||
boolean hasResults = st.execute();
|
|
||||||
|
|
||||||
long executionTimeElapsed = executionTime.getElapsed(TimeUnit.MILLISECONDS);
|
|
||||||
|
|
||||||
boolean hasUpdateCount = st.getUpdateCount() != -1;
|
|
||||||
|
|
||||||
Map<String, String> inputFileAttrMap = fileToProcess == null ? null : fileToProcess.getAttributes();
|
|
||||||
String inputFileUUID = fileToProcess == null ? null : fileToProcess.getAttribute(CoreAttributes.UUID.key());
|
|
||||||
while (hasResults || hasUpdateCount) {
|
|
||||||
//getMoreResults() and execute() return false to indicate that the result of the statement is just a number and not a ResultSet
|
|
||||||
if (hasResults) {
|
|
||||||
final AtomicLong nrOfRows = new AtomicLong(0L);
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
final ResultSet resultSet = st.getResultSet();
|
st.setFetchSize(fetchSize);
|
||||||
do {
|
} catch (SQLException se) {
|
||||||
final StopWatch fetchTime = new StopWatch(true);
|
// Not all drivers support this, just log the error (at debug level) and move on
|
||||||
|
logger.debug("Cannot set fetch size to {} due to {}", new Object[]{fetchSize, se.getLocalizedMessage()}, se);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
st.setQueryTimeout(queryTimeout); // timeout in seconds
|
||||||
|
|
||||||
FlowFile resultSetFF;
|
// Execute pre-query, throw exception and cleanup Flow Files if fail
|
||||||
if (fileToProcess == null) {
|
Pair<String, SQLException> failure = executeConfigStatements(con, preQueries);
|
||||||
resultSetFF = session.create();
|
if (failure != null) {
|
||||||
} else {
|
// In case of failure, assigning config query to "selectQuery" to follow current error handling
|
||||||
resultSetFF = session.create(fileToProcess);
|
selectQuery = failure.getLeft();
|
||||||
}
|
throw failure.getRight();
|
||||||
|
}
|
||||||
|
|
||||||
if (inputFileAttrMap != null) {
|
if (fileToProcess != null) {
|
||||||
resultSetFF = session.putAllAttributes(resultSetFF, inputFileAttrMap);
|
JdbcCommon.setParameters(st, fileToProcess.getAttributes());
|
||||||
}
|
}
|
||||||
|
logger.debug("Executing query {}", new Object[]{selectQuery});
|
||||||
|
|
||||||
|
int fragmentIndex = 0;
|
||||||
|
final String fragmentId = UUID.randomUUID().toString();
|
||||||
|
|
||||||
try {
|
final StopWatch executionTime = new StopWatch(true);
|
||||||
resultSetFF = session.write(resultSetFF, out -> {
|
|
||||||
try {
|
|
||||||
nrOfRows.set(sqlWriter.writeResultSet(resultSet, out, getLogger(), null));
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS);
|
boolean hasResults = st.execute();
|
||||||
|
|
||||||
// set attributes
|
long executionTimeElapsed = executionTime.getElapsed(TimeUnit.MILLISECONDS);
|
||||||
final Map<String, String> attributesToAdd = new HashMap<>();
|
|
||||||
attributesToAdd.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
|
|
||||||
attributesToAdd.put(RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed));
|
|
||||||
attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
|
|
||||||
attributesToAdd.put(RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed));
|
|
||||||
attributesToAdd.put(RESULTSET_INDEX, String.valueOf(resultCount));
|
|
||||||
if (inputFileUUID != null) {
|
|
||||||
attributesToAdd.put(INPUT_FLOWFILE_UUID, inputFileUUID);
|
|
||||||
}
|
|
||||||
attributesToAdd.putAll(sqlWriter.getAttributesToAdd());
|
|
||||||
resultSetFF = session.putAllAttributes(resultSetFF, attributesToAdd);
|
|
||||||
sqlWriter.updateCounters(session);
|
|
||||||
|
|
||||||
// if fragmented ResultSet, determine if we should keep this fragment; set fragment attributes
|
boolean hasUpdateCount = st.getUpdateCount() != -1;
|
||||||
if (maxRowsPerFlowFile > 0) {
|
|
||||||
// if row count is zero and this is not the first fragment, drop it instead of committing it.
|
|
||||||
if (nrOfRows.get() == 0 && fragmentIndex > 0) {
|
|
||||||
session.remove(resultSetFF);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_ID, fragmentId);
|
Map<String, String> inputFileAttrMap = fileToProcess == null ? null : fileToProcess.getAttributes();
|
||||||
resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_INDEX, String.valueOf(fragmentIndex));
|
String inputFileUUID = fileToProcess == null ? null : fileToProcess.getAttribute(CoreAttributes.UUID.key());
|
||||||
}
|
while (hasResults || hasUpdateCount) {
|
||||||
|
//getMoreResults() and execute() return false to indicate that the result of the statement is just a number and not a ResultSet
|
||||||
|
if (hasResults) {
|
||||||
|
final AtomicLong nrOfRows = new AtomicLong(0L);
|
||||||
|
|
||||||
logger.info("{} contains {} records; transferring to 'success'", new Object[]{resultSetFF, nrOfRows.get()});
|
try {
|
||||||
|
final ResultSet resultSet = st.getResultSet();
|
||||||
|
do {
|
||||||
|
final StopWatch fetchTime = new StopWatch(true);
|
||||||
|
|
||||||
// Report a FETCH event if there was an incoming flow file, or a RECEIVE event otherwise
|
FlowFile resultSetFF;
|
||||||
if(context.hasIncomingConnection()) {
|
if (fileToProcess == null) {
|
||||||
session.getProvenanceReporter().fetch(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
|
resultSetFF = session.create();
|
||||||
} else {
|
} else {
|
||||||
session.getProvenanceReporter().receive(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
|
resultSetFF = session.create(fileToProcess);
|
||||||
}
|
}
|
||||||
resultSetFlowFiles.add(resultSetFF);
|
|
||||||
|
|
||||||
// If we've reached the batch size, send out the flow files
|
if (inputFileAttrMap != null) {
|
||||||
if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) {
|
resultSetFF = session.putAllAttributes(resultSetFF, inputFileAttrMap);
|
||||||
session.transfer(resultSetFlowFiles, REL_SUCCESS);
|
}
|
||||||
// Need to remove the original input file if it exists
|
|
||||||
if (fileToProcess != null) {
|
|
||||||
session.remove(fileToProcess);
|
try {
|
||||||
fileToProcess = null;
|
resultSetFF = session.write(resultSetFF, out -> {
|
||||||
|
try {
|
||||||
|
nrOfRows.set(sqlWriter.writeResultSet(resultSet, out, getLogger(), null));
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
|
// set attributes
|
||||||
|
final Map<String, String> attributesToAdd = new HashMap<>();
|
||||||
|
attributesToAdd.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
|
||||||
|
attributesToAdd.put(RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed));
|
||||||
|
attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
|
||||||
|
attributesToAdd.put(RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed));
|
||||||
|
attributesToAdd.put(RESULTSET_INDEX, String.valueOf(resultCount));
|
||||||
|
if (inputFileUUID != null) {
|
||||||
|
attributesToAdd.put(INPUT_FLOWFILE_UUID, inputFileUUID);
|
||||||
|
}
|
||||||
|
attributesToAdd.putAll(sqlWriter.getAttributesToAdd());
|
||||||
|
resultSetFF = session.putAllAttributes(resultSetFF, attributesToAdd);
|
||||||
|
sqlWriter.updateCounters(session);
|
||||||
|
|
||||||
|
// if fragmented ResultSet, determine if we should keep this fragment; set fragment attributes
|
||||||
|
if (maxRowsPerFlowFile > 0) {
|
||||||
|
// if row count is zero and this is not the first fragment, drop it instead of committing it.
|
||||||
|
if (nrOfRows.get() == 0 && fragmentIndex > 0) {
|
||||||
|
session.remove(resultSetFF);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_ID, fragmentId);
|
||||||
|
resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_INDEX, String.valueOf(fragmentIndex));
|
||||||
}
|
}
|
||||||
|
|
||||||
session.commitAsync();
|
logger.info("{} contains {} records; transferring to 'success'", new Object[]{resultSetFF, nrOfRows.get()});
|
||||||
resultSetFlowFiles.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
fragmentIndex++;
|
// Report a FETCH event if there was an incoming flow file, or a RECEIVE event otherwise
|
||||||
} catch (Exception e) {
|
if (context.hasIncomingConnection()) {
|
||||||
// Remove any result set flow file(s) and propagate the exception
|
session.getProvenanceReporter().fetch(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
|
||||||
session.remove(resultSetFF);
|
} else {
|
||||||
session.remove(resultSetFlowFiles);
|
session.getProvenanceReporter().receive(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
|
||||||
if (e instanceof ProcessException) {
|
}
|
||||||
throw (ProcessException) e;
|
resultSetFlowFiles.add(resultSetFF);
|
||||||
} else {
|
|
||||||
throw new ProcessException(e);
|
// If we've reached the batch size, send out the flow files
|
||||||
|
if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) {
|
||||||
|
session.transfer(resultSetFlowFiles, REL_SUCCESS);
|
||||||
|
// Need to remove the original input file if it exists
|
||||||
|
if (fileToProcess != null) {
|
||||||
|
session.remove(fileToProcess);
|
||||||
|
fileToProcess = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
session.commitAsync();
|
||||||
|
resultSetFlowFiles.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
fragmentIndex++;
|
||||||
|
} catch (Exception e) {
|
||||||
|
// Remove any result set flow file(s) and propagate the exception
|
||||||
|
session.remove(resultSetFF);
|
||||||
|
session.remove(resultSetFlowFiles);
|
||||||
|
if (e instanceof ProcessException) {
|
||||||
|
throw (ProcessException) e;
|
||||||
|
} else {
|
||||||
|
throw new ProcessException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} while (maxRowsPerFlowFile > 0 && nrOfRows.get() == maxRowsPerFlowFile);
|
||||||
|
|
||||||
|
// If we are splitting results but not outputting batches, set count on all FlowFiles
|
||||||
|
if (outputBatchSize == 0 && maxRowsPerFlowFile > 0) {
|
||||||
|
for (int i = 0; i < resultSetFlowFiles.size(); i++) {
|
||||||
|
resultSetFlowFiles.set(i,
|
||||||
|
session.putAttribute(resultSetFlowFiles.get(i), FRAGMENT_COUNT, Integer.toString(fragmentIndex)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} while (maxRowsPerFlowFile > 0 && nrOfRows.get() == maxRowsPerFlowFile);
|
} catch (final SQLException e) {
|
||||||
|
throw new ProcessException(e);
|
||||||
// If we are splitting results but not outputting batches, set count on all FlowFiles
|
|
||||||
if (outputBatchSize == 0 && maxRowsPerFlowFile > 0) {
|
|
||||||
for (int i = 0; i < resultSetFlowFiles.size(); i++) {
|
|
||||||
resultSetFlowFiles.set(i,
|
|
||||||
session.putAttribute(resultSetFlowFiles.get(i), FRAGMENT_COUNT, Integer.toString(fragmentIndex)));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} catch (final SQLException e) {
|
|
||||||
throw new ProcessException(e);
|
resultCount++;
|
||||||
}
|
}
|
||||||
|
|
||||||
resultCount++;
|
// are there anymore result sets?
|
||||||
|
try {
|
||||||
|
hasResults = st.getMoreResults(Statement.CLOSE_CURRENT_RESULT);
|
||||||
|
hasUpdateCount = st.getUpdateCount() != -1;
|
||||||
|
} catch (SQLException ex) {
|
||||||
|
hasResults = false;
|
||||||
|
hasUpdateCount = false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// are there anymore result sets?
|
// Execute post-query, throw exception and cleanup Flow Files if fail
|
||||||
try {
|
failure = executeConfigStatements(con, postQueries);
|
||||||
hasResults = st.getMoreResults(Statement.CLOSE_CURRENT_RESULT);
|
if (failure != null) {
|
||||||
hasUpdateCount = st.getUpdateCount() != -1;
|
selectQuery = failure.getLeft();
|
||||||
} catch (SQLException ex) {
|
resultSetFlowFiles.forEach(ff -> session.remove(ff));
|
||||||
hasResults = false;
|
throw failure.getRight();
|
||||||
hasUpdateCount = false;
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Execute post-query, throw exception and cleanup Flow Files if fail
|
// If the auto commit is set to false, commit() is called for consistency
|
||||||
failure = executeConfigStatements(con, postQueries);
|
if (!con.getAutoCommit()) {
|
||||||
if (failure != null) {
|
con.commit();
|
||||||
selectQuery = failure.getLeft();
|
|
||||||
resultSetFlowFiles.forEach(ff -> session.remove(ff));
|
|
||||||
throw failure.getRight();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Transfer any remaining files to SUCCESS
|
|
||||||
session.transfer(resultSetFlowFiles, REL_SUCCESS);
|
|
||||||
resultSetFlowFiles.clear();
|
|
||||||
|
|
||||||
//If we had at least one result then it's OK to drop the original file, but if we had no results then
|
|
||||||
// pass the original flow file down the line to trigger downstream processors
|
|
||||||
if (fileToProcess != null) {
|
|
||||||
if (resultCount > 0) {
|
|
||||||
session.remove(fileToProcess);
|
|
||||||
} else {
|
|
||||||
fileToProcess = session.write(fileToProcess, out -> sqlWriter.writeEmptyResultSet(out, getLogger()));
|
|
||||||
fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, "0");
|
|
||||||
fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType());
|
|
||||||
session.transfer(fileToProcess, REL_SUCCESS);
|
|
||||||
}
|
}
|
||||||
} else if (resultCount == 0) {
|
|
||||||
//If we had no inbound FlowFile, no exceptions, and the SQL generated no result sets (Insert/Update/Delete statements only)
|
|
||||||
// Then generate an empty Output FlowFile
|
|
||||||
FlowFile resultSetFF = session.create();
|
|
||||||
|
|
||||||
resultSetFF = session.write(resultSetFF, out -> sqlWriter.writeEmptyResultSet(out, getLogger()));
|
// Transfer any remaining files to SUCCESS
|
||||||
resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, "0");
|
session.transfer(resultSetFlowFiles, REL_SUCCESS);
|
||||||
resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType());
|
resultSetFlowFiles.clear();
|
||||||
session.transfer(resultSetFF, REL_SUCCESS);
|
|
||||||
|
//If we had at least one result then it's OK to drop the original file, but if we had no results then
|
||||||
|
// pass the original flow file down the line to trigger downstream processors
|
||||||
|
if (fileToProcess != null) {
|
||||||
|
if (resultCount > 0) {
|
||||||
|
session.remove(fileToProcess);
|
||||||
|
} else {
|
||||||
|
fileToProcess = session.write(fileToProcess, out -> sqlWriter.writeEmptyResultSet(out, getLogger()));
|
||||||
|
fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, "0");
|
||||||
|
fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType());
|
||||||
|
session.transfer(fileToProcess, REL_SUCCESS);
|
||||||
|
}
|
||||||
|
} else if (resultCount == 0) {
|
||||||
|
//If we had no inbound FlowFile, no exceptions, and the SQL generated no result sets (Insert/Update/Delete statements only)
|
||||||
|
// Then generate an empty Output FlowFile
|
||||||
|
FlowFile resultSetFF = session.create();
|
||||||
|
|
||||||
|
resultSetFF = session.write(resultSetFF, out -> sqlWriter.writeEmptyResultSet(out, getLogger()));
|
||||||
|
resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, "0");
|
||||||
|
resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType());
|
||||||
|
session.transfer(resultSetFF, REL_SUCCESS);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} catch (final ProcessException | SQLException e) {
|
} catch (final ProcessException | SQLException e) {
|
||||||
//If we had at least one result then it's OK to drop the original file, but if we had no results then
|
//If we had at least one result then it's OK to drop the original file, but if we had no results then
|
||||||
|
|
|
@ -134,6 +134,7 @@ public class ExecuteSQL extends AbstractExecuteSQL {
|
||||||
pds.add(MAX_ROWS_PER_FLOW_FILE);
|
pds.add(MAX_ROWS_PER_FLOW_FILE);
|
||||||
pds.add(OUTPUT_BATCH_SIZE);
|
pds.add(OUTPUT_BATCH_SIZE);
|
||||||
pds.add(FETCH_SIZE);
|
pds.add(FETCH_SIZE);
|
||||||
|
pds.add(AUTO_COMMIT);
|
||||||
propDescriptors = Collections.unmodifiableList(pds);
|
propDescriptors = Collections.unmodifiableList(pds);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -140,6 +140,7 @@ public class ExecuteSQLRecord extends AbstractExecuteSQL {
|
||||||
pds.add(MAX_ROWS_PER_FLOW_FILE);
|
pds.add(MAX_ROWS_PER_FLOW_FILE);
|
||||||
pds.add(OUTPUT_BATCH_SIZE);
|
pds.add(OUTPUT_BATCH_SIZE);
|
||||||
pds.add(FETCH_SIZE);
|
pds.add(FETCH_SIZE);
|
||||||
|
pds.add(AUTO_COMMIT);
|
||||||
propDescriptors = Collections.unmodifiableList(pds);
|
propDescriptors = Collections.unmodifiableList(pds);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -175,6 +175,18 @@ public class TestExecuteSQL {
|
||||||
invokeOnTrigger(1, QUERY_WITH_EL, true, null, true); // 1 second max time
|
invokeOnTrigger(1, QUERY_WITH_EL, true, null, true); // 1 second max time
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAutoCommitFalse() throws InitializationException, ClassNotFoundException, SQLException, IOException {
|
||||||
|
runner.setProperty(ExecuteSQL.AUTO_COMMIT, "false");
|
||||||
|
invokeOnTrigger(null, QUERY_WITHOUT_EL, true, null, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAutoCommitTrue() throws InitializationException, ClassNotFoundException, SQLException, IOException {
|
||||||
|
runner.setProperty(ExecuteSQL.AUTO_COMMIT, "true");
|
||||||
|
invokeOnTrigger(null, QUERY_WITHOUT_EL, true, null, false);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWithNullIntColumn() throws SQLException {
|
public void testWithNullIntColumn() throws SQLException {
|
||||||
// remove previous test database, if any
|
// remove previous test database, if any
|
||||||
|
@ -556,6 +568,11 @@ public class TestExecuteSQL {
|
||||||
SimpleCommerceDataSet.loadTestData2Database(con, 100, 200, 100);
|
SimpleCommerceDataSet.loadTestData2Database(con, 100, 200, 100);
|
||||||
LOGGER.info("test data loaded");
|
LOGGER.info("test data loaded");
|
||||||
|
|
||||||
|
//commit loaded data if auto-commit is dissabled
|
||||||
|
if (!con.getAutoCommit()){
|
||||||
|
con.commit();
|
||||||
|
}
|
||||||
|
|
||||||
// ResultSet size will be 1x200x100 = 20 000 rows
|
// ResultSet size will be 1x200x100 = 20 000 rows
|
||||||
// because of where PER.ID = ${person.id}
|
// because of where PER.ID = ${person.id}
|
||||||
final int nrOfRows = 20000;
|
final int nrOfRows = 20000;
|
||||||
|
|
|
@ -169,6 +169,18 @@ public class TestExecuteSQLRecord {
|
||||||
assertEquals(ProvenanceEventType.FETCH, runner.getProvenanceEvents().get(1).getEventType());
|
assertEquals(ProvenanceEventType.FETCH, runner.getProvenanceEvents().get(1).getEventType());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAutoCommitFalse() throws InitializationException, ClassNotFoundException, SQLException, IOException {
|
||||||
|
runner.setProperty(ExecuteSQL.AUTO_COMMIT, "false");
|
||||||
|
invokeOnTriggerRecords(null, QUERY_WITHOUT_EL, true, null, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAutoCommitTrue() throws InitializationException, ClassNotFoundException, SQLException, IOException {
|
||||||
|
runner.setProperty(ExecuteSQL.AUTO_COMMIT, "true");
|
||||||
|
invokeOnTriggerRecords(null, QUERY_WITHOUT_EL, true, null, false);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWithOutputBatching() throws InitializationException, SQLException {
|
public void testWithOutputBatching() throws InitializationException, SQLException {
|
||||||
// remove previous test database, if any
|
// remove previous test database, if any
|
||||||
|
@ -545,6 +557,11 @@ public class TestExecuteSQLRecord {
|
||||||
SimpleCommerceDataSet.loadTestData2Database(con, 100, 200, 100);
|
SimpleCommerceDataSet.loadTestData2Database(con, 100, 200, 100);
|
||||||
LOGGER.info("test data loaded");
|
LOGGER.info("test data loaded");
|
||||||
|
|
||||||
|
//commit loaded data if auto-commit is dissabled
|
||||||
|
if (!con.getAutoCommit()){
|
||||||
|
con.commit();
|
||||||
|
}
|
||||||
|
|
||||||
// ResultSet size will be 1x200x100 = 20 000 rows
|
// ResultSet size will be 1x200x100 = 20 000 rows
|
||||||
// because of where PER.ID = ${person.id}
|
// because of where PER.ID = ${person.id}
|
||||||
final int nrOfRows = 20000;
|
final int nrOfRows = 20000;
|
||||||
|
|
Loading…
Reference in New Issue