From 7819afbefd980ce68f43093302997024926d9f51 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 8 Apr 2015 13:38:33 -0400 Subject: [PATCH] NIFI-495: Fixed handling of FlowFiles if destination full by rolling back session --- .../java/org/apache/nifi/remote/StandardRemoteGroupPort.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java index 740e405b87..69ba0fd4f8 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java @@ -171,6 +171,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { this.targetRunning.set(false); final String message = String.format("%s failed to communicate with %s because the remote instance indicates that the port is not in a valid state", this, url); logger.error(message); + session.rollback(); remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message); return; } catch (final UnknownPortException e) { @@ -178,6 +179,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { this.targetExists.set(false); final String message = String.format("%s failed to communicate with %s because the remote instance indicates that the port no longer exists", this, url); logger.error(message); + session.rollback(); remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message); return; } catch (final IOException e) { @@ -186,13 +188,14 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { if ( logger.isDebugEnabled() ) { logger.error("", e); } + session.rollback(); remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message); - session.rollback(); return; } if ( transaction == null ) { logger.debug("{} Unable to create transaction to communicate with; all peers must be penalized, so yielding context", this); + session.rollback(); context.yield(); return; }