mirror of https://github.com/apache/nifi.git
NIFI-495: Fixed handling of FlowFiles if destination full by rolling back session
This commit is contained in:
parent
8d20b82095
commit
7819afbefd
|
@ -171,6 +171,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
|
||||||
this.targetRunning.set(false);
|
this.targetRunning.set(false);
|
||||||
final String message = String.format("%s failed to communicate with %s because the remote instance indicates that the port is not in a valid state", this, url);
|
final String message = String.format("%s failed to communicate with %s because the remote instance indicates that the port is not in a valid state", this, url);
|
||||||
logger.error(message);
|
logger.error(message);
|
||||||
|
session.rollback();
|
||||||
remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
|
remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
|
||||||
return;
|
return;
|
||||||
} catch (final UnknownPortException e) {
|
} catch (final UnknownPortException e) {
|
||||||
|
@ -178,6 +179,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
|
||||||
this.targetExists.set(false);
|
this.targetExists.set(false);
|
||||||
final String message = String.format("%s failed to communicate with %s because the remote instance indicates that the port no longer exists", this, url);
|
final String message = String.format("%s failed to communicate with %s because the remote instance indicates that the port no longer exists", this, url);
|
||||||
logger.error(message);
|
logger.error(message);
|
||||||
|
session.rollback();
|
||||||
remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
|
remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
|
||||||
return;
|
return;
|
||||||
} catch (final IOException e) {
|
} catch (final IOException e) {
|
||||||
|
@ -186,13 +188,14 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
|
||||||
if ( logger.isDebugEnabled() ) {
|
if ( logger.isDebugEnabled() ) {
|
||||||
logger.error("", e);
|
logger.error("", e);
|
||||||
}
|
}
|
||||||
|
session.rollback();
|
||||||
remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
|
remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
|
||||||
session.rollback();
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( transaction == null ) {
|
if ( transaction == null ) {
|
||||||
logger.debug("{} Unable to create transaction to communicate with; all peers must be penalized, so yielding context", this);
|
logger.debug("{} Unable to create transaction to communicate with; all peers must be penalized, so yielding context", this);
|
||||||
|
session.rollback();
|
||||||
context.yield();
|
context.yield();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue