NIFI-3419: Routing flow files causing ZipException to failure in MergeContent

This closes #1454.
This commit is contained in:
Joe Gresock 2017-01-30 12:19:32 +00:00 committed by Pierre Villard
parent 6466931c23
commit 229b20f395
2 changed files with 34 additions and 4 deletions

View File

@ -40,6 +40,7 @@ import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.zip.ZipEntry; import java.util.zip.ZipEntry;
import java.util.zip.ZipException;
import java.util.zip.ZipOutputStream; import java.util.zip.ZipOutputStream;
import org.apache.avro.Schema; import org.apache.avro.Schema;
@ -810,6 +811,8 @@ public class MergeContent extends BinFiles {
private final int compressionLevel; private final int compressionLevel;
private List<FlowFile> unmerged = new ArrayList<>();
public ZipMerge(final int compressionLevel) { public ZipMerge(final int compressionLevel) {
this.compressionLevel = compressionLevel; this.compressionLevel = compressionLevel;
} }
@ -820,6 +823,7 @@ public class MergeContent extends BinFiles {
final ProcessSession session = bin.getSession(); final ProcessSession session = bin.getSession();
final List<FlowFile> contents = bin.getContents(); final List<FlowFile> contents = bin.getContents();
unmerged.addAll(contents);
FlowFile bundle = session.create(contents); FlowFile bundle = session.create(contents);
@ -835,10 +839,15 @@ public class MergeContent extends BinFiles {
final String entryName = path + flowFile.getAttribute(CoreAttributes.FILENAME.key()); final String entryName = path + flowFile.getAttribute(CoreAttributes.FILENAME.key());
final ZipEntry zipEntry = new ZipEntry(entryName); final ZipEntry zipEntry = new ZipEntry(entryName);
zipEntry.setSize(flowFile.getSize()); zipEntry.setSize(flowFile.getSize());
try {
out.putNextEntry(zipEntry); out.putNextEntry(zipEntry);
bin.getSession().exportTo(flowFile, out); bin.getSession().exportTo(flowFile, out);
out.closeEntry(); out.closeEntry();
unmerged.remove(flowFile);
} catch (ZipException e) {
getLogger().error("Encountered exception merging {}", new Object[]{flowFile}, e);
}
} }
out.finish(); out.finish();
@ -858,7 +867,7 @@ public class MergeContent extends BinFiles {
@Override @Override
public List<FlowFile> getUnmergedFlowFiles() { public List<FlowFile> getUnmergedFlowFiles() {
return Collections.emptyList(); return unmerged;
} }
} }

View File

@ -487,6 +487,27 @@ public class TestMergeContent {
bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/zip"); bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/zip");
} }
@Test
public void testZipException() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_ZIP);
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
attributes.put("filename", "duplicate-filename.txt");
runner.enqueue("Hello".getBytes("UTF-8"), attributes);
runner.enqueue(", ".getBytes("UTF-8"), attributes);
runner.enqueue("World!".getBytes("UTF-8"), attributes);
runner.run();
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 2);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
}
@Test @Test
public void testTar() throws IOException { public void testTar() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); final TestRunner runner = TestRunners.newTestRunner(new MergeContent());