mirror of https://github.com/apache/nifi.git
NIFI-6271, fix issue that incoming flowfile attributes don't copy into output flowfiles when Output Batch Size is set
NIFI-6271, fix incoming flowfile attributes don't copy into output flowfiles when Output Batch Size is set NIFI-6271, fix incoming flowfile attributes don't copy into output flowfiles when Output Batch Size is set replace getAttribute(uuid) with getAttribute(CoreAttributes.UUID.key() fix checkstyle violation This closes #3575. Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
parent
95f5b2278c
commit
fa1ed16e2b
|
@ -62,6 +62,7 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
|
|||
public static final String RESULT_QUERY_FETCH_TIME = "executesql.query.fetchtime";
|
||||
public static final String RESULTSET_INDEX = "executesql.resultset.index";
|
||||
public static final String RESULT_ERROR_MESSAGE = "executesql.error.message";
|
||||
public static final String INPUT_FLOWFILE_UUID = "input.flowfile.uuid";
|
||||
|
||||
public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
|
||||
public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
|
||||
|
@ -247,6 +248,8 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
|
|||
|
||||
boolean hasUpdateCount = st.getUpdateCount() != -1;
|
||||
|
||||
Map<String, String> inputFileAttrMap = fileToProcess == null ? null : fileToProcess.getAttributes();
|
||||
String inputFileUUID = fileToProcess == null ? null : fileToProcess.getAttribute(CoreAttributes.UUID.key());
|
||||
while (hasResults || hasUpdateCount) {
|
||||
//getMoreResults() and execute() return false to indicate that the result of the statement is just a number and not a ResultSet
|
||||
if (hasResults) {
|
||||
|
@ -262,9 +265,13 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
|
|||
resultSetFF = session.create();
|
||||
} else {
|
||||
resultSetFF = session.create(fileToProcess);
|
||||
resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes());
|
||||
}
|
||||
|
||||
if (inputFileAttrMap != null) {
|
||||
resultSetFF = session.putAllAttributes(resultSetFF, inputFileAttrMap);
|
||||
}
|
||||
|
||||
|
||||
try {
|
||||
resultSetFF = session.write(resultSetFF, out -> {
|
||||
try {
|
||||
|
@ -283,6 +290,9 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
|
|||
attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
|
||||
attributesToAdd.put(RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed));
|
||||
attributesToAdd.put(RESULTSET_INDEX, String.valueOf(resultCount));
|
||||
if (inputFileUUID != null) {
|
||||
attributesToAdd.put(INPUT_FLOWFILE_UUID, inputFileUUID);
|
||||
}
|
||||
attributesToAdd.putAll(sqlWriter.getAttributesToAdd());
|
||||
resultSetFF = session.putAllAttributes(resultSetFF, attributesToAdd);
|
||||
sqlWriter.updateCounters(session);
|
||||
|
|
|
@ -44,6 +44,7 @@ import org.apache.avro.io.DatumReader;
|
|||
import org.apache.commons.compress.compressors.CompressorException;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.dbcp.DBCPService;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
|
@ -303,10 +304,15 @@ public class TestExecuteSQL {
|
|||
stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (" + i + ", 1, 1)");
|
||||
}
|
||||
|
||||
|
||||
Map<String, String> attrMap = new HashMap<>();
|
||||
String testAttrName = "attr1";
|
||||
String testAttrValue = "value1";
|
||||
attrMap.put(testAttrName, testAttrValue);
|
||||
runner.setIncomingConnection(true);
|
||||
runner.setProperty(ExecuteSQL.MAX_ROWS_PER_FLOW_FILE, "5");
|
||||
runner.setProperty(ExecuteSQL.OUTPUT_BATCH_SIZE, "1");
|
||||
runner.enqueue("SELECT * FROM TEST_NULL_INT");
|
||||
MockFlowFile inputFlowFile = runner.enqueue("SELECT * FROM TEST_NULL_INT", attrMap);
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 200);
|
||||
|
@ -322,9 +328,13 @@ public class TestExecuteSQL {
|
|||
|
||||
MockFlowFile lastFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(199);
|
||||
|
||||
|
||||
lastFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "5");
|
||||
lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "199");
|
||||
lastFlowFile.assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0");
|
||||
lastFlowFile.assertAttributeEquals(testAttrName, testAttrValue);
|
||||
lastFlowFile.assertAttributeEquals(AbstractExecuteSQL.INPUT_FLOWFILE_UUID, inputFlowFile.getAttribute(CoreAttributes.UUID.key()));
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -240,6 +240,11 @@ public class TestExecuteSQLRecord {
|
|||
stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (" + i + ", 1, 1)");
|
||||
}
|
||||
|
||||
Map<String, String> attrMap = new HashMap<>();
|
||||
String testAttrName = "attr1";
|
||||
String testAttrValue = "value1";
|
||||
attrMap.put(testAttrName, testAttrValue);
|
||||
|
||||
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
|
||||
runner.addControllerService("writer", recordWriter);
|
||||
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
|
||||
|
@ -248,7 +253,7 @@ public class TestExecuteSQLRecord {
|
|||
runner.setIncomingConnection(true);
|
||||
runner.setProperty(ExecuteSQLRecord.MAX_ROWS_PER_FLOW_FILE, "5");
|
||||
runner.setProperty(ExecuteSQLRecord.OUTPUT_BATCH_SIZE, "1");
|
||||
runner.enqueue("SELECT * FROM TEST_NULL_INT");
|
||||
MockFlowFile inputFlowFile = runner.enqueue("SELECT * FROM TEST_NULL_INT", attrMap);
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS, 200);
|
||||
|
@ -267,6 +272,8 @@ public class TestExecuteSQLRecord {
|
|||
lastFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, "5");
|
||||
lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "199");
|
||||
lastFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULTSET_INDEX, "0");
|
||||
lastFlowFile.assertAttributeEquals(testAttrName, testAttrValue);
|
||||
lastFlowFile.assertAttributeEquals(AbstractExecuteSQL.INPUT_FLOWFILE_UUID, inputFlowFile.getAttribute(CoreAttributes.UUID.key()));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue