diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java index 0d337d84bd..5d4506c311 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java @@ -577,6 +577,15 @@ public class MergeContent extends BinFiles { return NUMBER_PATTERN.matcher(value).matches(); } + private void removeFlowFileFromSession(final ProcessSession session, final FlowFile flowFile, final ProcessContext context) { + try { + session.remove(flowFile); + } catch (final Exception e) { + getLogger().error("Failed to remove merged FlowFile from the session after merge failure during \"" + + context.getProperty(MERGE_FORMAT).getValue() + "\" merge.", e); + } + } + private class BinaryConcatenationMerge implements MergeBin { private String mimeType = "application/octet-stream"; @@ -636,7 +645,7 @@ public class MergeContent extends BinFiles { } }); } catch (final Exception e) { - session.remove(bundle); + removeFlowFileFromSession(session, bundle, context); throw e; } @@ -781,7 +790,7 @@ public class MergeContent extends BinFiles { } }); } catch (final Exception e) { - session.remove(bundle); + removeFlowFileFromSession(session, bundle, context); throw e; } @@ -848,7 +857,7 @@ public class MergeContent extends BinFiles { } }); } catch (final Exception e) { - session.remove(bundle); + removeFlowFileFromSession(session, bundle, context); throw e; } @@ -918,7 +927,7 @@ public class MergeContent extends BinFiles { } }); } catch (final Exception e) { - session.remove(bundle); + removeFlowFileFromSession(session, bundle, context); throw e; } @@ -1042,7 +1051,7 @@ public class MergeContent extends BinFiles { } }); } catch (final Exception e) { - session.remove(bundle); + removeFlowFileFromSession(session, bundle, context); throw e; }