NIFI-8193: Fixed PutSQL reordering FlowFiles

Also removed sorting of transferred MockFlowFiles in test framework.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #4803
This commit is contained in:
Peter Turcsanyi 2021-02-03 15:46:24 +01:00 committed by Matthew Burgess
parent 749d05840b
commit 40d8c41656
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
3 changed files with 103 additions and 39 deletions

View File

@ -56,7 +56,6 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -466,13 +465,6 @@ public class StandardProcessorTestRunner implements TestRunner {
flowFiles.addAll(session.getFlowFilesForRelationship(relationship));
}
Collections.sort(flowFiles, new Comparator<MockFlowFile>() {
@Override
public int compare(final MockFlowFile o1, final MockFlowFile o2) {
return Long.compare(o1.getCreationTime(), o2.getCreationTime());
}
});
return flowFiles;
}
@ -483,13 +475,6 @@ public class StandardProcessorTestRunner implements TestRunner {
flowFiles.addAll(session.getPenalizedFlowFiles());
}
Collections.sort(flowFiles, new Comparator<MockFlowFile>() {
@Override
public int compare(final MockFlowFile o1, final MockFlowFile o2) {
return Long.compare(o1.getCreationTime(), o2.getCreationTime());
}
});
return flowFiles;
}

View File

@ -301,14 +301,15 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
void apply(final ProcessContext context, final ProcessSession session, final FunctionContext fc,
final Connection conn, final List<FlowFile> flowFiles,
final List<StatementFlowFileEnclosure> groups,
final Map<String, StatementFlowFileEnclosure> sqlToEnclosure,
final RoutingResult result);
}
private GroupingFunction groupFragmentedTransaction = (context, session, fc, conn, flowFiles, groups, sqlToEnclosure, result) -> {
private final GroupingFunction groupFragmentedTransaction = (context, session, fc, conn, flowFiles, groups, result) -> {
final FragmentedEnclosure fragmentedEnclosure = new FragmentedEnclosure();
groups.add(fragmentedEnclosure);
final Map<String, StatementFlowFileEnclosure> sqlToEnclosure = new HashMap<>();
for (final FlowFile flowFile : flowFiles) {
final String sql = context.getProperty(PutSQL.SQL_STATEMENT).isSet()
? context.getProperty(PutSQL.SQL_STATEMENT).evaluateAttributeExpressions(flowFile).getValue()
@ -321,19 +322,22 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
}
};
private final GroupingFunction groupFlowFilesBySQLBatch = (context, session, fc, conn, flowFiles, groups, sqlToEnclosure, result) -> {
private final GroupingFunction groupFlowFilesBySQLBatch = (context, session, fc, conn, flowFiles, groups, result) -> {
for (final FlowFile flowFile : flowFiles) {
final String sql = context.getProperty(PutSQL.SQL_STATEMENT).isSet()
? context.getProperty(PutSQL.SQL_STATEMENT).evaluateAttributeExpressions(flowFile).getValue()
: getSQL(session, flowFile);
// Get or create the appropriate PreparedStatement to use.
final StatementFlowFileEnclosure enclosure = sqlToEnclosure
.computeIfAbsent(sql, k -> {
final StatementFlowFileEnclosure newEnclosure = new StatementFlowFileEnclosure(sql);
groups.add(newEnclosure);
return newEnclosure;
});
// Create a new PreparedStatement or reuse the one from the last group if that is the same.
final StatementFlowFileEnclosure enclosure;
final StatementFlowFileEnclosure lastEnclosure = groups.isEmpty() ? null : groups.get(groups.size() - 1);
if (lastEnclosure == null || !lastEnclosure.getSql().equals(sql)) {
enclosure = new StatementFlowFileEnclosure(sql);
groups.add(enclosure);
} else {
enclosure = lastEnclosure;
}
if(!exceptionHandler.execute(fc, flowFile, input -> {
final PreparedStatement stmt = enclosure.getCachedStatement(conn);
@ -347,26 +351,28 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
}
};
private GroupingFunction groupFlowFilesBySQL = (context, session, fc, conn, flowFiles, groups, sqlToEnclosure, result) -> {
private final GroupingFunction groupFlowFilesBySQL = (context, session, fc, conn, flowFiles, groups, result) -> {
for (final FlowFile flowFile : flowFiles) {
final String sql = context.getProperty(PutSQL.SQL_STATEMENT).isSet()
? context.getProperty(PutSQL.SQL_STATEMENT).evaluateAttributeExpressions(flowFile).getValue()
: getSQL(session, flowFile);
// Get or create the appropriate PreparedStatement to use.
final StatementFlowFileEnclosure enclosure = sqlToEnclosure
.computeIfAbsent(sql, k -> {
final StatementFlowFileEnclosure newEnclosure = new StatementFlowFileEnclosure(sql);
groups.add(newEnclosure);
return newEnclosure;
});
// Create a new PreparedStatement or reuse the one from the last group if that is the same.
final StatementFlowFileEnclosure enclosure;
final StatementFlowFileEnclosure lastEnclosure = groups.isEmpty() ? null : groups.get(groups.size() - 1);
if (lastEnclosure == null || !lastEnclosure.getSql().equals(sql)) {
enclosure = new StatementFlowFileEnclosure(sql);
groups.add(enclosure);
} else {
enclosure = lastEnclosure;
}
enclosure.addFlowFile(flowFile);
}
};
final PutGroup.GroupFlowFiles<FunctionContext, Connection, StatementFlowFileEnclosure> groupFlowFiles = (context, session, fc, conn, flowFiles, result) -> {
final Map<String, StatementFlowFileEnclosure> sqlToEnclosure = new HashMap<>();
final List<StatementFlowFileEnclosure> groups = new ArrayList<>();
// There are three patterns:
@ -374,11 +380,11 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
// 2. Obtain keys: An enclosure has multiple FlowFiles, and each FlowFile is executed separately
// 3. Fragmented transaction: One FlowFile per Enclosure?
if (fc.obtainKeys) {
groupFlowFilesBySQL.apply(context, session, fc, conn, flowFiles, groups, sqlToEnclosure, result);
groupFlowFilesBySQL.apply(context, session, fc, conn, flowFiles, groups, result);
} else if (fc.fragmentedTransaction) {
groupFragmentedTransaction.apply(context, session, fc, conn, flowFiles, groups, sqlToEnclosure, result);
groupFragmentedTransaction.apply(context, session, fc, conn, flowFiles, groups, result);
} else {
groupFlowFilesBySQLBatch.apply(context, session, fc, conn, flowFiles, groups, sqlToEnclosure, result);
groupFlowFilesBySQLBatch.apply(context, session, fc, conn, flowFiles, groups, result);
}
return groups;
@ -974,6 +980,10 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
this.sql = sql;
}
public String getSql() {
return sql;
}
public PreparedStatement getNewStatement(final Connection conn, final boolean obtainKeys) throws SQLException {
if (obtainKeys) {
// Create a new Prepared Statement, requesting that it return the generated keys.

View File

@ -47,6 +47,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import javax.xml.bind.DatatypeConverter;
@ -192,7 +193,7 @@ public class TestPutSQL {
}
@Test
public void testProvenanceEventsWithFragmentedTransactions() throws InitializationException, ProcessException, SQLException {
public void testProvenanceEventsWithFragmentedTransaction() throws InitializationException, ProcessException, SQLException {
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.BATCH_SIZE, "10");
runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "true");
@ -211,7 +212,7 @@ public class TestPutSQL {
testProvenanceEvents(runner);
}
private void testProvenanceEvents(final TestRunner runner) throws InitializationException, ProcessException, SQLException {
private void testProvenanceEvents(final TestRunner runner) throws ProcessException, SQLException {
recreateTable("PERSONS", createPersons);
runner.enqueue("DELETE FROM PERSONS WHERE ID = 1");
@ -227,6 +228,74 @@ public class TestPutSQL {
}
}
@Test
public void testKeepFlowFileOrderingWithBatchMode() throws InitializationException, ProcessException, SQLException {
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.BATCH_SIZE, "10");
runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
testKeepFlowFileOrdering(runner);
}
@Test
public void testKeepFlowFileOrderingWithFragmentedTransaction() throws InitializationException, ProcessException, SQLException {
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.BATCH_SIZE, "10");
runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "true");
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
testKeepFlowFileOrdering(runner);
}
@Test
public void testKeepFlowFileOrderingWithObtainGeneratedKeys() throws InitializationException, ProcessException, SQLException {
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.BATCH_SIZE, "10");
runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "true");
testKeepFlowFileOrdering(runner);
}
private void testKeepFlowFileOrdering(final TestRunner runner) throws ProcessException, SQLException {
recreateTable("PERSONS", createPersons);
final String delete = "DELETE FROM PERSONS WHERE ID = ?";
final String insert = "INSERT INTO PERSONS (ID) VALUES (?)";
final String[] statements = {delete, insert, insert, delete, delete, insert};
final Function<Integer, Map<String, String>> createSqlAttributes = (id) -> {
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", String.valueOf(id));
return attributes;
};
final int flowFileCount = statements.length;
for (int i = 0; i < flowFileCount; i++) {
runner.enqueue(statements[i], createSqlAttributes.apply(i));
}
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, flowFileCount);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutSQL.REL_SUCCESS);
for (int i = 0; i < flowFileCount; i++) {
MockFlowFile flowFile = flowFiles.get(i);
assertEquals(statements[i], flowFile.getContent());
assertEquals(String.valueOf(i), flowFile.getAttribute("sql.args.1.value"));
}
List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
assertEquals(flowFileCount, provenanceEvents.size());
for (int i = 0; i < flowFileCount; i++) {
ProvenanceEventRecord event = provenanceEvents.get(i);
assertEquals(String.valueOf(i), event.getAttribute("sql.args.1.value"));
}
}
@Test
public void testFailInMiddleWithBadStatementAndSupportTransaction() throws InitializationException, ProcessException, SQLException, IOException {