NIFI-13397 Updated PutDatabaseRecord to retry transient ProcessException causes

This closes #8964

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Jim Steinebrey 2024-06-13 12:27:17 -04:00 committed by exceptionfactory
parent a52d6a8214
commit bc95799a39
No known key found for this signature in database
2 changed files with 71 additions and 1 deletions

View File

@ -585,7 +585,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
// When an Exception is thrown, we want to route to 'retry' if we expect that attempting the same request again
// might work. Otherwise, route to failure. SQLTransientException is a specific type that indicates that a retry may work.
final Relationship relationship;
final Throwable toAnalyze = (e instanceof BatchUpdateException) ? e.getCause() : e;
final Throwable toAnalyze = (e instanceof BatchUpdateException || e instanceof ProcessException) ? e.getCause() : e;
if (toAnalyze instanceof SQLTransientException) {
relationship = REL_RETRY;
flowFile = session.penalize(flowFile);

View File

@ -58,6 +58,7 @@ import java.sql.ResultSet;
import java.sql.SQLDataException;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLTransientException;
import java.sql.Statement;
import java.time.LocalDate;
import java.time.ZoneOffset;
@ -263,6 +264,57 @@ public class PutDatabaseRecordTest {
}
@Test
public void testProcessExceptionRouteRetry() throws InitializationException, SQLException {
setRunner(TestCaseEnum.DEFAULT_1.getTestCase());
// This exception should route to REL_RETRY because its cause is SQLTransientException
dbcp = new DBCPServiceThrowConnectionException(new SQLTransientException("connection failed"));
final Map<String, String> dbcpProperties = new HashMap<>();
runner = TestRunners.newTestRunner(processor);
runner.addControllerService(DBCP_SERVICE_ID, dbcp, dbcpProperties);
runner.enableControllerService(dbcp);
runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID);
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
runner.enableControllerService(parser);
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE);
runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");
runner.enqueue(new byte[0]);
runner.run();
runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_RETRY);
}
@Test
public void testProcessExceptionRouteFailure() throws InitializationException, SQLException {
setRunner(TestCaseEnum.DEFAULT_1.getTestCase());
// This exception should route to REL_FAILURE because its cause is NOT SQLTransientException
dbcp = new DBCPServiceThrowConnectionException(new NullPointerException("connection is null"));
final Map<String, String> dbcpProperties = new HashMap<>();
runner = TestRunners.newTestRunner(processor);
runner.addControllerService(DBCP_SERVICE_ID, dbcp, dbcpProperties);
runner.enableControllerService(dbcp);
runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID);
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
runner.enableControllerService(parser);
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE);
runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");
runner.enqueue(new byte[0]);
runner.run();
runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_FAILURE);
}
public void testInsertNonRequiredColumnsUnmatchedField() throws InitializationException, ProcessException {
setRunner(TestCaseEnum.DEFAULT_2.getTestCase());
@ -2335,6 +2387,24 @@ public class PutDatabaseRecordTest {
}
}
static class DBCPServiceThrowConnectionException extends AbstractControllerService implements DBCPService {
private final Exception rootCause;
public DBCPServiceThrowConnectionException(final Exception rootCause) {
this.rootCause = rootCause;
}
@Override
public String getIdentifier() {
return DBCP_SERVICE_ID;
}
@Override
public Connection getConnection() throws ProcessException {
throw new ProcessException(rootCause);
}
}
static class DBCPServiceAutoCommitTest extends AbstractControllerService implements DBCPService {
private final String databaseLocation;