diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java index 1c77cbd00f..3401d665a3 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java @@ -40,6 +40,7 @@ import java.util.TreeMap; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import java.util.zip.ZipEntry; +import java.util.zip.ZipException; import java.util.zip.ZipOutputStream; import org.apache.avro.Schema; @@ -810,6 +811,8 @@ public class MergeContent extends BinFiles { private final int compressionLevel; + private List unmerged = new ArrayList<>(); + public ZipMerge(final int compressionLevel) { this.compressionLevel = compressionLevel; } @@ -820,6 +823,7 @@ public class MergeContent extends BinFiles { final ProcessSession session = bin.getSession(); final List contents = bin.getContents(); + unmerged.addAll(contents); FlowFile bundle = session.create(contents); @@ -835,10 +839,15 @@ public class MergeContent extends BinFiles { final String entryName = path + flowFile.getAttribute(CoreAttributes.FILENAME.key()); final ZipEntry zipEntry = new ZipEntry(entryName); zipEntry.setSize(flowFile.getSize()); - out.putNextEntry(zipEntry); + try { + out.putNextEntry(zipEntry); - bin.getSession().exportTo(flowFile, out); - out.closeEntry(); + bin.getSession().exportTo(flowFile, out); + out.closeEntry(); + unmerged.remove(flowFile); + } catch (ZipException e) { + getLogger().error("Encountered exception merging {}", new Object[]{flowFile}, e); + } } out.finish(); @@ -858,7 +867,7 @@ public class MergeContent extends BinFiles { @Override public List getUnmergedFlowFiles() { - return Collections.emptyList(); + return unmerged; } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java index 6590d47888..b6025c5482 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java @@ -487,6 +487,27 @@ public class TestMergeContent { bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/zip"); } + @Test + public void testZipException() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); + runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec"); + runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_ZIP); + + final Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.MIME_TYPE.key(), "application/plain-text"); + attributes.put("filename", "duplicate-filename.txt"); + + runner.enqueue("Hello".getBytes("UTF-8"), attributes); + runner.enqueue(", ".getBytes("UTF-8"), attributes); + runner.enqueue("World!".getBytes("UTF-8"), attributes); + runner.run(); + + runner.assertQueueEmpty(); + runner.assertTransferCount(MergeContent.REL_MERGED, 1); + runner.assertTransferCount(MergeContent.REL_FAILURE, 2); + runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3); + } + @Test public void testTar() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new MergeContent());