diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java index decbd547ba..933b027b9e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.regex.Pattern; import org.apache.commons.compress.archivers.ArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; @@ -45,6 +46,8 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -57,6 +60,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.stream.io.BufferedInputStream; import org.apache.nifi.stream.io.BufferedOutputStream; import org.apache.nifi.stream.io.StreamUtils; @@ -78,7 +82,7 @@ import org.apache.nifi.util.ObjectHolder; + "the attribute is set to application/zip, the ZIP Packaging Format will be used. If the attribute is set to application/flowfile-v3 or " + "application/flowfile-v2 or application/flowfile-v1, the appropriate FlowFile Packaging Format will be used. If this attribute is missing, " + "the FlowFile will be routed to 'failure'. Otherwise, if the attribute's value is not one of those mentioned above, the FlowFile will be " - + "routed to 'success' without being unpacked") + + "routed to 'success' without being unpacked. Use the File Filter property only extract files matching a specific regular expression.") @WritesAttributes({ @WritesAttribute(attribute = "mime.type", description = "If the FlowFile is successfully unpacked, its MIME Type is no longer known, so the mime.type " + "attribute is set to application/octet-stream."), @@ -91,14 +95,6 @@ import org.apache.nifi.util.ObjectHolder; + "the MergeContent processor automatically adds those extensions if it is used to rebuild the original FlowFile")}) @SeeAlso(MergeContent.class) public class UnpackContent extends AbstractProcessor { - - public static final String AUTO_DETECT_FORMAT = "use mime.type attribute"; - public static final String TAR_FORMAT = "tar"; - public static final String ZIP_FORMAT = "zip"; - public static final String FLOWFILE_STREAM_FORMAT_V3 = "flowfile-stream-v3"; - public static final String FLOWFILE_STREAM_FORMAT_V2 = "flowfile-stream-v2"; - public static final String FLOWFILE_TAR_FORMAT = "flowfile-tar-v1"; - // attribute keys public static final String FRAGMENT_ID = "fragment.identifier"; public static final String FRAGMENT_INDEX = "fragment.index"; @@ -111,8 +107,18 @@ public class UnpackContent extends AbstractProcessor { .name("Packaging Format") .description("The Packaging Format used to create the file") .required(true) - .allowableValues(AUTO_DETECT_FORMAT, TAR_FORMAT, ZIP_FORMAT, FLOWFILE_STREAM_FORMAT_V3, FLOWFILE_STREAM_FORMAT_V2, FLOWFILE_TAR_FORMAT) - .defaultValue(AUTO_DETECT_FORMAT) + .allowableValues(PackageFormat.AUTO_DETECT_FORMAT.toString(), PackageFormat.TAR_FORMAT.toString(), + PackageFormat.ZIP_FORMAT.toString(), PackageFormat.FLOWFILE_STREAM_FORMAT_V3.toString(), + PackageFormat.FLOWFILE_STREAM_FORMAT_V2.toString(), PackageFormat.FLOWFILE_TAR_FORMAT.toString()) + .defaultValue(PackageFormat.AUTO_DETECT_FORMAT.toString()) + .build(); + + public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder() + .name("File Filter") + .description("Only files whose names match the given regular expression will be extracted (tar/zip only)") + .required(true) + .defaultValue(".*") + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() @@ -131,6 +137,16 @@ public class UnpackContent extends AbstractProcessor { private Set relationships; private List properties; + private Unpacker unpacker; + private boolean addFragmentAttrs; + private Pattern fileFilter; + + private Unpacker tarUnpacker; + private Unpacker zipUnpacker; + private Unpacker flowFileStreamV3Unpacker; + private Unpacker flowFileStreamV2Unpacker; + private Unpacker flowFileTarUnpacker; + @Override protected void init(final ProcessorInitializationContext context) { final Set relationships = new HashSet<>(); @@ -141,6 +157,7 @@ public class UnpackContent extends AbstractProcessor { final List properties = new ArrayList<>(); properties.add(PACKAGING_FORMAT); + properties.add(FILE_FILTER); this.properties = Collections.unmodifiableList(properties); } @@ -154,6 +171,58 @@ public class UnpackContent extends AbstractProcessor { return properties; } + @OnStopped + public void onStopped() { + unpacker = null; + fileFilter = null; + } + + @OnScheduled + public void onScheduled(ProcessContext context) throws ProcessException { + if (fileFilter == null) { + fileFilter = Pattern.compile(context.getProperty(FILE_FILTER).getValue()); + tarUnpacker = new TarUnpacker(fileFilter); + zipUnpacker = new ZipUnpacker(fileFilter); + flowFileStreamV3Unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV3()); + flowFileStreamV2Unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV2()); + flowFileTarUnpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV1()); + } + + PackageFormat format = PackageFormat.getFormat(context.getProperty(PACKAGING_FORMAT).getValue()); + if (format != PackageFormat.AUTO_DETECT_FORMAT && unpacker == null) { + initUnpacker(format); + } + } + + public void initUnpacker(PackageFormat packagingFormat) { + switch (packagingFormat) { + case TAR_FORMAT: + case X_TAR_FORMAT: + unpacker = tarUnpacker; + addFragmentAttrs = true; + break; + case ZIP_FORMAT: + unpacker = zipUnpacker; + addFragmentAttrs = true; + break; + case FLOWFILE_STREAM_FORMAT_V2: + unpacker = flowFileStreamV2Unpacker; + addFragmentAttrs = false; + break; + case FLOWFILE_STREAM_FORMAT_V3: + unpacker = flowFileStreamV3Unpacker; + addFragmentAttrs = false; + break; + case FLOWFILE_TAR_FORMAT: + unpacker = flowFileTarUnpacker; + addFragmentAttrs = false; + break; + case AUTO_DETECT_FORMAT: + // The format of the unpacker should be known before initialization + throw new ProcessException(packagingFormat + " is not a valid packaging format"); + } + } + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { FlowFile flowFile = session.get(); @@ -162,8 +231,9 @@ public class UnpackContent extends AbstractProcessor { } final ComponentLog logger = getLogger(); - String packagingFormat = context.getProperty(PACKAGING_FORMAT).getValue().toLowerCase(); - if (AUTO_DETECT_FORMAT.equals(packagingFormat)) { + PackageFormat packagingFormat = PackageFormat.getFormat(context.getProperty(PACKAGING_FORMAT).getValue().toLowerCase()); + if (packagingFormat == PackageFormat.AUTO_DETECT_FORMAT) { + packagingFormat = null; final String mimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()); if (mimeType == null) { logger.error("No mime.type attribute set for {}; routing to failure", new Object[]{flowFile}); @@ -171,58 +241,18 @@ public class UnpackContent extends AbstractProcessor { return; } - switch (mimeType.toLowerCase()) { - case "application/tar": - packagingFormat = TAR_FORMAT; - break; - case "application/x-tar": - packagingFormat = TAR_FORMAT; - break; - case "application/zip": - packagingFormat = ZIP_FORMAT; - break; - case "application/flowfile-v3": - packagingFormat = FLOWFILE_STREAM_FORMAT_V3; - break; - case "application/flowfile-v2": - packagingFormat = FLOWFILE_STREAM_FORMAT_V2; - break; - case "application/flowfile-v1": - packagingFormat = FLOWFILE_TAR_FORMAT; - break; - default: { - logger.info("Cannot unpack {} because its mime.type attribute is set to '{}', which is not a format that can be unpacked; routing to 'success'", new Object[]{flowFile, mimeType}); - session.transfer(flowFile, REL_SUCCESS); - return; + for (PackageFormat format: PackageFormat.values()) { + if (mimeType.toLowerCase().equals(format.getMimeType())) { + packagingFormat = format; } } - } - - final Unpacker unpacker; - final boolean addFragmentAttrs; - switch (packagingFormat) { - case TAR_FORMAT: - unpacker = new TarUnpacker(); - addFragmentAttrs = true; - break; - case ZIP_FORMAT: - unpacker = new ZipUnpacker(); - addFragmentAttrs = true; - break; - case FLOWFILE_STREAM_FORMAT_V2: - unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV2()); - addFragmentAttrs = false; - break; - case FLOWFILE_STREAM_FORMAT_V3: - unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV3()); - addFragmentAttrs = false; - break; - case FLOWFILE_TAR_FORMAT: - unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV1()); - addFragmentAttrs = false; - break; - default: - throw new AssertionError("Packaging Format was " + context.getProperty(PACKAGING_FORMAT).getValue()); + if (packagingFormat == null) { + logger.info("Cannot unpack {} because its mime.type attribute is set to '{}', which is not a format that can be unpacked; routing to 'success'", new Object[]{flowFile, mimeType}); + session.transfer(flowFile, REL_SUCCESS); + return; + } else { + initUnpacker(packagingFormat); + } } final List unpacked = new ArrayList<>(); @@ -248,12 +278,26 @@ public class UnpackContent extends AbstractProcessor { } } - private static interface Unpacker { + private static abstract class Unpacker { + private Pattern fileFilter = null; - void unpack(ProcessSession session, FlowFile source, List unpacked); + public Unpacker() {}; + + public Unpacker(Pattern fileFilter) { + this.fileFilter = fileFilter; + } + + abstract void unpack(ProcessSession session, FlowFile source, List unpacked); + + protected boolean fileMatches(ArchiveEntry entry) { + return fileFilter == null || fileFilter.matcher(entry.getName()).find(); + } } - private static class TarUnpacker implements Unpacker { + private static class TarUnpacker extends Unpacker { + public TarUnpacker(Pattern fileFilter) { + super(fileFilter); + } @Override public void unpack(final ProcessSession session, final FlowFile source, final List unpacked) { @@ -265,7 +309,7 @@ public class UnpackContent extends AbstractProcessor { try (final TarArchiveInputStream tarIn = new TarArchiveInputStream(new BufferedInputStream(in))) { TarArchiveEntry tarEntry; while ((tarEntry = tarIn.getNextTarEntry()) != null) { - if (tarEntry.isDirectory()) { + if (tarEntry.isDirectory() || !fileMatches(tarEntry)) { continue; } final File file = new File(tarEntry.getName()); @@ -304,7 +348,10 @@ public class UnpackContent extends AbstractProcessor { } } - private static class ZipUnpacker implements Unpacker { + private static class ZipUnpacker extends Unpacker { + public ZipUnpacker(Pattern fileFilter) { + super(fileFilter); + } @Override public void unpack(final ProcessSession session, final FlowFile source, final List unpacked) { @@ -316,7 +363,7 @@ public class UnpackContent extends AbstractProcessor { try (final ZipArchiveInputStream zipIn = new ZipArchiveInputStream(new BufferedInputStream(in))) { ArchiveEntry zipEntry; while ((zipEntry = zipIn.getNextEntry()) != null) { - if (zipEntry.isDirectory()) { + if (zipEntry.isDirectory() || !fileMatches(zipEntry)) { continue; } final File file = new File(zipEntry.getName()); @@ -352,7 +399,7 @@ public class UnpackContent extends AbstractProcessor { } } - private static class FlowFileStreamUnpacker implements Unpacker { + private static class FlowFileStreamUnpacker extends Unpacker { private final FlowFileUnpackager unpackager; @@ -445,4 +492,53 @@ public class UnpackContent extends AbstractProcessor { unpacked.add(newFF); } } + + protected enum PackageFormat { + AUTO_DETECT_FORMAT("use mime.type attribute"), + TAR_FORMAT("tar", "application/tar"), + X_TAR_FORMAT("tar", "application/x-tar"), + ZIP_FORMAT("zip", "application/zip"), + FLOWFILE_STREAM_FORMAT_V3("flowfile-stream-v3", "application/flowfile-v3"), + FLOWFILE_STREAM_FORMAT_V2("flowfile-stream-v2", "application/flowfile-v2"), + FLOWFILE_TAR_FORMAT("flowfile-tar-v1", "application/flowfile-v1"); + + + private final String textValue; + private String mimeType; + + PackageFormat(String textValue, String mimeType) { + this.textValue = textValue; + this.mimeType = mimeType; + } + + PackageFormat(String textValue) { + this.textValue = textValue; + } + + @Override public String toString() { + return textValue; + } + + public String getMimeType() { + return mimeType; + } + + public static PackageFormat getFormat(String textValue) { + switch (textValue) { + case "use mime.type attribute": + return AUTO_DETECT_FORMAT; + case "tar": + return TAR_FORMAT; + case "zip": + return ZIP_FORMAT; + case "flowfile-stream-v3": + return FLOWFILE_STREAM_FORMAT_V3; + case "flowfile-stream-v2": + return FLOWFILE_STREAM_FORMAT_V2; + case "flowfile-stream-v1": + return FLOWFILE_TAR_FORMAT; + } + return null; + } + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java index 0b0cc4367c..acebdeaf06 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java @@ -42,13 +42,13 @@ public class TestUnpackContent { public void testTar() throws IOException { final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent()); final TestRunner autoUnpackRunner = TestRunners.newTestRunner(new UnpackContent()); - unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.TAR_FORMAT); - autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.AUTO_DETECT_FORMAT); + unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.TAR_FORMAT.toString()); + autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString()); unpackRunner.enqueue(dataPath.resolve("data.tar")); Map attributes = new HashMap<>(1); Map attributes2 = new HashMap<>(1); - attributes.put("mime.type", "application/x-tar"); - attributes2.put("mime.type", "application/tar"); + attributes.put("mime.type", UnpackContent.PackageFormat.TAR_FORMAT.getMimeType()); + attributes2.put("mime.type", UnpackContent.PackageFormat.X_TAR_FORMAT.getMimeType()); autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes); autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes2); unpackRunner.run(); @@ -73,12 +73,58 @@ public class TestUnpackContent { } } + @Test + public void testTarWithFilter() throws IOException { + final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent()); + final TestRunner autoUnpackRunner = TestRunners.newTestRunner(new UnpackContent()); + unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.TAR_FORMAT.toString()); + unpackRunner.setProperty(UnpackContent.FILE_FILTER, "^folder/date.txt$"); + autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString()); + autoUnpackRunner.setProperty(UnpackContent.FILE_FILTER, "^folder/cal.txt$"); + unpackRunner.enqueue(dataPath.resolve("data.tar")); + Map attributes = new HashMap<>(1); + Map attributes2 = new HashMap<>(1); + attributes.put("mime.type", "application/x-tar"); + attributes2.put("mime.type", "application/tar"); + autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes); + autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes2); + unpackRunner.run(); + autoUnpackRunner.run(2); + + unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 1); + unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1); + unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0); + + autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2); + autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2); + autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0); + + List unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS); + for (final MockFlowFile flowFile : unpacked) { + final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key()); + final String folder = flowFile.getAttribute(CoreAttributes.PATH.key()); + final Path path = dataPath.resolve(folder).resolve(filename); + assertTrue(Files.exists(path)); + assertEquals("date.txt", filename); + flowFile.assertContentEquals(path.toFile()); + } + unpacked = autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS); + for (final MockFlowFile flowFile : unpacked) { + final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key()); + final String folder = flowFile.getAttribute(CoreAttributes.PATH.key()); + final Path path = dataPath.resolve(folder).resolve(filename); + assertTrue(Files.exists(path)); + assertEquals("cal.txt", filename); + flowFile.assertContentEquals(path.toFile()); + } + } + @Test public void testZip() throws IOException { final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent()); final TestRunner autoUnpackRunner = TestRunners.newTestRunner(new UnpackContent()); - unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.ZIP_FORMAT); - autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.AUTO_DETECT_FORMAT); + unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.ZIP_FORMAT.toString()); + autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString()); unpackRunner.enqueue(dataPath.resolve("data.zip")); Map attributes = new HashMap<>(1); attributes.put("mime.type", "application/zip"); @@ -105,10 +151,53 @@ public class TestUnpackContent { } } + @Test + public void testZipWithFilter() throws IOException { + final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent()); + final TestRunner autoUnpackRunner = TestRunners.newTestRunner(new UnpackContent()); + unpackRunner.setProperty(UnpackContent.FILE_FILTER, "^folder/date.txt$"); + unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.ZIP_FORMAT.toString()); + autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString()); + autoUnpackRunner.setProperty(UnpackContent.FILE_FILTER, "^folder/cal.txt$"); + unpackRunner.enqueue(dataPath.resolve("data.zip")); + Map attributes = new HashMap<>(1); + attributes.put("mime.type", "application/zip"); + autoUnpackRunner.enqueue(dataPath.resolve("data.zip"), attributes); + unpackRunner.run(); + autoUnpackRunner.run(); + + unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 1); + unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1); + unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0); + + autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 1); + autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1); + autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0); + + List unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS); + for (final MockFlowFile flowFile : unpacked) { + final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key()); + final String folder = flowFile.getAttribute(CoreAttributes.PATH.key()); + final Path path = dataPath.resolve(folder).resolve(filename); + assertTrue(Files.exists(path)); + assertEquals("date.txt", filename); + flowFile.assertContentEquals(path.toFile()); + } + unpacked = autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS); + for (final MockFlowFile flowFile : unpacked) { + final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key()); + final String folder = flowFile.getAttribute(CoreAttributes.PATH.key()); + final Path path = dataPath.resolve(folder).resolve(filename); + assertTrue(Files.exists(path)); + assertEquals("cal.txt", filename); + flowFile.assertContentEquals(path.toFile()); + } + } + @Test public void testFlowFileStreamV3() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new UnpackContent()); - runner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.FLOWFILE_STREAM_FORMAT_V3); + runner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.FLOWFILE_STREAM_FORMAT_V3.toString()); runner.enqueue(dataPath.resolve("data.flowfilev3")); runner.run(); @@ -131,7 +220,7 @@ public class TestUnpackContent { @Test public void testFlowFileStreamV2() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new UnpackContent()); - runner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.FLOWFILE_STREAM_FORMAT_V2); + runner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.FLOWFILE_STREAM_FORMAT_V2.toString()); runner.enqueue(dataPath.resolve("data.flowfilev2")); runner.run(); @@ -154,7 +243,7 @@ public class TestUnpackContent { @Test public void testTarThenMerge() throws IOException { final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent()); - unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.TAR_FORMAT); + unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.TAR_FORMAT.toString()); unpackRunner.enqueue(dataPath.resolve("data.tar")); unpackRunner.run(); @@ -188,7 +277,7 @@ public class TestUnpackContent { @Test public void testZipThenMerge() throws IOException { final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent()); - unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.ZIP_FORMAT); + unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.ZIP_FORMAT.toString()); unpackRunner.enqueue(dataPath.resolve("data.zip")); unpackRunner.run(); @@ -222,7 +311,7 @@ public class TestUnpackContent { @Test public void testZipHandlesBadData() throws IOException { final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent()); - unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.ZIP_FORMAT); + unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.ZIP_FORMAT.toString()); unpackRunner.enqueue(dataPath.resolve("data.tar")); unpackRunner.run(); @@ -235,7 +324,7 @@ public class TestUnpackContent { @Test public void testTarHandlesBadData() throws IOException { final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent()); - unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.TAR_FORMAT); + unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.TAR_FORMAT.toString()); unpackRunner.enqueue(dataPath.resolve("data.zip")); unpackRunner.run();