NIFI-12014 NullPointerException in PutSQL when adding error attributes

This closes #7666.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
krisztina-zsihovszki 2023-08-31 18:29:04 +02:00 committed by Peter Turcsanyi
parent 2b330d9fee
commit 75cfe21e4c
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
2 changed files with 59 additions and 11 deletions

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.standard;
import java.util.Optional;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
@ -75,9 +76,9 @@ import java.util.function.BiFunction;
import static java.lang.String.format;
import static java.lang.String.valueOf;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toList;
import static org.apache.nifi.processor.util.pattern.ErrorTypes.Destination.Failure;
import static org.apache.nifi.processor.util.pattern.ExceptionHandler.createOnError;
@SupportsBatching
@ -462,15 +463,16 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
private ExceptionHandler.OnError<FunctionContext, FlowFile> onFlowFileError(final ProcessContext context, final ProcessSession session, final RoutingResult result) {
ExceptionHandler.OnError<FunctionContext, FlowFile> onFlowFileError = createOnError(context, session, result, REL_FAILURE, REL_RETRY);
onFlowFileError = onFlowFileError.andThen((ctx, flowFile, errorTypesResult, exception) -> {
flowFile = addErrorAttributesToFlowFile(session, flowFile, exception);
switch (errorTypesResult.destination()) {
case Failure:
getLogger().error("Failed to update database for {} due to {}; routing to failure", flowFile, exception, exception);
addErrorAttributesToFlowFile(session, flowFile, exception);
break;
case Retry:
getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry",
flowFile, exception, exception);
addErrorAttributesToFlowFile(session, flowFile, exception);
break;
case Self:
getLogger().error("Failed to update database for {} due to {};", flowFile, exception, exception);
@ -485,14 +487,26 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
ExceptionHandler.createOnGroupError(context, session, result, REL_FAILURE, REL_RETRY);
onGroupError = onGroupError.andThen((ctx, flowFileGroup, errorTypesResult, exception) -> {
Relationship relationship = errorTypesResult.destination() == Failure ? REL_FAILURE : REL_RETRY;
List<FlowFile> flowFilesToRelationship = result.getRoutedFlowFiles().get(relationship);
result.getRoutedFlowFiles().put(relationship, addErrorAttributesToFlowFilesInGroup(session, flowFilesToRelationship, flowFileGroup.getFlowFiles(), exception));
switch (errorTypesResult.destination()) {
case Failure:
List<FlowFile> flowFilesToFailure = getFlowFilesOnRelationship(result, REL_FAILURE);
result.getRoutedFlowFiles().put(REL_FAILURE, addErrorAttributesToFlowFilesInGroup(session, flowFilesToFailure, flowFileGroup.getFlowFiles(), exception));
break;
case Retry:
List<FlowFile> flowFilesToRetry = getFlowFilesOnRelationship(result, REL_RETRY);
result.getRoutedFlowFiles().put(REL_RETRY, addErrorAttributesToFlowFilesInGroup(session, flowFilesToRetry, flowFileGroup.getFlowFiles(), exception));
break;
}
});
return onGroupError;
}
private List<FlowFile> getFlowFilesOnRelationship(RoutingResult result, final Relationship relationship) {
return Optional.ofNullable(result.getRoutedFlowFiles().get(relationship))
.orElse(emptyList());
}
private List<FlowFile> addErrorAttributesToFlowFilesInGroup(ProcessSession session, List<FlowFile> flowFilesOnRelationship, List<FlowFile> flowFilesInGroup, Exception exception) {
return flowFilesOnRelationship.stream()
.map(ff -> flowFilesInGroup.contains(ff) ? addErrorAttributesToFlowFile(session, ff, exception) : ff)
@ -861,8 +875,16 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
attributes.put(ERROR_MESSAGE_ATTR, exception.getMessage());
if (exception instanceof SQLException) {
attributes.put(ERROR_CODE_ATTR, valueOf(((SQLException) exception).getErrorCode()));
attributes.put(ERROR_SQL_STATE_ATTR, valueOf(((SQLException) exception).getSQLState()));
int errorCode = ((SQLException) exception).getErrorCode();
String sqlState = ((SQLException) exception).getSQLState();
if (errorCode > 0) {
attributes.put(ERROR_CODE_ATTR, valueOf(errorCode));
}
if (sqlState != null) {
attributes.put(ERROR_SQL_STATE_ATTR, sqlState);
}
}
return session.putAllAttributes(flowFile, attributes);

View File

@ -397,6 +397,32 @@ public class TestPutSQL {
runner.assertTransferCount(PutSQL.REL_SUCCESS, 0);
}
@Test
public void testFailInMiddleWithNumberFormatException() throws InitializationException, ProcessException {
final TestRunner runner = initTestRunner();
runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "false");
runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
final Map<String, String> goodAttributes = new HashMap<>();
goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
goodAttributes.put("sql.args.1.value", "84");
final Map<String, String> badAttributes = new HashMap<>();
badAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
badAttributes.put("sql.args.1.value", "hello");
final byte[] data = "INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', ?)".getBytes();
runner.enqueue(data, goodAttributes);
runner.enqueue(data, badAttributes);
runner.enqueue(data, goodAttributes);
runner.enqueue(data, goodAttributes);
runner.run();
runner.assertTransferCount(PutSQL.REL_FAILURE, 1);
runner.assertTransferCount(PutSQL.REL_SUCCESS, 3);
assertNonSQLErrorRelatedAttributes(runner, PutSQL.REL_FAILURE);
}
@Test
public void testFailInMiddleWithBadParameterValueAndSupportTransaction() throws InitializationException, ProcessException, SQLException {
final TestRunner runner = initTestRunner();
@ -1260,7 +1286,7 @@ public class TestPutSQL {
// should fail because of the semicolon
runner.assertAllFlowFilesTransferred(PutSQL.REL_RETRY, 1);
assertSQLExceptionRelatedAttributes(runner, PutSQL.REL_RETRY);
assertNonSQLErrorRelatedAttributes(runner, PutSQL.REL_RETRY);
}
@Test
@ -1411,7 +1437,7 @@ public class TestPutSQL {
// No FlowFiles should be transferred because there were not enough FlowFiles with the same fragment identifier
runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1);
assertNonSQLExceptionRelatedAttribute(runner);
assertNonSQLErrorRelatedAttributes(runner, PutSQL.REL_FAILURE);
}
@Test
@ -1744,8 +1770,8 @@ public class TestPutSQL {
});
}
private static void assertNonSQLExceptionRelatedAttribute(final TestRunner runner) {
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutSQL.REL_FAILURE);
private static void assertNonSQLErrorRelatedAttributes(final TestRunner runner, Relationship relationship) {
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(relationship);
flowFiles.forEach(ff -> {
ff.assertAttributeExists("error.message");
});