NIFI-2611: Fixing bugs in UnpackContent with flow file unpackers

Signed-off-by: Mike Moser <mosermw@apache.org>

This closes #905
This commit is contained in:
Joe Gresock 2016-08-21 16:35:39 +00:00 committed by Mike Moser
parent 087622eadc
commit 17dec04939
2 changed files with 57 additions and 51 deletions

View File

@ -102,6 +102,13 @@ public class UnpackContent extends AbstractProcessor {
public static final String FRAGMENT_COUNT = "fragment.count"; public static final String FRAGMENT_COUNT = "fragment.count";
public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename"; public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
public static final String AUTO_DETECT_FORMAT_NAME = "use mime.type attribute";
public static final String TAR_FORMAT_NAME = "tar";
public static final String ZIP_FORMAT_NAME = "zip";
public static final String FLOWFILE_STREAM_FORMAT_V3_NAME = "flowfile-stream-v3";
public static final String FLOWFILE_STREAM_FORMAT_V2_NAME = "flowfile-stream-v2";
public static final String FLOWFILE_TAR_FORMAT_NAME = "flowfile-tar-v1";
public static final String OCTET_STREAM = "application/octet-stream"; public static final String OCTET_STREAM = "application/octet-stream";
public static final PropertyDescriptor PACKAGING_FORMAT = new PropertyDescriptor.Builder() public static final PropertyDescriptor PACKAGING_FORMAT = new PropertyDescriptor.Builder()
@ -144,9 +151,6 @@ public class UnpackContent extends AbstractProcessor {
private Unpacker tarUnpacker; private Unpacker tarUnpacker;
private Unpacker zipUnpacker; private Unpacker zipUnpacker;
private Unpacker flowFileStreamV3Unpacker;
private Unpacker flowFileStreamV2Unpacker;
private Unpacker flowFileTarUnpacker;
@Override @Override
protected void init(final ProcessorInitializationContext context) { protected void init(final ProcessorInitializationContext context) {
@ -184,14 +188,6 @@ public class UnpackContent extends AbstractProcessor {
fileFilter = Pattern.compile(context.getProperty(FILE_FILTER).getValue()); fileFilter = Pattern.compile(context.getProperty(FILE_FILTER).getValue());
tarUnpacker = new TarUnpacker(fileFilter); tarUnpacker = new TarUnpacker(fileFilter);
zipUnpacker = new ZipUnpacker(fileFilter); zipUnpacker = new ZipUnpacker(fileFilter);
flowFileStreamV3Unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV3());
flowFileStreamV2Unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV2());
flowFileTarUnpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV1());
}
PackageFormat format = PackageFormat.getFormat(context.getProperty(PACKAGING_FORMAT).getValue());
if (format != PackageFormat.AUTO_DETECT_FORMAT && unpacker == null) {
initUnpacker(format);
} }
} }
@ -207,15 +203,15 @@ public class UnpackContent extends AbstractProcessor {
addFragmentAttrs = true; addFragmentAttrs = true;
break; break;
case FLOWFILE_STREAM_FORMAT_V2: case FLOWFILE_STREAM_FORMAT_V2:
unpacker = flowFileStreamV2Unpacker; unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV2());
addFragmentAttrs = false; addFragmentAttrs = false;
break; break;
case FLOWFILE_STREAM_FORMAT_V3: case FLOWFILE_STREAM_FORMAT_V3:
unpacker = flowFileStreamV3Unpacker; unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV3());
addFragmentAttrs = false; addFragmentAttrs = false;
break; break;
case FLOWFILE_TAR_FORMAT: case FLOWFILE_TAR_FORMAT:
unpacker = flowFileTarUnpacker; unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV1());
addFragmentAttrs = false; addFragmentAttrs = false;
break; break;
case AUTO_DETECT_FORMAT: case AUTO_DETECT_FORMAT:
@ -254,6 +250,8 @@ public class UnpackContent extends AbstractProcessor {
} else { } else {
initUnpacker(packagingFormat); initUnpacker(packagingFormat);
} }
} else {
initUnpacker(packagingFormat);
} }
final List<FlowFile> unpacked = new ArrayList<>(); final List<FlowFile> unpacked = new ArrayList<>();
@ -495,13 +493,13 @@ public class UnpackContent extends AbstractProcessor {
} }
protected enum PackageFormat { protected enum PackageFormat {
AUTO_DETECT_FORMAT("use mime.type attribute"), AUTO_DETECT_FORMAT(AUTO_DETECT_FORMAT_NAME),
TAR_FORMAT("tar", "application/tar"), TAR_FORMAT(TAR_FORMAT_NAME, "application/tar"),
X_TAR_FORMAT("tar", "application/x-tar"), X_TAR_FORMAT(TAR_FORMAT_NAME, "application/x-tar"),
ZIP_FORMAT("zip", "application/zip"), ZIP_FORMAT(ZIP_FORMAT_NAME, "application/zip"),
FLOWFILE_STREAM_FORMAT_V3("flowfile-stream-v3", "application/flowfile-v3"), FLOWFILE_STREAM_FORMAT_V3(FLOWFILE_STREAM_FORMAT_V3_NAME, "application/flowfile-v3"),
FLOWFILE_STREAM_FORMAT_V2("flowfile-stream-v2", "application/flowfile-v2"), FLOWFILE_STREAM_FORMAT_V2(FLOWFILE_STREAM_FORMAT_V2_NAME, "application/flowfile-v2"),
FLOWFILE_TAR_FORMAT("flowfile-tar-v1", "application/flowfile-v1"); FLOWFILE_TAR_FORMAT(FLOWFILE_TAR_FORMAT_NAME, "application/flowfile-v1");
private final String textValue; private final String textValue;
@ -526,17 +524,17 @@ public class UnpackContent extends AbstractProcessor {
public static PackageFormat getFormat(String textValue) { public static PackageFormat getFormat(String textValue) {
switch (textValue) { switch (textValue) {
case "use mime.type attribute": case AUTO_DETECT_FORMAT_NAME:
return AUTO_DETECT_FORMAT; return AUTO_DETECT_FORMAT;
case "tar": case TAR_FORMAT_NAME:
return TAR_FORMAT; return TAR_FORMAT;
case "zip": case ZIP_FORMAT_NAME:
return ZIP_FORMAT; return ZIP_FORMAT;
case "flowfile-stream-v3": case FLOWFILE_STREAM_FORMAT_V3_NAME:
return FLOWFILE_STREAM_FORMAT_V3; return FLOWFILE_STREAM_FORMAT_V3;
case "flowfile-stream-v2": case FLOWFILE_STREAM_FORMAT_V2_NAME:
return FLOWFILE_STREAM_FORMAT_V2; return FLOWFILE_STREAM_FORMAT_V2;
case "flowfile-stream-v1": case FLOWFILE_TAR_FORMAT_NAME:
return FLOWFILE_TAR_FORMAT; return FLOWFILE_TAR_FORMAT;
} }
return null; return null;

View File

@ -45,17 +45,18 @@ public class TestUnpackContent {
unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.TAR_FORMAT.toString()); unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.TAR_FORMAT.toString());
autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString()); autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString());
unpackRunner.enqueue(dataPath.resolve("data.tar")); unpackRunner.enqueue(dataPath.resolve("data.tar"));
unpackRunner.enqueue(dataPath.resolve("data.tar"));
Map<String, String> attributes = new HashMap<>(1); Map<String, String> attributes = new HashMap<>(1);
Map<String, String> attributes2 = new HashMap<>(1); Map<String, String> attributes2 = new HashMap<>(1);
attributes.put("mime.type", UnpackContent.PackageFormat.TAR_FORMAT.getMimeType()); attributes.put("mime.type", UnpackContent.PackageFormat.TAR_FORMAT.getMimeType());
attributes2.put("mime.type", UnpackContent.PackageFormat.X_TAR_FORMAT.getMimeType()); attributes2.put("mime.type", UnpackContent.PackageFormat.X_TAR_FORMAT.getMimeType());
autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes); autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes);
autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes2); autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes2);
unpackRunner.run(); unpackRunner.run(2);
autoUnpackRunner.run(2); autoUnpackRunner.run(2);
unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2); unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 4);
unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1); unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0); unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 4); autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 4);
@ -82,17 +83,18 @@ public class TestUnpackContent {
autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString()); autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString());
autoUnpackRunner.setProperty(UnpackContent.FILE_FILTER, "^folder/cal.txt$"); autoUnpackRunner.setProperty(UnpackContent.FILE_FILTER, "^folder/cal.txt$");
unpackRunner.enqueue(dataPath.resolve("data.tar")); unpackRunner.enqueue(dataPath.resolve("data.tar"));
unpackRunner.enqueue(dataPath.resolve("data.tar"));
Map<String, String> attributes = new HashMap<>(1); Map<String, String> attributes = new HashMap<>(1);
Map<String, String> attributes2 = new HashMap<>(1); Map<String, String> attributes2 = new HashMap<>(1);
attributes.put("mime.type", "application/x-tar"); attributes.put("mime.type", "application/x-tar");
attributes2.put("mime.type", "application/tar"); attributes2.put("mime.type", "application/tar");
autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes); autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes);
autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes2); autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes2);
unpackRunner.run(); unpackRunner.run(2);
autoUnpackRunner.run(2); autoUnpackRunner.run(2);
unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 1); unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1); unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0); unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2); autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
@ -126,18 +128,20 @@ public class TestUnpackContent {
unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.ZIP_FORMAT.toString()); unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.ZIP_FORMAT.toString());
autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString()); autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString());
unpackRunner.enqueue(dataPath.resolve("data.zip")); unpackRunner.enqueue(dataPath.resolve("data.zip"));
unpackRunner.enqueue(dataPath.resolve("data.zip"));
Map<String, String> attributes = new HashMap<>(1); Map<String, String> attributes = new HashMap<>(1);
attributes.put("mime.type", "application/zip"); attributes.put("mime.type", "application/zip");
autoUnpackRunner.enqueue(dataPath.resolve("data.zip"), attributes); autoUnpackRunner.enqueue(dataPath.resolve("data.zip"), attributes);
unpackRunner.run(); autoUnpackRunner.enqueue(dataPath.resolve("data.zip"), attributes);
autoUnpackRunner.run(); unpackRunner.run(2);
autoUnpackRunner.run(2);
unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2); unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 4);
unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1); unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0); unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2); autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 4);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1); autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0); autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
final List<MockFlowFile> unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS); final List<MockFlowFile> unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
@ -160,18 +164,20 @@ public class TestUnpackContent {
autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString()); autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString());
autoUnpackRunner.setProperty(UnpackContent.FILE_FILTER, "^folder/cal.txt$"); autoUnpackRunner.setProperty(UnpackContent.FILE_FILTER, "^folder/cal.txt$");
unpackRunner.enqueue(dataPath.resolve("data.zip")); unpackRunner.enqueue(dataPath.resolve("data.zip"));
unpackRunner.enqueue(dataPath.resolve("data.zip"));
Map<String, String> attributes = new HashMap<>(1); Map<String, String> attributes = new HashMap<>(1);
attributes.put("mime.type", "application/zip"); attributes.put("mime.type", "application/zip");
autoUnpackRunner.enqueue(dataPath.resolve("data.zip"), attributes); autoUnpackRunner.enqueue(dataPath.resolve("data.zip"), attributes);
unpackRunner.run(); autoUnpackRunner.enqueue(dataPath.resolve("data.zip"), attributes);
autoUnpackRunner.run(); unpackRunner.run(2);
autoUnpackRunner.run(2);
unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 1); unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1); unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0); unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 1); autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1); autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0); autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
List<MockFlowFile> unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS); List<MockFlowFile> unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
@ -199,11 +205,12 @@ public class TestUnpackContent {
final TestRunner runner = TestRunners.newTestRunner(new UnpackContent()); final TestRunner runner = TestRunners.newTestRunner(new UnpackContent());
runner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.FLOWFILE_STREAM_FORMAT_V3.toString()); runner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.FLOWFILE_STREAM_FORMAT_V3.toString());
runner.enqueue(dataPath.resolve("data.flowfilev3")); runner.enqueue(dataPath.resolve("data.flowfilev3"));
runner.enqueue(dataPath.resolve("data.flowfilev3"));
runner.run(); runner.run(2);
runner.assertTransferCount(UnpackContent.REL_SUCCESS, 2); runner.assertTransferCount(UnpackContent.REL_SUCCESS, 4);
runner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1); runner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
runner.assertTransferCount(UnpackContent.REL_FAILURE, 0); runner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
final List<MockFlowFile> unpacked = runner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS); final List<MockFlowFile> unpacked = runner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
@ -222,11 +229,12 @@ public class TestUnpackContent {
final TestRunner runner = TestRunners.newTestRunner(new UnpackContent()); final TestRunner runner = TestRunners.newTestRunner(new UnpackContent());
runner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.FLOWFILE_STREAM_FORMAT_V2.toString()); runner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.FLOWFILE_STREAM_FORMAT_V2.toString());
runner.enqueue(dataPath.resolve("data.flowfilev2")); runner.enqueue(dataPath.resolve("data.flowfilev2"));
runner.enqueue(dataPath.resolve("data.flowfilev2"));
runner.run(); runner.run(2);
runner.assertTransferCount(UnpackContent.REL_SUCCESS, 2); runner.assertTransferCount(UnpackContent.REL_SUCCESS, 4);
runner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1); runner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
runner.assertTransferCount(UnpackContent.REL_FAILURE, 0); runner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
final List<MockFlowFile> unpacked = runner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS); final List<MockFlowFile> unpacked = runner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);