NIFI-4988 This closes #2725. Changed the exception handling so that an invliad ZIP is handled. When an invalid zip is processed, the exception is an IllegalArgumentException which was not being handled and thus the session was being rollbacked.

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Andrew Psaltis 2018-05-20 15:10:39 +08:00 committed by joewitt
parent 6bf6a92abd
commit 6356d7b9ee
4 changed files with 35 additions and 2 deletions

View File

@ -448,6 +448,7 @@
<exclude>src/test/resources/TestIdentifyMimeType/flowfilev1.tar</exclude> <exclude>src/test/resources/TestIdentifyMimeType/flowfilev1.tar</exclude>
<exclude>src/test/resources/TestUnpackContent/data.tar</exclude> <exclude>src/test/resources/TestUnpackContent/data.tar</exclude>
<exclude>src/test/resources/TestUnpackContent/data.zip</exclude> <exclude>src/test/resources/TestUnpackContent/data.zip</exclude>
<exclude>src/test/resources/TestUnpackContent/invalid_data.zip</exclude>
<exclude>src/test/resources/TestEncryptContent/plain.txt</exclude> <exclude>src/test/resources/TestEncryptContent/plain.txt</exclude>
<exclude>src/test/resources/TestEncryptContent/salted_raw.enc</exclude> <exclude>src/test/resources/TestEncryptContent/salted_raw.enc</exclude>
<exclude>src/test/resources/TestEncryptContent/salted_128_raw.enc</exclude> <exclude>src/test/resources/TestEncryptContent/salted_128_raw.enc</exclude>

View File

@ -20,7 +20,6 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.file.InvalidPathException;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -268,7 +267,7 @@ public class UnpackContent extends AbstractProcessor {
session.transfer(flowFile, REL_ORIGINAL); session.transfer(flowFile, REL_ORIGINAL);
session.getProvenanceReporter().fork(flowFile, unpacked); session.getProvenanceReporter().fork(flowFile, unpacked);
logger.info("Unpacked {} into {} and transferred to success", new Object[]{flowFile, unpacked}); logger.info("Unpacked {} into {} and transferred to success", new Object[]{flowFile, unpacked});
} catch (final ProcessException | InvalidPathException e) { } catch (final Exception e) {
logger.error("Unable to unpack {} due to {}; routing to failure", new Object[]{flowFile, e}); logger.error("Unable to unpack {} due to {}; routing to failure", new Object[]{flowFile, e});
session.transfer(flowFile, REL_FAILURE); session.transfer(flowFile, REL_FAILURE);
session.remove(unpacked); session.remove(unpacked);

View File

@ -168,6 +168,39 @@ public class TestUnpackContent {
flowFile.assertContentEquals(path.toFile()); flowFile.assertContentEquals(path.toFile());
} }
} }
@Test
public void testInvalidZip() throws IOException {
final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent());
final TestRunner autoUnpackRunner = TestRunners.newTestRunner(new UnpackContent());
unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.ZIP_FORMAT.toString());
autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString());
unpackRunner.enqueue(dataPath.resolve("invalid_data.zip"));
unpackRunner.enqueue(dataPath.resolve("invalid_data.zip"));
Map<String, String> attributes = new HashMap<>(1);
attributes.put("mime.type", "application/zip");
autoUnpackRunner.enqueue(dataPath.resolve("invalid_data.zip"), attributes);
autoUnpackRunner.enqueue(dataPath.resolve("invalid_data.zip"), attributes);
unpackRunner.run(2);
autoUnpackRunner.run(2);
unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 2);
unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 0);
unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 0);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 2);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 0);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 0);
final List<MockFlowFile> unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_FAILURE);
for (final MockFlowFile flowFile : unpacked) {
final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
// final String folder = flowFile.getAttribute(CoreAttributes.PATH.key());
final Path path = dataPath.resolve(filename);
assertTrue(Files.exists(path));
flowFile.assertContentEquals(path.toFile());
}
}
@Test @Test
public void testZipWithFilter() throws IOException { public void testZipWithFilter() throws IOException {