mirror of https://github.com/apache/nifi.git
NIFI-1568: Add Filter Capability to UnpackContent
Adds a "File Filter" property to the `UnpackContent` processor to allow users to specify which files are eligible for extraction. By default, all files will be extracted. Signed-off-by: Matt Burgess <mattyb149@apache.org> Refactor how Unpacker is initialized Re-uses unpackers to avoid creating them each time a flowfile is received. Moved the package formats into an enum for clarity. Signed-off-by: Matt Burgess <mattyb149@apache.org> Fix packaging format enum warning The `AUTO_DETECT_FORMAT` enum for PackagingFormat is not valid for initilization. When this value is set, then we use the mime.type to determine which packaging format to use. We never pass `AUTO_DETECT_FORMAT` to the `initUnpacker` method. Signed-off-by: Matt Burgess <mattyb149@apache.org> This closes #248
This commit is contained in:
parent
86bba1b202
commit
f5060a6d63
|
@ -29,6 +29,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.compress.archivers.ArchiveEntry;
|
||||
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
|
||||
|
@ -45,6 +46,8 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
|
|||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
|
@ -57,6 +60,7 @@ import org.apache.nifi.processor.Relationship;
|
|||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.io.InputStreamCallback;
|
||||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.stream.io.BufferedInputStream;
|
||||
import org.apache.nifi.stream.io.BufferedOutputStream;
|
||||
import org.apache.nifi.stream.io.StreamUtils;
|
||||
|
@ -78,7 +82,7 @@ import org.apache.nifi.util.ObjectHolder;
|
|||
+ "the attribute is set to application/zip, the ZIP Packaging Format will be used. If the attribute is set to application/flowfile-v3 or "
|
||||
+ "application/flowfile-v2 or application/flowfile-v1, the appropriate FlowFile Packaging Format will be used. If this attribute is missing, "
|
||||
+ "the FlowFile will be routed to 'failure'. Otherwise, if the attribute's value is not one of those mentioned above, the FlowFile will be "
|
||||
+ "routed to 'success' without being unpacked")
|
||||
+ "routed to 'success' without being unpacked. Use the File Filter property only extract files matching a specific regular expression.")
|
||||
@WritesAttributes({
|
||||
@WritesAttribute(attribute = "mime.type", description = "If the FlowFile is successfully unpacked, its MIME Type is no longer known, so the mime.type "
|
||||
+ "attribute is set to application/octet-stream."),
|
||||
|
@ -91,14 +95,6 @@ import org.apache.nifi.util.ObjectHolder;
|
|||
+ "the MergeContent processor automatically adds those extensions if it is used to rebuild the original FlowFile")})
|
||||
@SeeAlso(MergeContent.class)
|
||||
public class UnpackContent extends AbstractProcessor {
|
||||
|
||||
public static final String AUTO_DETECT_FORMAT = "use mime.type attribute";
|
||||
public static final String TAR_FORMAT = "tar";
|
||||
public static final String ZIP_FORMAT = "zip";
|
||||
public static final String FLOWFILE_STREAM_FORMAT_V3 = "flowfile-stream-v3";
|
||||
public static final String FLOWFILE_STREAM_FORMAT_V2 = "flowfile-stream-v2";
|
||||
public static final String FLOWFILE_TAR_FORMAT = "flowfile-tar-v1";
|
||||
|
||||
// attribute keys
|
||||
public static final String FRAGMENT_ID = "fragment.identifier";
|
||||
public static final String FRAGMENT_INDEX = "fragment.index";
|
||||
|
@ -111,8 +107,18 @@ public class UnpackContent extends AbstractProcessor {
|
|||
.name("Packaging Format")
|
||||
.description("The Packaging Format used to create the file")
|
||||
.required(true)
|
||||
.allowableValues(AUTO_DETECT_FORMAT, TAR_FORMAT, ZIP_FORMAT, FLOWFILE_STREAM_FORMAT_V3, FLOWFILE_STREAM_FORMAT_V2, FLOWFILE_TAR_FORMAT)
|
||||
.defaultValue(AUTO_DETECT_FORMAT)
|
||||
.allowableValues(PackageFormat.AUTO_DETECT_FORMAT.toString(), PackageFormat.TAR_FORMAT.toString(),
|
||||
PackageFormat.ZIP_FORMAT.toString(), PackageFormat.FLOWFILE_STREAM_FORMAT_V3.toString(),
|
||||
PackageFormat.FLOWFILE_STREAM_FORMAT_V2.toString(), PackageFormat.FLOWFILE_TAR_FORMAT.toString())
|
||||
.defaultValue(PackageFormat.AUTO_DETECT_FORMAT.toString())
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
|
||||
.name("File Filter")
|
||||
.description("Only files whose names match the given regular expression will be extracted (tar/zip only)")
|
||||
.required(true)
|
||||
.defaultValue(".*")
|
||||
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||
|
@ -131,6 +137,16 @@ public class UnpackContent extends AbstractProcessor {
|
|||
private Set<Relationship> relationships;
|
||||
private List<PropertyDescriptor> properties;
|
||||
|
||||
private Unpacker unpacker;
|
||||
private boolean addFragmentAttrs;
|
||||
private Pattern fileFilter;
|
||||
|
||||
private Unpacker tarUnpacker;
|
||||
private Unpacker zipUnpacker;
|
||||
private Unpacker flowFileStreamV3Unpacker;
|
||||
private Unpacker flowFileStreamV2Unpacker;
|
||||
private Unpacker flowFileTarUnpacker;
|
||||
|
||||
@Override
|
||||
protected void init(final ProcessorInitializationContext context) {
|
||||
final Set<Relationship> relationships = new HashSet<>();
|
||||
|
@ -141,6 +157,7 @@ public class UnpackContent extends AbstractProcessor {
|
|||
|
||||
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||
properties.add(PACKAGING_FORMAT);
|
||||
properties.add(FILE_FILTER);
|
||||
this.properties = Collections.unmodifiableList(properties);
|
||||
}
|
||||
|
||||
|
@ -154,6 +171,58 @@ public class UnpackContent extends AbstractProcessor {
|
|||
return properties;
|
||||
}
|
||||
|
||||
@OnStopped
|
||||
public void onStopped() {
|
||||
unpacker = null;
|
||||
fileFilter = null;
|
||||
}
|
||||
|
||||
@OnScheduled
|
||||
public void onScheduled(ProcessContext context) throws ProcessException {
|
||||
if (fileFilter == null) {
|
||||
fileFilter = Pattern.compile(context.getProperty(FILE_FILTER).getValue());
|
||||
tarUnpacker = new TarUnpacker(fileFilter);
|
||||
zipUnpacker = new ZipUnpacker(fileFilter);
|
||||
flowFileStreamV3Unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV3());
|
||||
flowFileStreamV2Unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV2());
|
||||
flowFileTarUnpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV1());
|
||||
}
|
||||
|
||||
PackageFormat format = PackageFormat.getFormat(context.getProperty(PACKAGING_FORMAT).getValue());
|
||||
if (format != PackageFormat.AUTO_DETECT_FORMAT && unpacker == null) {
|
||||
initUnpacker(format);
|
||||
}
|
||||
}
|
||||
|
||||
public void initUnpacker(PackageFormat packagingFormat) {
|
||||
switch (packagingFormat) {
|
||||
case TAR_FORMAT:
|
||||
case X_TAR_FORMAT:
|
||||
unpacker = tarUnpacker;
|
||||
addFragmentAttrs = true;
|
||||
break;
|
||||
case ZIP_FORMAT:
|
||||
unpacker = zipUnpacker;
|
||||
addFragmentAttrs = true;
|
||||
break;
|
||||
case FLOWFILE_STREAM_FORMAT_V2:
|
||||
unpacker = flowFileStreamV2Unpacker;
|
||||
addFragmentAttrs = false;
|
||||
break;
|
||||
case FLOWFILE_STREAM_FORMAT_V3:
|
||||
unpacker = flowFileStreamV3Unpacker;
|
||||
addFragmentAttrs = false;
|
||||
break;
|
||||
case FLOWFILE_TAR_FORMAT:
|
||||
unpacker = flowFileTarUnpacker;
|
||||
addFragmentAttrs = false;
|
||||
break;
|
||||
case AUTO_DETECT_FORMAT:
|
||||
// The format of the unpacker should be known before initialization
|
||||
throw new ProcessException(packagingFormat + " is not a valid packaging format");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
FlowFile flowFile = session.get();
|
||||
|
@ -162,8 +231,9 @@ public class UnpackContent extends AbstractProcessor {
|
|||
}
|
||||
|
||||
final ComponentLog logger = getLogger();
|
||||
String packagingFormat = context.getProperty(PACKAGING_FORMAT).getValue().toLowerCase();
|
||||
if (AUTO_DETECT_FORMAT.equals(packagingFormat)) {
|
||||
PackageFormat packagingFormat = PackageFormat.getFormat(context.getProperty(PACKAGING_FORMAT).getValue().toLowerCase());
|
||||
if (packagingFormat == PackageFormat.AUTO_DETECT_FORMAT) {
|
||||
packagingFormat = null;
|
||||
final String mimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
|
||||
if (mimeType == null) {
|
||||
logger.error("No mime.type attribute set for {}; routing to failure", new Object[]{flowFile});
|
||||
|
@ -171,58 +241,18 @@ public class UnpackContent extends AbstractProcessor {
|
|||
return;
|
||||
}
|
||||
|
||||
switch (mimeType.toLowerCase()) {
|
||||
case "application/tar":
|
||||
packagingFormat = TAR_FORMAT;
|
||||
break;
|
||||
case "application/x-tar":
|
||||
packagingFormat = TAR_FORMAT;
|
||||
break;
|
||||
case "application/zip":
|
||||
packagingFormat = ZIP_FORMAT;
|
||||
break;
|
||||
case "application/flowfile-v3":
|
||||
packagingFormat = FLOWFILE_STREAM_FORMAT_V3;
|
||||
break;
|
||||
case "application/flowfile-v2":
|
||||
packagingFormat = FLOWFILE_STREAM_FORMAT_V2;
|
||||
break;
|
||||
case "application/flowfile-v1":
|
||||
packagingFormat = FLOWFILE_TAR_FORMAT;
|
||||
break;
|
||||
default: {
|
||||
logger.info("Cannot unpack {} because its mime.type attribute is set to '{}', which is not a format that can be unpacked; routing to 'success'", new Object[]{flowFile, mimeType});
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
return;
|
||||
for (PackageFormat format: PackageFormat.values()) {
|
||||
if (mimeType.toLowerCase().equals(format.getMimeType())) {
|
||||
packagingFormat = format;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final Unpacker unpacker;
|
||||
final boolean addFragmentAttrs;
|
||||
switch (packagingFormat) {
|
||||
case TAR_FORMAT:
|
||||
unpacker = new TarUnpacker();
|
||||
addFragmentAttrs = true;
|
||||
break;
|
||||
case ZIP_FORMAT:
|
||||
unpacker = new ZipUnpacker();
|
||||
addFragmentAttrs = true;
|
||||
break;
|
||||
case FLOWFILE_STREAM_FORMAT_V2:
|
||||
unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV2());
|
||||
addFragmentAttrs = false;
|
||||
break;
|
||||
case FLOWFILE_STREAM_FORMAT_V3:
|
||||
unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV3());
|
||||
addFragmentAttrs = false;
|
||||
break;
|
||||
case FLOWFILE_TAR_FORMAT:
|
||||
unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV1());
|
||||
addFragmentAttrs = false;
|
||||
break;
|
||||
default:
|
||||
throw new AssertionError("Packaging Format was " + context.getProperty(PACKAGING_FORMAT).getValue());
|
||||
if (packagingFormat == null) {
|
||||
logger.info("Cannot unpack {} because its mime.type attribute is set to '{}', which is not a format that can be unpacked; routing to 'success'", new Object[]{flowFile, mimeType});
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
return;
|
||||
} else {
|
||||
initUnpacker(packagingFormat);
|
||||
}
|
||||
}
|
||||
|
||||
final List<FlowFile> unpacked = new ArrayList<>();
|
||||
|
@ -248,12 +278,26 @@ public class UnpackContent extends AbstractProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
private static interface Unpacker {
|
||||
private static abstract class Unpacker {
|
||||
private Pattern fileFilter = null;
|
||||
|
||||
void unpack(ProcessSession session, FlowFile source, List<FlowFile> unpacked);
|
||||
public Unpacker() {};
|
||||
|
||||
public Unpacker(Pattern fileFilter) {
|
||||
this.fileFilter = fileFilter;
|
||||
}
|
||||
|
||||
abstract void unpack(ProcessSession session, FlowFile source, List<FlowFile> unpacked);
|
||||
|
||||
protected boolean fileMatches(ArchiveEntry entry) {
|
||||
return fileFilter == null || fileFilter.matcher(entry.getName()).find();
|
||||
}
|
||||
}
|
||||
|
||||
private static class TarUnpacker implements Unpacker {
|
||||
private static class TarUnpacker extends Unpacker {
|
||||
public TarUnpacker(Pattern fileFilter) {
|
||||
super(fileFilter);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unpack(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) {
|
||||
|
@ -265,7 +309,7 @@ public class UnpackContent extends AbstractProcessor {
|
|||
try (final TarArchiveInputStream tarIn = new TarArchiveInputStream(new BufferedInputStream(in))) {
|
||||
TarArchiveEntry tarEntry;
|
||||
while ((tarEntry = tarIn.getNextTarEntry()) != null) {
|
||||
if (tarEntry.isDirectory()) {
|
||||
if (tarEntry.isDirectory() || !fileMatches(tarEntry)) {
|
||||
continue;
|
||||
}
|
||||
final File file = new File(tarEntry.getName());
|
||||
|
@ -304,7 +348,10 @@ public class UnpackContent extends AbstractProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
private static class ZipUnpacker implements Unpacker {
|
||||
private static class ZipUnpacker extends Unpacker {
|
||||
public ZipUnpacker(Pattern fileFilter) {
|
||||
super(fileFilter);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unpack(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) {
|
||||
|
@ -316,7 +363,7 @@ public class UnpackContent extends AbstractProcessor {
|
|||
try (final ZipArchiveInputStream zipIn = new ZipArchiveInputStream(new BufferedInputStream(in))) {
|
||||
ArchiveEntry zipEntry;
|
||||
while ((zipEntry = zipIn.getNextEntry()) != null) {
|
||||
if (zipEntry.isDirectory()) {
|
||||
if (zipEntry.isDirectory() || !fileMatches(zipEntry)) {
|
||||
continue;
|
||||
}
|
||||
final File file = new File(zipEntry.getName());
|
||||
|
@ -352,7 +399,7 @@ public class UnpackContent extends AbstractProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
private static class FlowFileStreamUnpacker implements Unpacker {
|
||||
private static class FlowFileStreamUnpacker extends Unpacker {
|
||||
|
||||
private final FlowFileUnpackager unpackager;
|
||||
|
||||
|
@ -445,4 +492,53 @@ public class UnpackContent extends AbstractProcessor {
|
|||
unpacked.add(newFF);
|
||||
}
|
||||
}
|
||||
|
||||
protected enum PackageFormat {
|
||||
AUTO_DETECT_FORMAT("use mime.type attribute"),
|
||||
TAR_FORMAT("tar", "application/tar"),
|
||||
X_TAR_FORMAT("tar", "application/x-tar"),
|
||||
ZIP_FORMAT("zip", "application/zip"),
|
||||
FLOWFILE_STREAM_FORMAT_V3("flowfile-stream-v3", "application/flowfile-v3"),
|
||||
FLOWFILE_STREAM_FORMAT_V2("flowfile-stream-v2", "application/flowfile-v2"),
|
||||
FLOWFILE_TAR_FORMAT("flowfile-tar-v1", "application/flowfile-v1");
|
||||
|
||||
|
||||
private final String textValue;
|
||||
private String mimeType;
|
||||
|
||||
PackageFormat(String textValue, String mimeType) {
|
||||
this.textValue = textValue;
|
||||
this.mimeType = mimeType;
|
||||
}
|
||||
|
||||
PackageFormat(String textValue) {
|
||||
this.textValue = textValue;
|
||||
}
|
||||
|
||||
@Override public String toString() {
|
||||
return textValue;
|
||||
}
|
||||
|
||||
public String getMimeType() {
|
||||
return mimeType;
|
||||
}
|
||||
|
||||
public static PackageFormat getFormat(String textValue) {
|
||||
switch (textValue) {
|
||||
case "use mime.type attribute":
|
||||
return AUTO_DETECT_FORMAT;
|
||||
case "tar":
|
||||
return TAR_FORMAT;
|
||||
case "zip":
|
||||
return ZIP_FORMAT;
|
||||
case "flowfile-stream-v3":
|
||||
return FLOWFILE_STREAM_FORMAT_V3;
|
||||
case "flowfile-stream-v2":
|
||||
return FLOWFILE_STREAM_FORMAT_V2;
|
||||
case "flowfile-stream-v1":
|
||||
return FLOWFILE_TAR_FORMAT;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,13 +42,13 @@ public class TestUnpackContent {
|
|||
public void testTar() throws IOException {
|
||||
final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent());
|
||||
final TestRunner autoUnpackRunner = TestRunners.newTestRunner(new UnpackContent());
|
||||
unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.TAR_FORMAT);
|
||||
autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.AUTO_DETECT_FORMAT);
|
||||
unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.TAR_FORMAT.toString());
|
||||
autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString());
|
||||
unpackRunner.enqueue(dataPath.resolve("data.tar"));
|
||||
Map<String, String> attributes = new HashMap<>(1);
|
||||
Map<String, String> attributes2 = new HashMap<>(1);
|
||||
attributes.put("mime.type", "application/x-tar");
|
||||
attributes2.put("mime.type", "application/tar");
|
||||
attributes.put("mime.type", UnpackContent.PackageFormat.TAR_FORMAT.getMimeType());
|
||||
attributes2.put("mime.type", UnpackContent.PackageFormat.X_TAR_FORMAT.getMimeType());
|
||||
autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes);
|
||||
autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes2);
|
||||
unpackRunner.run();
|
||||
|
@ -73,12 +73,58 @@ public class TestUnpackContent {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTarWithFilter() throws IOException {
|
||||
final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent());
|
||||
final TestRunner autoUnpackRunner = TestRunners.newTestRunner(new UnpackContent());
|
||||
unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.TAR_FORMAT.toString());
|
||||
unpackRunner.setProperty(UnpackContent.FILE_FILTER, "^folder/date.txt$");
|
||||
autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString());
|
||||
autoUnpackRunner.setProperty(UnpackContent.FILE_FILTER, "^folder/cal.txt$");
|
||||
unpackRunner.enqueue(dataPath.resolve("data.tar"));
|
||||
Map<String, String> attributes = new HashMap<>(1);
|
||||
Map<String, String> attributes2 = new HashMap<>(1);
|
||||
attributes.put("mime.type", "application/x-tar");
|
||||
attributes2.put("mime.type", "application/tar");
|
||||
autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes);
|
||||
autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes2);
|
||||
unpackRunner.run();
|
||||
autoUnpackRunner.run(2);
|
||||
|
||||
unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 1);
|
||||
unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1);
|
||||
unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
|
||||
|
||||
autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
|
||||
autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
|
||||
autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
|
||||
|
||||
List<MockFlowFile> unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
|
||||
for (final MockFlowFile flowFile : unpacked) {
|
||||
final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
|
||||
final String folder = flowFile.getAttribute(CoreAttributes.PATH.key());
|
||||
final Path path = dataPath.resolve(folder).resolve(filename);
|
||||
assertTrue(Files.exists(path));
|
||||
assertEquals("date.txt", filename);
|
||||
flowFile.assertContentEquals(path.toFile());
|
||||
}
|
||||
unpacked = autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
|
||||
for (final MockFlowFile flowFile : unpacked) {
|
||||
final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
|
||||
final String folder = flowFile.getAttribute(CoreAttributes.PATH.key());
|
||||
final Path path = dataPath.resolve(folder).resolve(filename);
|
||||
assertTrue(Files.exists(path));
|
||||
assertEquals("cal.txt", filename);
|
||||
flowFile.assertContentEquals(path.toFile());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testZip() throws IOException {
|
||||
final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent());
|
||||
final TestRunner autoUnpackRunner = TestRunners.newTestRunner(new UnpackContent());
|
||||
unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.ZIP_FORMAT);
|
||||
autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.AUTO_DETECT_FORMAT);
|
||||
unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.ZIP_FORMAT.toString());
|
||||
autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString());
|
||||
unpackRunner.enqueue(dataPath.resolve("data.zip"));
|
||||
Map<String, String> attributes = new HashMap<>(1);
|
||||
attributes.put("mime.type", "application/zip");
|
||||
|
@ -105,10 +151,53 @@ public class TestUnpackContent {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testZipWithFilter() throws IOException {
|
||||
final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent());
|
||||
final TestRunner autoUnpackRunner = TestRunners.newTestRunner(new UnpackContent());
|
||||
unpackRunner.setProperty(UnpackContent.FILE_FILTER, "^folder/date.txt$");
|
||||
unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.ZIP_FORMAT.toString());
|
||||
autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString());
|
||||
autoUnpackRunner.setProperty(UnpackContent.FILE_FILTER, "^folder/cal.txt$");
|
||||
unpackRunner.enqueue(dataPath.resolve("data.zip"));
|
||||
Map<String, String> attributes = new HashMap<>(1);
|
||||
attributes.put("mime.type", "application/zip");
|
||||
autoUnpackRunner.enqueue(dataPath.resolve("data.zip"), attributes);
|
||||
unpackRunner.run();
|
||||
autoUnpackRunner.run();
|
||||
|
||||
unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 1);
|
||||
unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1);
|
||||
unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
|
||||
|
||||
autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 1);
|
||||
autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1);
|
||||
autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
|
||||
|
||||
List<MockFlowFile> unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
|
||||
for (final MockFlowFile flowFile : unpacked) {
|
||||
final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
|
||||
final String folder = flowFile.getAttribute(CoreAttributes.PATH.key());
|
||||
final Path path = dataPath.resolve(folder).resolve(filename);
|
||||
assertTrue(Files.exists(path));
|
||||
assertEquals("date.txt", filename);
|
||||
flowFile.assertContentEquals(path.toFile());
|
||||
}
|
||||
unpacked = autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
|
||||
for (final MockFlowFile flowFile : unpacked) {
|
||||
final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
|
||||
final String folder = flowFile.getAttribute(CoreAttributes.PATH.key());
|
||||
final Path path = dataPath.resolve(folder).resolve(filename);
|
||||
assertTrue(Files.exists(path));
|
||||
assertEquals("cal.txt", filename);
|
||||
flowFile.assertContentEquals(path.toFile());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlowFileStreamV3() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new UnpackContent());
|
||||
runner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.FLOWFILE_STREAM_FORMAT_V3);
|
||||
runner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.FLOWFILE_STREAM_FORMAT_V3.toString());
|
||||
runner.enqueue(dataPath.resolve("data.flowfilev3"));
|
||||
|
||||
runner.run();
|
||||
|
@ -131,7 +220,7 @@ public class TestUnpackContent {
|
|||
@Test
|
||||
public void testFlowFileStreamV2() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new UnpackContent());
|
||||
runner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.FLOWFILE_STREAM_FORMAT_V2);
|
||||
runner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.FLOWFILE_STREAM_FORMAT_V2.toString());
|
||||
runner.enqueue(dataPath.resolve("data.flowfilev2"));
|
||||
|
||||
runner.run();
|
||||
|
@ -154,7 +243,7 @@ public class TestUnpackContent {
|
|||
@Test
|
||||
public void testTarThenMerge() throws IOException {
|
||||
final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent());
|
||||
unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.TAR_FORMAT);
|
||||
unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.TAR_FORMAT.toString());
|
||||
|
||||
unpackRunner.enqueue(dataPath.resolve("data.tar"));
|
||||
unpackRunner.run();
|
||||
|
@ -188,7 +277,7 @@ public class TestUnpackContent {
|
|||
@Test
|
||||
public void testZipThenMerge() throws IOException {
|
||||
final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent());
|
||||
unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.ZIP_FORMAT);
|
||||
unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.ZIP_FORMAT.toString());
|
||||
|
||||
unpackRunner.enqueue(dataPath.resolve("data.zip"));
|
||||
unpackRunner.run();
|
||||
|
@ -222,7 +311,7 @@ public class TestUnpackContent {
|
|||
@Test
|
||||
public void testZipHandlesBadData() throws IOException {
|
||||
final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent());
|
||||
unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.ZIP_FORMAT);
|
||||
unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.ZIP_FORMAT.toString());
|
||||
|
||||
unpackRunner.enqueue(dataPath.resolve("data.tar"));
|
||||
unpackRunner.run();
|
||||
|
@ -235,7 +324,7 @@ public class TestUnpackContent {
|
|||
@Test
|
||||
public void testTarHandlesBadData() throws IOException {
|
||||
final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent());
|
||||
unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.TAR_FORMAT);
|
||||
unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.TAR_FORMAT.toString());
|
||||
|
||||
unpackRunner.enqueue(dataPath.resolve("data.zip"));
|
||||
unpackRunner.run();
|
||||
|
|
Loading…
Reference in New Issue