diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index ca8c71107d..4a41be7909 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -56,7 +56,6 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -466,13 +465,6 @@ public class StandardProcessorTestRunner implements TestRunner { flowFiles.addAll(session.getFlowFilesForRelationship(relationship)); } - Collections.sort(flowFiles, new Comparator() { - @Override - public int compare(final MockFlowFile o1, final MockFlowFile o2) { - return Long.compare(o1.getCreationTime(), o2.getCreationTime()); - } - }); - return flowFiles; } @@ -483,13 +475,6 @@ public class StandardProcessorTestRunner implements TestRunner { flowFiles.addAll(session.getPenalizedFlowFiles()); } - Collections.sort(flowFiles, new Comparator() { - @Override - public int compare(final MockFlowFile o1, final MockFlowFile o2) { - return Long.compare(o1.getCreationTime(), o2.getCreationTime()); - } - }); - return flowFiles; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java index 9c0c15abac..609c1aa440 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java @@ -301,14 +301,15 @@ public class PutSQL extends AbstractSessionFactoryProcessor { void apply(final ProcessContext context, final ProcessSession session, final FunctionContext fc, final Connection conn, final List flowFiles, final List groups, - final Map sqlToEnclosure, final RoutingResult result); } - private GroupingFunction groupFragmentedTransaction = (context, session, fc, conn, flowFiles, groups, sqlToEnclosure, result) -> { + private final GroupingFunction groupFragmentedTransaction = (context, session, fc, conn, flowFiles, groups, result) -> { final FragmentedEnclosure fragmentedEnclosure = new FragmentedEnclosure(); groups.add(fragmentedEnclosure); + final Map sqlToEnclosure = new HashMap<>(); + for (final FlowFile flowFile : flowFiles) { final String sql = context.getProperty(PutSQL.SQL_STATEMENT).isSet() ? context.getProperty(PutSQL.SQL_STATEMENT).evaluateAttributeExpressions(flowFile).getValue() @@ -321,19 +322,22 @@ public class PutSQL extends AbstractSessionFactoryProcessor { } }; - private final GroupingFunction groupFlowFilesBySQLBatch = (context, session, fc, conn, flowFiles, groups, sqlToEnclosure, result) -> { + private final GroupingFunction groupFlowFilesBySQLBatch = (context, session, fc, conn, flowFiles, groups, result) -> { for (final FlowFile flowFile : flowFiles) { final String sql = context.getProperty(PutSQL.SQL_STATEMENT).isSet() ? context.getProperty(PutSQL.SQL_STATEMENT).evaluateAttributeExpressions(flowFile).getValue() : getSQL(session, flowFile); - // Get or create the appropriate PreparedStatement to use. - final StatementFlowFileEnclosure enclosure = sqlToEnclosure - .computeIfAbsent(sql, k -> { - final StatementFlowFileEnclosure newEnclosure = new StatementFlowFileEnclosure(sql); - groups.add(newEnclosure); - return newEnclosure; - }); + // Create a new PreparedStatement or reuse the one from the last group if that is the same. + final StatementFlowFileEnclosure enclosure; + final StatementFlowFileEnclosure lastEnclosure = groups.isEmpty() ? null : groups.get(groups.size() - 1); + + if (lastEnclosure == null || !lastEnclosure.getSql().equals(sql)) { + enclosure = new StatementFlowFileEnclosure(sql); + groups.add(enclosure); + } else { + enclosure = lastEnclosure; + } if(!exceptionHandler.execute(fc, flowFile, input -> { final PreparedStatement stmt = enclosure.getCachedStatement(conn); @@ -347,26 +351,28 @@ public class PutSQL extends AbstractSessionFactoryProcessor { } }; - private GroupingFunction groupFlowFilesBySQL = (context, session, fc, conn, flowFiles, groups, sqlToEnclosure, result) -> { + private final GroupingFunction groupFlowFilesBySQL = (context, session, fc, conn, flowFiles, groups, result) -> { for (final FlowFile flowFile : flowFiles) { final String sql = context.getProperty(PutSQL.SQL_STATEMENT).isSet() ? context.getProperty(PutSQL.SQL_STATEMENT).evaluateAttributeExpressions(flowFile).getValue() : getSQL(session, flowFile); - // Get or create the appropriate PreparedStatement to use. - final StatementFlowFileEnclosure enclosure = sqlToEnclosure - .computeIfAbsent(sql, k -> { - final StatementFlowFileEnclosure newEnclosure = new StatementFlowFileEnclosure(sql); - groups.add(newEnclosure); - return newEnclosure; - }); + // Create a new PreparedStatement or reuse the one from the last group if that is the same. + final StatementFlowFileEnclosure enclosure; + final StatementFlowFileEnclosure lastEnclosure = groups.isEmpty() ? null : groups.get(groups.size() - 1); + + if (lastEnclosure == null || !lastEnclosure.getSql().equals(sql)) { + enclosure = new StatementFlowFileEnclosure(sql); + groups.add(enclosure); + } else { + enclosure = lastEnclosure; + } enclosure.addFlowFile(flowFile); } }; final PutGroup.GroupFlowFiles groupFlowFiles = (context, session, fc, conn, flowFiles, result) -> { - final Map sqlToEnclosure = new HashMap<>(); final List groups = new ArrayList<>(); // There are three patterns: @@ -374,11 +380,11 @@ public class PutSQL extends AbstractSessionFactoryProcessor { // 2. Obtain keys: An enclosure has multiple FlowFiles, and each FlowFile is executed separately // 3. Fragmented transaction: One FlowFile per Enclosure? if (fc.obtainKeys) { - groupFlowFilesBySQL.apply(context, session, fc, conn, flowFiles, groups, sqlToEnclosure, result); + groupFlowFilesBySQL.apply(context, session, fc, conn, flowFiles, groups, result); } else if (fc.fragmentedTransaction) { - groupFragmentedTransaction.apply(context, session, fc, conn, flowFiles, groups, sqlToEnclosure, result); + groupFragmentedTransaction.apply(context, session, fc, conn, flowFiles, groups, result); } else { - groupFlowFilesBySQLBatch.apply(context, session, fc, conn, flowFiles, groups, sqlToEnclosure, result); + groupFlowFilesBySQLBatch.apply(context, session, fc, conn, flowFiles, groups, result); } return groups; @@ -974,6 +980,10 @@ public class PutSQL extends AbstractSessionFactoryProcessor { this.sql = sql; } + public String getSql() { + return sql; + } + public PreparedStatement getNewStatement(final Connection conn, final boolean obtainKeys) throws SQLException { if (obtainKeys) { // Create a new Prepared Statement, requesting that it return the generated keys. diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java index 1aa55a8de1..b1dce43e92 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java @@ -47,6 +47,7 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Function; import javax.xml.bind.DatatypeConverter; @@ -192,7 +193,7 @@ public class TestPutSQL { } @Test - public void testProvenanceEventsWithFragmentedTransactions() throws InitializationException, ProcessException, SQLException { + public void testProvenanceEventsWithFragmentedTransaction() throws InitializationException, ProcessException, SQLException { final TestRunner runner = initTestRunner(); runner.setProperty(PutSQL.BATCH_SIZE, "10"); runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "true"); @@ -211,7 +212,7 @@ public class TestPutSQL { testProvenanceEvents(runner); } - private void testProvenanceEvents(final TestRunner runner) throws InitializationException, ProcessException, SQLException { + private void testProvenanceEvents(final TestRunner runner) throws ProcessException, SQLException { recreateTable("PERSONS", createPersons); runner.enqueue("DELETE FROM PERSONS WHERE ID = 1"); @@ -227,6 +228,74 @@ public class TestPutSQL { } } + @Test + public void testKeepFlowFileOrderingWithBatchMode() throws InitializationException, ProcessException, SQLException { + final TestRunner runner = initTestRunner(); + runner.setProperty(PutSQL.BATCH_SIZE, "10"); + runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false"); + runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false"); + + testKeepFlowFileOrdering(runner); + } + + @Test + public void testKeepFlowFileOrderingWithFragmentedTransaction() throws InitializationException, ProcessException, SQLException { + final TestRunner runner = initTestRunner(); + runner.setProperty(PutSQL.BATCH_SIZE, "10"); + runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "true"); + runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false"); + + testKeepFlowFileOrdering(runner); + } + + @Test + public void testKeepFlowFileOrderingWithObtainGeneratedKeys() throws InitializationException, ProcessException, SQLException { + final TestRunner runner = initTestRunner(); + runner.setProperty(PutSQL.BATCH_SIZE, "10"); + runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false"); + runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "true"); + + testKeepFlowFileOrdering(runner); + } + + private void testKeepFlowFileOrdering(final TestRunner runner) throws ProcessException, SQLException { + recreateTable("PERSONS", createPersons); + + final String delete = "DELETE FROM PERSONS WHERE ID = ?"; + final String insert = "INSERT INTO PERSONS (ID) VALUES (?)"; + + final String[] statements = {delete, insert, insert, delete, delete, insert}; + + final Function> createSqlAttributes = (id) -> { + final Map attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.1.value", String.valueOf(id)); + return attributes; + }; + + final int flowFileCount = statements.length; + + for (int i = 0; i < flowFileCount; i++) { + runner.enqueue(statements[i], createSqlAttributes.apply(i)); + } + + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, flowFileCount); + List flowFiles = runner.getFlowFilesForRelationship(PutSQL.REL_SUCCESS); + for (int i = 0; i < flowFileCount; i++) { + MockFlowFile flowFile = flowFiles.get(i); + assertEquals(statements[i], flowFile.getContent()); + assertEquals(String.valueOf(i), flowFile.getAttribute("sql.args.1.value")); + } + List provenanceEvents = runner.getProvenanceEvents(); + assertEquals(flowFileCount, provenanceEvents.size()); + for (int i = 0; i < flowFileCount; i++) { + ProvenanceEventRecord event = provenanceEvents.get(i); + assertEquals(String.valueOf(i), event.getAttribute("sql.args.1.value")); + } + } + @Test public void testFailInMiddleWithBadStatementAndSupportTransaction() throws InitializationException, ProcessException, SQLException, IOException {