NIFI-13359 Tune ExecuteSQL/Record to create fewer transient flow files

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #8928
This commit is contained in:
Jim Steinebrey 2024-06-04 15:01:18 -04:00 committed by Matt Burgess
parent f20db691c7
commit 2363e63c86
1 changed files with 28 additions and 29 deletions

View File

@ -333,11 +333,6 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
resultSetFF = session.create(fileToProcess);
}
if (inputFileAttrMap != null) {
resultSetFF = session.putAllAttributes(resultSetFF, inputFileAttrMap);
}
try {
resultSetFF = session.write(resultSetFF, out -> {
try {
@ -347,10 +342,20 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
}
});
// if fragmented ResultSet, determine if we should keep this fragment
if (maxRowsPerFlowFile > 0 && nrOfRows.get() == 0 && fragmentIndex > 0) {
// if row count is zero and this is not the first fragment, drop it instead of committing it.
session.remove(resultSetFF);
break;
}
long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS);
// set attributes
final Map<String, String> attributesToAdd = new HashMap<>();
if (inputFileAttrMap != null) {
attributesToAdd.putAll(inputFileAttrMap);
}
attributesToAdd.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
attributesToAdd.put(RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed));
attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
@ -359,22 +364,15 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
if (inputFileUUID != null) {
attributesToAdd.put(INPUT_FLOWFILE_UUID, inputFileUUID);
}
if (maxRowsPerFlowFile > 0) {
// if fragmented ResultSet, set fragment attributes
attributesToAdd.put(FRAGMENT_ID, fragmentId);
attributesToAdd.put(FRAGMENT_INDEX, String.valueOf(fragmentIndex));
}
attributesToAdd.putAll(sqlWriter.getAttributesToAdd());
resultSetFF = session.putAllAttributes(resultSetFF, attributesToAdd);
sqlWriter.updateCounters(session);
// if fragmented ResultSet, determine if we should keep this fragment; set fragment attributes
if (maxRowsPerFlowFile > 0) {
// if row count is zero and this is not the first fragment, drop it instead of committing it.
if (nrOfRows.get() == 0 && fragmentIndex > 0) {
session.remove(resultSetFF);
break;
}
resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_ID, fragmentId);
resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_INDEX, String.valueOf(fragmentIndex));
}
logger.info("{} contains {} records; transferring to 'success'", resultSetFF, nrOfRows.get());
// Report a FETCH event if there was an incoming flow file, or a RECEIVE event otherwise
@ -452,26 +450,19 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
session.transfer(resultSetFlowFiles, REL_SUCCESS);
resultSetFlowFiles.clear();
//If we had at least one result then it's OK to drop the original file, but if we had no results then
// pass the original flow file down the line to trigger downstream processors
if (fileToProcess != null) {
if (resultCount > 0) {
// If we had at least one result then it's OK to drop the original file
session.remove(fileToProcess);
} else {
fileToProcess = session.write(fileToProcess, out -> sqlWriter.writeEmptyResultSet(out, getLogger()));
fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, "0");
fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType());
session.transfer(fileToProcess, REL_SUCCESS);
// If we had no results then transfer the original flow file downstream to trigger processors
session.transfer(setFlowFileEmptyResults(session, fileToProcess, sqlWriter), REL_SUCCESS);
}
} else if (resultCount == 0) {
//If we had no inbound FlowFile, no exceptions, and the SQL generated no result sets (Insert/Update/Delete statements only)
// If we had no inbound FlowFile, no exceptions, and the SQL generated no result sets (Insert/Update/Delete statements only)
// Then generate an empty Output FlowFile
FlowFile resultSetFF = session.create();
resultSetFF = session.write(resultSetFF, out -> sqlWriter.writeEmptyResultSet(out, getLogger()));
resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, "0");
resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType());
session.transfer(resultSetFF, REL_SUCCESS);
session.transfer(setFlowFileEmptyResults(session, resultSetFF, sqlWriter), REL_SUCCESS);
}
}
} catch (final ProcessException | SQLException e) {
@ -495,6 +486,14 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
}
}
protected FlowFile setFlowFileEmptyResults(final ProcessSession session, FlowFile flowFile, SqlWriter sqlWriter) {
flowFile = session.write(flowFile, out -> sqlWriter.writeEmptyResultSet(out, getLogger()));
final Map<String, String> attributesToAdd = new HashMap<>();
attributesToAdd.put(RESULT_ROW_COUNT, "0");
attributesToAdd.put(CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType());
return session.putAllAttributes(flowFile, attributesToAdd);
}
/*
* Executes given queries using pre-defined connection.
* Returns null on success, or a query string if failed.