1
0
mirror of https://github.com/apache/nifi.git synced 2025-03-02 15:39:12 +00:00

NIFI-7403:Add a function that adjust if the result is failed before we call the onFailed or onCompleted function. If the result is failed, return true and do sth

NIFI-7403:Add an extension point to adjust the result, if the result is failed then process onFailed function

NIFI-7403:Implement the AdjustFailed Function, if PutSQL set the SUPPORT_TRANSACTIONS true, then check whether the result contains REL_RETRY or REL_FAILURE.If it contains that, reroute the result and return true.

NIFI-7403: fix reroute logic in AdjustFailed function

NIFI-7403:Add and modify some unit test for PutSQL's SUPPORT_TRANSACTIONS property

NIFI-7403:Update for PR recheck

NIFI-7403:Add documentation on the Support Fragmented Transactions property to indicate the transactions rollback behavior

NIFI-7403: Fix Checkstyle issue

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes 
This commit is contained in:
zhangcheng 2020-05-12 14:47:58 +08:00 committed by Matthew Burgess
parent 2e45d959ba
commit 1ec6675f38
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
4 changed files with 125 additions and 31 deletions
nifi-nar-bundles
nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern
nifi-standard-bundle/nifi-standard-processors/src
main/java/org/apache/nifi/processors/standard
test/java/org/apache/nifi/processors/standard

@ -97,6 +97,12 @@ public class PartialFunctions {
void rollback(ProcessSession session, Throwable t);
}
@FunctionalInterface
public interface AdjustFailed {
boolean apply(ProcessContext context, RoutingResult result);
}
/**
* <p>This method is identical to what {@link org.apache.nifi.processor.AbstractProcessor#onTrigger(ProcessContext, ProcessSession)} does.</p>
* <p>Create a session from ProcessSessionFactory and execute specified onTrigger function, and commit the session if onTrigger finishes successfully.</p>

@ -42,6 +42,7 @@ public class Put<FC, C extends AutoCloseable> {
protected PartialFunctions.OnCompleted<FC, C> onCompleted;
protected PartialFunctions.OnFailed<FC, C> onFailed;
protected PartialFunctions.Cleanup<FC, C> cleanup;
protected PartialFunctions.AdjustFailed adjustFailed;
protected ComponentLog logger;
/**
@ -117,8 +118,17 @@ public class Put<FC, C extends AutoCloseable> {
.filter(flowFile -> !transferredFlowFiles.contains(flowFile)).collect(Collectors.toList());
result.routeTo(unprocessedFlowFiles, Relationship.SELF);
// Extension point to adjust the result, if the result is failed then process onFailed function
boolean failed = false;
if (adjustFailed != null) {
failed = adjustFailed.apply(context, result);
}
if (failed && onFailed != null) {
onFailed.apply(context, session, functionContext, connection,null);
}
// OnCompleted processing.
if (onCompleted != null) {
if (!failed && onCompleted != null) {
onCompleted.apply(context, session, functionContext, connection);
}
@ -182,6 +192,15 @@ public class Put<FC, C extends AutoCloseable> {
this.adjustRoute = f;
}
/**
* Specify an optional function that adjust if the result is failed before we call the onFailed or onCompleted function.
* If the result is failed, return true and do sth.
* @param f Function to be called to adjust if the result is failed
*/
public void adjustFailed(PartialFunctions.AdjustFailed f) {
this.adjustFailed = f;
}
/**
* Specify an optional function responsible for transferring routed FlowFiles.
* If not specified routed FlowFiles are simply transferred to its destination by default.

@ -76,6 +76,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import static java.lang.String.format;
import static org.apache.nifi.processor.util.pattern.ExceptionHandler.createOnError;
@ -151,7 +152,10 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
.description("If true, when a FlowFile is consumed by this Processor, the Processor will first check the fragment.identifier and fragment.count attributes of that FlowFile. "
+ "If the fragment.count value is greater than 1, the Processor will not process any FlowFile with that fragment.identifier until all are available; "
+ "at that point, it will process all FlowFiles with that fragment.identifier as a single transaction, in the order specified by the FlowFiles' fragment.index attributes. "
+ "This Provides atomicity of those SQL statements. If this value is false, these attributes will be ignored and the updates will occur independent of one another.")
+ "This Provides atomicity of those SQL statements. Once any statement of this transaction throws exception when executing, this transaction will be rolled back. When "
+ "transaction rollback happened, none of these FlowFiles would be routed to 'success'. If the <Rollback On Failure> is set true, these FlowFiles will stay in the input "
+ "relationship. When the <Rollback On Failure> is set false,, if any of these FlowFiles will be routed to 'retry', all of these FlowFiles will be routed to 'retry'.Otherwise, "
+ "they will be routed to 'failure'. If this value is false, these attributes will be ignored and the updates will occur independent of one another.")
.allowableValues("true", "false")
.defaultValue("true")
.build();
@ -578,6 +582,20 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
}
});
process.adjustFailed((c, r) -> {
if (c.getProperty(SUPPORT_TRANSACTIONS).asBoolean()){
if (r.contains(REL_RETRY) || r.contains(REL_FAILURE)) {
final List<FlowFile> transferredFlowFiles = r.getRoutedFlowFiles().values().stream()
.flatMap(List::stream).collect(Collectors.toList());
Relationship rerouteShip = r.contains(REL_RETRY) ? REL_RETRY : REL_FAILURE;
r.getRoutedFlowFiles().clear();
r.routeTo(transferredFlowFiles, rerouteShip);
return true;
}
}
return false;
});
exceptionHandler = new ExceptionHandler<>();
exceptionHandler.mapException(e -> {
if (e instanceof SQLNonTransientException) {

@ -163,8 +163,27 @@ public class TestPutSQL {
@Test
public void testFailInMiddleWithBadStatement() throws InitializationException, ProcessException, SQLException, IOException {
public void testFailInMiddleWithBadStatementAndSupportTransaction() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
testFailInMiddleWithBadStatement(runner);
runner.run();
runner.assertTransferCount(PutSQL.REL_FAILURE, 4);
runner.assertTransferCount(PutSQL.REL_SUCCESS, 0);
}
@Test
public void testFailInMiddleWithBadStatementAndNotSupportTransaction() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
testFailInMiddleWithBadStatement(runner);
runner.run();
runner.assertTransferCount(PutSQL.REL_FAILURE, 1);
runner.assertTransferCount(PutSQL.REL_SUCCESS, 3);
}
private void testFailInMiddleWithBadStatement(final TestRunner runner) throws InitializationException {
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
@ -173,12 +192,10 @@ public class TestPutSQL {
runner.enqueue("INSERT INTO PERSONS_AI".getBytes()); // intentionally wrong syntax
runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Tom', 3)".getBytes());
runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Harry', 44)".getBytes());
runner.run();
runner.assertTransferCount(PutSQL.REL_FAILURE, 1);
runner.assertTransferCount(PutSQL.REL_SUCCESS, 3);
}
@Test
public void testFailInMiddleWithBadStatementRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
@ -202,10 +219,28 @@ public class TestPutSQL {
}
}
@Test
public void testFailInMiddleWithBadParameterTypeAndNotSupportTransaction() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
testFailInMiddleWithBadParameterType(runner);
runner.run();
runner.assertTransferCount(PutSQL.REL_FAILURE, 1);
runner.assertTransferCount(PutSQL.REL_SUCCESS, 3);
}
@Test
public void testFailInMiddleWithBadParameterType() throws InitializationException, ProcessException, SQLException, IOException {
public void testFailInMiddleWithBadParameterTypeAndSupportTransaction() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
testFailInMiddleWithBadParameterType(runner);
runner.run();
runner.assertTransferCount(PutSQL.REL_FAILURE, 4);
runner.assertTransferCount(PutSQL.REL_SUCCESS, 0);
}
private void testFailInMiddleWithBadParameterType(final TestRunner runner) throws InitializationException, ProcessException, SQLException, IOException {
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
@ -224,11 +259,6 @@ public class TestPutSQL {
runner.enqueue(data, badAttributes);
runner.enqueue(data, goodAttributes);
runner.enqueue(data, goodAttributes);
runner.run();
runner.assertTransferCount(PutSQL.REL_FAILURE, 1);
runner.assertTransferCount(PutSQL.REL_SUCCESS, 3);
}
@Test
@ -265,28 +295,28 @@ public class TestPutSQL {
}
@Test
public void testFailInMiddleWithBadParameterValue() throws InitializationException, ProcessException, SQLException, IOException {
public void testFailInMiddleWithBadParameterValueAndSupportTransaction() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
testFailInMiddleWithBadParameterValue(runner);
runner.run();
recreateTable("PERSONS_AI",createPersonsAutoId);
runner.assertTransferCount(PutSQL.REL_SUCCESS, 0);
runner.assertTransferCount(PutSQL.REL_FAILURE, 0);
runner.assertTransferCount(PutSQL.REL_RETRY, 4);
final Map<String, String> goodAttributes = new HashMap<>();
goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
goodAttributes.put("sql.args.1.value", "84");
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS_AI");
assertFalse(rs.next());
}
}
}
final Map<String, String> badAttributes = new HashMap<>();
badAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
badAttributes.put("sql.args.1.value", "9999");
final byte[] data = "INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', ?)".getBytes();
runner.enqueue(data, goodAttributes);
runner.enqueue(data, badAttributes);
runner.enqueue(data, goodAttributes);
runner.enqueue(data, goodAttributes);
@Test
public void testFailInMiddleWithBadParameterValueAndNotSupportTransaction() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
testFailInMiddleWithBadParameterValue(runner);
runner.run();
runner.assertTransferCount(PutSQL.REL_SUCCESS, 1);
@ -305,6 +335,27 @@ public class TestPutSQL {
}
}
private void testFailInMiddleWithBadParameterValue(final TestRunner runner) throws InitializationException, ProcessException, SQLException, IOException {
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
recreateTable("PERSONS_AI",createPersonsAutoId);
final Map<String, String> goodAttributes = new HashMap<>();
goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
goodAttributes.put("sql.args.1.value", "84");
final Map<String, String> badAttributes = new HashMap<>();
badAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
badAttributes.put("sql.args.1.value", "9999");
final byte[] data = "INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', ?)".getBytes();
runner.enqueue(data, goodAttributes);
runner.enqueue(data, badAttributes);
runner.enqueue(data, goodAttributes);
runner.enqueue(data, goodAttributes);
}
@Test
public void testFailInMiddleWithBadParameterValueRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);